Access Keys:
Skip to content (Access Key - 0)

GemFire Versions

About GemFire Enterprise 6.5

Getting Started

Documentation

Resources

Articles

Developer Notes

Tools & Libraries

Toggle Sidebar

Continuous Querying

Overview of Continuous Querying

A query when run against a data region yields a resultset. This resultset is essentially a static snapshot of the query result based on the existing data in the region. If the data changes after the query was executed, then the caller has to re-execute the query at periodic intervals to obtain the new changes. This is neither scalable, and not a particularly efficient way to keep the resultset up to date for the caller. A Continuous Query solves this problem by allowing the caller to register the query with the server.
With the Continuous Query, the query returns a resultset to the caller, but continues to evaluate any data change to the region, if it qualifies as a result for the Continuous Query it sends the changes to the caller for inclusion in its original resultset, thus ensuring the resultset to be always in sync with the contents of the data region. For this reason, Continuous Queries are also referred as Active Queries.

Continuous Querying in GemFire Enterprise provides applications with a way to run active queries on server regions. Updates satisfying the query conditions are sent to client listener callbacks that are programmed to process the events received. Some of the key features of Continuous queries (CQs) are:

  • Standard query syntax and semantics:
    CQ queries are expressed in the same language as other GemFire Enterprise queries.
  • Complete integration with the client/server architecture:
    CQ functionality is fully integrated with the client/server architecture. As a results, CQs are highly available, provide transparent failover capabilities, and durable CQs can be registered by clients that are configured to be durable.
  • Interest criteria based on the values in your data values:
    CQ queries are run against the region's entry values.

Implementing Continuous Queries

To implement a CQ, follow these basic steps:

  1. Configuring CQ
    The CQ functionality requires standard client/server distributed system and cache configuration settings. In addition, clients and servers must be configured for callbacks from the server, referred to generally as notify-by-subscription. These are all of the things to consider:
    Callback events (events from server to client)
    The server must have notify-by-subscription set to true. Notify By Subscription
    Additionally, the client must use a Pool with subscription-enabled set to true.Subscription Connection
    High Availability (HA)
    For high available CQs, configure server for high availability Highly Available Subscription Delivery When servers are highly available, CQs are registered on primary and secondary servers and server failover is performed without any interruption to CQ messaging. CQ events messaging uses the same queues used for server-to-client messaging.
    Durable CQ
    For durable CQs, configure the clients for durable messaging and create durable CQs. The events for these CQs maintained during client disconnects and replayed for the client when it reconnects.
  2. Writing CQ
    Each CQ uses a query and any number of listeners. The query filters the events on the server and the listener handles the events that make it through the query filter.
    Writing the Query
    This is the basic syntax for the CQ query "SELECT * FROM [From Clause] WHERE [WHERE clause]"
    There are few Query restriction that are not currently supported with the CQ.
    CQ Query Restrictions
    The FROM clause must contain only a single region path specification, with optional iterator variable.
    The query must be a SELECT expression only, preceded by zero or more IMPORT statements. This means the query cannot be a statement like "/tradeOrder.name" or "(SELECT * from /tradeOrder).size".
    The CQ query can not use:
    --Cross region joins
    --Drill-downs into nested collections
    --DISTINCT
    --Projections
    --Bind parameters
    Queries not meeting the constraints generate an UnsupportedOperationException. Future versions of the product will address most of these constraints.
    Writing the CQ Listener
    Each of the CQs can have any number of listeners. This example shows how you might program a simple CqListener to update a display screen based on the CQ events it receives. The listener retrieves the queryOperation and entry key and value from the CqEvent and then updates the screen according to the operation type provided in queryOperation.
    public class TradeEventListener implements CqListener {
      public void onEvent(CqEvent cqEvent) {
        // com.gemstone.gemfire.cache.Operation associated with the query op
        Operation queryOperation = cqEvent.getQueryOperation();
        // key and new value from the event
        Object key = cqEvent.getKey();
        TradeOrder tradeOrder = (TradeOrder)cqEvent.getNewValue();
        if (queryOperation.isUpdate()) {
          // update data on the screen for the trade order . . .
        } else if (queryOperation.isCreate()) {
          // add the trade order to the screen . . .
        } else if (queryOperation.isDestroy()) {
          // remove the trade order from the screen . . .
        }
      }
      public void onError(CqEvent cqEvent) {
        // handle the error
      }
      // From CacheCallback
      public void close() {
        // close the output screen for the trades . . .
      }
    }
    

    CQ events do not change client cache. They are provided as an event service only. This allows to have any collection of CQs without storing large amounts of data in the regions. If information from CQ events needs to be stored, program the listener to store the information where it makes the most sense for the required application.
    The CqEvent Object
    The CqEvent contains this information:
    --Entry key and new value.
    --Base operation that triggered the cache event in the server. This is the standard Operation class instance used for cache events in GemFire Enterprise.
    --CqQuery object associated with this CQ event.
    --Query operation associated with this CQ event. This operation describes the change affected to the query results by the cache event. Application can use the query operation to decide what to do with the CqEvent in the listeners.
    --Throwable object, returned if an error occurred running the CqQuery for the cache event. This is non-null only for CqListener.onError calls.

