Programming Java threads in the real world, Part 9

More threads in an object-oriented world: Synchronous dispatchers, active objects, detangling console I/O

This month I'm picking up the architectural theme from the May 1999 Java Toolbox column, with a look at two additional architectural solutions to thread-synchronization problems. You'll remember from last month that object-oriented systems are designed in terms of synchronous and asynchronous messages, not in terms of threads of execution. (If you don't remember that, you should read the previous article.) The two strategies I presented for implementing asynchronous messages (the one-thread-per-message and thread-pool techniques) work just fine in many applications, but since multiple threads are running concurrently -- perhaps accessing the same object -- you still have to worry about interthread synchronization. This month I'll look at two additional approaches that can all but eliminate the need for synchronization between messages.

Synchronous dispatching

A synchronous dispatcher, or round-robin scheduler, solves the synchronization problem by simulating multithreading within a single Java thread. Let's start out by considering two tasks that need to be executed in parallel:

Figure 1. Two tasks to be executed in parallel

Each of these tasks naturally divides into four chunks, and each could be executed on its own thread. Let's also imagine that the four chunks have to be atomic (they cannot tolerate interruption while they're executing), but that it's not a problem if the task is preempted between chunks. The only way to get this atomicity in a normal preemptive multitasking environment is to synchronize the operations, with all the concomitant overhead and complexity.

To move from a single task divided into chunks to a synchronous dispatcher, imagine that we can break up the single task into four independent tasks, as illustrated in Figure 2.

Figure 2. A single task broken out into four independent tasks

It isn't too hard to imagine how we could implement the chunks; you could define each chunk as the run() method of a Runnable object, for example, put the objects into an array, and then write a simple scheduler to execute the objects one at a time with a sleep() or yield() between chunks:

    Runnable[] first_task = new Runnable[]
    {
        new Runnable(){ public void run(){  /* do chunk 1 here */ } },
        new Runnable(){ public void run(){  /* do chunk 2 here */ } },
        new Runnable(){ public void run(){  /* do chunk 3 here */ } },
        new Runnable(){ public void run(){  /* do chunk 4 here */ } },
    };
    for( int i = 0; i < first_task.length; ++i )
    {   first_task[i].run();
        Thread.getCurrentThread().yield();
    }

Doug Schmidt coined the term Reactor for this design pattern (see Resources). The Reactor pattern emerged from Schmidt's work on the ACE framework (see Resources), as a way to accumulate various operations that should occur when a given event is triggered. The event handler then executes the for loop. The effect is essentially the same as several threads waiting on a single condition variable that's set true by the event. However, unlike a condition variable, you have control over both the sequence of execution and the moment at which you give up that control.

Now let's imagine that I want to execute two complex tasks concurrently, each broken up into chunks. I do this simply by interleaving the two tasks, as illustrated in Figure 3.

Figure 3. Two tasks interleaved

I can implement this strategy as follows:

    Runnable[] first_task = new Runnable[]
    {
        new Runnable(){ public void run(){  /* do task 1, chunk 1 here */ } },
        new Runnable(){ public void run(){  /* do task 1, chunk 2 here */ } },
        new Runnable(){ public void run(){  /* do task 1, chunk 3 here */ } },
        new Runnable(){ public void run(){  /* do task 1, chunk 4 here */ } },
    };
    Runnable[] second_task = new Runnable[]
    {
        new Runnable(){ public void run(){  /* do task 2, chunk 1 here */ } },
        new Runnable(){ public void run(){  /* do task 2, chunk 2 here */ } },
        new Runnable(){ public void run(){  /* do task 2, chunk 3 here */ } },
        new Runnable(){ public void run(){  /* do task 2, chunk 4 here */ } },
    };
    for( int i = 0; i < first_task.length; ++i )
    {   first_task[i].run();
        second_task[i].run();
        Thread.getCurrentThread().yield();
    }

The foregoing example isn't a particularly good general strategy because the loop has to know individually about each array, but I can also merge the two arrays into one large array before entering the loop:

    Runnable[] both_tasks = new Runnable[]
    {
        new Runnable(){ public void run(){  /* do task 1, chunk 1 here */ } },
        new Runnable(){ public void run(){  /* do task 2, chunk 1 here */ } },
        new Runnable(){ public void run(){  /* do task 1, chunk 2 here */ } },
        new Runnable(){ public void run(){  /* do task 2, chunk 2 here */ } },
        new Runnable(){ public void run(){  /* do task 1, chunk 3 here */ } },
        new Runnable(){ public void run(){  /* do task 2, chunk 3 here */ } },
        new Runnable(){ public void run(){  /* do task 1, chunk 4 here */ } },
        new Runnable(){ public void run(){  /* do task 2, chunk 4 here */ } },
    };
    for( int i = 0; i < first_task.length; ++i )
    {   both_tasks[i].run();
        Thread.getCurrentThread().yield();
    }

