Compute Server - Master-Worker Pattern

  GigaSpaces 5.X

Documentation Home
Quick Start Guide
Release Notes

Previous release

  Search Here
Searching GigaSpaces Platform 5.X Documentation

                                               

Summary: Demonstrates how the master-worker is used to implement a parallel computing engine that calculates whether or not a given number is prime.

This page is specific to:
GigaSpaces 5.x

If you're interested in another version, click it below:
GigaSpaces 6.0
GigaSpaces 6.5

Overview

One of the most common patterns when using GigaSpaces is the master-worker pattern. In this section, the master-worker is used to implement a parallel computing engine that calculates whether or not a given number is prime.

Master-Worker Pattern

The master-worker pattern corresponds with the typical master-worker scheme: a master dispatches pieces of work to several workers. Each worker does his piece of work. When finished, it sends the result to the master and requests a new piece of work, and so on.

When all the work is done, the master sends a stop-signal to the workers.

This scheme is important in practice, since it automatically balances the load.

Prime Calculation Engine

In this example, an engine is implemented for calculating whether or not an argument is prime, based on the master-worker pattern above.

The master-worker pattern illustrates how the space can be used to perform parallel tasks.

This architecture can also be used in any Peer-to-Peer application, such as search engines, agent-based applications, etc.

In this specific example, the master is the calculation engine. It receives a number and is responsible for checking if that number is prime. It does so by splitting the possible divisors of this number into a group of ranges – a range for each worker. Each worker searches for a divisor in its range. If it finds a divisor, it immediately writes it back to the space. Otherwise, it writes an indication that no divisor has been found in its range. The master collects the workers' results and returns the final result.

Development Process

Step 1: Define the Entries (Packet interface)

The Entries in a space-based application have a similar role to those of remote interfaces in a remote application. The main difference is that an Entry is a loosely coupled communication mechanism, while the Entry in this mechanism is a packet that can move from one place to the other. There is no static binding to and from the interface. This characteristic enables a highly dynamic and more associative type of communication among distributed components.

Rules for defining an Entry

  • Each Entry class must be of type Entry. This means it should implement the Entry interface.
  • An Entry class must contain a default (no-arg) constructor.
  • Entry fields must be public, non-static, non-transient, and non-final Java objects. All other data members are passed by value and are not used for matching. In this case, the Entry packets are both the tasks and the task results.

CheckIfPrimeTask Class

The CheckIfPrimeTask class is designed to perform a check if a given number has a divisor in some range.

The m_StartDivisor and m_EndDevisor represent the range, and m_ValueToCheck is the prime candidate.

The run() method is the actual implementation that performs the test.

package com.j_spaces.examples.primes;
import java.util.Map;
import net.jini.core.entry.Entry;
/**
 * This object encapsulates a task entry in space. The performer of this task
 * checks if the m_ValueToCheck is divisible by any integer from m_StartDivisor
 * to m_EndDivisor. It writes the result as a CheckIfPrimeResult object.
 */
public class CheckIfPrimeTask
  //extends com.sun.rio.jsb.space.SpaceChain // required for rio compatibility
  implements Entry
{
  public Integer      m_ValueToCheck;
  public Integer      m_StartDivisor;
  public Integer      m_EndDivisor;
  public TaskUID   m_uid;
  public CheckIfPrimeTask()
  {
  }
  // called by the master
  public CheckIfPrimeTask(TaskUID uid, int valueToCheck, int startDivisor, int endDivisor)
  {
    m_ValueToCheck = new Integer(valueToCheck);
    m_StartDivisor = new Integer(startDivisor);
    m_EndDivisor = new Integer(endDivisor);
m_uid = uid;
  }
  // called by the server
  public Entry execute(Map map) throws Exception {
        return run();
  }
  public CheckIfPrimeResult run()
  {  

    for (int i=m_StartDivisor.intValue(); i<=m_EndDivisor.intValue(); i++)
    {
      if (m_ValueToCheck.intValue() % i == 0)
      { /* divisor found */
        return new CheckIfPrimeResult(m_uid, m_ValueToCheck.intValue(),
          m_StartDivisor.intValue(), m_EndDivisor.intValue(), i);
      }
    }
    /* divisor not found */
    return new CheckIfPrimeResult(m_uid, m_ValueToCheck.intValue(),
          m_StartDivisor.intValue(), m_EndDivisor.intValue(), 0);
  }  

  public String toString()
  {
  return getClass().getName() + m_uid  ;
  }
}

CheckIfPrimeResult Entry

The result Entry is designed to provide an interface for storing the result of the associated task. This means that it should contain information on whether or not the number is prime. It should also contain information that indicates the specific task to which the result is associated. In this example, the range is used as the association key. Each task is associated with a specific range, in the same way that each result is associated with that range.