Running the Continuous Query Code

The CQ is created from an instance of the QueryService. Once created, the CQ is maintained primarily through the CqQuery interface.

  1. CQ Execution Options
    CQ execution can be done with or without an initial result set, by calling CqQuery.execute or CqQuery.executeWithInitialResults. The initial SelectResults returned from executeWithInitialResults is the same as the one that will be returned by running the ad hoc query on the server cache. The SelectResults returned from executeWithInitialResults contain only the entry values, compared to the CqEvent, which contains the key and value.
    If the initial results is maintained as starting data set to be updated from subsequent CQ events, application must be able to uniquely identify the entries by their values alone, one way to do this is to include keys in the values.
    Just as with the standalone query, however, the initial SelectResults represents a possibly moving snapshot of the cache. If there are updates to the server region while the result set is being created, the result set and the subsequent event-by-event CQ query execution might miss some events.
  2. When an Error Occurs in a Running CQ
    When an error occurs in CQ execution on the server, specific information on the error itself is stored in the server's log file. The server passes the event information to the client and the client's CqListeneronError method is invoked. From the onError method, a call to the CqEvent.getThrowable method returns an error string of this type:
    Exception while processing the Event. Please see server log for more detail.
    The server log will contain an error with "Error while processing CQ" like this.
    [error 2007/12/18 12:03:18.903 PST gemfire1 <RMI TCP Connection(2)-10.80.10.91> tid=0x18] Error while processing CQ on the event, key :
    key-1, CqName :testCQEvents_0, ClientId :identity(carlos(3249):52623/35391,connection=1,durableAttributes=null)
    Error :No public attribute named 'ID' was found in class java.lang.Object

Usually, errors in CQ execution are caused by data errors, like invalid object types that are stored in the server region. In this case, the query is trying to read into an Object of type Portfolio for an entry where an empty object has been stored. These kind of errors can be avoided by placing constraints on the region entries or by otherwise controlling the types of objects stored in the server regions.

Examples

CQ Creation, Execution, and Close

Running Examples:

  1. Copy the above source to appropriate java files.
    CqServer.java - Source for Server
    CqClient.java - Source for CQ Client
    SimpleCqListener.java - Source for CQ Listener
    DurableCqClient.java - Durable CQ Client
  2. Compile the files.
    Make sure to have gemfire.jar and antlr.jar in your classpath (along with above add "continuousQuery" class package for running tests).
  3. Open two console windows

To run CQ Test (Non durable)
Start the server in one console window
Ex: java -classpath /product/lib/gemfire.jar:/product/lib/antlr.jar:/continuousQuery continuousQuery.CqServer
This will start CqServer and asks to start the CqClient

Start CqClient on another window.
Ex: java -classpath /product/lib/gemfire.jar:/product/lib/antlr.jar:/continuousQuery continuousQuery.CqClient

Follow the instruction to type "Enter" on server to modify the server region. While data is modified on the server region, client CqListener prints the events that are received by server (that satisfies the CQ query condition).

To run durable CQ Test
Start the server one console window
Ex: java -classpath /product/lib/gemfire.jar:/product/lib/antlr.jar:/continuousQuery continuousQuery.CqServer
This will start CqServer and asks to start the CqClient

Start DurableCqClient on another window.
Ex: java -classpath /product/lib/gemfire.jar:/product/lib/antlr.jar:/continuousQuery continuousQuery.CqClient

Follow the instruction, to type "Enter" on server to modify the region. While data is modified on the server region, client CqListener prints the events that are received by server (that satisfies the CQ query condition).

Note Server will ask to stop the DurableCqClient, while client is stopped data is modified at the server region.
After re-starting the DurableCqClient it will receive all the events that are satisfied when it was disconnected.

The GemFire CQ API

The GemFire Java CQ API allows clients to create and manage CQs. This section lists the primary GemFire Enterprise API for CQ management. For complete information on the classes and interfaces described here, see the online Java API documentation.
CQ API javadoc

QueryService This interface provides methods to:
--Create a new CQ and specify whether it is durable (available for durable clients)
--List all the CQs registered by the client
--Close and stop CQs at the cache and region level
--Get a handle on CqStatistics for the client
CqQuery This interface provides methods for managing a continuous query once it is created through the QueryService. This is the interface that is normally used to begin and end CQ execution and to retrieve other CQ-related objects such as the CQ attributes, CQ statistics, and the CQ state.
CqListener This application plug-in interface is programmed to handle continuous query events after they occur. This listener is usually implemented using the CqListenerAdapter interface in com.gemstone.gemfire.cache.util, which also inherits the close method from CacheCallback in com.gemstone.gemfire.cache.
CqEvent This interface provides all the information sent from the server about the CQ event, which is passed to the CQ's CqListener methods.
CqAttributes, CqAttributesFactory, CqAttributesMutator These interfaces allow you to manage CQ attributes. The attributes are composed of CqListener plug-in specifications.
CqStatistics, CqServiceStatistics These interfaces allow you to access statistics information for a single CQ and for the query service's management of CQs as a whole. For a discussion of GemFire Enterprise statistics as they apply to the developer, see Statistics. For details on statistics, see See System Statistics..

State and Life Cycle of a CQ

A CQ has three possible states. These are maintained on the server and can be accessed from the client by calling CqQuery.getState.

  • STOPPED--The CQ has been created but not yet executed or it has been explicitly stopped from executing. The stopped CQ uses system resources. The CQ can be started or restarted by calling the execute method on CqQuery.
  • RUNNING--The CQ is being executed on the server for all events in the region referenced by the query. Results are sent to all client listeners associated with the CqQuery.
  • CLOSED--The CQ is stopped and is not using system resources. Invoking an execute or stop method on closed CqQuery throws an exception.

A CQ life cycle usually flows like this:

  1. The client creates the CQ. This sets up everything for running the query and provides the client with a CqQuery object, but does not execute the CQ. At this point, the query is in a STOPPED state, ready to be closed or run.
  2. The client runs the CQ with an API call to one of the CqQueryexecute* methods. This puts the query into a RUNNING state on the client and on the server.
  3. The CQ is closed by a client call to CqQuery.close. This deallocates all resources in use for the CQ on the client and server. At this point, the cycle could begin again with the creation of a new CqQuery instance.

Administering CQs

CQ Statistics
CQ runtime statistics are available for the client through the CqServiceStatistics and CqStatistics interfaces described under The GemFire CQ API. Using these apis client can get information on the events generated by a specific CQ, high-level information about the CQs like number of CQs registered, running, and so on.

Modifying CQ Attributes
The attributes for an existing CQ can be modified using the methods provided by CqQuery.getCqAttributesMutator. The attributes consist of a list of listeners.

Closing CQs
Individual CQs are closed using the CqQuery method close. Closed CQs cannot be executed. CQs are also closed in the following cases:
--The client closes its cache before closing all of its CQs: Closing the client cache closes the QueryService and all associated CQs on the client and server.
--The client disconnects from its server: When a client disconnects all the CQs created by the client are removed from the server and put into a CLOSED state on the client.
--The server region is destroyed: When a server region is destroyed, all associated CQs are also cleaned up on the server and the region destroy event is sent to the client. On the client, the CqListener.close method is called for all CQs on the region.

References

For additional information please refer Continuous Querying chapter in GemFire product Developer's guide. Continuous Querying.

Child Pages

Enter labels to add to this page:
Please wait 
Looking for a label? Just start typing.
Adaptavist Theme Builder (3.4.0-M2-conf210) Powered by Atlassian Confluence 2.10.3, the Enterprise Wiki.