Search This Blog

Monday, 4 March 2013

Implementing an AsyncEventListener for Write-Behind Cache Event Handling

As part of GemFire 70 release they have introduced as AsyncEventListener for write behind capability which is more or less very similar to the Gateway Listener in GemFire 6.x

An AsyncEventListener receives callbacks for events that change region data. You can use an AsyncEventListener implementation as a write-behind cache event handler to synchronize region updates with a database.

It documented as follows.

http://pubs.vmware.com/vfabricNoSuite/index.jsp?topic=/com.vmware.vfabric.gemfire.7.0/developing/events/implementing_write_behind_event_handler.html

So how would my cache.xml file for a member look like here.
  
<?xml version="1.0"?>
<!DOCTYPE cache PUBLIC
    "-//GemStone Systems, Inc.//GemFire Declarative Caching 7.0//EN"
    "http://www.gemstone.com/dtd/cache7_0.dtd">

<cache>
    <async-event-queue id="GreenplumQueue" parallel="true" batch-size="500">
       <async-event-listener>
             <class-name>vmware.pivotal.example.listener.GreenplumGatewayListener</class-name>
       </async-event-listener>      
    </async-event-queue>
    <cache-server port="40001" notify-by-subscription="true"/>
    <region name="greenplumRegion">
      <region-attributes refid="PARTITION_REDUNDANT" async-event-queue-ids="GreenplumQueue"/>
   </region>
</cache>

Finally the code to write an AsyncEventListener would be as follows.
  
package vmware.pivotal.example.listener;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import vmware.pivotal.example.dao.jdbcbatch.JdbcBatch;
import vmware.pivotal.example.dao.jdbcbatch.JdbcBatchDAO;

import com.gemstone.gemfire.cache.Declarable;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;

public class GreenplumGatewayListener implements AsyncEventListener, Declarable
{

 private Logger logger = Logger.getLogger(this.getClass().getSimpleName());
 private ApplicationContext context;
 private static final String BEAN_NAME = "jdbcBatchDAOImpl";
 private JdbcBatchDAO jdbcBatchDAO;
 
 public GreenplumGatewayListener()
 {
     context = new ClassPathXmlApplicationContext("application-context.xml");
     jdbcBatchDAO = (JdbcBatchDAO) context.getBean(BEAN_NAME);  
     logger.log (Level.INFO, "GreenplumGatewayListener started...");
 }
 
 @Override
 public boolean processEvents(@SuppressWarnings("rawtypes") List<AsyncEvent> list) 
 {
     logger.log (Level.INFO, String.format("Size of List<GatewayEvent> = %s", list.size()));
     List<JdbcBatch> newEntries = new ArrayList<JdbcBatch>();
     
     List<JdbcBatch> updatedEntries = new ArrayList<JdbcBatch>();
     List<String> destroyedEntries = new ArrayList<String>();
     int possibleDulicates = 0;
     
     for (@SuppressWarnings("rawtypes") AsyncEvent ge: list)
     {
       
       if (ge.getPossibleDuplicate())
        possibleDulicates++;
        
       if ( ge.getOperation().equals(Operation.UPDATE)) 
       {
      updatedEntries.add((JdbcBatch) ge.getDeserializedValue());
       }
       else if ( ge.getOperation().equals(Operation.CREATE))
       {
         newEntries.add((JdbcBatch) ge.getDeserializedValue());
       }
       else if ( ge.getOperation().equals(Operation.DESTROY))
       {
      destroyedEntries.add(ge.getKey().toString());
       }
      
     }
     
     if (newEntries.size() > 0)
     {
      jdbcBatchDAO.storeInsertBatch(newEntries); 
     }
     
     if (updatedEntries.size() > 0)
     {
      jdbcBatchDAO.storeUpdateBatch(updatedEntries);
     }
     
     if (destroyedEntries.size() > 0)
     {
      jdbcBatchDAO.storeDeleteBatch(destroyedEntries);
     }
     
     logger.log (Level.INFO, 
           String.format("New Entries = [%s], Updated Entries = [%s], Destroyed Entries = [%s], Possible Duplicates = [%s]", 
                   newEntries.size(), 
                   updatedEntries.size(), 
                   destroyedEntries.size(), 
                   possibleDulicates));
     
     return true;
 }


 public void init(Properties arg0) {
  // TODO Auto-generated method stub
  
 }

 public void close() {
  // TODO Auto-generated method stub
  
 }

}