Programming Java threads in the real world, Part 9
More threads in an object-oriented world: Synchronous dispatchers, active objects, detangling console I/O
By Allen Holub, JavaWorld.com, 06/01/99
Page 6 of 6
dispatcher.dispatch
( new Runnable()
{ public void run()
{ System.out.println("hello world");
}
}
);
When you're done with the dispatcher, close it:
Thereafter, any attempts to dispatch() a new request will be rejected with an exception toss, though the requests that are waiting to be serviced will be executed.
Looking at the implementation, the input queue in Figure 5 is an instance of the Blocking_queue class (discussed last month) declared as requests on Listing 2, line 8. The dispatch() Listing 2, line 29 and close() Listing 2, line 34 methods are simple wrappers around the equivalent Blocking_queue methods. The enqueued "request" is just a Runnable object, whose run() method is executed on the active object's thread.
Note that I had to add a method to last month's Blocking_queue class to make the close() function work elegantly without having to synchronize. (The source code in the "Articles" section of my Web site has been
patched to include the new method. See my bio at the end of this article for a link to my site.) Once a Blocking_queue is closed by calling enqueue_final_item(), any further attempts to enqueue new items are rejected with an exception toss. Moreover, the queue is closed automatically
when the final item is dequeued.
The only other method in the Active_object class is run() (Listing 2, line 13). (An Active_object class extends Thread, so it implements run() directly.) The run() method sits in a tight loop, dequeueing and executing requests as they come in, and most of the time the run() will be blocked waiting to dequeue a request. The loop terminates when the null, enqueued by the close() method, is dequeued. In order to be a good citizen, I yield() after every request is executed to give other threads at my priority level a chance to run.
Notice that none of the methods of Active_object are synchronized. They don't need to be because, with the single exception of run(), all the methods of Active_object are simple wrappers around calls to the Blocking_queue, which is synchronized. Moreover, the Active_object itself is the only thread that dequeues from the queue, so extra synchronization on the dequeue operation is also unnecessary.
Listing 2: /src/com/holub/asynch/Active_object.java
|
|
package com.holub.asynch;
import com.holub.asynch.Blocking_queue;
|
|
/***********************************************************************
| |
| |
|
You use a dispatcher to implement the message-queueing and dispatching part of an active object. Create and start up a dispatcher
like this:
Active_object dispatcher = new Active_object();
dispatcher.start();
Ask the active object to do something for you like this:
dispatcher.dispatch
( new Runnable()
{ public void run()
{ System.out.println("hello world");
}
}
);
When you're done with the dispatcher, close it:
Variations on these themes are also possible. See the article text for more details.
(c) 1999, Allen I. Holub.
This code may not be distributed by yourself except in binary form, incorporated into a Java .class file. You may use this
code freely for personal purposes, but you may not incorporate it into any commercial product without my express written permission.
|
@author Allen I. Holub
|
|
|
|
|
|
|
/******************************************************************
| |
| |
|
Create an active object. The object is a Daemon thread that waits for dispatch() requests, then executes them in the order that they were enqueued. Since the thread is a daemon, it will not keep the process
alive.
|
|
|
|
|
|
/******************************************************************
| |
| |
|
Do not call this method. This method is public only because it's an override of a public method in the Thread base class.
I'd rather it were private, but Java doesn't permit an override to have a more-restrictive access privilege than the template
method in the base class. run() encapsulates the event loop.
|
|
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
|
public void run()
{ try
{ Runnable the_request;
while( (the_request = (Runnable)(requests.dequeue())) != null )
{ the_request.run();
the_request = null;
yield(); // give other threads a chance to run
}
}
catch( InterruptedException e )
{ // Thread was interrupted while blocked on a dequeue,
// Treat it as a close request and ignore it.
}
}
|
|
/*****************************************************************
| |
| |
|
Cause an operation to be performed on the current active object's event-handler thread. Operations are executed serially in
the order received. @param operation A Runnable "command" object that encapsulates the operation to perform. If operation is null the active object will shut itself down when that request is dequeued (in its turn). Any operations posted after a null request is dispatched are not executed. @throws Blocking_queue.Closed If you attempt to dispatch on a closed object.
|
|
|
|
|
|
/*****************************************************************
| |
| |
|
Close the active object (render it unwilling to accept new "dispatch" requests). All pending requests are executed, but new
ones are not accepted. This method returns immediately. Before the pending requests have been processed Active_object shuts down. You can block until the pending requests are handled by sending the Active_object object a join() message:
Active_object dispatcher = new Active_object();
//...
dispatcher.close(); // cause it to reject new requests
dispatcher.join(); // wait for existing request to be processed
You can also cause the Active_object to terminate gracefully (without blocking) by calling dispatch(null). Any requests that are "dispatched" after a dispatch(null) is issued are silently ignored, however.
Attempts to close a closed queue are silently ignored.
|
|
Detangling console output
A more realistic example of using an Active_object is in the Console class in Listing 3. The main problem with console-based I/O in multithreaded applications is that the strings that are being printed by different
threads tend to get mixed up with one another, particularly if these strings are being printed one piece at a time rather
than as single, monolithic units. The active-object pattern can be used to solve this problem by creating a console task whose
job is to write to the console. Rather than calling System.out.println() (or equivalent) directly, you send the output to the console task. The task keeps buffers for every thread that's using it,
and flushes the buffer to the actual console when a newline is encountered. Since the console task is single threaded, strings
created by separate threads won't be mixed together.
My implementation of the console task is in Listing 3. The Console class is a singleton, so you can't create one with new; use Console.out() to get a reference to the actual Console object. I've chosen to implement the Console as an extension of java.io.OutputStream. This way, you can wrap the single Console object in any of the standard java.io wrappers (such as DataOutputStream) to add functionality to it. For example, use
DataOutputStream data_writer = new DataOutputStream( Console.out() );
or
PrintWriter writer = new PrintWriter( Console.out() );
Looking at the code, out() creates the object (Listing 3, line 24) using the double-check locking strategy discussed in the April 1999 column. I've also used the JDK_11_unloading_bug_fix class discussed in the same column. The interesting code -- at least for this month -- concerns the active object: dispatcher (Listing 3, line 16).
The only method that's doing real work is write(int) (Listing 3, line 47), which creates a Handler command object and passes it to the dispatcher for execution. The run() method of the Handler class (Listing 3, line 63) is called by the dispatcher, in its turn, to handle a single-character write request.
If the character isn't a newline, it's buffered up in a Vector that's associated with the current thread (it's the value component of a Map that uses a reference to the thread that issued the request as a key). Note that I can't call Thread.currentThread() in the Handler's run() method because I would get a reference to the active object's thread, not the thread that's issuing the write request. The current-thread reference is determined on line 49 when I dispatch the request. (The write() method runs on the thread that requests the write operation; Handler.run() runs on the active object's thread at some later time.)
If the character passed to run through the Handler is a newline, the else clause on line 73 prints all the buffered characters to the console (along with the newline) and then destroys the buffer. That is, the users Map contains buffers only for those threads that are in the process of assembling a line. Once the line is flushed, the buffer
is discarded.
I've also implemented the OutputStream's flush() (Listing 3, line 87) and close() (Listing 3, line 94) methods. Note that flush() flushes the partially assembled buffers for all threads to standard output.
The Test class (Listing 3, line 110) encapsulates a small test routine that creates two threads, each of which prints a message with random sleeps inserted between
each character-write operation to make sure that the write requests won't be jumbled up.
Finally, a couple of style notes: First, nothing at all is synchronized (and nothing needs to be), because all write requests
are executed serially on a single thread (the active object's event-loop thread). Second, whenever possible I've written code
in terms of the most abstract class or interface available. For example, even though the list of actively writing threads
is maintained in a HashMap, the actual reference (users [Listing 3, line 18]) is declared as a Map reference. This is just good object-oriented programming, but many Java programmers don't do it. By using the most abstract
class possible, I can replace the HashMap with a different data structure (such as a TreeMap) at any time simply by changing the single new invocation on line 29. The rest of the code adapts automatically.
Listing 3: /src/com/holub/asynch/Console.java
001
002
003
004
005
006
007
008
|
package com.holub.asynch;
import java.io.*;
import java.util.*;
import com.holub.asynch.Active_object;
import com.holub.asynch.Mutex;
import com.holub.asynch.JDK_11_unloading_bug_fix;
|
|
/***********************************************************************
| |
| |
|
This file presents a console-output task that demonstrates how to use the Active_object class. The Console is an OutputStream
that multiple threads can use to write to the console. Unlike a normal printstream(), the current class guarantees that lines
print intact. (Characters from one line will not be inserted into another line.)
|
(c) 1999, Allen I. Holub.This code may not be distributed by yourself except in binary form, incorporated into a java .class file. You may use this
code freely for personal purposes, but you may not incorporate it into any commercial product without my express written permission. |
@author Allen I. Holub
|
|
|
009
010
011
012
013
014
015
016
017
018
019
|
|
|
/******************************************************************
| |
| |
|
A private constructor makes it impossible to create a Console using new. Use System.out() to get a reference to the Console.
|
|
|
|
|
|
/******************************************************************
| |
| |
|
The console is a "singleton" -- only one object is permitted to exist. The Console has a private constructor, so you cannot manufacture one with new. Get a reference to the one-and-only instance of the Console by calling Console.out(). @return a thread-safe OutputStream that you can wrap with any of the standard java.io decorators. This output stream buffers characters on a per-thread basis
until a newline, sent by that thread, is encountered. The Console object then sends the entire line to the standard output
as a single unit.
|
|
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
|
|
|
/*******************************************************************
| |
| |
|
Shut down the Console in an orderly way. The Console uses a daemon thread to do its work, so it's not essential to shut it
down explicitly, but you can call shut_down() to kill the thread in situations where you know that no more output is expected. Any characters that have been buffered,
but not yet sent to the console, will be lost. You can actually call out() after shut_down(), but it's inefficient to do so.
|
|
038
039
040
041
042
043
044
045
|
|
|
/*******************************************************************
| |
| |
|
This method overrides the OutputStream write(int) function. Use the inherited functions for all other OutputStream functionality.
For a given thread, no output at all is flushed until a newline is encountered. (This behavior is accomplished using a hashtable,
indexed by thread object, that contains a buffer of accumulated characters.) The entire line is flushed as a unit when the
newline is encountered. Once the Console is closed, (see close), any requests to write characters are silently ignored.
|
|
|
|
|
|
/*******************************************************************
| |
| |
|
This class defines the request object that's sent to the Active_object. All the real work is done here.
|
|
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
|
private class Handler implements Runnable
{
private int character;
private Object key;
Handler( int character, Object key )
{ this.character = character;
this.key = key;
}
public void run()
{ List buffer = (List)( users.get(key) );
if( character != '\n' )
{ if( buffer == null ) // first time this thread made request
{ buffer = new Vector();
users.put( key, buffer );
}
buffer.add( new int[]{ character } );
}
else
{ if( buffer != null )
{ for( Iterator i = ((List)buffer).iterator(); i.hasNext() ; )
{ int c = ( (int[])( i.next() ) )[0];
System.out.print( (char)c );
}
users.remove( key );
}
System.out.print( '\n' );
}
}
}
|
|
/*******************************************************************
| |
| |
|
This method overrides the OutputStream flush() method. All partially-buffered lines are printed. A newline is added automatically to the end of each text string. This method
does not block.
|
|
086
087
088
089
090
091
092
|
|
|
/*******************************************************************
| |
| |
|
This method overrides the OutputStream close() method. Output is flushed (see flush). Subsequent output requests are silently ignored.
|
|
|
|
|
|
/*******************************************************************
| |
| |
|
A convenience method, this method provides a simple way to print a string without having to wrap the Console.out() stream
in a DataOutputStream.
|
|
099
100
101
102
103
104
105
106
107
108
|
|
|
/*******************************************************************
| |
| |
|
A test class that prints two messages in parallel on two threads, with random sleeps between each character.
|
|
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
|
static public class Test extends Thread
{
private String message;
private DataOutputStream data =
new DataOutputStream( Console.out() );
public Test( String message )
{ this.message = message;
}
public void run()
{ try
{ Random indeterminate_time = new Random();
for(int count = 2; --count >= 0 ;)
{ for( int i = 0; i < message.length(); ++i )
{ Console.out().write( message.charAt(i) );
sleep( Math.abs(indeterminate_time.nextInt()) % 20 );
}
Console.out().println( "(" + count + ")" );
sleep( Math.abs(indeterminate_time.nextInt()) % 20 );
data.writeChars( "[" + count + "]" );
sleep( Math.abs(indeterminate_time.nextInt()) % 20 );
Console.out().write('\n');
}
}
catch( Exception e )
{ Console.out().println( e.toString() );
}
}
static public void main( String[] args ) throws Exception
{
Thread t1 = new Test( "THIS MESSAGE IS FROM THREAD ONE" );
Thread t2 = new Test( "this message is from thread two" );
t1.start();
t2.start();
t1.join(); // Wait for everything to get enqueued
t2.join();
Console.out().close(); // wait for everything to be printed
}
}
}
|
Wrapping things up, books, and JavaOne
So, that's it for threads. The nine parts of this series, when taken together, give you a pretty good introduction to real Java threading (as compared to the simplified version found in most books). I've covered everything from thread architectures
to common problems in multithreaded systems to atomic-level synchronization classes to the architectural approach to threading
discussed this month. The tools I've developed along the way provide a good foundation to a multithreaded toolbox that can
make your life as a Java thread programmer much easer.
For those of you who would like to see all the material I've been discussing in one place, an expanded version of this threading
series will turn into the book Taming Java Threads, to be published by Apress (see Resources).
I'll also be speaking about many of the thread-related topics I've discussed in this series at the upcoming JavaOne Worldwide
Java Developer Conference on Tuesday, June 15, from 2:45 p.m.to 5:00 p.m. in Moscone Center's Hall E.
And now for something completely different...
Next month I plan to change the subject entirely, turning the discussion to object-oriented ways to implement user interfaces.
Most of the RAD tools for Java programming (such as Symantec's Café, IBM's Visual Age, Microsoft's J++, and so on) produce
code that isn't in the least bit object-oriented. Using these RAD tools cuts you off from many of the benefits of object-oriented
design (like fast debugging and ease of maintenance). The tool-generated code is much harder to debug and maintain than that
of a carefully crafted object-oriented system, and these tools really add time to the overall development cycle.
Java itself is so easy to program that by the time you've hacked up the machine-generated code to make it maintainable, you
may as well have written it correctly to begin with. And if you don't take steps to make the machine-generated systems maintainable,
you will loose enormous amounts of time down the road as the code has to be updated. To paraphrase Fred Brooks, author of
the classic book The Mythical Man Month, there is no silver bullet that kills the werewolf of development time. Next month I'll explain this further by describing
exactly what an object-oriented approach to UI design would look like. Subsequent columns will present a series of classes
that will help you build true object-oriented interfaces using Java.