Search This Blog

Monday, 12 December 2011

GemFire Write-Behind with an Oracle RDBMS

Many in memory data fabric's use an Oracle RDBMS to persist data and make it available as required. In this demo we explore an effective way to use GemFire write-behind functionality to an Oracle RDBMS and strategies around how to increase throughput. GemFire's Write-Behind technology simply translates what in-memory data management does uniquely well (low-latency/high throughput of small data updates) to what disk-based data management does best (higher latency/high throughput of large data updates).

Here is an example config of such a setup.

cache XML file
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE cache PUBLIC
    "-//GemStone Systems, Inc.//GemFire Declarative Caching 6.6//EN" 
"http://www.gemstone.com/dtd/cache6_6.dtd">
<cache>
  <gateway-hub id="DBWriterHub" port="-1" startup-policy="none">
     <gateway id="DBWriter">
        <gateway-listener>
           <class-name>pas.au.gemfire.demo.cachewriter.DBGatewayListener</class-name>
        </gateway-listener>
        <!-- 10 seconds limit that can elapse between sending batches of up to 1000 -->
        <gateway-queue batch-size="5000" batch-time-interval="10000"/>
     </gateway>
  </gateway-hub>
  <region name="firstRegion" refid="PARTITION_REDUNDANT"> 
    <region-attributes enable-gateway="true" hub-id="DBWriterHub">
      <eviction-attributes>
        <lru-heap-percentage action="overflow-to-disk" />
      </eviction-attributes>
    </region-attributes>
  </region>
  <function-service>
 <function>
   <class-name>pas.au.gemfire.demo.cachewriter.SizeFunction</class-name>
 </function>
  </function-service> 
  <resource-manager critical-heap-percentage="75" eviction-heap-percentage="65"/>
</cache>

Gateway Listener Code
package pas.au.gemfire.demo.cachewriter;

import com.gemstone.gemfire.cache.Declarable;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.util.GatewayEvent;
import com.gemstone.gemfire.cache.util.GatewayEventListener;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import pas.au.gemfire.demo.cachewriter.dao.jdbcbatch.JdbcBatch;
import pas.au.gemfire.demo.cachewriter.dao.jdbcbatch.JdbcBatchDAO;
import pas.au.gemfire.demo.cachewriter.dao.jdbcbatch.JdbcBatchDAOImpl;

public class DBGatewayListener implements GatewayEventListener, Declarable 
{
  private Logger logger = Logger.getLogger(this.getClass().getSimpleName());
  private ApplicationContext context;
  private static final String BEAN_NAME = "jdbcBatchDAO";
  private JdbcBatchDAO jdbcBatchDAO;

  public DBGatewayListener()
  {
    context = new ClassPathXmlApplicationContext("application-config.xml");
    jdbcBatchDAO = (JdbcBatchDAOImpl) context.getBean(BEAN_NAME);
  }

  @Override
  public boolean processEvents(List<GatewayEvent> list)
  {
    logger.log (Level.INFO, String.format("Size of List<GatewayEvent> = %s", list.size()));
    List<JdbcBatch> newEntries = new ArrayList<JdbcBatch>();
    
    List<JdbcBatch> updatedEntries = new ArrayList<JdbcBatch>();
    List<String> destroyedEntries = new ArrayList<String>();
    @SuppressWarnings("unused")
 int possibleDulicates = 0;
    
    for (GatewayEvent ge: list)
    {
      
      if (ge.getPossibleDuplicate())
       possibleDulicates++;
       
      if ( ge.getOperation().equals(Operation.UPDATE)) 
      {
     updatedEntries.add((JdbcBatch) ge.getDeserializedValue());
      }
      else if ( ge.getOperation().equals(Operation.CREATE))
      {
        newEntries.add((JdbcBatch) ge.getDeserializedValue());
      }
      else if ( ge.getOperation().equals(Operation.DESTROY))
      {
     destroyedEntries.add(ge.getKey().toString());
      }
     
    }
    
    if (newEntries.size() > 0)
    {
     jdbcBatchDAO.storeInsertBatch(newEntries); 
    }
    
    if (updatedEntries.size() > 0)
    {
     jdbcBatchDAO.storeUpdateBatch(updatedEntries);
    }
    
    if (destroyedEntries.size() > 0)
    {
     jdbcBatchDAO.storeDeleteBatch(destroyedEntries);
    }
    
    logger.log (Level.INFO, 
          String.format("New Entries = [%s], Updated Entries = [%s], Destroyed Entries = [%s], Possible Duplicates = [%s]", 
                  newEntries.size(), 
                  updatedEntries.size(), 
                  destroyedEntries.size(), 
                  possibleDulicates));
    
    return true;
  }

  @Override
  public void close()
  {
  }

  @Override
  public void init(Properties properties)
  {
  }
}

For more information on vFabric GemFire use the link below.

http://www.vmware.com/products/application-platform/vfabric-gemfire/overview.html

2 comments:

Anonymous said...

How does eviction from the cache work? What triggers an eviction?

Pas Apicella said...

It's described here in the GemFire 7 documentation

http://pubs.vmware.com/vfabric53/index.jsp?topic=/com.vmware.vfabric.gemfire.7.0/developing/eviction/chapter_overview.html