In space-based programming, this association is extremely common, since most operations are performed asynchronously. The template matching mechanism is the perfect way to implement this association. One of the key issues in designing an interface is defining the association keys that are be used by the template Entries.

package com.j_spaces.examples.primes;
import net.jini.core.entry.Entry;
/**
 * This object encapsulates the result of a CheckIfPrimeTask entry.
/ * If the m_Divisor != 0 then m_CheckedValue is divisible by m_Divisor;
 * otherwise m_CheckedValue is not divisible by any integer from
 * m_StartDivisor to m_EndDivisor.
 */
public class CheckIfPrimeResult
     implements Entry
{
  public Integer      m_CheckedValue;
  public Integer      m_StartDivisor;
  public Integer      m_EndDivisor;
  public Integer      m_Divisor;
  public TaskUID   m_uid;
  public CheckIfPrimeResult()
  {
  }
  public CheckIfPrimeResult(TaskUID uid, int checkedValue, int startDivisor,
    int endDivisor, int divisor)
  {
    m_uid = uid;
    m_CheckedValue = new Integer(checkedValue);
    m_StartDivisor = new Integer(startDivisor);
    m_EndDivisor = new Integer(endDivisor);
    m_Divisor = new Integer(divisor);
  }
  public boolean isDivisable()
  {
    return m_Divisor.intValue() != 0;
  }
  public String toString()
  {
  return getClass().getName() + m_uid  ;
  }
}

The master assigns a TaskUID, a simple concatenation of thread name, count and time.

The TaskUID is unique. It allows several masters to work in conjunction on different isPrime() requests.

package com.j_spaces.examples.primes;
import java.io.Serializable;
public class TaskUID implements Serializable
{  

  private  String m_tid ;
  private  int m_taskCount ;
  private long m_timeStmp ;
  transient static int m_globalCount=0 ;
  public TaskUID()
  {
  m_tid = Thread.currentThread().getName();
  m_taskCount = m_globalCount++;
  m_timeStmp = System.currentTimeMillis();
  }
  public static TaskUID newUID()
  {
  return new TaskUID();
  }
  public String toString()
  {
  return "[ " + m_tid + "." +  m_taskCount + "." + m_timeStmp + " ] ";
  }
}

Step 2: Generate Starting-Point Code

The starting point of an application includes the application mainline. Since the application is always the space's client, initialization should be common for both the master and the worker. This initialization deals with the way we find a space or any other service required during the lifecycle of the application.

It is good practice to build an accessory class for each service that deals with these issues.

package com.j_spaces.examples.primes;
public class Main
{
  public static void main(String args[])
     throws Exception
  {
    // check usage
    if (args.length != 2)
        printUsageAndExit();
    // extract params
    String lookupHost = args[0];
    int primeCandidate = Integer.parseInt(args[1]);
    // set RMI Security Manager
     if ( System.getSecurityManager() == null )
     {
       System.setSecurityManager( new RMISecurityManager() );
      }
    // get references to space and transaction services
    JavaSpace space = null;
    try
    {
    space = (JavaSpace)SpaceFinder.find( lookupHost );
    }
catch( FinderException ex )
{
ex.printStackTrace();
System.out.println("Could not find space: " + lookupHost);
System.out.println("Please check that GigaSpaces Server is running.");
  System.exit(1);
}
    System.out.println("Looking for Transaction Manager...");
    /*
     * TransactionManager trManager = (TransactionManager)LookupFinder.find(
           "Transaction Manager",       // service name
           null,//new Class[] { net.jini.core.transaction.server.TransactionManager.class }, 
           // service class name
           null,       // service attributes
           "localhost", // unicast lookup host
   null,       // lookup groups
                                   10*1000     // timeout 10 seconds
   );
*/
    TransactionManager trManager = new LocalTransactionManager((IJSpace)space);    

    if ( trManager == null )
    {
     System.out.println("Transaction Manager can not be found...");
     System.exit(1);
    }
    else
     System.out.println("Found: Transaction Manager." );
    // check if prime and print result
    boolean isPrime = Master.isPrime(space, trManager, primeCandidate);
    if (isPrime)
      System.out.println("prime");
    else
      System.out.println("not prime");
    // finish
    System.exit(0);
  }
  private static void printUsageAndExit()
  {
    System.out.println("Main <Jini lookup host> <prime candidate>");
    System.exit(1);
  }
}

Step 3: Develop Master Program

The role of the Master is to provide a unified interface that calculates whether or not a number is prime. The Master processes this task by separating the task into a range of sub-tasks calculated in parallel via the workers.

