Executors Example

Search Solutions & Best Practices
Searching XAP 7.0.X Documentation
Browse Solutions & Best Practices

                                                              

Summary: This example illustrates the usage of Executors Remoting (Service Executors) and Task Executors
Author: Shay Hasidim, Deputy CTO, GigaSpaces
Recently tested with GigaSpaces version: XAP 7.0 M8
Last Update: May 2009
Your Rating: Results: PatheticBadOKGoodOutstanding! 19 rates

Overview

This example illustrates the usage of Executors Remoting (Service Executors) and Task Executors to execute your business logic on a remote process that is collocated with a space.

The Executors Remoting should be used when you would like to export service method(s) for remote clients to be invoked. The Task Executors should be used when you would like to transport business logic to the server side to be executed remotely. In both cases, the business logic will be invoked with a collocated space.

Installing the Example

  1. Download the example and extract the zip file. Open your IDE and import the project files.
  2. Set the project libraries to have the correct GigaSpaces libraries location by setting the GS_HOME environment variable to point to GigaSpaces Root folder.

Executors Task Example

The example will illustrate a simple map-reduce implementation.

A client writing some Account objects into the Data Grid. Later it will calculate the average balance for all the Accounts via a DistributedTask that is sent to each partition to be executed:

The Task will read all the Account objects at the collocated partition and calculate the average balance for these objects and return the result back to the client. The results sent from each partition back to the client will be aggregated (via the DistributedTask reducer implementation at the client side) and the final result will be displayed.

The example code will have the following implemented:

  1. Task
  2. Client

The Task Implementation

The Task implements the DistributedTask interface. It includes the execute and the reduce methods:

package org.test.executor;

import java.sql.Time;
import java.util.List;

import org.openspaces.core.GigaSpace;
import org.openspaces.core.executor.DistributedTask;
import org.openspaces.core.executor.TaskGigaSpace;
import com.gigaspaces.annotation.pojo.SpaceRouting;
import com.gigaspaces.async.AsyncResult;

public class MyTask implements DistributedTask<Integer, Integer>{

	@TaskGigaSpace
	transient GigaSpace space;
	
	public Integer execute() throws Exception {
		Account templ = new Account();
		
		Account accounts[] = space.readMultiple(templ , Integer.MAX_VALUE);
		int total = 0;
		for (Account account : accounts) {
			total += account.getBalance();
		}
		Time t = new Time(System.currentTimeMillis());
		System.out.println(t + " MyTask execute called at "+space.getSpace().getURL().getContainerName() + 
			" - total is:" + total );
		return total/accounts.length;
	}

	int routing;

	@SpaceRouting
	public Integer routing() {
		return routing;
	}

	public Integer reduce(List<AsyncResult<Integer>> results) throws Exception {
		Integer total_result =0;
		int partitions=0;
		for (AsyncResult<Integer> result : results) {
			if (result.getException() != null) {
				throw result.getException();
			}
			partitions++;
			int temp_result = result.getResult().intValue();
			total_result +=  temp_result ;
		}
		return total_result/partitions;
	}
}

The Client

Sync mode

The client invokes the Task on the remote space in sync mode using the following:

space = new UrlSpaceConfigurer("jini://*/*/space").space();
gigaSpace = new GigaSpaceConfigurer(space).gigaSpace(); 
AsyncFuture<Integer> future =gigaSpace.execute(new MyTask());
Integer result = future.get();

A-Sync mode

The client invokes the Task on the remote space in A-sync mode using the following:

space = new UrlSpaceConfigurer("jini://*/*/space").space();
gigaSpace = new GigaSpaceConfigurer(space).gigaSpace(); 
gigaSpace.execute(new MyTask(),new ExecutorTaskClientMain ());

ExecutorTaskClientMain implements AsyncFutureListener:

public void onResult(AsyncResult<Integer> result) {
	System.out.println(new Time(System.currentTimeMillis()) + " - Client got Result:" + result.getResult() );
}

Running the Example

Deploying the Space

Using IDE:
Set your IDe to have the Following:


Click Run. This will start the clustered space within your IDE.

Using CLI:
To start the clustered space with 2 partitions run the following:

\gigaspaces-xap\bin\bin>puInstance -cluster schema=partitioned total_members=2 ..\deploy\templates\datagrid

Run the Client Application

Run the Client Application (ExecutorTaskClientMain.java).

The ExecutorTaskClientMain requires the following as application arguments:
Sync mode:

org.test.executor.ExecutorTaskClientMain sync

A-Sync mode:

org.test.executor.ExecutorTaskClientMain async

Service Executors Example

The example will have a clustered space with a collocated service running. A client will be invoking the service:

The results created at each partition will be sent to the client. These will be aggregated via the reducer implementation at the client side and displayed:

This example illustrates simple Service Executors usage in Synchronous mode and Asynchronous mode.
Your code should have the following implemented:

  1. Service Interface
  2. Service Implementation
  3. Service Result Reducer (Client Side)
  4. Client

The Service Interface

