Caching Data with GemFire XD / Handling DML Events Asynchronously |
You implement an AsyncEventListener by defining a class that implements the AsyncEventListener interface, installing the AsyncEventListener configuration, and associating a table with it.
The example procedures show how to configure an AsyncEventListener with redundancy 1 in a GemFire XD system having one peer-client member and three data store members. Two data stores "DataStore1" and "DataStore2" are available in server group "SG1." The AsyncEventListener will be installed on only those members that belong to server group 'SG1'. A single table, "TESTTABLE" is associated with the AsyncEventListener.
When testing your own AsyncEventListener implementation you will also need to stop the listener configuration. Stop or Remove an AsyncEventListener Configuration describes this process.
Install the AsyncEventListener implementation by executing the CREATE ASYNCEVENTLISTENER statement on any member of the GemFire XD system. You must specify the server groups on which to deploy the implementation, as well as a unique identification and class name for the implementation.
For example:
CREATE ASYNCEVENTLISTENER MyListener ( LISTENERCLASSNAME 'com.pivotal.gemfirexd.callbacks.TestAsyncEventListener' INITPARAMS 'file=/usr/local/my-listener-params.txt' ) SERVER GROUPS ( SG1 );
GemFire XD installs the AsyncEventListener instance on each data store in the specified SERVER GROUPS. Ensure that the server groups have at least two data stores in order to provide redundancy for listener operations. As a best practice, install no more than two standby listener instances (redundancy of at most two) for performance and memory reasons.
When multiple data stores host an event listener, there is no direct way to ensure that a specific GemFire XD data store hosts the active instance. If you require a specific member to host the active instance, ensure that only that member is running in the target server groups when you deploy the listener and attach the table. You can then start the remaining data stores in the server group, and they will host redundant, standby instances.
The above example installs TestAsyncEventListener with the identifier "MyListener" on members of the server group "SG1," using initialization parameters defined in /usr/local/my-listener-params.txt. See CREATE ASYNCEVENTLISTENER for more information about other parameters to the procedure.
After you install the AsyncEventListener implementation, you can start it by specifying the unique identifier for the implementation in the SYS.START_ASYNC_EVENT_LISTENER procedure. For example:
java.sql.Connection conn = getConnection(); CallableStatement cs = conn.prepareCall("call SYS.START_ASYNC_EVENT_LISTENER( ? )"); cs.setString(1, "MyListener"); cs.execute();
On execution, a single thread services the queue of the AsyncEventListener and dispatches events to the callback class in FIFO order.
You associate a table with an AsyncEventListener configuration using the AsyncEventListener clause in the table's CREATE TABLE statement. For example, to associate the table "TestTable" with an AsyncEventListenerConfiguration identified by "MyListener" use the statement:
java.sql.Connection conn = getConnection(); Statement stmt = conn.createStatement(); stmt.execute("CREATE TABLE TESTTABLE (ID int NOT NULL, DESCRIPTION varchar(1024), ADDRESS varchar(1024) ) AsyncEventListener(MyListener) " );
select t.*, m.ID DSID from SYS.SYSTABLES t, SYS.MEMBERS m, SYS.ASYNCEVENTLISTENERS a where t.tablename='<table>' and groupsintersect(a.SERVER_GROUPS, m.SERVERGROUPS) and groupsintersect(t.ASYNCLISTENERS, a.ID);
See SYSTABLES and ASYNCEVENTLISTENERS.
To stop a running AsyncEventListener configuration, first use the SYS.WAIT_FOR_SENDER_QUEUE_FLUSH procedure to ensure that no queued events are lost. Then use SYS.STOP_ASYNC_EVENT_LISTENER and specify the identifier to the configuration to stop:
java.sql.Connection conn = getConnection(); CallableStatement cs1 = conn.prepareCall("call SYS.WAIT_FOR_SENDER_QUEUE_FLUSH(?,?,?); cs1.setString(1, "MyListener"); cs1.setString(2, "TRUE"); cs1.setString(3, "0"); cs1.execute(); CallableStatement cs2 = conn.prepareCall("call SYS.STOP_ASYNC_EVENT_LISTENER( ? )"); cs2.setString(1, "MyListener"); cs2.execute();
This waits for all queued events to flush, and then stops dispatching events to the callback listener class. Also, the close() method of the Callback AsyncEventListener class is invoked so that the implementation can perform any necessary cleanup.
You can also remove a listener implementation from the cluster using DROP ASYNCEVENTLISTENER.
An AsyncEventListener implementation can choose to act or ignore events based on information about the events it receives. As an alternative, individual connections to the GemFire XD system can set the skip-listeners property to true in order to prevent all DML operations that are executed over the connection from being queued and delivered to configured listeners.