Programming Java threads in the real world, Part 9

More threads in an object-oriented world: Synchronous dispatchers, active objects, detangling console I/O

This month I'm picking up the architectural theme from the May 1999 Java Toolbox column, with a look at two additional architectural solutions to thread-synchronization problems. You'll remember from last month that object-oriented systems are designed in terms of synchronous and asynchronous messages, not in terms of threads of execution. (If you don't remember that, you should read the previous article.) The two strategies I presented for implementing asynchronous messages (the one-thread-per-message and thread-pool techniques) work just fine in many applications, but since multiple threads are running concurrently -- perhaps accessing the same object -- you still have to worry about interthread synchronization. This month I'll look at two additional approaches that can all but eliminate the need for synchronization between messages.

Synchronous dispatching

A synchronous dispatcher, or round-robin scheduler, solves the synchronization problem by simulating multithreading within a single Java thread. Let's start out by considering two tasks that need to be executed in parallel:

Figure 1. Two tasks to be executed in parallel

Each of these tasks naturally divides into four chunks, and each could be executed on its own thread. Let's also imagine that the four chunks have to be atomic (they cannot tolerate interruption while they're executing), but that it's not a problem if the task is preempted between chunks. The only way to get this atomicity in a normal preemptive multitasking environment is to synchronize the operations, with all the concomitant overhead and complexity.

To move from a single task divided into chunks to a synchronous dispatcher, imagine that we can break up the single task into four independent tasks, as illustrated in Figure 2.

Figure 2. A single task broken out into four independent tasks

