Concurrency patterns - Active Object

Active Object pattern decouples method execution from its invocation. Think asynchronous method invocation, callbacks etc. To avoid race conditions, incoming client requests are queued and handled by a scheduler. The scheduler picks a queued object and makes it run its logic. It is object's responsibility to know what to do when it gets invoked, hence the Active Object.


An important aspect is that each of the invoked objects resides in its own thread of control. The scheduler guarantees a serialised access to the queued tasks.

Key Components



Example Implementation

A simple counter implementing a sub-set of the AtomicLong. The counter keeps its internal state which is then a subject to race conditions:
 public class ThreadSafeCounter implements Counter {  
   private long value;  
The challenge is to ensure the counter consistently yields the correct results, even when many threads access and modify counter's intrinsic value. represents a naive implementation which fails to handle concurrent access. The failure is proved by a multi-threaded test:
 public class ThreadUnsafeCounterMultiThreadedTest {  
   Counter counter = new ThreadUnsafeCounter(INITIAL_VALUE);  
   // Note that a test failure is expected  
   @Test(expected = AssertionError.class)  
   public void incrementAndGet() {  
     assertEquals(getExpectedIncrementedValue(), counter.get());  
On the other hand relies on the Active Object pattern when handling concurrent requests:
 public class ThreadSafeCounter implements Counter {  
   // The internal state, subject to race conditions.  
   private long value;  
   // Activation List: incoming requests (tasks)   
   // are put into a queue  
   private BlockingQueue<Callable<Long>> taskQueue =   
                 new LinkedBlockingQueue<>();  
   // Callback: provides access to the calculated results   
   // (incrementAndGet, etc.)  
   private BlockingQueue<Long> resultQueue =   
                 new LinkedBlockingQueue<>();  
   // Scheduler: a dedicated thread created and started   
   // when the counter is instantiated  
   public ThreadSafeCounter(long value) {  
     new Thread(new Runnable() {  
       public void run() {  
         while (true) {  
           // Constantly watching for incoming requests  
   // Proxy: allows the clients to submit new tasks  
   private long enqueueTask(Callable<Long> task) {..}  
The implementation offloads the actual task scheduling to the Executor framework. The execution results are handled asynchronously via futures. For simplicity, I chose to block the clients until the results become available. Still in the
   // This is the actual task scheduler. It only allows for a single task at a time.  
   ExecutorService executorService = Executors.newSingleThreadExecutor();  
   // At some point in the future the counter's new value will be available  
   Future<Long> future = executorService.submit(taskQueue.take());  
   // Meanwhile, the client is blocked until the result is ready  
   while (true) {  
     Long result = resultQueue.poll(500, TimeUnit.MILLISECONDS);  
     if (result != null) break;  

Source Code
