|
Search Solutions & Best Practices
Browse Solutions & Best Practices
|
OverviewThis example illustrates the usage of Executors Remoting (Service Executors) and Task Executors to execute your business logic on a remote process that is collocated with a space. The Executors Remoting should be used when you would like to export service method(s) for remote clients to be invoked. The Task Executors should be used when you would like to transport business logic to the server side to be executed remotely. In both cases, the business logic will be invoked with a collocated space. Installing the Example
Executors Task ExampleThe example will illustrate a simple map-reduce implementation. A client writing some Account objects into the Data Grid. Later it will calculate the average balance for all the Accounts via a DistributedTask that is sent to each partition to be executed: The example code will have the following implemented:
The Task ImplementationThe Task implements the DistributedTask interface. It includes the execute and the reduce methods: package org.test.executor; import java.sql.Time; import java.util.List; import org.openspaces.core.GigaSpace; import org.openspaces.core.executor.DistributedTask; import org.openspaces.core.executor.TaskGigaSpace; import com.gigaspaces.annotation.pojo.SpaceRouting; import com.gigaspaces.async.AsyncResult; public class MyTask implements DistributedTask<Integer, Integer>{ @TaskGigaSpace transient GigaSpace space; public Integer execute() throws Exception { Account templ = new Account(); Account accounts[] = space.readMultiple(templ , Integer.MAX_VALUE); int total = 0; for (Account account : accounts) { total += account.getBalance(); } Time t = new Time(System.currentTimeMillis()); System.out.println(t + " MyTask execute called at "+space.getSpace().getURL().getContainerName() + " - total is:" + total ); return total/accounts.length; } int routing; @SpaceRouting public Integer routing() { return routing; } public Integer reduce(List<AsyncResult<Integer>> results) throws Exception { Integer total_result =0; int partitions=0; for (AsyncResult<Integer> result : results) { if (result.getException() != null) { throw result.getException(); } partitions++; int temp_result = result.getResult().intValue(); total_result += temp_result ; } return total_result/partitions; } } The ClientSync modeThe client invokes the Task on the remote space in sync mode using the following: space = new UrlSpaceConfigurer("jini://*/*/space").space(); gigaSpace = new GigaSpaceConfigurer(space).gigaSpace(); AsyncFuture<Integer> future =gigaSpace.execute(new MyTask()); Integer result = future.get(); A-Sync modeThe client invokes the Task on the remote space in A-sync mode using the following: space = new UrlSpaceConfigurer("jini://*/*/space").space(); gigaSpace = new GigaSpaceConfigurer(space).gigaSpace(); gigaSpace.execute(new MyTask(),new ExecutorTaskClientMain ()); ExecutorTaskClientMain implements AsyncFutureListener: public void onResult(AsyncResult<Integer> result) { System.out.println(new Time(System.currentTimeMillis()) + " - Client got Result:" + result.getResult() ); } Running the ExampleDeploying the SpaceUsing IDE: Using CLI: \gigaspaces-xap\bin\bin>puInstance -cluster schema=partitioned total_members=2 ..\deploy\templates\datagrid Run the Client ApplicationRun the Client Application (ExecutorTaskClientMain.java). The ExecutorTaskClientMain requires the following as application arguments: org.test.executor.ExecutorTaskClientMain sync A-Sync mode: org.test.executor.ExecutorTaskClientMain async Service Executors ExampleThe Service Interface | The Service Implementation | The Service Declaration | Service Result Reducer | The Client | Running the Example
The example will have a clustered space with a collocated service running. A client will be invoking the service: This example illustrates simple Service Executors usage in Synchronous mode and Asynchronous mode.
The Service InterfaceThe Service Interface includes 2 methods. One used to invoke the Service method in Synchronous mode and another used to invoke the Service method Asynchronous mode: import com.gigaspaces.async.AsyncFuture; public interface IDataProcessor { Integer processData(Object data); AsyncFuture<Integer> asyncProcessData(Object data); } The Service ImplementationThe Service Implementation includes some business logic for both of these methods: import java.sql.Time; import org.openspaces.core.GigaSpace; import org.openspaces.core.cluster.ClusterInfo; import org.openspaces.core.cluster.ClusterInfoContext; import org.openspaces.core.context.GigaSpaceContext; import org.openspaces.remoting.RemotingService; import com.gigaspaces.async.AsyncFuture; @RemotingService public class DataProcessorService implements IDataProcessor { @ClusterInfoContext public ClusterInfo clusteinfo; @GigaSpaceContext transient GigaSpace gigaSpace; public AsyncFuture<Integer> asyncProcessData(Object data) { return null; } public Integer processData(Object data) { Account templ = new Account(); Account accounts[] = gigaSpace.readMultiple(templ , Integer.MAX_VALUE); int total = 0; for (Account account : accounts) { total += account.getBalance(); } Time t = new Time(System.currentTimeMillis()); System.out.println(t + " MyTask execute called at "+gigaSpace.getSpace().getURL().getContainerName() + " - total is:" + total ); return total/accounts.length; } } The Service DeclarationThe pu.xml used to export the Service and start the space described below: <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:os-core="http://www.openspaces.org/schema/core" xmlns:os-events="http://www.openspaces.org/schema/events" xmlns:os-remoting="http://www.openspaces.org/schema/remoting" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.openspaces.org/schema/core http://www.openspaces.org/schema/core/openspaces-core.xsd http://www.openspaces.org/schema/events http://www.openspaces.org/schema/events/openspaces-events.xsd http://www.openspaces.org/schema/remoting http://www.openspaces.org/schema/remoting/openspaces-remoting.xsd"> <!-- Support @RemotingService component scanning --> <context:component-scan base-package="org.test.executor"/> <os-core:giga-space-context/> <!-- Support the @RemotingService annotation on a service--> <os-remoting:annotation-support /> <os-core:space id="space" url="/./space" /> <os-core:giga-space id="gigaSpace" space="space"/> <os-remoting:service-exporter id="serviceExporter" /> </beans>
Service Result ReducerThe Service Result Reducer is called at the client side and aggregates results sent from all invoked services (collocated with all space partitions). The Reducer implements the RemoteResultReducer interface: import org.openspaces.remoting.RemoteResultReducer; import org.openspaces.remoting.SpaceRemotingInvocation; import org.openspaces.remoting.SpaceRemotingResult; public class DataProcessorServiceReducer implements RemoteResultReducer<Integer, Integer>{ public Integer reduce(SpaceRemotingResult<Integer>[] results, SpaceRemotingInvocation sri) throws Exception { int total_result =0; for (int i =0 ;i<results.length ; i++) { int temp_result = results[i].getResult().intValue(); total_result += temp_result ; } return total_result/results.length ; } } The ClientThe client invokes the service in Synchronous mode using the following: IJSpace space = new UrlSpaceConfigurer("jini://*/*/space").space(); GigaSpace gigaSpace = new GigaSpaceConfigurer(space).gigaSpace(); IDataProcessor dataProcessor = new ExecutorRemotingProxyConfigurer<IDataProcessor> (gigaSpace, IDataProcessor.class).broadcast(new DataProcessorServiceReducer()).proxy(); Integer result = dataProcessor.processData("A" + count); System.out.println(new Time(System.currentTimeMillis()) + " - Client got Result:" + result.intValue() );
Running the ExampleDeploying the Space and ServicesUsing IDE: Using CLI: \gigaspaces-xap\bin\bin>puInstance -cluster schema=partitioned total_members=2 \ExecutorExample\classes Where the \ExecutorExample\classes should include the processing unit pu.xml under META-INF\spring\pu.xml and relevant Service class files. Run the Client ApplicationRun the Client Application (ExecutorClientMain.java) using the following: org.test.executor.ExecutorClientMain sync |
Executors Example
(None)








Add Comment