Lets start with a very basic function which in this example takes a parameter of Type Map and inserts into into the Region using Region.putAll method.
ServerPutAllFunction.java
package pivotal.au.gemfire.performance.functions; import java.util.Map; import java.util.Properties; import com.gemstone.gemfire.cache.CacheFactory; import com.gemstone.gemfire.cache.Declarable; import com.gemstone.gemfire.cache.GemFireCache; import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.execute.FunctionAdapter; import com.gemstone.gemfire.cache.execute.FunctionContext; public class ServerPutAllFunction extends FunctionAdapter implements Declarable { private static final long serialVersionUID = 1L; public static final String ID = "serverputall-function"; private GemFireCache cache; public ServerPutAllFunction() { this.cache = CacheFactory.getAnyInstance(); } @Override public void init(Properties arg0) { // TODO Auto-generated method stub } @Override public void execute(FunctionContext context) { Map testMap = (Map) context.getArguments(); Region region = this.cache.getRegion("testRegionFunction"); region.putAll(testMap); context.getResultSender().lastResult(testMap.size()); } @Override public String getId() { // TODO Auto-generated method stub return ID; } }
Cahe.xml showing how to define the function
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE cache PUBLIC "-//GemStone Systems, Inc.//GemFire Declarative Cache 7.0//EN" "http://www.gemstone.com/dtd/cache7_0.dtd"> <cache is-server="true"> <cache-server port="0" notify-by-subscription="true"/> <pdx read-serialized="false"> <pdx-serializer> <class-name>com.gemstone.gemfire.pdx.ReflectionBasedAutoSerializer</class-name> <parameter name="classes"> <string>pivotal\.au\.gemfire\.performance\..*</string> </parameter> </pdx-serializer> </pdx> <region name="testRegion"> <region-attributes data-policy="partition" statistics-enabled="true" concurrency-level="16"> <partition-attributes redundant-copies="1" total-num-buckets="113"/> </region-attributes> </region> <region name="testRegionFunction"> <region-attributes data-policy="partition" statistics-enabled="true" concurrency-level="16"> <partition-attributes redundant-copies="1" total-num-buckets="113"/> </region-attributes> </region> <function-service> <function> <class-name>pivotal.au.gemfire.performance.functions.SizeFunction</class-name> </function> <function> <class-name>pivotal.au.gemfire.performance.functions.ServerPutAllFunction</class-name> </function> </function-service> </cache>
Now lets connect to GFSH and display the functions available on all members.
gfsh>connect --locator=localhost[10334] Connecting to Locator at [host=localhost, port=10334] .. Connecting to Manager at [host=192-168-1-5.tpgi.com.au, port=1099] .. Successfully connected to: [host=192-168-1-5.tpgi.com.au, port=1099] Cluster-1 gfsh>list functions Member | Function ------- | --------------------- server1 | serverputall-function server1 | size-function server2 | serverputall-function server2 | size-function
Calling the function from a client would be done as follows
public void run() { Map buffer = new HashMap(); Execution execution = null; int dataSize = SAMPLE_SIZE / nThreads; System.out.printf("Start: %d End: %d \n",(dataSize * (increment - 1)), (dataSize * increment)); for (int i = (dataSize * (increment - 1)); i < (dataSize * increment); i++) { buffer.put(String.valueOf(i), new Test(i, "name" + i)); if (buffer.size() % BATCH_SIZE == 0) { try { execution = FunctionService.onServer(cache).withArgs(buffer); ResultCollector<?, ?> collector = execution.execute(ServerPutAllFunction.ID); counter += buffer.size(); buffer.clear(); } catch (Exception e) { e.printStackTrace(); } } } if (!buffer.isEmpty()) { execution = FunctionService.onServer(cache).withArgs(buffer); ResultCollector<?, ?> collector = execution.execute(ServerPutAllFunction.ID); counter += buffer.size(); } }
No comments:
Post a Comment