Implementing an AsyncEventListener

You implement an AsyncEventListener by defining a class that implements the AsyncEventListener interface, installing the AsyncEventListener configuration, and associating a table with it.

Note: GemFire XD also includes sample code for the built-in DBSynchronizer listener, which uses the latest AsyncEventListener API. This sample listener is installed in the examples/com/pivotal/gemfirexd/callbacks subdirectory of your GemFire XD installation directory. This also includes source code for the AsyncEventHelper class. Keep in mind that you cannot compile the AsyncEventHelper.java code as-is, because it uses internal GemFire XD classes. You can either extend the installed AsyncEventHelper implementation or copy the public API code from the example into your own class, modifying the code as necessary to create a custom helper class.

The example procedures show how to configure an AsyncEventListener with redundancy 1 in a GemFire XD system having one peer-client member and three data store members. Two data stores "DataStore1" and "DataStore2" are available in server group "SG1." The AsyncEventListener will be installed on only those members that belong to server group 'SG1'. A single table, "TESTTABLE" is associated with the AsyncEventListener.

When testing your own AsyncEventListener implementation you will also need to stop the listener configuration. Stop or Remove an AsyncEventListener Configuration describes this process.

Implement the AsyncEventListener Interface

Define a class that implements the AsyncEventListener interface. GemFire XD installs the source code for the DBSynchronizer AsyncEventListener implementation in the /examples/storedprocedure/com/pivotal/gemfirexd/callbacks directory of your GemFire XD installation directory.
Note: The DBSynchronizer.java file uses no internal classes. However, you must copy the example code into your own class in order to avoid conflicts with the built-in DBSynchronizer implementation.

Install the AsyncEventListener Configuration

Install the AsyncEventListener implementation by executing the CREATE ASYNCEVENTLISTENER statement on any member of the GemFire XD system. You must specify the server groups on which to deploy the implementation, as well as a unique identification and class name for the implementation.

For example:

CREATE ASYNCEVENTLISTENER MyListener
(
   LISTENERCLASSNAME 'com.pivotal.gemfirexd.callbacks.TestAsyncEventListener'
   INITPARAMS 'file=/usr/local/my-listener-params.txt'
)  SERVER GROUPS ( SG1 );
Note: You must include the INITPARAMS argument, even if your AsyncEventListener implementation defines no parameters. Pivotal recommends that you define parameters in a separate text file, and then use INITPARAMS 'file=path-to-file' to reference the file. This enables you to change or add parameters after you have configured the listener implementation in GemFire XD. See CREATE ASYNCEVENTLISTENER.
Note: If you send user credentials in the initialization parameters, consider encrypting the password using gfxd encrypt-password with the external option.
Note: You can optionally install the AsyncEventListener configuration after you associate a table with the listener name. Make sure that you use the same listener name with both the CREATE ASYNCEVENTLISTNER command and the CREATE TABLE command.

This example installs TestAsyncEventListener with the identifier "MyListener" on members of the server group "SG1," using initialization parameters defined in /usr/local/my-listener-params.txt. See CREATE ASYNCEVENTLISTENER for more information about other parameters to the procedure.

Start the AsyncEventListener Configuration

After you install the AsyncEventListener implementation, you can start it by specifying the unique identifier for the implementation in the SYS.START_ASYNC_EVENT_LISTENER procedure. For example:

java.sql.Connection conn = getConnection();    
    CallableStatement cs = conn.prepareCall("call SYS.START_ASYNC_EVENT_LISTENER( ? )");
    cs.setString(1, "MyListener");
    cs.execute();

On execution, a thread would service the queue of the AsyncEventListener and dispatch events to the callback class.

Associate a Table with an AsyncEventListener Configuration

You associate a table with an AsyncEventListener configuration using the AsyncEventListener clause in the table's CREATE TABLE statement. For example, to associate the table "TestTable" with an AsyncEventListenerConfiguration identified by "MyListener" use the statement:

 java.sql.Connection conn = getConnection();    
 Statement stmt = conn.createStatement();
 stmt.execute("CREATE TABLE TESTTABLE (ID int NOT NULL, DESCRIPTION varchar(1024),
 ADDRESS varchar(1024) ) AsyncEventListener(MyListener) " ); 
Note: Because the associated listener configuration does not have to be available at the time you create the table, GemFire XD does not display an error message if the specified listener name does not yet exist. Make sure that you use the same listener name with both the CREATE ASYNCEVENTLISTNER command and the CREATE TABLE command.
Note: You can optionally associate the table with an AsyncEventListener configuration after you create the table, using the GemFire XD ALTER TABLE statement.

Display Installed AsyncEventListeners

You can join the SYSTABLES, MEMBERS, and ASYNCEVENTLISTENERS tables to display the listener implementations associated with a table as well as the data store ID on which the listener is installed:
select t.*, m.ID DSID from SYS.SYSTABLES t, SYS.MEMBERS m, SYS.ASYNCEVENTLISTENERS a
       where t.tablename='<table>' and groupsintersect(a.SERVER_GROUPS, m.SERVERGROUPS)
       and groupsintersect(t.ASYNCLISTENERS, a.ID);

See SYSTABLES and ASYNCEVENTLISTENERS.

Stop or Remove an AsyncEventListener Configuration

To stop a running AsyncEventListener configuration, first use the SYS.WAIT_FOR_SENDER_QUEUE_FLUSH procedure to ensure that no queued events are lost. Then use SYS.STOP_ASYNC_EVENT_LISTENER and specify the identifier to the configuration to stop:

    java.sql.Connection conn = getConnection();    
    CallableStatement cs1 = conn.prepareCall("call SYS.WAIT_FOR_SENDER_QUEUE_FLUSH(?,?,?);
    cs1.setString(1, "MyListener");
    cs1.setString(2, "TRUE");
    cs1.setString(3, "0");
    cs1.execute();
    CallableStatement cs2 = conn.prepareCall("call SYS.STOP_ASYNC_EVENT_LISTENER( ? )");
    cs2.setString(1, "MyListener");
    cs2.execute();

This waits for all queued events to flush, and then stops dispatching events to the callback listener class. Also, the close() method of the Callback AsyncEventListener class is invoked so that the implementation can perform any necessary cleanup.

You can also remove a listener implementation from the cluster using DROP ASYNCEVENTLISTENER.