|
Search XAP.NET 7.0
Offline Documentation
Download latest offline documentation in HTML format:
|
Summary: Space task executors allow you to easily execute grid-wide tasks on the space.
OverviewGigaSpaces support executing tasks in a collocated Space (processing unit that started an embedded Space). Space tasks can be executed either directly on a specific cluster member using typical routing value. Space tasks can also be distributed, which means it is executed in a "broadcast" mode on all the primary cluster members concurrently and reduced to a single result on the client side, which is also known as the map reduce pattern which is used in many applications that does parallel processing. Space tasks are dynamic in terms of content, it contains user code that will be executed at the Space as is.
Space Task APIThe ISpaceTask interface is defined as follows: public interface ISpaceTask<T> { /// <summary> /// Computes a result, or throws an exception if unable to do so. /// </summary> /// <param name="spaceProxy">A proxy to the space in which this task is being executed.</param> /// <param name="tx">The transaction (if any) under which to work.</param> /// <returns>Computed result.</returns> T Execute(ISpaceProxy spaceProxy, ITransaction tx); } Here is a simple implementation of a space task that calculates the number of objects on a single node and prints a message at the single node output. [Serializable] public class CountTask : ISpaceTask<int> { private String _message; public CountTask(String message) { _message = message; } public int Execute(ISpaceProxy spaceProxy, ITransaction tx) { System.Console.WriteLine(message); return spaceProxy.Count(); } }
Executing the space task ISpaceProxy spaceProxy = // obtain a proxy to a space //Execute the task on a specific node using a specified routing value (2) //And inserting the calculation result to count variable int count = spaceProxy.Execute(new CountTask("hello world"), 2); Distributed Space Task API (Map Reduce)A IDistributedSpaceTask is a space task that ends up executing more than once (concurrently) and returns a result that is a reduced operation of all the different execution. Phase 1 - Sending the Space tasks to be executed: Phase 2 - Getting the results back to be reduced: The IDistributedSpaceTask interface is a composition both ISpaceTask and ISpaceTaskResultsReducer interfaces. The ISpaceTaskResultsReducer is defined as follows: public interface ISpaceTaskResultsReducer<R, T> { /// <summary> /// Reduce a list of <see cref="SpaceTaskResult{T}"/> into a single result. /// </summary> /// <param name="results">The list of space task results to reduce.</param> /// <returns>Reduced result.</returns> R Reduce(SpaceTaskResultsCollection<T> results); } Here is a simple example of a distributed space task that extends our previous example: [Serializable] public class DistributedCountTask : IDistributedSpaceTask<long, int> { private String _message; public CountTask(String message) { _message = message; } public int Execute(ISpaceProxy spaceProxy, ITransaction tx) { System.Console.WriteLine(message); return spaceProxy.Count(new Object()); } public long Reduce(SpaceTaskResultsCollection<int> results) { int sum = 0; foreach(SpaceTaskResult<int> result in results) { if (result.Exception != null) throw result.Exception; sum += result.Result; } return result; } } This task will execute on each one of the primary nodes in the cluster, Executing the distributed task ISpaceProxy spaceProxy = // obtain a proxy to a space //Execute the task on all the primary nodes with in the cluster //and inserting the calculation result to count variable long count = spaceProxy.Execute(new DistributedCountTask("hello world")); Space Task Results FilterWhen executing a distributed space task, results arrive in an asynchronous manner and once all the results have arrived, the ISpaceTaskResultsReducer is used to reduce them. The ISpaceTasukResultsFilter can be used as a callback and filter mechanism to be invoked for each result that arrives. public interface ISpaceTaskResultsFilter<T> { /// <summary> /// A callback invoked for each result that arrives as a result of a distributed space task execution allowing /// to access the result that caused this event, the events received so far, and the total expected results. /// </summary> /// <param name="info">Current filter info.</param> /// <returns>Filter's decision</returns> SpaceTaskFilterDecision GetFilterDecision(SpaceTaskFilterInfo<T> info); } /// <summary> /// Controls what should be done with the results of an <see cref="IDistributedSpaceTask{R,T}"/> execution. /// </summary> public enum SpaceTaskFilterDecision { /// <summary> /// Continue processing the distributed task. /// </summary> Continue = 0, /// <summary> /// Break out of the processing of the distributed task and move /// to the reduce phase including the current result. /// </summary> Break = 1, /// <summary> /// Skip this result and continue processing the rest of the results. /// </summary> Skip = 2, /// <summary> /// Skip this result and breaks out of the processing of the distributed task /// and move to the reduce phase. /// </summary> SkipAndBreak = 3, } The filter can be used to control if a result should be used or not (the Skip decision). If we have enough results and we can move to the reduce phase (the Break decision). If we should continue accumulating results (the Continue decision). Or if we dont want to use the current result and move to the reduce phase (the SkipAndBreak decision). The filter can also be used as a way to be identify that results have arrived and we can do something within our application as a result of that. Note, in this case, make sure that heavy processing should be performed on a separate (probably pooled) thread. TransactionsSpace tasks fully support transactions, an execute request can receive a When executing a single space task, usally a local transaction will suffice, while when executing a distributed space task, a distributed transaction will be required. The transaction creation, commit and abort normally should be done at the client according to the result. Here's a simple example ISpaceProxy spaceProxy = // obtain a proxy to a space ITransaction tx = spaceProxy.LocalTransactionManager.Create(); try { //Execute the task on a specific node using a specified routing value (2) //And inserting the calculation result to count variable int count = spaceProxy.Execute(new ClearMyObjectTask(), 2, tx); tx.Commit(); } catch(Exception ex) { tx.Abort(); } [Serializable] public class ClearMyObjectTask : ISpaceTask<int> { public int Execute(ISpaceProxy spaceProxy, ITransaction tx) { MyObject template = new MyObject(); int result spaceProxy.Count(template, tx); spaceProxy.Clear(template, tx); return result; } } Asynchronous ExecutionA space task can also be executed asynchronously with the corresponding BeginExecute EndExecute method. This follows the standard .NET asynchronous API, once the execution is complete the execute invoker is notified by the async result which is received from the BeginExecute method or to a supplied callback. This will be similiar to executing a task in a seperate thread, allowing to continue local process while waiting for the result to be calculated at the space nodes. Executing asynchronous space using async result ISpaceProxy spaceProxy = // obtain a proxy to a space //Execute the task on all the primary nodes with in the cluster IAsyncResult<long> asyncResult = spaceProxy.BeginExecute(new DistributedCountTask("hello world"), null /*callback*/, null /*state object*/); ... //This will block until the result execution has arrived long count = spaceProxy.EndExecute(asyncResult); Executing asynchronous space using async result wait handle ISpaceProxy spaceProxy = // obtain a proxy to a space //Execute the task on all the primary nodes with in the cluster IAsyncResult<long> asyncResult = spaceProxy.BeginExecute(new DistributedCountTask("hello world"), null /*callback*/, null /*state object*/); ... //This will block until the result execution has arrived asyncResult.AsyncWaitHandle.WaitOne(); //Gets the actual result of the async execution long count = spaceProxy.EndExecute(asyncResult); Executing asynchronous space using async call back and a state object ISpaceProxy spaceProxy = // obtain a proxy to a space //Execute the task on all the primary nodes with in the cluster spaceProxy.BeginExecute(new DistributedCountTask("hello world"),ResultCallBack, new MyStateObject()); ... public void ResultCallBack<long>(IAsyncResult<long> asyncResult) { //Gets the state object attached to this execution MyStateObject stateObject = (MyStateObject)asyncResult.AsyncState; ... //Gets the actual result of the async execution long count = spaceProxy.EndExecute(asyncResult); ... } |
Task Execution Over The Space
IMPORTANT: This is an old version of GigaSpaces XAP. Click here for the latest version.
(None)



