Tuesday, 2 February 2010

Using Database Change Notification (DCN) with a Coherence Cache

I needed to ensure my coherence cache which was storing a table from a database was kept in sync. To do that I used Database Change Notification (DCN) in the 11g JDBC driver with a 11.2 RDBMS. Few things I needed to ensure were as follows.

1. Firstly I made sure my DCN listener on the client which determines what refresh to perform on the cache does this using the Executor interface to call a runnable task in it's own thread. That is done to make sure if the operation takes time it's done in it's own thread and won't hold up the listener itself.


package support.au.coherence.dcn.server.db;

import java.util.concurrent.Executor;

import oracle.jdbc.dcn.DatabaseChangeEvent;
import oracle.jdbc.dcn.DatabaseChangeListener;
import oracle.jdbc.dcn.RowChangeDescription;
import oracle.jdbc.dcn.RowChangeDescription.RowOperation;
import oracle.jdbc.dcn.TableChangeDescription;

public class DCNListener implements DatabaseChangeListener
{
DeptDCNRegister demo;
DCNListener(DeptDCNRegister dem)
{
demo = dem;
}

public void onDatabaseChangeNotification
(DatabaseChangeEvent databaseChangeEvent)
{
System.out.println("DCNListener: got an event (" + this + ")");
System.out.println(databaseChangeEvent.toString());
TableChangeDescription [] tableChanges =
databaseChangeEvent.getTableChangeDescription();

for (TableChangeDescription tableChange : tableChanges)
{
RowChangeDescription[] rcds = tableChange.getRowChangeDescription();
for (RowChangeDescription rcd : rcds)
{
System.out.println("Affected row -> " +
rcd.getRowid().stringValue());
RowOperation ro = rcd.getRowOperation();

Executor executor = new DBExecutor();
String rowid = rcd.getRowid().stringValue();

if (ro.equals(RowOperation.INSERT))
{

System.out.println("INSERT occurred");
executor.execute(new HandleDBRefresh(rowid, "insert"));
}
else if (ro.equals(RowOperation.UPDATE))
{
System.out.println("UPDATE occurred");
executor.execute(new HandleDBRefresh(rowid, "update"));
}
else if (ro.equals(RowOperation.DELETE))
{
System.out.println("DELETE occurred");
executor.execute(new HandleDBRefresh(rowid, "delete"));
}
else
{
System.out.println("Only handling INSERT/DELETE/UPDATE");
}
}
}

synchronized( demo )
{
demo.notify();
}

}
}

2. The "DBExecutor" is defined as follows.


package support.au.coherence.dcn.server.db;

import java.util.concurrent.Executor;

public class DBExecutor implements Executor
{
public void execute(Runnable command)
{
new Thread(command).run();
}
}

3. The runnable class "HandleDBRefresh" is defined as follows.


package support.au.coherence.dcn.server.db;

import com.tangosol.net.CacheFactory;
import com.tangosol.net.CacheFactoryBuilder;
import com.tangosol.net.NamedCache;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import support.au.coherence.dcn.server.CacheHelper;
import support.au.coherence.dcn.server.Dept;

public class HandleDBRefresh implements Runnable
{
private DBConnectionManager connMgr = null;
private String rowId;
private String action;

public HandleDBRefresh()
{
}

public HandleDBRefresh(String rowId, String action)
{
super();
this.rowId = rowId;
this.action = action;
}

public void run()
{
PreparedStatement stmt = null;
ResultSet rset = null;
Connection conn = null;

try
{
connMgr = DBConnectionManager.getInstance();
if (!action.toLowerCase().equals("delete"))
{
conn = connMgr.getConnection();
stmt = conn.prepareStatement
("select rowid, deptno, dname from dept where rowid = ?");
stmt.setString(1, rowId);
rset = stmt.executeQuery();
rset.next();
}

CacheHelper cacheHelper = CacheHelper.getInstance();

// check if action
if (action.toLowerCase().equals("delete"))
{
cacheHelper.removeEntry(rowId);
System.out.println("Cache record delete");
}
else if (action.toLowerCase().equals("insert"))
{
// add to cache
if (rset != null)
{
Dept d = new Dept(rset.getInt(2), rset.getString(3));
cacheHelper.updateEntry(rset.getString(1), d);
System.out.println("Cache updated with new record");
}
}
else if (action.toLowerCase().equals("update"))
{
// refresh record in cache
if (rset != null)
{
Dept d = new Dept(rset.getInt(2), rset.getString(3));
cacheHelper.updateEntry(rset.getString(1), d);
System.out.println("Cache record updated");
}
}
}
catch (Exception e)
{
throw new RuntimeException
("Error updating cache: rowid [" + rowId + "] " + e);
}
finally
{
if (rset != null)
{
try
{
rset.close();
}
catch (SQLException se)
{
}
}

if (stmt != null)
{
try
{
stmt.close();
}
catch (SQLException se)
{
}
}

if (conn != null)
{
try
{
connMgr.returnConnection(conn);
}
catch (SQLException se)
{
}
}

}

}

public void setRowId(String rowId)
{
this.rowId = rowId;
}

public String getRowId()
{
return rowId;
}

public void setAction(String action)
{
this.action = action;
}

public String getAction()
{
return action;
}
}


4. "DeptDCNRegister" is defined as follows.

  
package support.au.coherence.dcn.server;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;

import java.sql.Statement;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import oracle.jdbc.OracleConnection;
import oracle.jdbc.OracleStatement;
import oracle.jdbc.dcn.DatabaseChangeRegistration;

