Search This Blog

Thursday, 26 September 2013

Functions in GemFire

You can execute data-independent functions or data-dependent functions in vFabric GemFire. You run data-independent functions by targeting a specific member or specific members in your distributed system, or by targeting logical member groups on which to execute your function. If you are executing a data-dependent function, you specify a region on which to execute the function.

Lets start with a very basic function which in this example takes a parameter of Type Map and inserts into into the Region using Region.putAll method.

ServerPutAllFunction.java
  
package pivotal.au.gemfire.performance.functions;

import java.util.Map;
import java.util.Properties;

import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.Declarable;
import com.gemstone.gemfire.cache.GemFireCache;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.execute.FunctionAdapter;
import com.gemstone.gemfire.cache.execute.FunctionContext;

public class ServerPutAllFunction extends FunctionAdapter implements Declarable 
{

 private static final long serialVersionUID = 1L;

 public static final String ID = "serverputall-function";
   
    private GemFireCache cache;
   
    public ServerPutAllFunction()
    {
     this.cache = CacheFactory.getAnyInstance(); 
    }
    
 @Override
 public void init(Properties arg0) 
 {
  // TODO Auto-generated method stub
  
 }

 @Override
 public void execute(FunctionContext context) 
 {
     Map testMap = (Map) context.getArguments();
     Region region = this.cache.getRegion("testRegionFunction");
     region.putAll(testMap);
     context.getResultSender().lastResult(testMap.size());
 }

 @Override
 public String getId() 
 {
  // TODO Auto-generated method stub
  return ID;
 }


}

Cahe.xml showing how to define the function
  
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE cache PUBLIC "-//GemStone Systems, Inc.//GemFire Declarative Cache 7.0//EN"
        "http://www.gemstone.com/dtd/cache7_0.dtd">
<cache is-server="true">

    <cache-server port="0" notify-by-subscription="true"/>    
    <pdx read-serialized="false">
        <pdx-serializer>
            <class-name>com.gemstone.gemfire.pdx.ReflectionBasedAutoSerializer</class-name>
            <parameter name="classes">
               <string>pivotal\.au\.gemfire\.performance\..*</string>
            </parameter>
        </pdx-serializer>
    </pdx>

    <region name="testRegion">
        <region-attributes data-policy="partition"
                           statistics-enabled="true"
                           concurrency-level="16">
            <partition-attributes redundant-copies="1" total-num-buckets="113"/>
        </region-attributes>
    </region>

    <region name="testRegionFunction">
        <region-attributes data-policy="partition"
                           statistics-enabled="true"
                           concurrency-level="16">
            <partition-attributes redundant-copies="1" total-num-buckets="113"/>
        </region-attributes>
    </region>
    
    <function-service>
   <function>
     <class-name>pivotal.au.gemfire.performance.functions.SizeFunction</class-name>
   </function>
   <function>
     <class-name>pivotal.au.gemfire.performance.functions.ServerPutAllFunction</class-name>
   </function>
    </function-service> 
   
</cache>

Now lets connect to GFSH and display the functions available on all members.
  
gfsh>connect --locator=localhost[10334]
Connecting to Locator at [host=localhost, port=10334] ..
Connecting to Manager at [host=192-168-1-5.tpgi.com.au, port=1099] ..
Successfully connected to: [host=192-168-1-5.tpgi.com.au, port=1099]

Cluster-1 gfsh>list functions
Member  | Function
------- | ---------------------
server1 | serverputall-function
server1 | size-function
server2 | serverputall-function
server2 | size-function

Calling the function from a client would be done as follows
  
public void run() {
            Map buffer = new HashMap();
            Execution execution = null;
            
            int dataSize = SAMPLE_SIZE / nThreads;
            System.out.printf("Start: %d  End: %d \n",(dataSize * (increment - 1)), (dataSize * increment));
            
            for (int i = (dataSize * (increment - 1)); i < (dataSize * increment); i++) {
                buffer.put(String.valueOf(i), new Test(i, "name" + i));
                if (buffer.size() % BATCH_SIZE == 0) {
                    try {
                        execution = FunctionService.onServer(cache).withArgs(buffer);
                        ResultCollector<?, ?> collector = execution.execute(ServerPutAllFunction.ID);
                        
                        counter += buffer.size();
                        buffer.clear();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
            
            if (!buffer.isEmpty()) {
                execution = FunctionService.onServer(cache).withArgs(buffer);
                ResultCollector<?, ?> collector = execution.execute(ServerPutAllFunction.ID);
                counter += buffer.size();
            }
        }

No comments: