I needed to ensure my coherence cache which was storing a table from a database was kept in sync. To do that I used Database Change Notification (DCN) in the 11g JDBC driver with a 11.2 RDBMS. Few things I needed to ensure were as follows.
1. Firstly I made sure my DCN listener on the client which determines what refresh to perform on the cache does this using the
Executor interface to call a runnable task in it's own thread. That is done to make sure if the operation takes time it's done in it's own thread and won't hold up the listener itself.
package support.au.coherence.dcn.server.db;
import java.util.concurrent.Executor;
import oracle.jdbc.dcn.DatabaseChangeEvent;
import oracle.jdbc.dcn.DatabaseChangeListener;
import oracle.jdbc.dcn.RowChangeDescription;
import oracle.jdbc.dcn.RowChangeDescription.RowOperation;
import oracle.jdbc.dcn.TableChangeDescription;
public class DCNListener implements DatabaseChangeListener
{
DeptDCNRegister demo;
DCNListener(DeptDCNRegister dem)
{
demo = dem;
}
public void onDatabaseChangeNotification
(DatabaseChangeEvent databaseChangeEvent)
{
System.out.println("DCNListener: got an event (" + this + ")");
System.out.println(databaseChangeEvent.toString());
TableChangeDescription [] tableChanges =
databaseChangeEvent.getTableChangeDescription();
for (TableChangeDescription tableChange : tableChanges)
{
RowChangeDescription[] rcds = tableChange.getRowChangeDescription();
for (RowChangeDescription rcd : rcds)
{
System.out.println("Affected row -> " +
rcd.getRowid().stringValue());
RowOperation ro = rcd.getRowOperation();
Executor executor = new DBExecutor();
String rowid = rcd.getRowid().stringValue();
if (ro.equals(RowOperation.INSERT))
{
System.out.println("INSERT occurred");
executor.execute(new HandleDBRefresh(rowid, "insert"));
}
else if (ro.equals(RowOperation.UPDATE))
{
System.out.println("UPDATE occurred");
executor.execute(new HandleDBRefresh(rowid, "update"));
}
else if (ro.equals(RowOperation.DELETE))
{
System.out.println("DELETE occurred");
executor.execute(new HandleDBRefresh(rowid, "delete"));
}
else
{
System.out.println("Only handling INSERT/DELETE/UPDATE");
}
}
}
synchronized( demo )
{
demo.notify();
}
}
}
2. The "DBExecutor" is defined as follows.
package support.au.coherence.dcn.server.db;
import java.util.concurrent.Executor;
public class DBExecutor implements Executor
{
public void execute(Runnable command)
{
new Thread(command).run();
}
}
3. The runnable class "
HandleDBRefresh" is defined as follows.
package support.au.coherence.dcn.server.db;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.CacheFactoryBuilder;
import com.tangosol.net.NamedCache;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import support.au.coherence.dcn.server.CacheHelper;
import support.au.coherence.dcn.server.Dept;
public class HandleDBRefresh implements Runnable
{
private DBConnectionManager connMgr = null;
private String rowId;
private String action;
public HandleDBRefresh()
{
}
public HandleDBRefresh(String rowId, String action)
{
super();
this.rowId = rowId;
this.action = action;
}
public void run()
{
PreparedStatement stmt = null;
ResultSet rset = null;
Connection conn = null;
try
{
connMgr = DBConnectionManager.getInstance();
if (!action.toLowerCase().equals("delete"))
{
conn = connMgr.getConnection();
stmt = conn.prepareStatement
("select rowid, deptno, dname from dept where rowid = ?");
stmt.setString(1, rowId);
rset = stmt.executeQuery();
rset.next();
}
CacheHelper cacheHelper = CacheHelper.getInstance();
// check if action
if (action.toLowerCase().equals("delete"))
{
cacheHelper.removeEntry(rowId);
System.out.println("Cache record delete");
}
else if (action.toLowerCase().equals("insert"))
{
// add to cache
if (rset != null)
{
Dept d = new Dept(rset.getInt(2), rset.getString(3));
cacheHelper.updateEntry(rset.getString(1), d);
System.out.println("Cache updated with new record");
}
}
else if (action.toLowerCase().equals("update"))
{
// refresh record in cache
if (rset != null)
{
Dept d = new Dept(rset.getInt(2), rset.getString(3));
cacheHelper.updateEntry(rset.getString(1), d);
System.out.println("Cache record updated");
}
}
}
catch (Exception e)
{
throw new RuntimeException
("Error updating cache: rowid [" + rowId + "] " + e);
}
finally
{
if (rset != null)
{
try
{
rset.close();
}
catch (SQLException se)
{
}
}
if (stmt != null)
{
try
{
stmt.close();
}
catch (SQLException se)
{
}
}
if (conn != null)
{
try
{
connMgr.returnConnection(conn);
}
catch (SQLException se)
{
}
}
}
}
public void setRowId(String rowId)
{
this.rowId = rowId;
}
public String getRowId()
{
return rowId;
}
public void setAction(String action)
{
this.action = action;
}
public String getAction()
{
return action;
}
}
4. "
DeptDCNRegister" is defined as follows.
package support.au.coherence.dcn.server;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import oracle.jdbc.OracleConnection;
import oracle.jdbc.OracleStatement;
import oracle.jdbc.dcn.DatabaseChangeRegistration;
@SuppressWarnings("unchecked")
public class DeptDCNRegister
{
private DBConnectionManager connMgr = null;
//private final String depSQL = "select * from dept";
private DatabaseChangeRegistration dcr = null;
private static DeptDCNRegister instance = null;
static
{
try
{
instance = new DeptDCNRegister();
}
catch (Exception e)
{
throw new RuntimeException("Error creating instance of DeptDCNRegister", e);
}
}
private DeptDCNRegister () throws SQLException
{
connMgr = DBConnectionManager.getInstance();
OracleConnection conn = (OracleConnection) connMgr.getConnection();
if (dcr == null)
{
registerDCN(conn);
}
}
public static DeptDCNRegister getInstance()
{
return instance;
}
private void registerDCN (OracleConnection conn) throws SQLException
{
/*
* register a listener for change notofication to be displayed to standard out
* for testing purposes
*/
Properties props = new Properties();
props.put(OracleConnection.DCN_NOTIFY_ROWIDS, "true");
props.put(OracleConnection.NTF_QOS_RELIABLE, "false");
props.setProperty(OracleConnection.DCN_BEST_EFFORT, "true");
dcr = conn.registerDatabaseChangeNotification(props);
// Add the dummy DCNListener which is DCNListener.java class
DCNListener list = new DCNListener(this);
dcr.addListener(list);
Statement stmt = conn.createStatement();
// Associate the statement with the registration.
((OracleStatement)stmt).setDatabaseChangeRegistration(dcr);
ResultSet rs = stmt.executeQuery("select * from dept where 1 = 2");
while (rs.next())
{
// do nothing no , need to just need query to register the DEPT table
}
String[] tableNames = dcr.getTables();
for(int i=0; i < tableNames.length; i++)
{
System.out.println(tableNames[i]+" successfully registered.");
}
// close resources
stmt.close();
rs.close();
}
public void closeDCN (OracleConnection conn) throws SQLException
{
conn.unregisterDatabaseChangeNotification(dcr);
conn.close();
}
}
5. Finally the key for Dept cache records is the ROWID at the database table level to ensure it's always unique not to mention that's what drives the specific operation so makes it easier this way. The ROWID is what is provided to the listener for the table changes so makes sense to use that as the KEY within the coherence cache.
I did give some thought into making the DCN listener run within the DB but loading the required JARS turned me off that idea. Wasn't sure how I could connect to a coherence cluster itself as storage disabled from within the DB itself to be honest. Otherwise this would of been a good option as the registration would of been active as long as the DB was running.