@SuppressWarnings("unchecked")
public class DeptDCNRegister
{
private DBConnectionManager connMgr = null;
//private final String depSQL = "select * from dept";
private DatabaseChangeRegistration dcr = null;
private static DeptDCNRegister instance = null;

static
{
try
{
instance = new DeptDCNRegister();
}
catch (Exception e)
{
throw new RuntimeException("Error creating instance of DeptDCNRegister", e);
}
}

private DeptDCNRegister () throws SQLException
{
connMgr = DBConnectionManager.getInstance();
OracleConnection conn = (OracleConnection) connMgr.getConnection();
if (dcr == null)
{
registerDCN(conn);
}
}

public static DeptDCNRegister getInstance()
{
return instance;
}

private void registerDCN (OracleConnection conn) throws SQLException
{
/*
* register a listener for change notofication to be displayed to standard out
* for testing purposes
*/
Properties props = new Properties();
props.put(OracleConnection.DCN_NOTIFY_ROWIDS, "true");
props.put(OracleConnection.NTF_QOS_RELIABLE, "false");
props.setProperty(OracleConnection.DCN_BEST_EFFORT, "true");

dcr = conn.registerDatabaseChangeNotification(props);

// Add the dummy DCNListener which is DCNListener.java class
DCNListener list = new DCNListener(this);
dcr.addListener(list);

Statement stmt = conn.createStatement();
// Associate the statement with the registration.
((OracleStatement)stmt).setDatabaseChangeRegistration(dcr);
ResultSet rs = stmt.executeQuery("select * from dept where 1 = 2");
while (rs.next())
{
// do nothing no , need to just need query to register the DEPT table
}

String[] tableNames = dcr.getTables();
for(int i=0; i < tableNames.length; i++)
{
System.out.println(tableNames[i]+" successfully registered.");
}

// close resources
stmt.close();
rs.close();

}

public void closeDCN (OracleConnection conn) throws SQLException
{
conn.unregisterDatabaseChangeNotification(dcr);
conn.close();
}
}


5. Finally the key for Dept cache records is the ROWID at the database table level to ensure it's always unique not to mention that's what drives the specific operation so makes it easier this way. The ROWID is what is provided to the listener for the table changes so makes sense to use that as the KEY within the coherence cache.

I did give some thought into making the DCN listener run within the DB but loading the required JARS turned me off that idea. Wasn't sure how I could connect to a coherence cluster itself as storage disabled from within the DB itself to be honest. Otherwise this would of been a good option as the registration would of been active as long as the DB was running.

7 comments:

eggmatters said...

This code is a little bit more lucid than the similair code provided by oracle. Trying to follow as my listener hangs and won't seem to acknowledge an event it is listening for. Curious though, what specifically is defined in your class object: DeptDCNRegister demo? I could imagine that it is the registered event?

Pas Apicella said...

I added the code to the main entry above, alot easier then adding to comments itself as it's difficult to format.

DeptDCNRegister.java does the registration itself.

eggmatters said...

Excellent, that makse perfect sense. Thank you!

eggmatters said...

I believe I have found a bug in 11g. My call from the change registration object to getTables:

# String[] tableNames = dcr.getTables();
# for(int i=0; i < tableNames.length; i++)
# {
# System.out.println(tableNames[i]+" successfully registered.");
# }

Returns a truncated value. My table name is .CUSTOM_TESTER, where getTables() returns .CUSTOM. Truncating the table name following the underscore. A post on Oracle forums went unanswered and a search for similar issues showed posts that went unanswered as well. This is probably why my listener is not responding.

Pas Apicella said...

I tested this with the 11.1.0.7 and 11.2.0.1 JDBC driver and it worked ok for me. Output as follows:

Started DCNDemo at Tue Apr 13 09:28:08 EST 2010
Showing tables which are part of the registration thus far
SCOTT.CUSTOM_TESTER is part of the registration.
Ended DCNDemo at Tue Apr 13 09:28:09 EST 2010

I suggest you log a support ticket with oracle giveing your testcase showing it's not, my registration is the same code used in this blog entry above so nothing unique about it. The table was created as follows in a 11.1.0.7 RDBMS.

create table CUSTOM_TESTER (col1 number);

amitstechblog said...

Good Post. Something which I am also working on currently. I am currently facing an issue which is - If the application registers for database change notification and aborts before un-registering, the next attempt to register would fail with "ORA-29972: user does not have privilege to change/ create registration" and further attempts will fail with an error "ORA-29970: Specified registration id does not exist".

A real time example of the above situation where the application aborts before un-registering could be power failures or network failures.

I am not sure if you have faced this issue.

Anonymous said...

Hi Pas,


As for the problem that the database registrations do not die if the listener has an abnormal exit (see amitstechblog's posting above)

I've found the following measures can potentially help on the problem:


1.
The Java application should register a Shutdown hook to make sure the registrations are unregistered if the application is killed. Here's how you can potentially write such code:


<code>
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
/* unregister here */
}
});
</code>
(Sorry for this lousy formatting. I really, really recent blogger's lack of formatting options for comments !!)

Note that use of Shutdown hook does not catch all cases of abnormal exit but it is certainly better than nothing.


2.
On startup of the Java application it should query the USER_CHANGE_NOTIFICATION_REGS dictionary view to find orphaned registrations and then unregister those before proceeding.

I think that together these two measures will solve the problem of orphaned registrations.