ProcedureResultProcessor

ProcedureResultProcessor collates results and OUT parameters from multiple servers.

A custom ProcedureResultProcessor can be implemented to collate results and OUT parameters from multiple servers in a way that is different from the default behavior. It has these methods:

Method Description
getOutParameters() Provide the out parameters for this procedure to the client as an Object[].
getNextResultRow(int resultSetNumber) Provide the next row for result set number resultSetNumber. The processor should do whatever processing is required on the incoming data to provide the next row.
close() Called by GemFire XD when this statement is closed.

Example

–  A custom Processor Example
–  Example code showing how to implement a Procedure and a custom Collector that does a MergeSort.

   CREATE PROCEDURE MergeSort () 
   LANGUAGE JAVA   PARAMETER STYLE JAVA 
   READS SQL DATA   DYNAMIC RESULT SETS 1 
   EXTERNAL NAME 'examples.MergeSortProcedure.mergeSort' 

– MergeSortProcedure class

package examples; 
import com.pivotal.gemfirexd.*; 
import java.sql.*; 

public class MergeSortProcedure { 
  static final String LOCAL = "<local>"; 
  public static void mergeSort(ResultSet[] outResults, 
                                ProcedureExecutionContext context) 
  throws SQLException { 
    String queryString = LOCAL 
                         + "SELECT * FROM " 
                         + context.getTableName(); 
    Connection cxn = context.getConnection(); 
    Statement stmt = cxn.createStatement(); 
    ResultSet rs = stmt.executeQuery(queryString); 
    outResults[0] = rs; 
  } 
} 

MergeSortProcessor class

package examples; 
import com.pivotal.gemfirexd.*; 
import java.sql.*; 
import java.util.*; 

public class MergeSortProcessor implements ProcedureResultProcessor { 
 
  private ProcedureProcessorContext context; 
 
  public void init(ProcedureProcessorContext context) { 
    this.context = context; 
  } 
 
  public Object[] getOutParameters() { 
    throw new AssertionError("this procedure has no out parameters"); 
  } 
 
  public Object[] getNextResultRow(int resultSetNumber) 
    throws InterruptedException { 
    // this procedure deals with only result set number 1 
    assert resultSetNumber == 1; 
    IncomingResultSet[] inSets = context.getIncomingResultSets(1); 
    Object[] lesserRow = null; 
    Comparator cmp = getComparator(); 
    IncomingResultSet setWithLeastRow = null; 
    for (IncomingResultSet inSet : inSets) { 
      Object[] nextRow = inSet.waitPeekRow(); 
      if (nextRow == IncomingResultSet.END_OF_RESULTS) { 
        // no more rows in this incoming results 
        continue; 
      } 
      // find the least row so far 
      if (lesserRow == null || cmp.compare(nextRow, lesserRow) <= 0) { 
        lesserRow = nextRow; 
        setWithLeastRow = inSet; 
      } 
    } 
    if (setWithLeastRow != null) { 
      // consume the lesserRow by removing lesserRow from the incoming result set 
      Object[] takeRow = setWithLeastRow.takeRow(); 
      assert takeRow == lesserRow; 
    } 
    // if lesserRow is null, then there are no more rows in any incoming results 
    return lesserRow; 
  } 

  public boolean getMoreResults(int nextResultSetNumber) { 
    return false; // only one result set 
  } 
  public void close() { 
    this.context = null; 
  } 
}