Programming Java threads in the real world, Part 9

More threads in an object-oriented world: Synchronous dispatchers, active objects, detangling console I/O

1 2 3 4 Page 2
Page 2 of 4

001 002 package com.holub.asynch; 003 import java.util.*; 004 import com.holub.asynch.Alarm; 005

/************************************************** A synchronous notification dispatcher executes a sequence of operations sequentially. This allows two sets of linked operations to be interspersed and effectively executed in parallel, but without using multiple threads. This class is built on the JDK 1.2x LinkedList class, which must be present in the system.

(c) 1999, Allen I. Holub. This code may not be distributed by yourself except in binary form, incorporated into a Java .class file. You may use this code freely for personal purposes, but you may not incorporate it into any commercial product without my express permission in writing.

@author Allen I. Holub */

006 007 public class Synchronous_dispatcher 008 { 009 private LinkedList events = new LinkedList(); // list of Runnable objects 010

/** Add a new handler to the end of the current list of subscribers. */

011 012 public synchronized void add_handler( Runnable handler ) 013 { events.add( handler ); 014 } 015

/** Add several listeners to the dispatcher, distributing them as evenly as possible with respect to the current list. */

016 017 public synchronized void add_handler( Runnable[] handlers ) 018 { 019 if( events.size() == 0 ) 020 { for( int i=0; i < handlers.length; ) 021 events.add( handlers[i++] ); 022 } 023 else 024 { Object[] larger = events.toArray(); 025 Object[] smaller = handlers; 026 027 if( larger.length < smaller.length ) 028 { Object[] tmp = larger; 029 larger = smaller; 030 smaller = tmp; 031 } 032 033 int distribution = larger.length / smaller.length; 034 035 LinkedList new_list = new LinkedList(); 036 037 int large_source = 0; 038 int small_source = 0; 039 040 while( small_source < smaller.length ) 041 { for( int skip = 0; skip < distribution; ++skip ) 042 new_list.add( larger[large_source++] ); 043 new_list.add( smaller[small_source++] ); 044 } 045 046 events = new_list; 047 } 048 } 049

/******************************************************************* Remove all handlers from the current dispatcher. */

050 051 public synchronized void remove_all_handlers() 052 { events.clear(); 053 } 054

/** Dispatch the actions "iterations" times. Use -1 for "forever." This function is not synchronized so that the list of events can be modified while the dispatcher is running. The method makes a clone of the event list and then executes from the clone on each iteration through the list of subscribers. Events added to the list will be executed starting with the next iteration. */

055 056 public void dispatch( int iterations ) 057 { 058 // Dispatch operations. A simple copy-and-dispatch-from-copy 059 // strategy is used, here. Eventually, I'll replace this code 060 // with a <code>Multicaster</code>. 061 062 063 if( events.size() > 0 ) 064 while( iterations==-1 || --iterations >= 0 ) 065 { 066 Object[] snapshot; 067 synchronized( this ) 068 { snapshot = events.toArray(); 069 } 070 071 for( int i = 0; i < snapshot.length; ++i ) 072 { ((Runnable)snapshot[i]).run(); 073 Thread.currentThread().yield(); 074 } 075 } 076 } 077

/** Dispatch actions "iterations" number of times, with an action dispatched every "interval" milliseconds. Note that the last action executed takes up the entire time slot, even if the run() function itself doesn't take "interval" milliseconds to execute. Also note that the timing will be irregular if any run() method executes in more than "interval" milliseconds. If you want a time interval between iterations, but not between the operations performed in a single iteration, just insert a Runnable action that sleeps for a fixed number of milliseconds.

@param iterations` Number of times to loop through the actions executing them. Use -1 to mean "forever."

#param interval An action is executed every "interval" milliseconds. */

078 079

public void metered_dispatch( int iterations, int interval )

080 { 081 Alarm timer = new Alarm( interval, Alarm.MULTI_SHOT ); 082 timer.start(); 083 084 while( iterations==-1 || --iterations >= 0 ) 085 { 086 Object[] snapshot; 087 synchronized( this ) 088 { snapshot = events.toArray(); 089 } 090 091 for( int i = 0; i < snapshot.length; ++i ) 092 { ((Runnable)snapshot[i]).run(); 093 timer.await(); 094 timer.start(); 095 } 096 } 097 098 timer.stop(); 099 } 100 101

static public class Test

102 { 103 // Execute the test with: 104 // java "com.holub.asynch.Synchronous_dispatcher\$Test" 105 // 106 107

public static void main( String[] args )

108 { 109 Synchronous_dispatcher dispatcher = 110 new Synchronous_dispatcher(); 111 112 dispatcher.add_handler( 113 new Runnable() 114

{ public void run()

115 { System.out.print("hello"); 116 } 117 } 118 ); 119 120 dispatcher.add_handler( 121 new Runnable() 122

{ public void run()

123 { System.out.println(" world"); 124 } 125 } 126 ); 127 128 dispatcher.dispatch( 1 ); 129 dispatcher.metered_dispatch( 2, 1000 ); 130 131 //------------------------------------------------ 132 // Test two tasks, passed to the dispatcher as arrays 133 // of chunks. Should print: 134 // Hello (Bonjour) world (monde) 135 136 Runnable[] first_task = 137 { new Runnable(){ public void run(){ System.out.print("Hello"); }}, 138 new Runnable(){ public void run(){ System.out.print(" World");}} 139 }; 140 141 Runnable[] second_task = 142 { new Runnable(){ public void run(){ System.out.print(" Bonjour");}}, 143 new Runnable(){ public void run(){ System.out.print(" Monde" );}} 144 }; 145 146 dispatcher = new Synchronous_dispatcher(); 147 dispatcher.add_handler( first_task ); 148 dispatcher.add_handler( second_task ); 149 dispatcher.dispatch( 1 ); 150 } 151 } 152 } 153

Active objects

The second architecture I'd like to discuss this month is the active object architecure. Though we can thank Greg Lavender and Doug Schmidt for the name (see Resources), the architecture has been around for a while -- the first time I saw it (dinosaur that I am) was in Intel's RMX operating system, circa 1979.

Though none of us will be programming for RMX, Intel's terminology is useful in describing the pattern: RMX (the Real-time Multitasking Executive) was an OS for embedded systems -- think of a multithreaded operating system that supports only a single process that remains resident in core. All threads share a single address space, and there is no virtual memory. Figure 4, below, shows the general architecture. RMX sees the world as a series of "tasks," not threads. Each task has an input queue. And each task effectively runs on a single thread. You activate a task by sending it an asynchronous message. You literally put a data structure onto the task's input queue. The task starts out idle, waiting for a request to be queued. It wakes up when a message arrives, performs the requested operation, then goes back to sleep. If messages arrive while the task is busy, they remain in the queue until the current operation is complete, and are dequeued and executed sequentially. Each message carries with it a return address -- a message queue to which the task posts the original message data structure (perhaps modified by the task to contain a completion status or output data) when it completes the requested operation.

Figure 4. The RMX system architecture

You use this system by creating a task for every desired operation. For example, you might create a "file I/O" task whose job was to access a single file. You'd send data to that file by queueing up a write request on that task's input queue. The I/O task would perform the operation, then it would send the data structure representing the write-request to the task whose input queue was listed in the message as the return address. A special "garbage collector" task was included solely to provide a place to send these reply messages to when the originating task wasn't interested in the reply. (The garbage collector simply freed any memory that arrived in its queue.) You would read the file by posting an empty read-request message to the file I/O task's input queue. The file I/O task would then fill that message with data and post it back to the included return address.

The main advantage of the active-object architecture is that the individual operations don't have to be synchronized since they're executing sequentially on the task object's thread. In fact, the only synchronization necessary in the entire system are in the "enqueue" and "dequeue" operations.

Of course, from an object-oriented-design perspective, a "task" is simply an object that can accept nothing but asynchronous messages. There's no requirement that asynchronous requests must execute in parallel. From the perspective of the object that sends the message, as long as the request returns immediately, it's okay for the receiving object to perform the operation whenever it gets around to it.

A general solution

I've adapted the idea of the active object to my own ends, with a general solution to the dispatch problem. Figure 5, below, shows how my implementation works. The main difference from RMX is that, rather than queuing up passive data structures that contain an ID identifying the requested operation, I'm queuing up Runnable objects that actually do the operation. This way, a given active object can handle many requests without knowing what those requests actually do. This notion of passing in an object that encapsulates a request is an example of the Command design pattern (see Resources).

Figure 5. Active objects in Java

The implementation of an active object dispatcher (Listing 2) isn't much more complicated than the picture above. Most of Listing 2, as you will see, is taken up with comments.

Create and start up a dispatcher like this:

    Active_object dispatcher = new Active_object();

Ask the active object to do something for you like this:

    (   new Runnable()
       {   public void run()
            {   System.out.println("hello world");

When you're done with the dispatcher, close it:


Thereafter, any attempts to dispatch() a new request will be rejected with an exception toss, though the requests that are waiting to be serviced will be executed.

Looking at the implementation, the input queue in Figure 5 is an instance of the Blocking_queue class (discussed last month) declared as requests on Listing 2, line 8. The dispatch() Listing 2, line 29 and close() Listing 2, line 34 methods are simple wrappers around the equivalent Blocking_queue methods. The enqueued "request" is just a Runnable object, whose run() method is executed on the active object's thread.

Note that I had to add a method to last month's Blocking_queue class to make the close() function work elegantly without having to synchronize. (The source code in the "Articles" section of my Web site has been patched to include the new method. See my bio at the end of this article for a link to my site.) Once a Blocking_queue is closed by calling enqueue_final_item(), any further attempts to enqueue new items are rejected with an exception toss. Moreover, the queue is closed automatically when the final item is dequeued.

The only other method in the Active_object class is run() (Listing 2, line 13). (An Active_object class extends Thread, so it implements run() directly.) The run() method sits in a tight loop, dequeueing and executing requests as they come in, and most of the time the run() will be blocked waiting to dequeue a request. The loop terminates when the null, enqueued by the close() method, is dequeued. In order to be a good citizen, I yield() after every request is executed to give other threads at my priority level a chance to run.

Notice that none of the methods of Active_object are synchronized. They don't need to be because, with the single exception of run(), all the methods of Active_object are simple wrappers around calls to the Blocking_queue, which is synchronized. Moreover, the Active_object itself is the only thread that dequeues from the queue, so extra synchronization on the dequeue operation is also unnecessary.

Listing 2: /src/com/holub/asynch/

001 002 package com.holub.asynch; 003 import com.holub.asynch.Blocking_queue; 004

/*********************************************************************** You use a dispatcher to implement the message-queueing and dispatching part of an active object. Create and start up a dispatcher like this:

Active_object dispatcher = new Active_object(); dispatcher.start();

Ask the active object to do something for you like this:

dispatcher.dispatch ( new Runnable() { public void run() { System.out.println("hello world"); } } );

When you're done with the dispatcher, close it:


Variations on these themes are also possible. See the article text for more details.

(c) 1999, Allen I. Holub.

This code may not be distributed by yourself except in binary form, incorporated into a Java .class file. You may use this code freely for personal purposes, but you may not incorporate it into any commercial product without my express written permission.

@author Allen I. Holub */

005 006 public class Active_object extends Thread 007 { 008 private Blocking_queue requests = new Blocking_queue(); 009

/****************************************************************** Create an active object. The object is a Daemon thread that waits for dispatch() requests, then executes them in the order that they were enqueued. Since the thread is a daemon, it will not keep the process alive. */

010 011 public Active_object() { setDaemon( true ); } 012

/****************************************************************** Do not call this method. This method is public only because it's an override of a public method in the Thread base class. I'd rather it were private, but Java doesn't permit an override to have a more-restrictive access privilege than the template method in the base class. run() encapsulates the event loop. */

013 public void run() 014 { try 015 { Runnable the_request; 016 while( (the_request = (Runnable)(requests.dequeue())) != null ) 017 {; 018 the_request = null; 019 yield(); // give other threads a chance to run 020 } 021 } 022 catch( InterruptedException e ) 023 { // Thread was interrupted while blocked on a dequeue, 024 // Treat it as a close request and ignore it. 025 } 026 } 027

/***************************************************************** Cause an operation to be performed on the current active object's event-handler thread. Operations are executed serially in the order received.

@param operation A Runnable "command" object that encapsulates the operation to perform. If operation is null the active object will shut itself down when that request is dequeued (in its turn). Any operations posted after a null request is dispatched are not executed.

@throws Blocking_queue.Closed If you attempt to dispatch on a closed object. */

028 029 public final void dispatch( Runnable operation ) 030 { requests.enqueue( operation ); 031 } 032

/***************************************************************** Close the active object (render it unwilling to accept new "dispatch" requests). All pending requests are executed, but new ones are not accepted. This method returns immediately. Before the pending requests have been processed Active_object shuts down. You can block until the pending requests are handled by sending the Active_object object a join() message:

Active_object dispatcher = new Active_object(); //... dispatcher.close(); // cause it to reject new requests dispatcher.join(); // wait for existing request to be processed

You can also cause the Active_object to terminate gracefully (without blocking) by calling dispatch(null). Any requests that are "dispatched" after a dispatch(null) is issued are silently ignored, however.

Attempts to close a closed queue are silently ignored. **/

033 034

public final void close()

035 { requests.enqueue_final_item( null ); 036 } 037 }

Detangling console output

A more realistic example of using an Active_object is in the Console class in Listing 3. The main problem with console-based I/O in multithreaded applications is that the strings that are being printed by different threads tend to get mixed up with one another, particularly if these strings are being printed one piece at a time rather than as single, monolithic units. The active-object pattern can be used to solve this problem by creating a console task whose job is to write to the console. Rather than calling System.out.println() (or equivalent) directly, you send the output to the console task. The task keeps buffers for every thread that's using it, and flushes the buffer to the actual console when a newline is encountered. Since the console task is single threaded, strings created by separate threads won't be mixed together.

My implementation of the console task is in Listing 3. The Console class is a singleton, so you can't create one with new; use Console.out() to get a reference to the actual Console object. I've chosen to implement the Console as an extension of This way, you can wrap the single Console object in any of the standard wrappers (such as DataOutputStream) to add functionality to it. For example, use

    DataOutputStream data_writer = new DataOutputStream( Console.out() );


    PrintWriter writer = new PrintWriter( Console.out() );

Looking at the code, out() creates the object (Listing 3, line 24) using the double-check locking strategy discussed in the April 1999 column. I've also used the JDK_11_unloading_bug_fix class discussed in the same column. The interesting code -- at least for this month -- concerns the active object: dispatcher (Listing 3, line 16).

The only method that's doing real work is write(int) (Listing 3, line 47), which creates a Handler command object and passes it to the dispatcher for execution. The run() method of the Handler class (Listing 3, line 63) is called by the dispatcher, in its turn, to handle a single-character write request.

If the character isn't a newline, it's buffered up in a Vector that's associated with the current thread (it's the value component of a Map that uses a reference to the thread that issued the request as a key). Note that I can't call Thread.currentThread() in the Handler's run() method because I would get a reference to the active object's thread, not the thread that's issuing the write request. The current-thread reference is determined on line 49 when I dispatch the request. (The write() method runs on the thread that requests the write operation; runs on the active object's thread at some later time.)

1 2 3 4 Page 2
Page 2 of 4