The Service Interface includes 2 methods. One used to invoke the Service method in Synchronous mode and another used to invoke the Service method Asynchronous mode:

import com.gigaspaces.async.AsyncFuture;

public interface IDataProcessor {
    Integer processData(Object data);
    AsyncFuture<Integer>  asyncProcessData(Object data);
}

The Service Implementation

The Service Implementation includes some business logic for both of these methods:

import java.sql.Time;
import org.openspaces.core.GigaSpace;
import org.openspaces.core.cluster.ClusterInfo;
import org.openspaces.core.cluster.ClusterInfoContext;
import org.openspaces.core.context.GigaSpaceContext;
import org.openspaces.remoting.RemotingService;
import com.gigaspaces.async.AsyncFuture;

@RemotingService
public class DataProcessorService implements IDataProcessor {

	@ClusterInfoContext
	public ClusterInfo clusteinfo;

	@GigaSpaceContext
	transient GigaSpace gigaSpace;
	
    public AsyncFuture<Integer>   asyncProcessData(Object data)
    {
    	return null;
    }

    public Integer processData(Object data) {
		Account templ = new Account();
		Account accounts[] = gigaSpace.readMultiple(templ , Integer.MAX_VALUE);
		int total = 0;
		for (Account account : accounts) {
			total += account.getBalance();
		}
		Time t = new Time(System.currentTimeMillis());
		System.out.println(t + " MyTask execute called at "+gigaSpace.getSpace().getURL().getContainerName() + " - total is:" + total );
		return total/accounts.length;
	}
}

The Service Declaration

The pu.xml used to export the Service and start the space described below:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:os-core="http://www.openspaces.org/schema/core"
       xmlns:os-events="http://www.openspaces.org/schema/events"
       xmlns:os-remoting="http://www.openspaces.org/schema/remoting"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
       http://www.openspaces.org/schema/core http://www.openspaces.org/schema/core/openspaces-core.xsd
       http://www.openspaces.org/schema/events http://www.openspaces.org/schema/events/openspaces-events.xsd
       http://www.openspaces.org/schema/remoting http://www.openspaces.org/schema/remoting/openspaces-remoting.xsd">

<!-- Support @RemotingService component scanning -->
<context:component-scan base-package="org.test.executor"/>

<os-core:giga-space-context/>

<!-- Support the @RemotingService annotation on a service-->
<os-remoting:annotation-support />

<os-core:space id="space" url="/./space" />

<os-core:giga-space id="gigaSpace" space="space"/>

<os-remoting:service-exporter id="serviceExporter" />

</beans>
The context:component-scan , os-remoting:service-exporter and os-remoting:annotation-support used to allow the system to locate classes annotated with RemotingService and export these implicitly.

Service Result Reducer

The Service Result Reducer is called at the client side and aggregates results sent from all invoked services (collocated with all space partitions).

The Reducer implements the RemoteResultReducer interface:

import org.openspaces.remoting.RemoteResultReducer;
import org.openspaces.remoting.SpaceRemotingInvocation;
import org.openspaces.remoting.SpaceRemotingResult;

public class DataProcessorServiceReducer implements RemoteResultReducer<Integer, Integer>{

	public Integer reduce(SpaceRemotingResult<Integer>[] results, SpaceRemotingInvocation sri) throws Exception {
		int total_result =0;
		for (int i =0 ;i<results.length ; i++)
		{
			int temp_result = results[i].getResult().intValue();
			total_result +=  temp_result ;
		}
		return total_result/results.length  ;
	}
}

The Client

The client invokes the service in Synchronous mode using the following:

IJSpace space = new UrlSpaceConfigurer("jini://*/*/space").space();
GigaSpace gigaSpace = new GigaSpaceConfigurer(space).gigaSpace(); 
IDataProcessor dataProcessor = new ExecutorRemotingProxyConfigurer<IDataProcessor>
	(gigaSpace, IDataProcessor.class).broadcast(new DataProcessorServiceReducer()).proxy(); 
Integer result = dataProcessor.processData("A" + count);
System.out.println(new Time(System.currentTimeMillis()) + " - Client got Result:" + result.intValue() );
  1. The client getting a proxy to a remote space
  2. The client constructing Service proxy using the ExecutorRemotingProxyConfigurer. The DataProcessorServiceReducer is used when constructing Service proxy .
  3. The client invokes the service in Synchronous mode
  4. The result is being displayed.

Running the Example

Deploying the Space and Services

Using IDE:
Set your IDe to have the Following:


Click Run. This will start the clustered space and the Services within your IDE.

Using CLI:
To start the clustered space with 2 partitions and export the Service run the following:

\gigaspaces-xap\bin\bin>puInstance -cluster schema=partitioned total_members=2 \ExecutorExample\classes

Where the \ExecutorExample\classes should include the processing unit pu.xml under META-INF\spring\pu.xml and relevant Service class files.

Run the Client Application

Run the Client Application (ExecutorClientMain.java) using the following:

org.test.executor.ExecutorClientMain sync
Rate this page:
Your Rating: Results: PatheticBadOKGoodOutstanding! 19 rates

Labels

 
(None)