Programming Java threads in the real world, Part 9

More threads in an object-oriented world: Synchronous dispatchers, active objects, detangling console I/O

1 2 3 4 Page 2
Page 2 of 4
050  
051  
052  
053  
054  
public synchronized void remove_all_handlers()
    {   events.clear();
    }
    /**
     |    Dispatch the actions "iterations" times. Use -1 for "forever." This function is not synchronized so that the list of events can be modified while the dispatcher is running. The method makes a clone of the event list and then executes from the clone on each iteration through the list of subscribers. Events added to the list will be executed starting with the next iteration.
     */
055  
056  
057  
058  
059  
060  
061  
062  
063  
064  
065  
066  
067  
068  
069  
070  
071  
072  
073  
074  
075  
076  
077  
public void dispatch( int iterations )
    {
        // Dispatch operations. A simple copy-and-dispatch-from-copy
        // strategy is used, here. Eventually, I'll replace this code
        // with a <code>Multicaster</code>.
        if( events.size() > 0 )
            while( iterations==-1 || --iterations >= 0 )
            {
                Object[] snapshot;
synchronized( this )
                {   snapshot = events.toArray();
                }
                for( int i = 0; i < snapshot.length; ++i )
                {   ((Runnable)snapshot[i]).run();
                    Thread.currentThread().yield();
                }
            }
    }
    /**
     |    

Dispatch actions "iterations" number of times, with an action dispatched every "interval" milliseconds. Note that the last action executed takes up the entire time slot, even if the run() function itself doesn't take "interval" milliseconds to execute. Also note that the timing will be irregular if any run() method executes in more than "interval" milliseconds. If you want a time interval between iterations, but not between the operations performed in a single iteration, just insert a Runnable action that sleeps for a fixed number of milliseconds.

@param iterations`

number of times to loop through the actions executing them. Use -1 to mean "forever." #param interval An action is executed every "interval" milliseconds.

     */
078  
079  
080  
081  
082  
083  
084  
085  
086  
087  
088  
089  
090  
091  
092  
093  
094  
095  
096  
097  
098  
099  
100  
101  
102  
103  
104  
105  
106  
107  
108  
109  
110  
111  
112  
113  
114  
115  
116  
117  
118  
119  
120  
121  
122  
123  
124  
125  
126  
127  
128  
129  
130  
131  
132  
133  
134  
135  
136  
137  
138  
139  
140  
141  
142  
143  
144  
145  
146  
147  
148  
149  
150  
151  
152  
public void metered_dispatch( int iterations, int interval )
    {
        Alarm timer = new Alarm( interval, Alarm.MULTI_SHOT );
        timer.start();
        while( iterations==-1 || --iterations >= 0 )
        {
            Object[] snapshot;
            synchronized( this )
            {   snapshot = events.toArray();
            }
            for( int i = 0; i < snapshot.length; ++i )
            {   ((Runnable)snapshot[i]).run();
                timer.await();
                timer.start();
            }
        }
        timer.stop();
    }
static public class Test
    {
        // Execute the test with:
        //  java "com.holub.asynch.Synchronous_dispatcher\$Test"
        //
public static void main( String[] args )
        {
            Synchronous_dispatcher dispatcher = 
                                        new Synchronous_dispatcher();
            dispatcher.add_handler( 
                new Runnable()
 {   public void run()
                    {   System.out.print("hello");
                    }
                }
            );
            dispatcher.add_handler(
                new Runnable()
 {   public void run()
                    {   System.out.println(" world");
                    }
                }
            );
            dispatcher.dispatch( 1 );
            dispatcher.metered_dispatch( 2, 1000 );
            //------------------------------------------------
            // Test two tasks, passed to the dispatcher as arrays
            // of chunks. Should print:
            //          Hello (Bonjour) world (monde)
            Runnable[] first_task =
            {   new Runnable(){ public void run(){ System.out.print("Hello"); }},
                new Runnable(){ public void run(){ System.out.print(" World");}}
            };
            Runnable[] second_task =
            {   new Runnable(){ public void run(){ System.out.print(" Bonjour");}},
                new Runnable(){ public void run(){ System.out.print(" Monde"  );}}
            };
            dispatcher = new Synchronous_dispatcher();
            dispatcher.add_handler( first_task  );
            dispatcher.add_handler( second_task );
            dispatcher.dispatch( 1 );
        }
    }
}

Active objects

The second architecture I'd like to discuss this month is the active object architecure. Though we can thank Greg Lavender and Doug Schmidt for the name (see Resources), the architecture has been around for a while -- the first time I saw it (dinosaur that I am) was in Intel's RMX operating system, circa 1979.

Though none of us will be programming for RMX, Intel's terminology is useful in describing the pattern: RMX (the Real-time Multitasking Executive) was an OS for embedded systems -- think of a multithreaded operating system that supports only a single process that remains resident in core. All threads share a single address space, and there is no virtual memory. Figure 4, below, shows the general architecture. RMX sees the world as a series of "tasks," not threads. Each task has an input queue. And each task effectively runs on a single thread. You activate a task by sending it an asynchronous message. You literally put a data structure onto the task's input queue. The task starts out idle, waiting for a request to be queued. It wakes up when a message arrives, performs the requested operation, then goes back to sleep. If messages arrive while the task is busy, they remain in the queue until the current operation is complete, and are dequeued and executed sequentially. Each message carries with it a return address -- a message queue to which the task posts the original message data structure (perhaps modified by the task to contain a completion status or output data) when it completes the requested operation.

