Programming Java threads in the real world, Part 9

More threads in an object-oriented world: Synchronous dispatchers, active objects, detangling console I/O

This month I'm picking up the architectural theme from the May 1999 Java Toolbox column, with a look at two additional architectural solutions to thread-synchronization problems. You'll remember from last month that object-oriented systems are designed in terms of synchronous and asynchronous messages, not in terms of threads of execution. (If you don't remember that, you should read the previous article.) The two strategies I presented for implementing asynchronous messages (the one-thread-per-message and thread-pool techniques) work just fine in many applications, but since multiple threads are running concurrently -- perhaps accessing the same object -- you still have to worry about interthread synchronization. This month I'll look at two additional approaches that can all but eliminate the need for synchronization between messages.

Synchronous dispatching

A synchronous dispatcher, or round-robin scheduler, solves the synchronization problem by simulating multithreading within a single Java thread. Let's start out by considering two tasks that need to be executed in parallel:

Figure 1. Two tasks to be executed in parallel

Each of these tasks naturally divides into four chunks, and each could be executed on its own thread. Let's also imagine that the four chunks have to be atomic (they cannot tolerate interruption while they're executing), but that it's not a problem if the task is preempted between chunks. The only way to get this atomicity in a normal preemptive multitasking environment is to synchronize the operations, with all the concomitant overhead and complexity.

To move from a single task divided into chunks to a synchronous dispatcher, imagine that we can break up the single task into four independent tasks, as illustrated in Figure 2.

Figure 2. A single task broken out into four independent tasks

It isn't too hard to imagine how we could implement the chunks; you could define each chunk as the run() method of a Runnable object, for example, put the objects into an array, and then write a simple scheduler to execute the objects one at a time with a sleep() or yield() between chunks:

    Runnable[] first_task = new Runnable[]
    {
        new Runnable(){ public void run(){  /* do chunk 1 here */ } },
        new Runnable(){ public void run(){  /* do chunk 2 here */ } },
        new Runnable(){ public void run(){  /* do chunk 3 here */ } },
        new Runnable(){ public void run(){  /* do chunk 4 here */ } },
    };
    for( int i = 0; i < first_task.length; ++i )
    {   first_task[i].run();
        Thread.getCurrentThread().yield();
    }

Doug Schmidt coined the term Reactor for this design pattern (see Resources). The Reactor pattern emerged from Schmidt's work on the ACE framework (see Resources), as a way to accumulate various operations that should occur when a given event is triggered. The event handler then executes the for loop. The effect is essentially the same as several threads waiting on a single condition variable that's set true by the event. However, unlike a condition variable, you have control over both the sequence of execution and the moment at which you give up that control.

Now let's imagine that I want to execute two complex tasks concurrently, each broken up into chunks. I do this simply by interleaving the two tasks, as illustrated in Figure 3.

Figure 3. Two tasks interleaved

I can implement this strategy as follows:

    Runnable[] first_task = new Runnable[]
    {
        new Runnable(){ public void run(){  /* do task 1, chunk 1 here */ } },
        new Runnable(){ public void run(){  /* do task 1, chunk 2 here */ } },
        new Runnable(){ public void run(){  /* do task 1, chunk 3 here */ } },
        new Runnable(){ public void run(){  /* do task 1, chunk 4 here */ } },
    };
    Runnable[] second_task = new Runnable[]
    {
        new Runnable(){ public void run(){  /* do task 2, chunk 1 here */ } },
        new Runnable(){ public void run(){  /* do task 2, chunk 2 here */ } },
        new Runnable(){ public void run(){  /* do task 2, chunk 3 here */ } },
        new Runnable(){ public void run(){  /* do task 2, chunk 4 here */ } },
    };
    for( int i = 0; i < first_task.length; ++i )
    {   first_task[i].run();
        second_task[i].run();
        Thread.getCurrentThread().yield();
    }

The foregoing example isn't a particularly good general strategy because the loop has to know individually about each array, but I can also merge the two arrays into one large array before entering the loop:

    Runnable[] both_tasks = new Runnable[]
    {
        new Runnable(){ public void run(){  /* do task 1, chunk 1 here */ } },
        new Runnable(){ public void run(){  /* do task 2, chunk 1 here */ } },
        new Runnable(){ public void run(){  /* do task 1, chunk 2 here */ } },
        new Runnable(){ public void run(){  /* do task 2, chunk 2 here */ } },
        new Runnable(){ public void run(){  /* do task 1, chunk 3 here */ } },
        new Runnable(){ public void run(){  /* do task 2, chunk 3 here */ } },
        new Runnable(){ public void run(){  /* do task 1, chunk 4 here */ } },
        new Runnable(){ public void run(){  /* do task 2, chunk 4 here */ } },
    };
    for( int i = 0; i < first_task.length; ++i )
    {   both_tasks[i].run();
        Thread.getCurrentThread().yield();
    }