Note that I've interleaved the operations: task 1, chunk 1 runs before task 2, chunk 1; then task 2, chunk 2 runs, and so forth. What's going on here, at least with respect to the two tasks, is a lot like what you'd see in a preemptive scheduler on a concurrent system (one CPU) in which each task was executing on its own thread. The big difference is that each chunk has control over when it gets "preempted." You don't lose control until you return from the run() method that implements the chunk. In fact, this behavior is a lot like a multithreading cooperative system (in which you don't give up control until you explicitly yield to another thread). Because of the explicit yielding of control, you don't have to synchronize any of the fields used by either task, provided that you divide up the chunks carefully. You can always leave any referenced objects in a stable state when you give up control.

Synchronous dispatching is surprisingly useful, especially when speed is of the essence. All the chunks are executing on a single OS-level thread. There is no synchronization overhead at all, and no expensive context swaps into the OS kernel. The main downside is that it's sometimes tricky to break up a single task into neat chunks.

I've solved the problem of implementing a synchronous dispatcher with the Synchronous_dispatcher class in Listing 1. You can use the dispatcher in two ways. First of all, tasks can be added independently and then executed. The following code prints the string "hello world" three times.

    Synchronous_dispatcher dispatcher = new Synchronous_dispatcher();
    dispatcher.add_handler( new Runnable()
                           {   public void run(){ System.out.print("hello "); }
                            }
                          );
    dispatcher.add_handler( new Runnable()
                           {   public void run(){ System.out.print("world\n"); }
                            } 
                          );
    dispatcher.dispatch( 3 );

You can add a time delay between each chunk as follows:

    dispatcher.metered_dispatch( 2, 1000 ); // one second delay between chunks

This call prints the following:

hello <pause> world <pause> hello <pause> world

A second version of add_handler() is provided to add an entire array of Runnable chunks to the dispatcher. The elements of the array are distributed as evenly as possible among the previously added chunks. For example, the following code prints "Hello (Bonjour) world (monde)":

    Runnable[] first_task =
    {   new Runnable(){ public void run(){ System.out.print("Hello"); }},
        new Runnable(){ public void run(){ System.out.print(" world");}}
    };
    Runnable[] second_task =
    {   new Runnable(){ public void run(){ System.out.print(" (Bonjour)");}},
        new Runnable(){ public void run(){ System.out.print(" (monde)"  );}}
    };
    dispatcher = new Synchronous_dispatcher();
    dispatcher.add_handler( first_task  );
    dispatcher.add_handler( second_task );
    dispatcher.dispatch( 1 );

The chunks of a task can share data, if necessary, through the instance variables of the class that contains the array definitions.

Of course, several dispatchers can each run on their own threads, in which case you have a situation much like Sun's "green thread" model, where cooperative and preemptive threads both share the same process.

Looking at the implementation, the individual Runnable chunks are kept in the events LinkedList (Listing 1, line 9). The one-element version of add_handler(Runnable) (Listing 1, line 12) just tacks the new element onto the end of the list. The array version add_handler(Runnable[]) (Listing 1, line 17) is trickier to implement than you might expect. This is because the incoming array could be larger than the existing list (in which case you want the existing list elements dispersed evenly between array elements), or smaller than the existing list (in which case you want the incoming array elements to be dispersed evenly throughout the existing list). I've chosen to make matters easier by first converting the list to an array so that I can work with two similar data structures. I then rebuild the linked list, first copying one or more elements from the larger array into the list, then copying one element from the smaller array, then one or more elements from the larger array, and so forth. An earlier version of this method did the same thing without first doing the list-to-array conversion, but it was both considerably more complex and slower then the current version.

The problem of dispatching is essentially the same problem that I discussed in the context of Observer notifications (in the March 1999 column). I don't want to synchronize dispatch(...) (Listing 1, line 56) because I don't want to disallow the addition of new operations while dispatching is in progress. Here, I've taken the easy way out and copied the list inside the synchronized statement (Listing 1, line 67). A multicaster-based solution, as discussed in the March column, could also work.

The metered_dispatch(...) (Listing 1, line 79) variant on dispatch just uses an Alarm (discussed in February 1999) to dispatch events at a fixed interval.

Listing 1: /src/com/holub/asynch/Synchronous_dispatcher.java
001  
002  
003  
004  
005  
package com.holub.asynch;
.
import java.util.*;
import com.holub.asynch.Alarm;
/***********************************************************************
 |    

A synchronous notification dispatcher executes a sequence of operations sequentially. This allows two sets of linked operations to be interspersed and effectively executed in parallel, but without using multiple threads. This class is built on the JDK 1.2x LinkedList class, which must be present in the system.

(c) 1999, Allen I. Holub.
This code may not be distributed by yourself except in binary form, incorporated into a Java .class file. You may use this code freely for personal purposes, but you may not incorporate it into any commercial product without my express permission in writing.

@author Allen

I. Holub

 */
006  
007  
008  
009  
010  
public class Synchronous_dispatcher
{
private LinkedList events = new LinkedList();   // list of Runnable objects
    /**
     |    Add a new handler to the end of the current list of subscribers.
     */
011  
012  
013  
014  
015  
public synchronized void add_handler( Runnable handler )
    {   events.add( handler );
    }
    /**
     |    Add several listeners to the dispatcher, distributing them as evenly as possible with respect to the current list.
     */
016  
017  
018  
019  
020  
021  
022  
023  
024  
025  
026  
027  
028  
029  
030  
031  
032  
033  
034  
035  
036  
037  
038  
039  
040  
041  
042  
043  
044  
045  
046  
047  
048  
049  
public synchronized void add_handler( Runnable[] handlers )
    {
        if( events.size() == 0 )
        {   for( int i=0; i < handlers.length; )
                events.add( handlers[i++] );
        }
        else
        {   Object[] larger  = events.toArray();
            Object[] smaller = handlers;
            if( larger.length < smaller.length )
            {   Object[] tmp = larger;
                larger  = smaller;
                smaller = tmp;
            }
            int distribution = larger.length / smaller.length;
            LinkedList new_list = new LinkedList();
            int large_source = 0;
            int small_source = 0;
            while( small_source < smaller.length ) 
            {   for( int skip = 0; skip < distribution; ++skip )
                    new_list.add( larger[large_source++] );
                new_list.add( smaller[small_source++] );
            }
            events = new_list;
        }
    }
    /*******************************************************************
     |    Remove all handlers from the current dispatcher.
     */
050  
051  
052  
053  
054  
public synchronized void remove_all_handlers()
    {   events.clear();
    }
    /**
     |    Dispatch the actions "iterations" times. Use -1 for "forever." This function is not synchronized so that the list of events can be modified while the dispatcher is running. The method makes a clone of the event list and then executes from the clone on each iteration through the list of subscribers. Events added to the list will be executed starting with the next iteration.
     */
055  
056  
057  
058  
059  
060  
061  
062  
063  
064  
065  
066  
067  
068  
069  
070  
071  
072  
073  
074  
075  
076  
077  
public void dispatch( int iterations )
    {
        // Dispatch operations. A simple copy-and-dispatch-from-copy
        // strategy is used, here. Eventually, I'll replace this code
        // with a <code>Multicaster</code>.
        if( events.size() > 0 )
            while( iterations==-1 || --iterations >= 0 )
            {
                Object[] snapshot;
synchronized( this )
                {   snapshot = events.toArray();
                }
                for( int i = 0; i < snapshot.length; ++i )
                {   ((Runnable)snapshot[i]).run();
                    Thread.currentThread().yield();
                }
            }
    }
    /**
     |    

Dispatch actions "iterations" number of times, with an action dispatched every "interval" milliseconds. Note that the last action executed takes up the entire time slot, even if the run() function itself doesn't take "interval" milliseconds to execute. Also note that the timing will be irregular if any run() method executes in more than "interval" milliseconds. If you want a time interval between iterations, but not between the operations performed in a single iteration, just insert a Runnable action that sleeps for a fixed number of milliseconds.

@param iterations`

number of times to loop through the actions executing them. Use -1 to mean "forever." #param interval An action is executed every "interval" milliseconds.

     */
078  
079  
080  
081  
082  
083  
084  
085  
086  
087  
088  
089  
090  
091  
092  
093  
094  
095  
096  
097  
098  
099  
100  
101  
102  
103  
104  
105  
106  
107  
108  
109  
110  
111  
112  
113  
114  
115  
116  
117  
118  
119  
120  
121  
122  
123  
124  
125  
126  
127  
128  
129  
130  
131  
132  
133  
134  
135  
136  
137  
138  
139  
140  
141  
142  
143  
144  
145  
146  
147  
148  
149  
150  
151  
152  
public void metered_dispatch( int iterations, int interval )
    {
        Alarm timer = new Alarm( interval, Alarm.MULTI_SHOT );
        timer.start();
        while( iterations==-1 || --iterations >= 0 )
        {
            Object[] snapshot;
            synchronized( this )
            {   snapshot = events.toArray();
            }
            for( int i = 0; i < snapshot.length; ++i )
            {   ((Runnable)snapshot[i]).run();
                timer.await();
                timer.start();
            }
        }
        timer.stop();
    }
static public class Test
    {
        // Execute the test with:
        //  java "com.holub.asynch.Synchronous_dispatcher\$Test"
        //
public static void main( String[] args )
        {
            Synchronous_dispatcher dispatcher = 
                                        new Synchronous_dispatcher();
            dispatcher.add_handler( 
                new Runnable()
 {   public void run()
                    {   System.out.print("hello");
                    }
                }
            );
            dispatcher.add_handler(
                new Runnable()
 {   public void run()
                    {   System.out.println(" world");
                    }
                }
            );
            dispatcher.dispatch( 1 );
            dispatcher.metered_dispatch( 2, 1000 );
            //------------------------------------------------
            // Test two tasks, passed to the dispatcher as arrays
            // of chunks. Should print:
            //          Hello (Bonjour) world (monde)
            Runnable[] first_task =
            {   new Runnable(){ public void run(){ System.out.print("Hello"); }},
                new Runnable(){ public void run(){ System.out.print(" World");}}
            };
            Runnable[] second_task =
            {   new Runnable(){ public void run(){ System.out.print(" Bonjour");}},
                new Runnable(){ public void run(){ System.out.print(" Monde"  );}}
            };
            dispatcher = new Synchronous_dispatcher();
            dispatcher.add_handler( first_task  );
            dispatcher.add_handler( second_task );
            dispatcher.dispatch( 1 );
        }
    }
}

Active objects

The second architecture I'd like to discuss this month is the active object architecure. Though we can thank Greg Lavender and Doug Schmidt for the name (see Resources), the architecture has been around for a while -- the first time I saw it (dinosaur that I am) was in Intel's RMX operating system, circa 1979.

Though none of us will be programming for RMX, Intel's terminology is useful in describing the pattern: RMX (the Real-time Multitasking Executive) was an OS for embedded systems -- think of a multithreaded operating system that supports only a single process that remains resident in core. All threads share a single address space, and there is no virtual memory. Figure 4, below, shows the general architecture. RMX sees the world as a series of "tasks," not threads. Each task has an input queue. And each task effectively runs on a single thread. You activate a task by sending it an asynchronous message. You literally put a data structure onto the task's input queue. The task starts out idle, waiting for a request to be queued. It wakes up when a message arrives, performs the requested operation, then goes back to sleep. If messages arrive while the task is busy, they remain in the queue until the current operation is complete, and are dequeued and executed sequentially. Each message carries with it a return address -- a message queue to which the task posts the original message data structure (perhaps modified by the task to contain a completion status or output data) when it completes the requested operation.

Figure 4. The RMX system architecture

You use this system by creating a task for every desired operation. For example, you might create a "file I/O" task whose job was to access a single file. You'd send data to that file by queueing up a write request on that task's input queue. The I/O task would perform the operation, then it would send the data structure representing the write-request to the task whose input queue was listed in the message as the return address. A special "garbage collector" task was included solely to provide a place to send these reply messages to when the originating task wasn't interested in the reply. (The garbage collector simply freed any memory that arrived in its queue.) You would read the file by posting an empty read-request message to the file I/O task's input queue. The file I/O task would then fill that message with data and post it back to the included return address.

The main advantage of the active-object architecture is that the individual operations don't have to be synchronized since they're executing sequentially on the task object's thread. In fact, the only synchronization necessary in the entire system are in the "enqueue" and "dequeue" operations.

Of course, from an object-oriented-design perspective, a "task" is simply an object that can accept nothing but asynchronous messages. There's no requirement that asynchronous requests must execute in parallel. From the perspective of the object that sends the message, as long as the request returns immediately, it's okay for the receiving object to perform the operation whenever it gets around to it.

A general solution

I've adapted the idea of the active object to my own ends, with a general solution to the dispatch problem. Figure 5, below, shows how my implementation works. The main difference from RMX is that, rather than queuing up passive data structures that contain an ID identifying the requested operation, I'm queuing up Runnable objects that actually do the operation. This way, a given active object can handle many requests without knowing what those requests actually do. This notion of passing in an object that encapsulates a request is an example of the Command design pattern (see Resources).

Figure 5. Active objects in Java

The implementation of an active object dispatcher (Listing 2) isn't much more complicated than the picture above. Most of Listing 2, as you will see, is taken up with comments.

Create and start up a dispatcher like this:

    Active_object dispatcher = new Active_object();
    dispatcher.start();

Ask the active object to do something for you like this:

    dispatcher.dispatch
    (   new Runnable()
       {   public void run()
            {   System.out.println("hello world");
            }
        }
    );

When you're done with the dispatcher, close it:

    dispatcher.close();

Thereafter, any attempts to dispatch() a new request will be rejected with an exception toss, though the requests that are waiting to be serviced will be executed.

Looking at the implementation, the input queue in Figure 5 is an instance of the Blocking_queue class (discussed last month) declared as requests on Listing 2, line 8. The dispatch() Listing 2, line 29 and close() Listing 2, line 34 methods are simple wrappers around the equivalent Blocking_queue methods. The enqueued "request" is just a Runnable object, whose run() method is executed on the active object's thread.

Note that I had to add a method to last month's Blocking_queue class to make the close() function work elegantly without having to synchronize. (The source code in the "Articles" section of my Web site has been patched to include the new method. See my bio at the end of this article for a link to my site.) Once a Blocking_queue is closed by calling enqueue_final_item(), any further attempts to enqueue new items are rejected with an exception toss. Moreover, the queue is closed automatically when the final item is dequeued.

The only other method in the Active_object class is run() (Listing 2, line 13). (An Active_object class extends Thread, so it implements run() directly.) The run() method sits in a tight loop, dequeueing and executing requests as they come in, and most of the time the run() will be blocked waiting to dequeue a request. The loop terminates when the null, enqueued by the close() method, is dequeued. In order to be a good citizen, I yield() after every request is executed to give other threads at my priority level a chance to run.

Notice that none of the methods of Active_object are synchronized. They don't need to be because, with the single exception of run(), all the methods of Active_object are simple wrappers around calls to the Blocking_queue, which is synchronized. Moreover, the Active_object itself is the only thread that dequeues from the queue, so extra synchronization on the dequeue operation is also unnecessary.

Listing 2: /src/com/holub/asynch/Active_object.java
001  
002  
003  
004  
package com.holub.asynch;
import  com.holub.asynch.Blocking_queue;
/***********************************************************************
 |    

You use a dispatcher to implement the message-queueing and dispatching part of an active object. Create and start up a dispatcher like this:

   Active_object dispatcher = new Active_object();
    dispatcher.start();

Ask the active object to do something for you like this:

   dispatcher.dispatch
    (   new Runnable()
        {   public void run()
            {   System.out.println("hello world");
            }
        }
    );

When you're done with the dispatcher, close it:

   dispatcher.close();

Variations on these themes are also possible. See the article text for more details.

(c) 1999, Allen I. Holub.

This code may not be distributed by yourself except in binary form, incorporated into a Java .class file. You may use this code freely for personal purposes, but you may not incorporate it into any commercial product without my express written permission.

@author Allen

I. Holub

 */
005  
006  
007  
008  
009  
public class Active_object extends Thread
{
private Blocking_queue requests = new Blocking_queue();
    /******************************************************************
     |    Create an active object. The object is a Daemon thread that waits for dispatch() requests, then executes them in the order that they were enqueued. Since the thread is a daemon, it will not keep the process alive.
     */
010  
011  
012  
public Active_object()
    {   setDaemon( true );
    }
    /******************************************************************
     |    Do not call this method. This method is public only because it's an override of a public method in the Thread base class. I'd rather it were private, but Java doesn't permit an override to have a more-restrictive access privilege than the template method in the base class. run() encapsulates the event loop.
     */
013  
014  
015  
016  
017  
018  
019  
020  
021  
022  
023  
024  
025  
026  
027  
public void run()
    {   try
        {   Runnable the_request;
            while( (the_request = (Runnable)(requests.dequeue())) != null )
            {   the_request.run();
                the_request  = null;
                yield();    // give other threads a chance to run
            }
        }
        catch( InterruptedException e )
        {   // Thread was interrupted while blocked on a dequeue,
            // Treat it as a close request and ignore it.
        }
    }
    /*****************************************************************
     |    

Cause an operation to be performed on the current active object's event-handler thread. Operations are executed serially in the order received.

@param operation A Runnable "command" object that encapsulates the operation to perform. If operation is null the active object will shut itself down when that request is dequeued (in its turn). Any operations posted after a null request is dispatched are not executed.

@throws Blocking_queue.Closed

If you attempt to dispatch on a closed object.

     **/
028  
029  
030  
031  
032  
public final void dispatch( Runnable operation )
    {   requests.enqueue( operation );
    }
    /*****************************************************************
     |    

Close the active object (render it unwilling to accept new "dispatch" requests). All pending requests are executed, but new ones are not accepted. This method returns immediately. Before the pending requests have been processed

Active_object

shuts down. You can block until the pending requests are handled by sending the

Active_object

object a

join()

message:

   Active_object dispatcher = new Active_object();
    //...
    dispatcher.close(); // cause it to reject new requests
    dispatcher.join();  // wait for existing request to be processed

You can also cause the Active_object to terminate gracefully (without blocking) by calling dispatch(null). Any requests that are "dispatched" after a dispatch(null) is issued are silently ignored, however.

Attempts to close a closed queue are silently ignored.

     **/
033  
034  
035  
036  
037  
public final void close()
    {   requests.enqueue_final_item( null );
    }
}

Detangling console output

A more realistic example of using an Active_object is in the Console class in Listing 3. The main problem with console-based I/O in multithreaded applications is that the strings that are being printed by different threads tend to get mixed up with one another, particularly if these strings are being printed one piece at a time rather than as single, monolithic units. The active-object pattern can be used to solve this problem by creating a console task whose job is to write to the console. Rather than calling System.out.println() (or equivalent) directly, you send the output to the console task. The task keeps buffers for every thread that's using it, and flushes the buffer to the actual console when a newline is encountered. Since the console task is single threaded, strings created by separate threads won't be mixed together.

My implementation of the console task is in Listing 3. The Console class is a singleton, so you can't create one with new; use Console.out() to get a reference to the actual Console object. I've chosen to implement the Console as an extension of java.io.OutputStream. This way, you can wrap the single Console object in any of the standard java.io wrappers (such as DataOutputStream) to add functionality to it. For example, use

    DataOutputStream data_writer = new DataOutputStream( Console.out() );

or

    PrintWriter writer = new PrintWriter( Console.out() );

Looking at the code, out() creates the object (Listing 3, line 24) using the double-check locking strategy discussed in the April 1999 column. I've also used the JDK_11_unloading_bug_fix class discussed in the same column. The interesting code -- at least for this month -- concerns the active object: dispatcher (Listing 3, line 16).

The only method that's doing real work is write(int) (Listing 3, line 47), which creates a Handler command object and passes it to the dispatcher for execution. The run() method of the Handler class (Listing 3, line 63) is called by the dispatcher, in its turn, to handle a single-character write request.

If the character isn't a newline, it's buffered up in a Vector that's associated with the current thread (it's the value component of a Map that uses a reference to the thread that issued the request as a key). Note that I can't call Thread.currentThread() in the Handler's run() method because I would get a reference to the active object's thread, not the thread that's issuing the write request. The current-thread reference is determined on line 49 when I dispatch the request. (The write() method runs on the thread that requests the write operation; Handler.run() runs on the active object's thread at some later time.)

If the character passed to run through the Handler is a newline, the else clause on line 73 prints all the buffered characters to the console (along with the newline) and then destroys the buffer. That is, the users Map contains buffers only for those threads that are in the process of assembling a line. Once the line is flushed, the buffer is discarded.

I've also implemented the OutputStream's flush() (Listing 3, line 87) and close() (Listing 3, line 94) methods. Note that flush() flushes the partially assembled buffers for all threads to standard output.

The Test class (Listing 3, line 110) encapsulates a small test routine that creates two threads, each of which prints a message with random sleeps inserted between each character-write operation to make sure that the write requests won't be jumbled up.

Finally, a couple of style notes: First, nothing at all is synchronized (and nothing needs to be), because all write requests are executed serially on a single thread (the active object's event-loop thread). Second, whenever possible I've written code in terms of the most abstract class or interface available. For example, even though the list of actively writing threads is maintained in a HashMap, the actual reference (users [Listing 3, line 18]) is declared as a Map reference. This is just good object-oriented programming, but many Java programmers don't do it. By using the most abstract class possible, I can replace the HashMap with a different data structure (such as a TreeMap) at any time simply by changing the single new invocation on line 29. The rest of the code adapts automatically.

Listing 3: /src/com/holub/asynch/Console.java
001
002
003
004
005
006
007
008
package com.holub.asynch;
import java.io.*;
import java.util.*;
import com.holub.asynch.Active_object;
import com.holub.asynch.Mutex;
import com.holub.asynch.JDK_11_unloading_bug_fix;
/***********************************************************************
 |    

This file presents a console-output task that demonstrates how to use the Active_object class. The Console is an OutputStream that multiple threads can use to write to the console. Unlike a normal printstream(), the current class guarantees that lines print intact. (Characters from one line will not be inserted into another line.)

(c) 1999, Allen I. Holub.
This code may not be distributed by yourself except in binary form, incorporated into a java .class file. You may use this code freely for personal purposes, but you may not incorporate it into any commercial product without my express written permission.

@author Allen

I. Holub

 */
009
010
011
012
013
014
015
016
017
018
019
public class Console extends OutputStream
{
private Console()
    {   new JDK_11_unloading_bug_fix( Console.class );
    }
private static  Active_object   dispatcher  = null;
private static  Console         the_console = null;
private static  Map             users       = null;
    /******************************************************************
     |    A private constructor makes it impossible to create a Console using new. Use System.out() to get a reference to the Console.
     */
020
021
022
private Console(){}
    /******************************************************************
     |    

The console is a "singleton" -- only one object is permitted to exist. The Console has a private constructor, so you cannot manufacture one with new. Get a reference to the one-and-only instance of the Console by calling Console.out().

@return a

thread-safe

OutputStream

that you can wrap with any of the standard java.io decorators. This output stream buffers characters on a per-thread basis until a newline, sent by that thread, is encountered. The Console object then sends the entire line to the standard output as a single unit.

     */
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
public static final Console out()
    {   if( the_console == null )
        {   synchronized( OutputStream.class )
            {   if( the_console == null )
                {   the_console = new Console();
                   users       = new HashMap();
                    dispatcher  = new Active_object();
                    dispatcher.start();
                }
            }
        }
        return the_console;
    }
    /*******************************************************************
     |    Shut down the Console in an orderly way. The Console uses a daemon thread to do its work, so it's not essential to shut it down explicitly, but you can call shut_down() to kill the thread in situations where you know that no more output is expected. Any characters that have been buffered, but not yet sent to the console, will be lost. You can actually call out() after shut_down(), but it's inefficient to do so.
     */
038
039
040
041
042
043
044
045
public static void shut_down()
    {   dispatcher.close();
        dispatcher  = null;
        the_console = null;
        users       = null;
    }
    /*******************************************************************
     |    This method overrides the OutputStream write(int) function. Use the inherited functions for all other OutputStream functionality. For a given thread, no output at all is flushed until a newline is encountered. (This behavior is accomplished using a hashtable, indexed by thread object, that contains a buffer of accumulated characters.) The entire line is flushed as a unit when the newline is encountered. Once the Console is closed, (see close), any requests to write characters are silently ignored.
     */
046
047
048
049
050
051
public void write(final int character) throws IOException
    {   if( character != 0 )
dispatcher.dispatch( new Handler(character, Thread.currentThread()) );
    }
    /*******************************************************************
     |    This class defines the request object that's sent to the Active_object. All the real work is done here.
     */
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
private class Handler implements Runnable
    {
private int     character;
private Object  key;
        Handler( int character, Object key )
        {   this.character = character;
            this.key       = key;
        }
public void run()
        {   List buffer = (List)( users.get(key) );
            if( character != '\n' )
            {   if( buffer == null ) // first time this thread made request
                {   buffer = new Vector();
                    users.put( key, buffer );
                }
                buffer.add( new int[]{ character } );
            }
else
            {   if( buffer != null )
                {   for( Iterator i = ((List)buffer).iterator(); i.hasNext() ; )
                    {   int c = (  (int[])( i.next() )   )[0];
                        System.out.print( (char)c );
                    }
                    users.remove( key );
                }
                System.out.print( '\n' );
            }
        }
    }
    /*******************************************************************
     |    This method overrides the OutputStream flush() method. All partially-buffered lines are printed. A newline is added automatically to the end of each text string. This method does not block.
     **/
086
087
088
089
090
091
092
public void flush() throws IOException
    {   Set keys = users.keySet();
        for( Iterator i = keys.iterator(); i.hasNext(); )
            dispatcher.dispatch( new Handler('\n', i.next()) );
    }
    /*******************************************************************
     |    This method overrides the OutputStream close() method. Output is flushed (see flush). Subsequent output requests are silently ignored.
     **/
093
094
095
096
097
098
public void close() throws IOException
    {   flush();
        dispatcher.close();     // blocks until everything stops.
    }
    /*******************************************************************
     |    A convenience method, this method provides a simple way to print a string without having to wrap the Console.out() stream in a DataOutputStream.
     **/
099
100
101
102
103
104
105
106
107
108
public void println( final String s )
    {   try
        {   for( int i = 0; i < s.length(); ++i )
                write( s.charAt(i) );
            write( '\n' );
        }
        catch( IOException e ){ /*ignore it*/ }
    }
    /*******************************************************************
     |    A test class that prints two messages in parallel on two threads, with random sleeps between each character.
     */
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
static public class Test extends Thread
    {
private String message;
private DataOutputStream data =
                                new DataOutputStream( Console.out() );
public Test( String message )
        {   this.message = message;
        }
public void run()
        {   try
            {   Random indeterminate_time = new Random();
                for(int count = 2; --count >= 0 ;)
                {   for( int i = 0; i < message.length(); ++i )
                    {   Console.out().write( message.charAt(i) );
                        sleep( Math.abs(indeterminate_time.nextInt()) % 20 );
                    }
                    Console.out().println( "(" + count + ")" );
                    sleep( Math.abs(indeterminate_time.nextInt()) % 20 );
                    data.writeChars( "[" + count + "]" );
                    sleep( Math.abs(indeterminate_time.nextInt()) % 20 );
                    Console.out().write('\n');
                }
            }
            catch( Exception e )
            {   Console.out().println( e.toString() );
            }
        }
static public void main( String[] args ) throws Exception
        {
            Thread t1 = new Test( "THIS MESSAGE IS FROM THREAD ONE" );
            Thread t2 = new Test( "this message is from thread two" );
            t1.start();
            t2.start();
            t1.join();  // Wait for everything to get enqueued
            t2.join();
            Console.out().close();  // wait for everything to be printed
        }
    }
}

Wrapping things up, books, and JavaOne

So, that's it for threads. The nine parts of this series, when taken together, give you a pretty good introduction to real Java threading (as compared to the simplified version found in most books). I've covered everything from thread architectures to common problems in multithreaded systems to atomic-level synchronization classes to the architectural approach to threading discussed this month. The tools I've developed along the way provide a good foundation to a multithreaded toolbox that can make your life as a Java thread programmer much easer.

For those of you who would like to see all the material I've been discussing in one place, an expanded version of this threading series will turn into the book Taming Java Threads, to be published by Apress (see Resources).

I'll also be speaking about many of the thread-related topics I've discussed in this series at the upcoming JavaOne Worldwide Java Developer Conference on Tuesday, June 15, from 2:45 p.m.to 5:00 p.m. in Moscone Center's Hall E.

And now for something completely different...

Next month I plan to change the subject entirely, turning the discussion to object-oriented ways to implement user interfaces. Most of the RAD tools for Java programming (such as Symantec's Café, IBM's Visual Age, Microsoft's J++, and so on) produce code that isn't in the least bit object-oriented. Using these RAD tools cuts you off from many of the benefits of object-oriented design (like fast debugging and ease of maintenance). The tool-generated code is much harder to debug and maintain than that of a carefully crafted object-oriented system, and these tools really add time to the overall development cycle.

Java itself is so easy to program that by the time you've hacked up the machine-generated code to make it maintainable, you may as well have written it correctly to begin with. And if you don't take steps to make the machine-generated systems maintainable, you will loose enormous amounts of time down the road as the code has to be updated. To paraphrase Fred Brooks, author of the classic book The Mythical Man Month, there is no silver bullet that kills the werewolf of development time. Next month I'll explain this further by describing exactly what an object-oriented approach to UI design would look like. Subsequent columns will present a series of classes that will help you build true object-oriented interfaces using Java.

Learn more about this topic

Join the discussion
Be the first to comment on this article. Our Commenting Policies
See more