Design Pattern: the Asynchronous Dispatcher

Today we’ll revisit a common design pattern, the Observer, and one of its derivative for multi-threaded processing: the Asynchronous Dispatcher.

I Observer pattern

The classical Observer pattern features 2 actors: the subject, called Observable and one or more Observers

The classical UML diagram for such a pattern is:

Observer Pattern Diagram

Original picture credits: http://en.wikipedia.org/wiki/File:Observer.svg

This pattern is quite simple and powerfull, the only requirement is that the subject (Observable) being observed should:

  • manage the registration/removal of its Observers
  • be able to detect any state change and notify its Observers

We are here in a push model where state changes are pushed to the Observers. The latter are just passive event listeners.

This pattern is suitable for one subject having many Observers. If there are more than one subjecs that should be monitored, the Observers registration should be done on each of them.

 

II The Dispatcher

To relieve the limitations of the Observer pattern. We can look at the Dispatcher pattern.

Dispatcher

This pattern introduces a third actor, the Dispatcher. It is responsible for:

  • managing registration of EventManagers (equivalent to the Observers)
  • dispatching the event to all EventManagers

Please notice that we define an enum for the event type (EventTypeEnum). Indeed there is one list of EventManagers for each distinct EventTypeEnum.

The routing based on the EventTypeEnum is done in the dispatch(EventTypeEnum eventType, Subject object) method body.

public void dispatch(EventTypeEnum event, Subject object)
{
	switch (event) {
		case SAVE:
			for(EventManager manager: this.saveEventManagers)
			{
				manager.executeEvent(event,object);
			}
			break;
		case UPDATE:
			for(EventManager manager: this.updateEventManagers)
			{
				manager.executeEvent(event,object);
			}
			break;
		case DELETE:
			for(EventManager manager: this.deleteEventManagers)
			{
				manager.executeEvent(event,object);
			}
			break;
		default:
			logger.error("Unknow EventType of type " + event.name());
        }

}

The executeEvent(EventTypeEnum event, Subject object) method body in EventManager is quite straightforward

public void executeEvent(EventTypeEnum event, Subject object) {
	...
	// Processing here
	...
}

Alternatively it is possible to delegate the routing of EventTypeEnum processing to each EventManager.

In this case, the Dispatcher only needs a single list of all EventManagers.

Dispatcher_Alternative

Now, in each EventManager executeEvent(EventTypeEnum event, Subject object) method body, the routing is done:

public void executeEvent(EventTypeEnum event, Subject object) {
	switch (event) {
		case SAVE:
			// Processing here
			break;
		case UPDATE, DELETE:
			// Do nothing
			break;
		default:
			logger.error("Unknow EventType of type " + event.name());
        }
}

The only drawback of this pattern is that if you introduces a new EventType, you will need to change the Dispatcher code (for the routing) or each existing EventManager code (if routing is done at EventManager level).

 

III Asynchronous Dispatcher

In real projects, some processing can take time. With the Dispatcher pattern, the main thread is blocked until all EventManagers finish their task.

If would be interesting if we could delegate the processing to a separate thread. There comes the Asynchronous Dispatcher pattern.

AsyncDispatcher

Here we rely on a ThreadPoolTaskExecutor in each EventManager to delegate the work to a new thread. Please notice that the dispatching task is done in the main thread. The new thread delegation is done at the last step in each EventManager.

One important point to take care when working in a multi-threaded context is concurrency issues. Indeed when we pass the Subject object to a new thread for processing, the main thread resumes its processing flow and the same Subject object may be changed in this main thread.

Either you ensure that the processing flow in the main thread never change the Subject object, or you make a deep copy. It is quite expensive but there is no other way around to guard agains concurrent modifications.

Below is the modified dispatch(EventTypeEnum eventType, Subject object, Object…args) method

public void dispatch(EventTypeEnum event, Subject object, Object...args)
{
	Subject copy = ObjectCloner.deepCopy(object);
	for(EventManager manager: this.eventManagers)
	{
		manager.executeEvent(event,copy);
	}
}

At line 3, the Subject object is copied and passed to the manager instead of the original.

Notice: it is strongly recommended not to use JPA entities as Subject object. Indeed making a deep copy of an JPA entity with its proxy and maybe its un-initialized collections is clearly not a good idea. It is better to pass the technical ID of the object as parameter and reload entirely the object in the new thread.

 

You have probably noticed that existing methods now have an additional Object … args vararg. These extra arguments are usefull to pass extra parameters for the processing. It also serves to pass technical ID of JPA entities to be reloaded in the new thread, if any.

Now let’s look at the new executeEvent(EventTypeEnum event, Subject object, Object…args) method body.

public class AsyncTaskManager {

    private ThreadPoolTaskExecutor taskExecutor;

    private MyService myService;

    @Override
    public void executeEvent(EventTypeEnum event, Subject object, Object... args) {
        switch (event) {
            case SAVE:
                this.saveSubject(object,args);
                break;
            case UPDATE,DELETE:
                // Do nothing
                break;
            default:
                logger.error("Unknow EventTypeEnum of type " + event.name());
        }
    }

 private void executeEvent(Subject object, Object... args) {

	this.taskExecutor.execute(new Runnable() {

		@Override
		public void run() {
			try {
				AsyncTaskManager.this.myService.save(object);
			} catch (Exception ex) {
				logger.error(ex.getMessage(), ex);
			}
		}
	});

At line 23, we create a new Runnable object and pass it to the thread executor from the thread pool. At line 28, please notice the special reference to the enclosing class (AsyncTaskManager.this) to access the MyService service.

 
 

1 Comment

  1. Anonymous

    Hi Duyhai, in dispatcher diagram “composition” diamond should be in the “Dispatcher” side I guess.

    Reply

Leave a Reply to Anonymous Cancel reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.