An AsyncEventListener receives callbacks for events that change region data. You can use an AsyncEventListener implementation as a write-behind cache event handler to synchronize region updates with a database.
It documented as follows.
http://pubs.vmware.com/vfabricNoSuite/index.jsp?topic=/com.vmware.vfabric.gemfire.7.0/developing/events/implementing_write_behind_event_handler.html
So how would my cache.xml file for a member look like here.
<?xml version="1.0"?> <!DOCTYPE cache PUBLIC "-//GemStone Systems, Inc.//GemFire Declarative Caching 7.0//EN" "http://www.gemstone.com/dtd/cache7_0.dtd"> <cache> <async-event-queue id="GreenplumQueue" parallel="true" batch-size="500"> <async-event-listener> <class-name>vmware.pivotal.example.listener.GreenplumGatewayListener</class-name> </async-event-listener> </async-event-queue> <cache-server port="40001" notify-by-subscription="true"/> <region name="greenplumRegion"> <region-attributes refid="PARTITION_REDUNDANT" async-event-queue-ids="GreenplumQueue"/> </region> </cache>
Finally the code to write an AsyncEventListener would be as follows.
package vmware.pivotal.example.listener; import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.logging.Level; import java.util.logging.Logger; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import vmware.pivotal.example.dao.jdbcbatch.JdbcBatch; import vmware.pivotal.example.dao.jdbcbatch.JdbcBatchDAO; import com.gemstone.gemfire.cache.Declarable; import com.gemstone.gemfire.cache.Operation; import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent; import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener; public class GreenplumGatewayListener implements AsyncEventListener, Declarable { private Logger logger = Logger.getLogger(this.getClass().getSimpleName()); private ApplicationContext context; private static final String BEAN_NAME = "jdbcBatchDAOImpl"; private JdbcBatchDAO jdbcBatchDAO; public GreenplumGatewayListener() { context = new ClassPathXmlApplicationContext("application-context.xml"); jdbcBatchDAO = (JdbcBatchDAO) context.getBean(BEAN_NAME); logger.log (Level.INFO, "GreenplumGatewayListener started..."); } @Override public boolean processEvents(@SuppressWarnings("rawtypes") List<AsyncEvent> list) { logger.log (Level.INFO, String.format("Size of List<GatewayEvent> = %s", list.size())); List<JdbcBatch> newEntries = new ArrayList<JdbcBatch>(); List<JdbcBatch> updatedEntries = new ArrayList<JdbcBatch>(); List<String> destroyedEntries = new ArrayList<String>(); int possibleDulicates = 0; for (@SuppressWarnings("rawtypes") AsyncEvent ge: list) { if (ge.getPossibleDuplicate()) possibleDulicates++; if ( ge.getOperation().equals(Operation.UPDATE)) { updatedEntries.add((JdbcBatch) ge.getDeserializedValue()); } else if ( ge.getOperation().equals(Operation.CREATE)) { newEntries.add((JdbcBatch) ge.getDeserializedValue()); } else if ( ge.getOperation().equals(Operation.DESTROY)) { destroyedEntries.add(ge.getKey().toString()); } } if (newEntries.size() > 0) { jdbcBatchDAO.storeInsertBatch(newEntries); } if (updatedEntries.size() > 0) { jdbcBatchDAO.storeUpdateBatch(updatedEntries); } if (destroyedEntries.size() > 0) { jdbcBatchDAO.storeDeleteBatch(destroyedEntries); } logger.log (Level.INFO, String.format("New Entries = [%s], Updated Entries = [%s], Destroyed Entries = [%s], Possible Duplicates = [%s]", newEntries.size(), updatedEntries.size(), destroyedEntries.size(), possibleDulicates)); return true; } public void init(Properties arg0) { // TODO Auto-generated method stub } public void close() { // TODO Auto-generated method stub } }