public class Master {
public static boolean isPrime(JavaSpace space,
TransactionManager trManager, int primeCandidate) throws Exception { 

if (primeCandidate <= 1)
return false;
if (primeCandidate == 2)
return true; 

// break task into several tasks and write them to space in one
// transaction
int numOfTasks = (int) Math.sqrt((double) primeCandidate - 2);
int numbersPerTask = (primeCandidate - 2) / numOfTasks;
// Creating task ID
TaskUID uid = TaskUID.newUID();
System.out.println("Creating " + numOfTasks + " tasks, "
+ numbersPerTask + " numbers per task with id " + uid ); 

// Create transaction
Transaction.Created tCreated = TransactionFactory.create(trManager,
3600 * 1000);
Transaction tr = tCreated.transaction;
for (int i = 0; i < numOfTasks; i++) {
int startDivisor = i * numbersPerTask + 2;
int endDivisor = i * numbersPerTask + 2 + numbersPerTask - 1;
CheckIfPrimeTask task = new CheckIfPrimeTask(uid, primeCandidate,
startDivisor, endDivisor);
space.write(task, tr, 60 * 1000);
}
// commit transaction
tr.commit();
// create and start worker threads
int numOfWorkers = Math.min(numOfTasks, 10);
System.out.println("Starting " + numOfWorkers + " worker threads");
for (int i = 0; i < numOfWorkers; i++) {
Thread worker = new Thread(new Worker(space, trManager, 60 * 1000));
worker.start();
}
//wait for any result. If result indicates a not prime number, return
// false.
// If all results indicate a prime return true.
// create a new transaction
CheckIfPrimeResult template = new CheckIfPrimeResult();
template.m_CheckedValue = new Integer(primeCandidate);
template.m_uid = uid;
CheckIfPrimeTask templateTask = new CheckIfPrimeTask();
templateTask.m_uid = uid;
System.out.println("Master : Waiting for results...");
// create transaction
tCreated = TransactionFactory.create(trManager, 3600 * 1000);
tr = tCreated.transaction;
int numOfResults = 0;
// read the results
while (numOfResults < numOfTasks) {
CheckIfPrimeResult taskResult = (CheckIfPrimeResult) space.take(
template, tr, Long.MAX_VALUE);
System.out.println("Result (startDivisor="
+ taskResult.m_StartDivisor + " ; endDivisor="
+ taskResult.m_EndDivisor + ") arrived");
numOfResults++;
if (taskResult.isDivisable()) {
// found divisible - candidate is not prime
System.out.println("Master : Found divisor "
+ taskResult.m_Divisor.intValue() + " for "
+ primeCandidate); 

tr.commit();
return false;
}
}
tr.commit();
return true;
}
}

Step 4: Develop the Worker

A Worker is a thread responsible for taking CheckIfPrimeTask tasks from the space, performing these tasks, and writing back CheckIfPrimeResult objects. It uses the task's run method to perform the task and receive a result, which is then written back to the space for the master to gather.

public class Worker
  implements Runnable
{
  private JavaSpace           m_Space;
  private TransactionManager  m_TrManager;
  private long                m_Timeout;
  public Worker(JavaSpace space, TransactionManager trManager, long timeout)
  {
    m_Space = space;
    m_TrManager = trManager;
    m_Timeout = timeout;
  }
  public void run()
  {
    try {
      CheckIfPrimeTask template = new CheckIfPrimeTask();
      while (true)
      {
        // create a new transaction
        Transaction.Created tCreated = TransactionFactory.create(
              m_TrManager, 3600 * 1000);
        Transaction tr = tCreated.transaction;
        // take a task from space
System.out.println("Worker " + Thread.currentThread().getName() +  " waiting for task : " + 
template.getClass().getName());
        CheckIfPrimeTask task = (CheckIfPrimeTask) m_Space.take(template, tr, m_Timeout);
System.out.println("Worker "+ Thread.currentThread().getName() + " processing task :" + task);
        if (task == null)
        {
          tr.commit();
          return;
        }
        // check result
        CheckIfPrimeResult result = task.run();
        m_Space.write(result, tr, Lease.FOREVER);
        // commit transaction
        tr.commit();
      }
    }
    catch (Exception ex) {
      ex.printStackTrace();
    }
  }
}

In this example, Jini transactions are used to protect the application from partial failures. A transaction is a "unit of work". All operations in a transaction are visible only to members of the transaction. Moreover, all these operations succeed or fail together, depending on whether the transaction is committed or aborted. This maintains data integrity and protects the application and the space from partial failures.

For list of required libraries, refer to the Setting Classpath section.

Wiki Content Tree


Your Feedback Needed!

We need your help to improve this wiki site. If you have any suggestions or corrections, write to us at techw@gigaspaces.com. Please provide a link to the wiki page you are referring to.

Labels

 
(None)