Note that I've interleaved the operations: task 1, chunk 1 runs before task 2, chunk 1; then task 2, chunk 2 runs, and so forth. What's going on here, at least with respect to the two tasks, is a lot like what you'd see in a preemptive scheduler on a concurrent system (one CPU) in which each task was executing on its own thread. The big difference is that each chunk has control over when it gets "preempted." You don't lose control until you return from the run() method that implements the chunk. In fact, this behavior is a lot like a multithreading cooperative system (in which you don't give up control until you explicitly yield to another thread). Because of the explicit yielding of control, you don't have to synchronize any of the fields used by either task, provided that you divide up the chunks carefully. You can always leave any referenced objects in a stable state when you give up control.

Synchronous dispatching is surprisingly useful, especially when speed is of the essence. All the chunks are executing on a single OS-level thread. There is no synchronization overhead at all, and no expensive context swaps into the OS kernel. The main downside is that it's sometimes tricky to break up a single task into neat chunks.

I've solved the problem of implementing a synchronous dispatcher with the Synchronous_dispatcher class in Listing 1. You can use the dispatcher in two ways. First of all, tasks can be added independently and then executed. The following code prints the string "hello world" three times.

    Synchronous_dispatcher dispatcher = new Synchronous_dispatcher();
    dispatcher.add_handler( new Runnable()
                           {   public void run(){ System.out.print("hello "); }
                            }
                          );
    dispatcher.add_handler( new Runnable()
                           {   public void run(){ System.out.print("world\n"); }
                            } 
                          );
    dispatcher.dispatch( 3 );

You can add a time delay between each chunk as follows:

    dispatcher.metered_dispatch( 2, 1000 ); // one second delay between chunks

This call prints the following:

hello <pause> world <pause> hello <pause> world

A second version of add_handler() is provided to add an entire array of Runnable chunks to the dispatcher. The elements of the array are distributed as evenly as possible among the previously added chunks. For example, the following code prints "Hello (Bonjour) world (monde)":

    Runnable[] first_task =
    {   new Runnable(){ public void run(){ System.out.print("Hello"); }},
        new Runnable(){ public void run(){ System.out.print(" world");}}
    };
    Runnable[] second_task =
    {   new Runnable(){ public void run(){ System.out.print(" (Bonjour)");}},
        new Runnable(){ public void run(){ System.out.print(" (monde)"  );}}
    };
    dispatcher = new Synchronous_dispatcher();
    dispatcher.add_handler( first_task  );
    dispatcher.add_handler( second_task );
    dispatcher.dispatch( 1 );

The chunks of a task can share data, if necessary, through the instance variables of the class that contains the array definitions.

Of course, several dispatchers can each run on their own threads, in which case you have a situation much like Sun's "green thread" model, where cooperative and preemptive threads both share the same process.

Looking at the implementation, the individual Runnable chunks are kept in the events LinkedList (Listing 1, line 9). The one-element version of add_handler(Runnable) (Listing 1, line 12) just tacks the new element onto the end of the list. The array version add_handler(Runnable[]) (Listing 1, line 17) is trickier to implement than you might expect. This is because the incoming array could be larger than the existing list (in which case you want the existing list elements dispersed evenly between array elements), or smaller than the existing list (in which case you want the incoming array elements to be dispersed evenly throughout the existing list). I've chosen to make matters easier by first converting the list to an array so that I can work with two similar data structures. I then rebuild the linked list, first copying one or more elements from the larger array into the list, then copying one element from the smaller array, then one or more elements from the larger array, and so forth. An earlier version of this method did the same thing without first doing the list-to-array conversion, but it was both considerably more complex and slower then the current version.

The problem of dispatching is essentially the same problem that I discussed in the context of Observer notifications (in the March 1999 column). I don't want to synchronize dispatch(...) (Listing 1, line 56) because I don't want to disallow the addition of new operations while dispatching is in progress. Here, I've taken the easy way out and copied the list inside the synchronized statement (Listing 1, line 67). A multicaster-based solution, as discussed in the March column, could also work.

The metered_dispatch(...) (Listing 1, line 79) variant on dispatch just uses an Alarm (discussed in February 1999) to dispatch events at a fixed interval.

Listing 1: /src/com/holub/asynch/Synchronous_dispatcher.java
001  
002  
003  
004  
005  
package com.holub.asynch;
.
import java.util.*;
import com.holub.asynch.Alarm;
/***********************************************************************
 |    

A synchronous notification dispatcher executes a sequence of operations sequentially. This allows two sets of linked operations to be interspersed and effectively executed in parallel, but without using multiple threads. This class is built on the JDK 1.2x LinkedList class, which must be present in the system.

(c) 1999, Allen I. Holub.
This code may not be distributed by yourself except in binary form, incorporated into a Java .class file. You may use this code freely for personal purposes, but you may not incorporate it into any commercial product without my express permission in writing.

@author Allen

I. Holub

 */
006  
007  
008  
009  
010  
public class Synchronous_dispatcher
{
private LinkedList events = new LinkedList();   // list of Runnable objects
    /**
     |    Add a new handler to the end of the current list of subscribers.
     */
011  
012  
013  
014  
015  
public synchronized void add_handler( Runnable handler )
    {   events.add( handler );
    }
    /**
     |    Add several listeners to the dispatcher, distributing them as evenly as possible with respect to the current list.
     */
016  
017  
018  
019  
020  
021  
022  
023  
024  
025  
026  
027  
028  
029  
030  
031  
032  
033  
034  
035  
036  
037  
038  
039  
040  
041  
042  
043  
044  
045  
046  
047  
048  
049  
public synchronized void add_handler( Runnable[] handlers )
    {
        if( events.size() == 0 )
        {   for( int i=0; i < handlers.length; )
                events.add( handlers[i++] );
        }
        else
        {   Object[] larger  = events.toArray();
            Object[] smaller = handlers;
            if( larger.length < smaller.length )
            {   Object[] tmp = larger;
                larger  = smaller;
                smaller = tmp;
            }
            int distribution = larger.length / smaller.length;
            LinkedList new_list = new LinkedList();
            int large_source = 0;
            int small_source = 0;
            while( small_source < smaller.length ) 
            {   for( int skip = 0; skip < distribution; ++skip )
                    new_list.add( larger[large_source++] );
                new_list.add( smaller[small_source++] );
            }
            events = new_list;
        }
    }
    /*******************************************************************
     |    Remove all handlers from the current dispatcher.
     */
1 2 3 4 Page
Recommended
Join the discussion
Be the first to comment on this article. Our Commenting Policies
See more