Lets start with a very basic function which in this example takes a parameter of Type Map and inserts into into the Region using Region.putAll method.
ServerPutAllFunction.java
package pivotal.au.gemfire.performance.functions;
import java.util.Map;
import java.util.Properties;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.Declarable;
import com.gemstone.gemfire.cache.GemFireCache;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.execute.FunctionAdapter;
import com.gemstone.gemfire.cache.execute.FunctionContext;
public class ServerPutAllFunction extends FunctionAdapter implements Declarable
{
private static final long serialVersionUID = 1L;
public static final String ID = "serverputall-function";
private GemFireCache cache;
public ServerPutAllFunction()
{
this.cache = CacheFactory.getAnyInstance();
}
@Override
public void init(Properties arg0)
{
// TODO Auto-generated method stub
}
@Override
public void execute(FunctionContext context)
{
Map testMap = (Map) context.getArguments();
Region region = this.cache.getRegion("testRegionFunction");
region.putAll(testMap);
context.getResultSender().lastResult(testMap.size());
}
@Override
public String getId()
{
// TODO Auto-generated method stub
return ID;
}
}
Cahe.xml showing how to define the function
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE cache PUBLIC "-//GemStone Systems, Inc.//GemFire Declarative Cache 7.0//EN"
"http://www.gemstone.com/dtd/cache7_0.dtd">
<cache is-server="true">
<cache-server port="0" notify-by-subscription="true"/>
<pdx read-serialized="false">
<pdx-serializer>
<class-name>com.gemstone.gemfire.pdx.ReflectionBasedAutoSerializer</class-name>
<parameter name="classes">
<string>pivotal\.au\.gemfire\.performance\..*</string>
</parameter>
</pdx-serializer>
</pdx>
<region name="testRegion">
<region-attributes data-policy="partition"
statistics-enabled="true"
concurrency-level="16">
<partition-attributes redundant-copies="1" total-num-buckets="113"/>
</region-attributes>
</region>
<region name="testRegionFunction">
<region-attributes data-policy="partition"
statistics-enabled="true"
concurrency-level="16">
<partition-attributes redundant-copies="1" total-num-buckets="113"/>
</region-attributes>
</region>
<function-service>
<function>
<class-name>pivotal.au.gemfire.performance.functions.SizeFunction</class-name>
</function>
<function>
<class-name>pivotal.au.gemfire.performance.functions.ServerPutAllFunction</class-name>
</function>
</function-service>
</cache>
Now lets connect to GFSH and display the functions available on all members.
gfsh>connect --locator=localhost[10334] Connecting to Locator at [host=localhost, port=10334] .. Connecting to Manager at [host=192-168-1-5.tpgi.com.au, port=1099] .. Successfully connected to: [host=192-168-1-5.tpgi.com.au, port=1099] Cluster-1 gfsh>list functions Member | Function ------- | --------------------- server1 | serverputall-function server1 | size-function server2 | serverputall-function server2 | size-function
Calling the function from a client would be done as follows
public void run() {
Map buffer = new HashMap();
Execution execution = null;
int dataSize = SAMPLE_SIZE / nThreads;
System.out.printf("Start: %d End: %d \n",(dataSize * (increment - 1)), (dataSize * increment));
for (int i = (dataSize * (increment - 1)); i < (dataSize * increment); i++) {
buffer.put(String.valueOf(i), new Test(i, "name" + i));
if (buffer.size() % BATCH_SIZE == 0) {
try {
execution = FunctionService.onServer(cache).withArgs(buffer);
ResultCollector<?, ?> collector = execution.execute(ServerPutAllFunction.ID);
counter += buffer.size();
buffer.clear();
} catch (Exception e) {
e.printStackTrace();
}
}
}
if (!buffer.isEmpty()) {
execution = FunctionService.onServer(cache).withArgs(buffer);
ResultCollector<?, ?> collector = execution.execute(ServerPutAllFunction.ID);
counter += buffer.size();
}
}