The polling event container performs polling receive operations against the space. If a receive operation succeeds (a value is returned from the receive operation), the Data Event Listener is invoked with the event. A polling event operation is mainly used when simulating Queue semantics or when using the master-worker design pattern.
Here is a simple example of polling event container configuration:
<!-- Enable scan for OpenSpaces and Spring components --><context:component-scan base-package="com.mycompany"/><!-- Enable support for @Polling annotation --><os-events:annotation-support /><os-core:space id="space" url="/./space" /><os-core:giga-space id="gigaSpace" space="space"/>
@EventDriven @Polling
public class SimpleListener {
@EventTemplate
Data unprocessedData() {
Data template = new Data();
template.setProcessed(false);
return template;
}
@SpaceDataEvent
public Data eventListener(Data event) {
//process Data here
}
}
GigaSpace gigaSpace = // either create the GigaSpace or get it by injection
SimplePollingEventListenerContainer pollingEventListenerContainer = new SimplePollingContainerConfigurer(gigaSpace)
.template(new Data(false))
.eventListenerAnnotation(newObject() {
@SpaceDataEvent
public void eventHappened() {
eventCalled.set(true);
}
}).pollingContainer();
// when needed dispose of the notification container
pollingEventListenerContainer.destroy();
The example above performs single take operations (see below) using the provided template (a Data object with its processed flag set to false). If the take operation succeeds (a value is returned), the SimpleListener is invoked. The operations are performed on the configured GigaSpace bean (in this case, if working in a clustered topology, it is performed directly on the cluster member).
Primary/Backup
The polling event container, by default, performs receive operations only when the relevant space it is working against is in primary mode. When the space is in backup mode, no receive operations are performed. If the space moves from backup mode to primary mode, the receive operations are started.
This mostly applies when working with an embedded space directly with a cluster member. When working with a clustered space (performing operations against the whole cluster), the mode of the space is always primary.
Concurrent Consumers
By default, the polling event container starts a single thread that performs the receive operations and invokes the event listener. It can be configured to start several concurrent consumer threads and have an upper limit to the concurrent consumer threads. This provides faster processing of events, however, any FIFO behavior that might be configured in the space and/or template is lost. Here is an example of a polling container with 3 concurrent consumers and 5 maximum concurrent consumers:
@EventDriven @Polling(concurrentConsumers = 3, maxConcurrentConsumers = 5)
public class SimpleListener {
@EventTemplate
Data unprocessedData() {
Data template = new Data();
template.setProcessed(false);
return template;
}
@SpaceDataEvent
public Data eventListener(Data event) {
//process Data here
}
}
Sometimes, it is very convenient to have a listener instance per concurrent polling thread. This allows a thread-safe instance variable to be constructed without worrying about concurrent access. In such a case, the prototype Spring scope can be used in conjunction with a listenerRef bean name injection. Here is an example:
When performing receive operations, a template is defined, creating a virtualized subset of data within the space that matches it. GigaSpaces supports templates based on the actual domain model (with null values denoting wildcards), which are shown in the examples. GigaSpaces allows the use of SQLQuery in order to query the space, which can be easily used with the event container as the template. Here is an example of how it can be defined:
@EventDriven @Polling
public class SimpleListener {
@EventTemplate
SQLQuery<Data> unprocessedData() {
SQLQuery<Data> template = new SQLQuery<Data>(Data.class, "processed = true");
template.setProcessed(false);
return template;
}
@SpaceDataEvent
public Data eventListener(Data event) {
//process Data here
}
}
The polling receive container performs receive operations. The actual implementation of the receive operation is abstracted using the following interface:
publicinterface ReceiveOperationHandler {
/**
* Performs the actual receive operation. Return values allowed are single object or an array of
* objects.
*
* @param template
* The template to use for the receive operation.
* @param gigaSpace
* The GigaSpace interface to perform the receive operations with
* @param receiveTimeout
* Receive timeout value
* @return The receive result. <code>null</code> indicating no receive occured. Single object
* or an array of objects indicating the receive operation result.
* @throws DataAccessException
*/
Object receive(Object template, GigaSpace gigaSpace, long receiveTimeout) throws DataAccessException;
}
OpenSpaces comes with several built-in receive operation-handler implementations:
Receive Operation Handler
Description
SingleTakeReceiveOperationHandler
Performs a single blocking take operation with the receive timeout.
SingleReadReceiveOperationHandler
Performs a single blocking read operation with the receive timeout.
ExclusiveReadReceiveOperationHandler
Performs a single read operation under an exclusive read lock (similar to "select for update" in databases) with the receive timeout. Exclusive read lock mimics the take operation without actually taking the Entry from the space. This receive operation handler must be used within a transaction.
MultiTakeReceiveOperationHandler
First tries to perform takeMultiple (using a configured max Entries). If no values are returned, performs a blocking take operation with the receive timeout.
MultiReadReceiveOperationHandler
First tries to perform readMultiple (using a configured max Entries). If no values are returned, performs a blocking read operation with the receive timeout.
MultiExclusiveReadReceiveOperationHandler
First tries to perform readMultiple (using a configured max Entries). If no values are returned, performs a blocking read operation with the receive timeout. Both read operations are performed under an exclusive read lock (similar to "select for update" in databases) which mimics a take operation without actually taking the Entry from the space. Note, this receive operation handler must be used within a transaction.
When using the ExclusiveReadReceiveOperationHandler or even the SingleReadReceiveOperationHandler, it is important to remember that the actual event still remains in the space. If the data event is not taken from the space, or one of its properties changes in order not to match the container template, the same data event is read again.
Here is an example of how the receive operation handler can be configured with MultiTakeReceiveOperationHandler:
@EventDriven @Polling
public class SimpleListener {
@ReceiveHandler
ReceiveOperationHandler receiveHandler() {
MultiTakeReceiveOperationHandler receiveHandler = new MultiTakeReceiveOperationHandler();
receiveHandler.setMaxEntries(100);
return receiveHandler;
}
@EventTemplate
Data unprocessedData() {
Data template = new Data();
template.setProcessed(false);
return template;
}
@SpaceDataEvent
public Data eventListener(Data event) {
//process Data here
}
}
When working with a partitioned cluster and configuring the polling container to work against the whole cluster, blocking operations are not allowed (when the routing index is not set in the template). The default receive operation handlers support performing the receive operation in a non-blocking manner, by sleeping between non-blocking operations. For example, the SingleTakeReceiveOperationHandler performs a non-blocking take operation against the space and then sleeps for a configurable amount of time. Here is an example of how it can be configured:
@EventDriven @Polling
public class SimpleListener {
@ReceiveHandler
ReceiveOperationHandler receiveHandler() {
SingleTakeReceiveOperationHandler receiveHandler = new SingleTakeReceiveOperationHandler();
receiveHandler.setNonBlocking(true);
receiveHandler.setNonBlockingFactor(10);
return receiveHandler;
}
@EventTemplate
Data unprocessedData() {
Data template = new Data();
template.setProcessed(false);
return template;
}
@SpaceDataEvent
public Data eventListener(Data event) {
//process Data here
}
}
The above example uses a receive timeout of 10 seconds (10000 milliseconds). The SingleTakeReceiveOperationHandler is configured to be non-blocking with a non-blocking factor of 10. This means that the receive handler performs 10 non-blocking takes within 10 seconds and sleeps the rest of the time (~1 second each time).
Pass Array as is
Certain receive operation handlers might return an array as a result of the receive operation. A prime example is the MultiTakeReceiveOperationHandler, which might return an array as a result of a takeMultiple operation. By default, the polling container serializes the execution of the array into invocation of the event listener for each element in the array. If you want the event to operate on the whole array (receive the array as a parameter), the pass-array-as-is configuration attribute should be set to true.
Transaction Support
Both the receive operation and the actual event action can be configured to be performed under a transaction. Transaction support is required when, for example, an exception occurs in the event listener, and the receive operation needs to be to rolled back (and the actual data event is returned to the space). Adding transaction support is very simple in the polling container, and can be done by injecting a transaction manager into it. For example:
<!-- Enable scan for OpenSpaces and Spring components --><context:component-scan base-package="com.mycompany"/><!-- Enable support for @Polling annotation --><os-events:annotation-support /><os-core:space id="space" url="/./space" /><os-core:local-tx-manager id="transactionManager" space="space"/><os-core:giga-space id="gigaSpace" space="space" tx-manager="transactionManager"/>
@EventDriven @Polling @TransactionalEvent
public class SimpleListener {
@EventTemplate
Data unprocessedData() {
Data template = new Data();
template.setProcessed(false);
return template;
}
@SpaceDataEvent
public Data eventListener(Data event) {
//process Data here
}
}
When using transactions with polling container a special care should be taken with timeout values. Transactions started by the polling container can have a timeout value associated with them (if not set will default to the default timeout value of the transaction manager, which is usually "forever"). If setting a specific timeout value, make sure the timeout value is higher than the timeout value for blocking operations and includes the expected execution time of the associated listener.
Note the timeout value is in seconds as per Spring spec for TransactionDefinition.
Here is an example how timeout value (and transaction isolation) can be set with polling container:
<!-- Enable scan for OpenSpaces and Spring components --><context:component-scan base-package="com.mycompany"/><!-- Enable support for @Polling annotation --><os-events:annotation-support /><os-core:space id="space" url="/./space" /><os-core:local-tx-manager id="transactionManager" space="space"/><os-core:giga-space id="gigaSpace" space="space" tx-manager="transactionManager"/>
@EventDriven @Polling @TransactionalEvent(isolation = Isolation.READ_COMMITTED, timeout = 1000)
public class SimpleListener {
@EventTemplate
Data unprocessedData() {
Data template = new Data();
template.setProcessed(false);
return template;
}
@SpaceDataEvent
public Data eventListener(Data event) {
//process Data here
}
}
When configuring the polling event container to perform its receive operation and event actions under a transaction, a transaction is started and rolled back for each unsuccessful receive operation, which results in a higher load on the space. The polling event container allows pluggable logic to be used in order to decide if the actual receive operation should be performed or not. This logic, called the trigger receive operation, is performed outside the receive transaction boundaries. The following interface is provided for custom implementation of this logic:
publicinterface TriggerOperationHandler {
/**
* Allows to perform a trigger receive operation which control if the active receive operation
* will be performed in a polling event container. This feature is mainly used when having
* polling event operations with transactions where the trigger receive operation is performed
* outside of a transaction thus reducing the creation of transactions did not perform the
* actual receive operation.
*
* <p>
* If this operation returns a non <code>null</code> value, it means that the receive
* operation should take place. If it returns a <code>null</code> value, no receive operation
* will be attempted.
*
* @param template
* The template to use for the receive operation.
* @param gigaSpace
* The GigaSpace interface to perform the receive operations with
* @param receiveTimeout
* Receive timeout value
* @throws DataAccessException
*
*/
Object triggerReceive(Object template, GigaSpace gigaSpace, long receiveTimeout) throws DataAccessException;
/**
* Controls if the object returned from
* {@link #triggerReceive(Object,org.openspaces.core.GigaSpace,long)} will be used as the
* template for the receive operation by returning <code>true</code>. If <code>false</code>
* is returned, the actual template configured in the polling event container will be used.
*/
boolean isUseTriggerAsTemplate();
}
OpenSpaces comes with a built-in implementation of this interface, called ReadTriggerOperationHandler. It performs a single blocking read operation (using the provided receive timeout), thus "peeking" into the space for relevant event data. If the read operation returns a value, this means that there is higher probability that the receive operation will succeed, and the transaction won't be started without a purpose. Here is how it can be configured:
@EventDriven @Polling @TransactionalEvent
public class SimpleListener {
@TriggerHandler
TriggerOperationHandler receiveHandler() {
ReadTriggerOperationHandler triggerHandler = new ReadTriggerOperationHandler();
return triggerHandler;
}
@EventTemplate
Data unprocessedData() {
Data template = new Data();
template.setProcessed(false);
return template;
}
@SpaceDataEvent
public Data eventListener(Data event) {
//process Data here
}
}
Add Comment