It isn't too hard to imagine how we could implement the chunks; you could define each chunk as the run() method of a Runnable object, for example, put the objects into an array, and then write a simple scheduler to execute the objects one at a time with a sleep() or yield() between chunks:

    Runnable[] first_task = new Runnable[]
        new Runnable(){ public void run(){  /* do chunk 1 here */ } },
        new Runnable(){ public void run(){  /* do chunk 2 here */ } },
        new Runnable(){ public void run(){  /* do chunk 3 here */ } },
        new Runnable(){ public void run(){  /* do chunk 4 here */ } },
    for( int i = 0; i < first_task.length; ++i )
    {   first_task[i].run();

Doug Schmidt coined the term Reactor for this design pattern (see Resources). The Reactor pattern emerged from Schmidt's work on the ACE framework (see Resources), as a way to accumulate various operations that should occur when a given event is triggered. The event handler then executes the for loop. The effect is essentially the same as several threads waiting on a single condition variable that's set true by the event. However, unlike a condition variable, you have control over both the sequence of execution and the moment at which you give up that control.

Now let's imagine that I want to execute two complex tasks concurrently, each broken up into chunks. I do this simply by interleaving the two tasks, as illustrated in Figure 3.

Figure 3. Two tasks interleaved

I can implement this strategy as follows:

    Runnable[] first_task = new Runnable[]
        new Runnable(){ public void run(){  /* do task 1, chunk 1 here */ } },
        new Runnable(){ public void run(){  /* do task 1, chunk 2 here */ } },
        new Runnable(){ public void run(){  /* do task 1, chunk 3 here */ } },
        new Runnable(){ public void run(){  /* do task 1, chunk 4 here */ } },
    Runnable[] second_task = new Runnable[]
        new Runnable(){ public void run(){  /* do task 2, chunk 1 here */ } },
        new Runnable(){ public void run(){  /* do task 2, chunk 2 here */ } },
        new Runnable(){ public void run(){  /* do task 2, chunk 3 here */ } },
        new Runnable(){ public void run(){  /* do task 2, chunk 4 here */ } },
    for( int i = 0; i < first_task.length; ++i )
    {   first_task[i].run();

The foregoing example isn't a particularly good general strategy because the loop has to know individually about each array, but I can also merge the two arrays into one large array before entering the loop:

    Runnable[] both_tasks = new Runnable[]
        new Runnable(){ public void run(){  /* do task 1, chunk 1 here */ } },
        new Runnable(){ public void run(){  /* do task 2, chunk 1 here */ } },
        new Runnable(){ public void run(){  /* do task 1, chunk 2 here */ } },
        new Runnable(){ public void run(){  /* do task 2, chunk 2 here */ } },
        new Runnable(){ public void run(){  /* do task 1, chunk 3 here */ } },
        new Runnable(){ public void run(){  /* do task 2, chunk 3 here */ } },
        new Runnable(){ public void run(){  /* do task 1, chunk 4 here */ } },
        new Runnable(){ public void run(){  /* do task 2, chunk 4 here */ } },
    for( int i = 0; i < first_task.length; ++i )
    {   both_tasks[i].run();

Note that I've interleaved the operations: task 1, chunk 1 runs before task 2, chunk 1; then task 2, chunk 2 runs, and so forth. What's going on here, at least with respect to the two tasks, is a lot like what you'd see in a preemptive scheduler on a concurrent system (one CPU) in which each task was executing on its own thread. The big difference is that each chunk has control over when it gets "preempted." You don't lose control until you return from the run() method that implements the chunk. In fact, this behavior is a lot like a multithreading cooperative system (in which you don't give up control until you explicitly yield to another thread). Because of the explicit yielding of control, you don't have to synchronize any of the fields used by either task, provided that you divide up the chunks carefully. You can always leave any referenced objects in a stable state when you give up control.

Synchronous dispatching is surprisingly useful, especially when speed is of the essence. All the chunks are executing on a single OS-level thread. There is no synchronization overhead at all, and no expensive context swaps into the OS kernel. The main downside is that it's sometimes tricky to break up a single task into neat chunks.

I've solved the problem of implementing a synchronous dispatcher with the Synchronous_dispatcher class in Listing 1. You can use the dispatcher in two ways. First of all, tasks can be added independently and then executed. The following code prints the string "hello world" three times.

    Synchronous_dispatcher dispatcher = new Synchronous_dispatcher();
    dispatcher.add_handler( new Runnable()
                           {   public void run(){ System.out.print("hello "); }
    dispatcher.add_handler( new Runnable()
                           {   public void run(){ System.out.print("world\n"); }
    dispatcher.dispatch( 3 );

You can add a time delay between each chunk as follows:

    dispatcher.metered_dispatch( 2, 1000 ); // one second delay between chunks

This call prints the following:

hello <pause> world <pause> hello <pause> world

A second version of add_handler() is provided to add an entire array of Runnable chunks to the dispatcher. The elements of the array are distributed as evenly as possible among the previously added chunks. For example, the following code prints "Hello (Bonjour) world (monde)":

    Runnable[] first_task =
    {   new Runnable(){ public void run(){ System.out.print("Hello"); }},
        new Runnable(){ public void run(){ System.out.print(" world");}}
    Runnable[] second_task =
    {   new Runnable(){ public void run(){ System.out.print(" (Bonjour)");}},
        new Runnable(){ public void run(){ System.out.print(" (monde)"  );}}
    dispatcher = new Synchronous_dispatcher();
    dispatcher.add_handler( first_task  );
    dispatcher.add_handler( second_task );
    dispatcher.dispatch( 1 );

The chunks of a task can share data, if necessary, through the instance variables of the class that contains the array definitions.

Of course, several dispatchers can each run on their own threads, in which case you have a situation much like Sun's "green thread" model, where cooperative and preemptive threads both share the same process.

Looking at the implementation, the individual Runnable chunks are kept in the events LinkedList (Listing 1, line 9). The one-element version of add_handler(Runnable) (Listing 1, line 12) just tacks the new element onto the end of the list. The array version add_handler(Runnable[]) (Listing 1, line 17) is trickier to implement than you might expect. This is because the incoming array could be larger than the existing list (in which case you want the existing list elements dispersed evenly between array elements), or smaller than the existing list (in which case you want the incoming array elements to be dispersed evenly throughout the existing list). I've chosen to make matters easier by first converting the list to an array so that I can work with two similar data structures. I then rebuild the linked list, first copying one or more elements from the larger array into the list, then copying one element from the smaller array, then one or more elements from the larger array, and so forth. An earlier version of this method did the same thing without first doing the list-to-array conversion, but it was both considerably more complex and slower then the current version.

The problem of dispatching is essentially the same problem that I discussed in the context of Observer notifications (in the March 1999 column). I don't want to synchronize dispatch(...) (Listing 1, line 56) because I don't want to disallow the addition of new operations while dispatching is in progress. Here, I've taken the easy way out and copied the list inside the synchronized statement (Listing 1, line 67). A multicaster-based solution, as discussed in the March column, could also work.

The metered_dispatch(...) (Listing 1, line 79) variant on dispatch just uses an Alarm (discussed in February 1999) to dispatch events at a fixed interval.

Listing 1: /src/com/holub/asynch/

001 002 package com.holub.asynch; 003 import java.util.*; 004 import com.holub.asynch.Alarm; 005

/************************************************** A synchronous notification dispatcher executes a sequence of operations sequentially. This allows two sets of linked operations to be interspersed and effectively executed in parallel, but without using multiple threads. This class is built on the JDK 1.2x LinkedList class, which must be present in the system.

(c) 1999, Allen I. Holub. This code may not be distributed by yourself except in binary form, incorporated into a Java .class file. You may use this code freely for personal purposes, but you may not incorporate it into any commercial product without my express permission in writing.

@author Allen I. Holub */

006 007 public class Synchronous_dispatcher 008 { 009 private LinkedList events = new LinkedList(); // list of Runnable objects 010

/** Add a new handler to the end of the current list of subscribers. */

011 012 public synchronized void add_handler( Runnable handler ) 013 { events.add( handler ); 014 } 015

/** Add several listeners to the dispatcher, distributing them as evenly as possible with respect to the current list. */

016 017 public synchronized void add_handler( Runnable[] handlers ) 018 { 019 if( events.size() == 0 ) 020 { for( int i=0; i < handlers.length; ) 021 events.add( handlers[i++] ); 022 } 023 else 024 { Object[] larger = events.toArray(); 025 Object[] smaller = handlers; 026 027 if( larger.length < smaller.length ) 028 { Object[] tmp = larger; 029 larger = smaller; 030 smaller = tmp; 031 } 032 033 int distribution = larger.length / smaller.length; 034 035 LinkedList new_list = new LinkedList(); 036 037 int large_source = 0; 038 int small_source = 0; 039 040 while( small_source < smaller.length ) 041 { for( int skip = 0; skip < distribution; ++skip ) 042 new_list.add( larger[large_source++] ); 043 new_list.add( smaller[small_source++] ); 044 } 045 046 events = new_list; 047 } 048 } 049

/******************************************************************* Remove all handlers from the current dispatcher. */

050 051 public synchronized void remove_all_handlers() 052 { events.clear(); 053 } 054

/** Dispatch the actions "iterations" times. Use -1 for "forever." This function is not synchronized so that the list of events can be modified while the dispatcher is running. The method makes a clone of the event list and then executes from the clone on each iteration through the list of subscribers. Events added to the list will be executed starting with the next iteration. */

055 056 public void dispatch( int iterations ) 057 { 058 // Dispatch operations. A simple copy-and-dispatch-from-copy 059 // strategy is used, here. Eventually, I'll replace this code 060 // with a <code>Multicaster</code>. 061 062 063 if( events.size() > 0 ) 064 while( iterations==-1 || --iterations >= 0 ) 065 { 066 Object[] snapshot; 067 synchronized( this ) 068 { snapshot = events.toArray(); 069 } 070 071 for( int i = 0; i < snapshot.length; ++i ) 072 { ((Runnable)snapshot[i]).run(); 073 Thread.currentThread().yield(); 074 } 075 } 076 } 077

/** Dispatch actions "iterations" number of times, with an action dispatched every "interval" milliseconds. Note that the last action executed takes up the entire time slot, even if the run() function itself doesn't take "interval" milliseconds to execute. Also note that the timing will be irregular if any run() method executes in more than "interval" milliseconds. If you want a time interval between iterations, but not between the operations performed in a single iteration, just insert a Runnable action that sleeps for a fixed number of milliseconds.

@param iterations` Number of times to loop through the actions executing them. Use -1 to mean "forever."

#param interval An action is executed every "interval" milliseconds. */

078 079

public void metered_dispatch( int iterations, int interval )

080 { 081 Alarm timer = new Alarm( interval, Alarm.MULTI_SHOT ); 082 timer.start(); 083 084 while( iterations==-1 || --iterations >= 0 ) 085 { 086 Object[] snapshot; 087 synchronized( this ) 088 { snapshot = events.toArray(); 089 } 090 091 for( int i = 0; i < snapshot.length; ++i ) 092 { ((Runnable)snapshot[i]).run(); 093 timer.await(); 094 timer.start(); 095 } 096 } 097 098 timer.stop(); 099 } 100 101

static public class Test

102 { 103 // Execute the test with: 104 // java "com.holub.asynch.Synchronous_dispatcher\$Test" 105 // 106 107

public static void main( String[] args )

108 { 109 Synchronous_dispatcher dispatcher = 110 new Synchronous_dispatcher(); 111 112 dispatcher.add_handler( 113 new Runnable() 114

{ public void run()

115 { System.out.print("hello"); 116 } 117 } 118 ); 119 120 dispatcher.add_handler( 121 new Runnable() 122

{ public void run()

123 { System.out.println(" world"); 124 } 125 } 126 ); 127 128 dispatcher.dispatch( 1 ); 129 dispatcher.metered_dispatch( 2, 1000 ); 130 131 //------------------------------------------------ 132 // Test two tasks, passed to the dispatcher as arrays 133 // of chunks. Should print: 134 // Hello (Bonjour) world (monde) 135 136 Runnable[] first_task = 137 { new Runnable(){ public void run(){ System.out.print("Hello"); }}, 138 new Runnable(){ public void run(){ System.out.print(" World");}} 139 }; 140 141 Runnable[] second_task = 142 { new Runnable(){ public void run(){ System.out.print(" Bonjour");}}, 143 new Runnable(){ public void run(){ System.out.print(" Monde" );}} 144 }; 145 146 dispatcher = new Synchronous_dispatcher(); 147 dispatcher.add_handler( first_task ); 148 dispatcher.add_handler( second_task ); 149 dispatcher.dispatch( 1 ); 150 } 151 } 152 } 153

Active objects

The second architecture I'd like to discuss this month is the active object architecure. Though we can thank Greg Lavender and Doug Schmidt for the name (see Resources), the architecture has been around for a while -- the first time I saw it (dinosaur that I am) was in Intel's RMX operating system, circa 1979.

Though none of us will be programming for RMX, Intel's terminology is useful in describing the pattern: RMX (the Real-time Multitasking Executive) was an OS for embedded systems -- think of a multithreaded operating system that supports only a single process that remains resident in core. All threads share a single address space, and there is no virtual memory. Figure 4, below, shows the general architecture. RMX sees the world as a series of "tasks," not threads. Each task has an input queue. And each task effectively runs on a single thread. You activate a task by sending it an asynchronous message. You literally put a data structure onto the task's input queue. The task starts out idle, waiting for a request to be queued. It wakes up when a message arrives, performs the requested operation, then goes back to sleep. If messages arrive while the task is busy, they remain in the queue until the current operation is complete, and are dequeued and executed sequentially. Each message carries with it a return address -- a message queue to which the task posts the original message data structure (perhaps modified by the task to contain a completion status or output data) when it completes the requested operation.

Figure 4. The RMX system architecture

You use this system by creating a task for every desired operation. For example, you might create a "file I/O" task whose job was to access a single file. You'd send data to that file by queueing up a write request on that task's input queue. The I/O task would perform the operation, then it would send the data structure representing the write-request to the task whose input queue was listed in the message as the return address. A special "garbage collector" task was included solely to provide a place to send these reply messages to when the originating task wasn't interested in the reply. (The garbage collector simply freed any memory that arrived in its queue.) You would read the file by posting an empty read-request message to the file I/O task's input queue. The file I/O task would then fill that message with data and post it back to the included return address.

The main advantage of the active-object architecture is that the individual operations don't have to be synchronized since they're executing sequentially on the task object's thread. In fact, the only synchronization necessary in the entire system are in the "enqueue" and "dequeue" operations.

Of course, from an object-oriented-design perspective, a "task" is simply an object that can accept nothing but asynchronous messages. There's no requirement that asynchronous requests must execute in parallel. From the perspective of the object that sends the message, as long as the request returns immediately, it's okay for the receiving object to perform the operation whenever it gets around to it.

A general solution

I've adapted the idea of the active object to my own ends, with a general solution to the dispatch problem. Figure 5, below, shows how my implementation works. The main difference from RMX is that, rather than queuing up passive data structures that contain an ID identifying the requested operation, I'm queuing up Runnable objects that actually do the operation. This way, a given active object can handle many requests without knowing what those requests actually do. This notion of passing in an object that encapsulates a request is an example of the Command design pattern (see Resources).

Figure 5. Active objects in Java

The implementation of an active object dispatcher (Listing 2) isn't much more complicated than the picture above. Most of Listing 2, as you will see, is taken up with comments.

Create and start up a dispatcher like this:

    Active_object dispatcher = new Active_object();

Ask the active object to do something for you like this:

    (   new Runnable()
       {   public void run()
            {   System.out.println("hello world");

When you're done with the dispatcher, close it:


Thereafter, any attempts to dispatch() a new request will be rejected with an exception toss, though the requests that are waiting to be serviced will be executed.

Looking at the implementation, the input queue in Figure 5 is an instance of the Blocking_queue class (discussed last month) declared as requests on Listing 2, line 8. The dispatch() Listing 2, line 29 and close() Listing 2, line 34 methods are simple wrappers around the equivalent Blocking_queue methods. The enqueued "request" is just a Runnable object, whose run() method is executed on the active object's thread.

Note that I had to add a method to last month's Blocking_queue class to make the close() function work elegantly without having to synchronize. (The source code in the "Articles" section of my Web site has been patched to include the new method. See my bio at the end of this article for a link to my site.) Once a Blocking_queue is closed by calling enqueue_final_item(), any further attempts to enqueue new items are rejected with an exception toss. Moreover, the queue is closed automatically when the final item is dequeued.

The only other method in the Active_object class is run() (Listing 2, line 13). (An Active_object class extends Thread, so it implements run() directly.) The run() method sits in a tight loop, dequeueing and executing requests as they come in, and most of the time the run() will be blocked waiting to dequeue a request. The loop terminates when the null, enqueued by the close() method, is dequeued. In order to be a good citizen, I yield() after every request is executed to give other threads at my priority level a chance to run.

Notice that none of the methods of Active_object are synchronized. They don't need to be because, with the single exception of run(), all the methods of Active_object are simple wrappers around calls to the Blocking_queue, which is synchronized. Moreover, the Active_object itself is the only thread that dequeues from the queue, so extra synchronization on the dequeue operation is also unnecessary.

Listing 2: /src/com/holub/asynch/

001 002 package com.holub.asynch; 003 import com.holub.asynch.Blocking_queue; 004

/*********************************************************************** You use a dispatcher to implement the message-queueing and dispatching part of an active object. Create and start up a dispatcher like this:

Active_object dispatcher = new Active_object(); dispatcher.start();

Ask the active object to do something for you like this:

dispatcher.dispatch ( new Runnable() { public void run() { System.out.println("hello world"); } } );

When you're done with the dispatcher, close it:


Variations on these themes are also possible. See the article text for more details.

(c) 1999, Allen I. Holub.

This code may not be distributed by yourself except in binary form, incorporated into a Java .class file. You may use this code freely for personal purposes, but you may not incorporate it into any commercial product without my express written permission.

@author Allen I. Holub */

005 006 public class Active_object extends Thread 007 { 008 private Blocking_queue requests = new Blocking_queue(); 009

/****************************************************************** Create an active object. The object is a Daemon thread that waits for dispatch() requests, then executes them in the order that they were enqueued. Since the thread is a daemon, it will not keep the process alive. */

010 011 public Active_object() { setDaemon( true ); } 012

/****************************************************************** Do not call this method. This method is public only because it's an override of a public method in the Thread base class. I'd rather it were private, but Java doesn't permit an override to have a more-restrictive access privilege than the template method in the base class. run() encapsulates the event loop. */

013 public void run() 014 { try 015 { Runnable the_request; 016 while( (the_request = (Runnable)(requests.dequeue())) != null ) 017 {; 018 the_request = null; 019 yield(); // give other threads a chance to run 020 } 021 } 022 catch( InterruptedException e ) 023 { // Thread was interrupted while blocked on a dequeue, 024 // Treat it as a close request and ignore it. 025 } 026 } 027

/***************************************************************** Cause an operation to be performed on the current active object's event-handler thread. Operations are executed serially in the order received.

@param operation A Runnable "command" object that encapsulates the operation to perform. If operation is null the active object will shut itself down when that request is dequeued (in its turn). Any operations posted after a null request is dispatched are not executed.

@throws Blocking_queue.Closed If you attempt to dispatch on a closed object. */

028 029 public final void dispatch( Runnable operation ) 030 { requests.enqueue( operation ); 031 } 032

/***************************************************************** Close the active object (render it unwilling to accept new "dispatch" requests). All pending requests are executed, but new ones are not accepted. This method returns immediately. Before the pending requests have been processed Active_object shuts down. You can block until the pending requests are handled by sending the Active_object object a join() message:

Active_object dispatcher = new Active_object(); //... dispatcher.close(); // cause it to reject new requests dispatcher.join(); // wait for existing request to be processed

You can also cause the Active_object to terminate gracefully (without blocking) by calling dispatch(null). Any requests that are "dispatched" after a dispatch(null) is issued are silently ignored, however.

Attempts to close a closed queue are silently ignored. **/

033 034

public final void close()

035 { requests.enqueue_final_item( null ); 036 } 037 }

Detangling console output

A more realistic example of using an Active_object is in the Console class in Listing 3. The main problem with console-based I/O in multithreaded applications is that the strings that are being printed by different threads tend to get mixed up with one another, particularly if these strings are being printed one piece at a time rather than as single, monolithic units. The active-object pattern can be used to solve this problem by creating a console task whose job is to write to the console. Rather than calling System.out.println() (or equivalent) directly, you send the output to the console task. The task keeps buffers for every thread that's using it, and flushes the buffer to the actual console when a newline is encountered. Since the console task is single threaded, strings created by separate threads won't be mixed together.

My implementation of the console task is in Listing 3. The Console class is a singleton, so you can't create one with new; use Console.out() to get a reference to the actual Console object. I've chosen to implement the Console as an extension of This way, you can wrap the single Console object in any of the standard wrappers (such as DataOutputStream) to add functionality to it. For example, use

    DataOutputStream data_writer = new DataOutputStream( Console.out() );


    PrintWriter writer = new PrintWriter( Console.out() );

Looking at the code, out() creates the object (Listing 3, line 24) using the double-check locking strategy discussed in the April 1999 column. I've also used the JDK_11_unloading_bug_fix class discussed in the same column. The interesting code -- at least for this month -- concerns the active object: dispatcher (Listing 3, line 16).

The only method that's doing real work is write(int) (Listing 3, line 47), which creates a Handler command object and passes it to the dispatcher for execution. The run() method of the Handler class (Listing 3, line 63) is called by the dispatcher, in its turn, to handle a single-character write request.

If the character isn't a newline, it's buffered up in a Vector that's associated with the current thread (it's the value component of a Map that uses a reference to the thread that issued the request as a key). Note that I can't call Thread.currentThread() in the Handler's run() method because I would get a reference to the active object's thread, not the thread that's issuing the write request. The current-thread reference is determined on line 49 when I dispatch the request. (The write() method runs on the thread that requests the write operation; runs on the active object's thread at some later time.)

If the character passed to run through the Handler is a newline, the else clause on line 73 prints all the buffered characters to the console (along with the newline) and then destroys the buffer. That is, the users Map contains buffers only for those threads that are in the process of assembling a line. Once the line is flushed, the buffer is discarded.

I've also implemented the OutputStream's flush() (Listing 3, line 87) and close() (Listing 3, line 94) methods. Note that flush() flushes the partially assembled buffers for all threads to standard output.

The Test class (Listing 3, line 110) encapsulates a small test routine that creates two threads, each of which prints a message with random sleeps inserted between each character-write operation to make sure that the write requests won't be jumbled up.

Finally, a couple of style notes: First, nothing at all is synchronized (and nothing needs to be), because all write requests are executed serially on a single thread (the active object's event-loop thread). Second, whenever possible I've written code in terms of the most abstract class or interface available. For example, even though the list of actively writing threads is maintained in a HashMap, the actual reference (users [Listing 3, line 18]) is declared as a Map reference. This is just good object-oriented programming, but many Java programmers don't do it. By using the most abstract class possible, I can replace the HashMap with a different data structure (such as a TreeMap) at any time simply by changing the single new invocation on line 29. The rest of the code adapts automatically.

Listing 3: /src/com/holub/asynch/

001 002 package com.holub.asynch; 003 import*; 004 import java.util.*; 005 import com.holub.asynch.Active_object; 006 import com.holub.asynch.Mutex; 007 import com.holub.asynch.JDK_11_unloading_bug_fix; 008

/*********************************************************************** This file presents a console-output task that demonstrates how to use the Active_object class. The Console is an OutputStream that multiple threads can use to write to the console. Unlike a normal printstream(), the current class guarantees that lines print intact. (Characters from one line will not be inserted into another line.)

(c) 1999, Allen I. Holub. This code may not be distributed by yourself except in binary form, incorporated into a java .class file. You may use this code freely for personal purposes, but you may not incorporate it into any commercial product without my express written permission.

@author Allen I. Holub */

009 010 public class Console extends OutputStream 011 { 012 private Console() 013 { new JDK_11_unloading_bug_fix( Console.class ); 014 } 015 016 private static Active_object dispatcher = null; 017 private static Console the_console = null; 018 private static Map users = null; 019

/****************************************************************** A private constructor makes it impossible to create a Console using new. Use System.out() to get a reference to the Console. */

020 021 private Console(){} 022

/****************************************************************** The console is a "singleton" -- only one object is permitted to exist. The Console has a private constructor, so you cannot manufacture one with new. Get a reference to the one-and-only instance of the Console by calling Console.out().

@return a thread-safe OutputStream that you can wrap with any of the standard decorators. This output stream buffers characters on a per-thread basis until a newline, sent by that thread, is encountered. The Console object then sends the entire line to the standard output as a single unit. */

023 024 public static final Console out() 025 { if( the_console == null ) 026 { synchronized( OutputStream.class ) 027 { if( the_console == null ) 028 { the_console = new Console(); 029 users = new HashMap(); 030 dispatcher = new Active_object(); 031 dispatcher.start(); 032 } 033 } 034 } 035 return the_console; 036 } 037

/******************************************************************* Shut down the Console in an orderly way. The Console uses a daemon thread to do its work, so it's not essential to shut it down explicitly, but you can call shut_down() to kill the thread in situations where you know that no more output is expected. Any characters that have been buffered, but not yet sent to the console, will be lost. You can actually call out() after shut_down(), but it's inefficient to do so. */

038 039 public static void shut_down() 040 { dispatcher.close(); 041 dispatcher = null; 042 the_console = null; 043 users = null; 044 } 045

/******************************************************************* This method overrides the OutputStream write(int) function. Use the inherited functions for all other OutputStream functionality. For a given thread, no output at all is flushed until a newline is encountered. (This behavior is accomplished using a hashtable, indexed by thread object, that contains a buffer of accumulated characters.) The entire line is flushed as a unit when the newline is encountered. Once the Console is closed, (see close), any requests to write characters are silently ignored. */

046 047 public void write(final int character) throws IOException 048 { if( character != 0 ) 049 dispatcher.dispatch( new Handler(character, Thread.currentThread()) ); 050 } 051

/******************************************************************* This class defines the request object that's sent to the Active_object. All the real work is done here. */

052 053 private class Handler implements Runnable 054 { 055 private int character; 056 private Object key; 057 058 Handler( int character, Object key ) 059 { this.character = character; 060 this.key = key; 061 } 062 063 public void run() 064 { List buffer = (List)( users.get(key) ); 065 066 if( character != '\n' ) 067 { if( buffer == null ) // first time this thread made request 068 { buffer = new Vector(); 069 users.put( key, buffer ); 070 } 071 buffer.add( new int[]{ character } ); 072 } 073 else 074 { if( buffer != null ) 075 { for( Iterator i = ((List)buffer).iterator(); i.hasNext() ; ) 076 { int c = ( (int[])( ) )[0]; 077 System.out.print( (char)c ); 078 } 079 users.remove( key ); 080 } 081 System.out.print( '\n' ); 082 } 083 } 084 } 085

/******************************************************************* This method overrides the OutputStream flush() method. All partially-buffered lines are printed. A newline is added automatically to the end of each text string. This method does not block. **/

086 087 public void flush() throws IOException 088 { Set keys = users.keySet(); 089 for( Iterator i = keys.iterator(); i.hasNext(); ) 090 dispatcher.dispatch( new Handler('\n', ); 091 } 092

/******************************************************************* This method overrides the OutputStream close() method. Output is flushed (see flush). Subsequent output requests are silently ignored. **/

093 094 public void close() throws IOException 095 { flush(); 096 dispatcher.close(); // blocks until everything stops. 097 } 098

/******************************************************************* A convenience method, this method provides a simple way to print a string without having to wrap the Console.out() stream in a DataOutputStream. **/

099 100 public void println( final String s ) 101 { try 102 { for( int i = 0; i < s.length(); ++i ) 103 write( s.charAt(i) ); 104 write( '\n' ); 105 } 106 catch( IOException e ){ /*ignore it*/ } 107 } 108

/******************************************************************* A test class that prints two messages in parallel on two threads, with random sleeps between each character. */

109 110

static public class Test extends Thread

111 { 112

private String message;


private DataOutputStream data =

114 new DataOutputStream( Console.out() ); 115 116

public Test( String message )

117 { this.message = message; 118 } 119 120

public void run()

121 { try 122 { Random indeterminate_time = new Random(); 123 for(int count = 2; --count >= 0 ;) 124 { for( int i = 0; i < message.length(); ++i ) 125 { Console.out().write( message.charAt(i) ); 126 sleep( Math.abs(indeterminate_time.nextInt()) % 20 ); 127 } 128 129 Console.out().println( "(" + count + ")" ); 130 sleep( Math.abs(indeterminate_time.nextInt()) % 20 ); 131 132 data.writeChars( "[" + count + "]" ); 133 sleep( Math.abs(indeterminate_time.nextInt()) % 20 ); 134 135 Console.out().write('\n'); 136 } 137 } 138 catch( Exception e ) 139 { Console.out().println( e.toString() ); 140 } 141 } 142 143

static public void main( String[] args ) throws Exception

144 { 145 Thread t1 = new Test( "THIS MESSAGE IS FROM THREAD ONE" ); 146 Thread t2 = new Test( "this message is from thread two" ); 147 148 t1.start(); 149 t2.start(); 150 151 t1.join(); // Wait for everything to get enqueued 152 t2.join(); 153 154 Console.out().close(); // wait for everything to be printed 155 } 156 } 157 }

Wrapping things up, books, and JavaOne

So, that's it for threads. The nine parts of this series, when taken together, give you a pretty good introduction to real Java threading (as compared to the simplified version found in most books). I've covered everything from thread architectures to common problems in multithreaded systems to atomic-level synchronization classes to the architectural approach to threading discussed this month. The tools I've developed along the way provide a good foundation to a multithreaded toolbox that can make your life as a Java thread programmer much easer.

For those of you who would like to see all the material I've been discussing in one place, an expanded version of this threading series will turn into the book Taming Java Threads, to be published by Apress (see Resources).

I'll also be speaking about many of the thread-related topics I've discussed in this series at the upcoming JavaOne Worldwide Java Developer Conference on Tuesday, June 15, from 2:45 5:00 p.m. in Moscone Center's Hall E.

And now for something completely different...

Next month I plan to change the subject entirely, turning the discussion to object-oriented ways to implement user interfaces. Most of the RAD tools for Java programming (such as Symantec's Café, IBM's Visual Age, Microsoft's J++, and so on) produce code that isn't in the least bit object-oriented. Using these RAD tools cuts you off from many of the benefits of object-oriented design (like fast debugging and ease of maintenance). The tool-generated code is much harder to debug and maintain than that of a carefully crafted object-oriented system, and these tools really add time to the overall development cycle.

Java itself is so easy to program that by the time you've hacked up the machine-generated code to make it maintainable, you may as well have written it correctly to begin with. And if you don't take steps to make the machine-generated systems maintainable, you will loose enormous amounts of time down the road as the code has to be updated. To paraphrase Fred Brooks, author of the classic book The Mythical Man Month, there is no silver bullet that kills the werewolf of development time. Next month I'll explain this further by describing exactly what an object-oriented approach to UI design would look like. Subsequent columns will present a series of classes that will help you build true object-oriented interfaces using Java.

Learn more about this topic