Figure 4. The RMX system architecture

You use this system by creating a task for every desired operation. For example, you might create a "file I/O" task whose job was to access a single file. You'd send data to that file by queueing up a write request on that task's input queue. The I/O task would perform the operation, then it would send the data structure representing the write-request to the task whose input queue was listed in the message as the return address. A special "garbage collector" task was included solely to provide a place to send these reply messages to when the originating task wasn't interested in the reply. (The garbage collector simply freed any memory that arrived in its queue.) You would read the file by posting an empty read-request message to the file I/O task's input queue. The file I/O task would then fill that message with data and post it back to the included return address.

The main advantage of the active-object architecture is that the individual operations don't have to be synchronized since they're executing sequentially on the task object's thread. In fact, the only synchronization necessary in the entire system are in the "enqueue" and "dequeue" operations.

Of course, from an object-oriented-design perspective, a "task" is simply an object that can accept nothing but asynchronous messages. There's no requirement that asynchronous requests must execute in parallel. From the perspective of the object that sends the message, as long as the request returns immediately, it's okay for the receiving object to perform the operation whenever it gets around to it.

A general solution

I've adapted the idea of the active object to my own ends, with a general solution to the dispatch problem. Figure 5, below, shows how my implementation works. The main difference from RMX is that, rather than queuing up passive data structures that contain an ID identifying the requested operation, I'm queuing up Runnable objects that actually do the operation. This way, a given active object can handle many requests without knowing what those requests actually do. This notion of passing in an object that encapsulates a request is an example of the Command design pattern (see Resources).

Figure 5. Active objects in Java

The implementation of an active object dispatcher (Listing 2) isn't much more complicated than the picture above. Most of Listing 2, as you will see, is taken up with comments.

Create and start up a dispatcher like this:

    Active_object dispatcher = new Active_object();
    dispatcher.start();

Ask the active object to do something for you like this:

    dispatcher.dispatch
    (   new Runnable()
       {   public void run()
            {   System.out.println("hello world");
            }
        }
    );

When you're done with the dispatcher, close it:

    dispatcher.close();

Thereafter, any attempts to dispatch() a new request will be rejected with an exception toss, though the requests that are waiting to be serviced will be executed.

Looking at the implementation, the input queue in Figure 5 is an instance of the Blocking_queue class (discussed last month) declared as requests on Listing 2, line 8. The dispatch() Listing 2, line 29 and close() Listing 2, line 34 methods are simple wrappers around the equivalent Blocking_queue methods. The enqueued "request" is just a Runnable object, whose run() method is executed on the active object's thread.

Note that I had to add a method to last month's Blocking_queue class to make the close() function work elegantly without having to synchronize. (The source code in the "Articles" section of my Web site has been patched to include the new method. See my bio at the end of this article for a link to my site.) Once a Blocking_queue is closed by calling enqueue_final_item(), any further attempts to enqueue new items are rejected with an exception toss. Moreover, the queue is closed automatically when the final item is dequeued.

The only other method in the Active_object class is run() (Listing 2, line 13). (An Active_object class extends Thread, so it implements run() directly.) The run() method sits in a tight loop, dequeueing and executing requests as they come in, and most of the time the run() will be blocked waiting to dequeue a request. The loop terminates when the null, enqueued by the close() method, is dequeued. In order to be a good citizen, I yield() after every request is executed to give other threads at my priority level a chance to run.

Notice that none of the methods of Active_object are synchronized. They don't need to be because, with the single exception of run(), all the methods of Active_object are simple wrappers around calls to the Blocking_queue, which is synchronized. Moreover, the Active_object itself is the only thread that dequeues from the queue, so extra synchronization on the dequeue operation is also unnecessary.

Listing 2: /src/com/holub/asynch/Active_object.java
001  
002  
003  
004  
package com.holub.asynch;
import  com.holub.asynch.Blocking_queue;
/***********************************************************************
 |    

You use a dispatcher to implement the message-queueing and dispatching part of an active object. Create and start up a dispatcher like this:

   Active_object dispatcher = new Active_object();
    dispatcher.start();

Ask the active object to do something for you like this:

   dispatcher.dispatch
    (   new Runnable()
        {   public void run()
            {   System.out.println("hello world");
            }
        }
    );

When you're done with the dispatcher, close it:

   dispatcher.close();

Variations on these themes are also possible. See the article text for more details.

(c) 1999, Allen I. Holub.

This code may not be distributed by yourself except in binary form, incorporated into a Java .class file. You may use this code freely for personal purposes, but you may not incorporate it into any commercial product without my express written permission.

@author Allen

I. Holub

 */
005  
006  
007  
008  
009  
public class Active_object extends Thread
{
private Blocking_queue requests = new Blocking_queue();
    /******************************************************************
     |    Create an active object. The object is a Daemon thread that waits for dispatch() requests, then executes them in the order that they were enqueued. Since the thread is a daemon, it will not keep the process alive.
     */
010  
011  
012  
public Active_object()
    {   setDaemon( true );
    }
    /******************************************************************
     |    Do not call this method. This method is public only because it's an override of a public method in the Thread base class. I'd rather it were private, but Java doesn't permit an override to have a more-restrictive access privilege than the template method in the base class. run() encapsulates the event loop.
     */
1 2 3 4 Page 2
Page 2 of 4