|
Summary: IP supports multicasting, where a source device can send to a group of devices. With Multicast, traffic is sent to a single address, but is processed by multiple hosts.
Deprecated in GigaSpaces 6.0 OverviewThe vast majority of traffic on IP internet is of the unicast variety: one source device sending to one destination device. IP also supports multicasting, where a source device can send to a group of devices. With multicast, traffic is sent to a single address, but is processed by multiple hosts. Hosts listening on a specific multicast address make up a multicast group, and they receive and process traffic sent to that group address. Group membership is dynamic, allowing hosts to join and leave the group at any time. Groups are not limited by size, and members can be from multiple network segments (links or sub-nets) if the connecting routers support forwarding of multicast traffic and group membership information. A host can send traffic to a group address without belonging to the group. In fact, to join a group, a host sends a group membership message. Multicast routers periodically poll membership status. GigaSpaces supports both unicast-TCP and multicast-UDP notification transmissions when using the JavaSpace.Notify() method. The unicast-TCP has a drawback when there are several clients registered for notifications with the same template. In this special case, the main problem is server side overhead with CPU and memory consumption, since notifications need to be sent to all registered clients. Moreover, clients receive these TCP notifications using a dedicated server-side thread taken from a fixed size thread pool. When there are massive amounts of notifications to be delivered, the pool may be fully consumed, causing a backlog of accumulating notifications to be sent. This generates unexpected message delivery delays. In this transmission mode the network utilization is high, since the server performs a remote call for each registered client to deliver the notification. These problems result in scalability issues. Multicast-UDP notification transmissions are targeted to solve these problems
How it WorksA special component within the space, Multicast Notify Worker, receives UDP notify registration requests. One of the most important features of the Multicast Notify Worker is its ability to identify templates that have already been registered and to utilize one subscription for many subscribers.
On the client side, the DataEventSession instance in UDP transmission mode receives transmitted events for the subscribed multicast channel and calls the relevant RemoteEventListener implementation. The Multicast Notify Worker sends notifications to the clients periodically or when a specific number of notifications have accumulated in chunks. You can define the interval-dispatch-time or the chunk-event-size via the space configuration file. By using the NotifyDelegator with UDP transmission mode you may provide INotifyDelegatorFilter implementation that runs at the server side when space destructive events are triggered (write, take, update, Lease expiration). This user defined business logic (e.g., based on Entry content, system current status, client identification, etc.) allows notifications to be filtered out at the server eliminating needless transmissions to the client.
MulticastNotifyWorker PropertiesThe space configuration file holds the following properties for the Multicast Notify Worker: <workers> <worker-names>MulticastNotifyWorker</worker-names> <MulticastNotifyWorker> <enabled>true</enabled> <class-name>com.j_spaces.worker.multicast.MulticastNotifyWorker</class-name> <!--To reduce latency, dispatch intervals should be set to accommodate latency requirements--> <interval-dispatch-time>1000</interval-dispatch-time> <chunk-event-size>100</chunk-event-size> <multicast-group>224.0.0.1</multicast-group> <multicast-port>34721</multicast-port> <ttl>4</ttl> <discovery-interface>localhost</discovery-interface> <description>Multicast Notify Manager</description> </MulticastNotifyWorker> </workers>
Multicast address table:
INotifyDelegatorFilter in UDP Mode.The com.j_spaces.core.client.INotifyDelegatorFilter interface includes the following methods:
Implement INotifyDelegatorFilter according to the desired business logic. The com.j_spaces.core.client.INotifyDelegatorFilter.process(...) method should return true when the event meets a criteria – i.e., event should be transmitted to the client; otherwise, false (see reference to example below). Multicast in a Clustered Space ConfigurationEfficient cluster wide filtering may be achieved by maintaining filtered logic in a centralized component, thus encapsulating business logic for members of that cluster. Registering for notifications under a clustered space configuration is done through the clustered proxy reference. All space members must have the same multicast group properties (i.e., multicast-group, multicast-port, and discovery-interface).
Multicast Notify ExampleThis example demonstrate the Multicast NotifyDelegator receiveing notifications for requested templates. The Multicast NotifyDelegator support Filtered and non Filtered listener. This example has 2 modes:
The listeners register the notification (filtered or non-filtered) and a writer application write 10 Entries with each Hit on the <Enter> key to the space. This triggers the notification listeners. To start the GigasSpaces Server: gsInstance "/./multicastnotifySpace?schema=cache&properties=multicastnotify" ".;..\classes" The space properties file: space-config.workers.MulticastNotifyWorker.enabled=true
To start the client that receives filtered messages: @%JAVACMD% -Dcom.gs.home=%JSHOMEDIR% -classpath %JARS%;classes com.j_spaces.examples.
multicast_notify.MulticastNotify
rmi://localhost/./multicastnotifySpace -f
To start the client that receives non-filtered messages: @%JAVACMD% -Dcom.gs.home=%JSHOMEDIR%
-classpath %JARS%;classes com.j_spaces.examples.multicast_notify.MulticastNotify
rmi://localhost/./multicastnotifySpace -a
The Entry to be used as part of the Example: import com.j_spaces.core.client.MetaDataEntry; public class Message extends MetaDataEntry { public Integer id; public String content; public Message() { this.setFifo(true); } public Message(int id) { this(id, null); } public Message(int id, String msg) { this.id = new Integer(id); this.content = msg; this.setFifo(true); } public String toString() { return "Message(" + id +", " + content+")"; } } The Application: public class MulticastNotify { /** * MulticastNotifyListener, is invoked each time an event arrives * at the client. The events are numbered as they arrive, and * printed out with relevant information. */ public class MulticastNotifyListener implements RemoteEventListener { volatile int numberOfNotifications; public final int listenerId; private long leaseExpTime; public MulticastNotifyListener(Integer handback) { this.listenerId = handback.intValue(); } public void setLeaseExpirationTime(long lease) { this.leaseExpTime = lease; } public long getLeaseExpirationTime() { return this.leaseExpTime - System.currentTimeMillis(); } public void notify(RemoteEvent theEvent) throws UnknownEventException, RemoteException { try { Message message = (Message)((EntryArrivedRemoteEvent)theEvent).getEntry(); System.out.println( "\n[" + (++numberOfNotifications) + "]. " + "Message : " + message + "\n\tEventId : " + theEvent.getID() + "\n\tSequenceNumber : " + theEvent.getSequenceNumber() + "\n\tHandBack : " + theEvent.getRegistrationObject().get() + "\n\tLease Expires in : " + getLeaseExpirationTime() + " ms"); } catch (Exception e) { e.printStackTrace(); } } } /** * MulticastNotifyFilter, is invoked at the server * Excludes odd event-ids from reaching the listener, but allows * even event-ids. */ static public class MulticastNotifyFilter implements INotifyDelegatorFilter { private IJSpace spaceProxy; //leave this false to exclude PASSED messages from log public final boolean logIncludingPassed = false; //listener id this filter is attached to private int listenerId; public MulticastNotifyFilter(Integer listenerId) { this.listenerId = listenerId.intValue(); } public void init(IJSpace space, Entry notifyTemplate) { spaceProxy = space; Thread.currentThread().setName("MulticastNotifyFilter-"+listenerId); } /** * process is called on each arrived event at the space. * Excludes odd event ids, allow even event ids. * @return true if this event is valid by this filter, false otherwise **/ public boolean process(EntryArrivedRemoteEvent theEvent) { boolean isEven = false; try { Message message = (Message)theEvent.getEntry(); /* filter's process method returns TRUE if and only if * Message Id is even = 2,4,6 ... otherwise FALSE. */ isEven = (message.id.intValue() % 2 == 0); } catch (Exception e) { System.err.println("MulticastNotifyFilter - Exception: "+e); e.printStackTrace(); } return isEven; } public void close() { } } private IJSpace spaceProxy; public static int globalId = 1; public static int notifyListenerId; private MulticastNotifyDelegator mDelegator; private MulticastNotifyFilter multicastNotifyFilter; private MulticastNotifyListener multicastNotifyListener; public static final int NUMBER_OF_MESSAGES = 10; public MulticastNotify(IJSpace space) { spaceProxy = space; } /** * Register notify delegator for the Message() template * * @throws TransactionException * @throws IOException * @throws ClassNotFoundException */ private void registerNotifyTemplates(boolean filterEnabled) throws TransactionException, IOException, ClassNotFoundException { // Get an instance of the delegator mDelegator = MulticastNotifyDelegator.getInstance(spaceProxy); //define registration template Message template = new Message(); //prepare notify listener Integer listenerId = new Integer( (++MulticastNotify.notifyListenerId) ); MarshalledObject marshObj = new MarshalledObject( listenerId ); multicastNotifyListener = new MulticastNotifyListener( listenerId ); EventRegistration eventReg; if (filterEnabled) { multicastNotifyFilter = new MulticastNotifyFilter(listenerId); eventReg = mDelegator.notify( template, null, //transaction multicastNotifyListener, Lease.FOREVER, marshObj, //could also be null false, //fifo mode - not support NotifyModifiers.NOTIFY_WRITE, multicastNotifyFilter); } else { eventReg = mDelegator.notify( template, null, //transaction multicastNotifyListener, Lease.FOREVER, marshObj, //could also be null false, //fifo mode - not support NotifyModifiers.NOTIFY_WRITE ); } multicastNotifyListener.setLeaseExpirationTime( eventReg.getLease().getExpiration() ); System.out.println( "Template : " + template.getClass() + "\n\tEvent Registration Id : " + eventReg.getID() + "\n\tSequenceNumber : " + eventReg.getSequenceNumber() + "\n\tHandBack : " + marshObj.get() + "\n\tLease Expires in : " + new Date( multicastNotifyListener.getLeaseExpirationTime() ) + " ms"); } /** * Called by the main application. * Writes NUMBER_OF_MESSAGES to the space, each with an increasing id * and a creation timestamp. * * @throws RemoteException * @throws TransactionException */ private void writeMessages() throws RemoteException, TransactionException { int startId = MulticastNotify.globalId; MulticastNotify.globalId += NUMBER_OF_MESSAGES; int endId = MulticastNotify.globalId; // global id <= i < global id + NUMBER_OF_MESSAGES for (int i=startId; i < endId; i++) { String messageCreationTime = (new Date( System.currentTimeMillis())).toString(); Message message = new Message( i, messageCreationTime ); spaceProxy.write( message, null, 30 * 1000); System.out.println("Message :\t" + message +" \tWritten to space"); } } /** * see usage for help * @param args <URL> <OPTIONS> */ public static void main(String[] args) { if ( args.length != 2 ) { printUsage(); } try { IJSpace space = (IJSpace)SpaceFinder.find( args[0] ); if ( space == null ) { System.out.println("Space not found: " + args[0]); System.exit(-1); } MulticastNotify mNotify = new MulticastNotify(space); if ( args[Look And Feel - ServiceGrid].equals("-m") ) { Thread.sleep(1000); // sleep interval for load time System.out.println("\nStarted the writer Application... \n\tConnected to space: " + args[0]); System.out.println("\tMake sure listeners (filtered or non-filtered) are started!\n"); while(true) { System.out.println("\nPress <Enter> to Write " + NUMBER_OF_MESSAGES + " Message entries"); System.in.read(); mNotify.writeMessages(); } } else if ( args[Look And Feel - ServiceGrid].equals("-f") ) { System.out.println("\nRegistering filtered listener..."); mNotify.registerNotifyTemplates(true); System.out.println("\nPress Enter to end Demo!"); System.in.read(); } else if ( args[Look And Feel - ServiceGrid].equals("-a") ) { System.out.println("\nRegistering non-filtered listener..."); mNotify.registerNotifyTemplates(false); System.out.println("\nPress Enter to end Demo!"); System.in.read(); } else { printUsage(); } } catch( FinderException ex ) { ex.printStackTrace(); System.out.println("Could not find space: " + args[0]); System.out.println("Please check that GigaSpaces Server is running."); } catch (Exception e) { e.printStackTrace(); } } public static void printUsage() { System.out.println("Usage: MulticastNotify <URL> <OPTION>"); System.out.println("Load the main application and then load as many listeners as you wish."); System.out.println("\nOptions are:"); System.out.println("\t-m\tMain application"); System.out.println("\t-f\tFiltered notification listeners"); System.out.println("\t-a\tAll notifications, non-filtered listeners"); System.out.println("\tURL in the form of rmi://lookup host/container name/space name"); System.exit(-1); } } Related Topics
|
(works on Firefox 2 and Internet Explorer 7)