Programming Java threads in the real world, Part 9

More threads in an object-oriented world: Synchronous dispatchers, active objects, detangling console I/O

1 2 3 4 Page 3
Page 3 of 4
013  
014  
015  
016  
017  
018  
019  
020  
021  
022  
023  
024  
025  
026  
027  
public void run()
    {   try
        {   Runnable the_request;
            while( (the_request = (Runnable)(requests.dequeue())) != null )
            {   the_request.run();
                the_request  = null;
                yield();    // give other threads a chance to run
            }
        }
        catch( InterruptedException e )
        {   // Thread was interrupted while blocked on a dequeue,
            // Treat it as a close request and ignore it.
        }
    }
    /*****************************************************************
     |    

Cause an operation to be performed on the current active object's event-handler thread. Operations are executed serially in the order received.

@param operation A Runnable "command" object that encapsulates the operation to perform. If operation is null the active object will shut itself down when that request is dequeued (in its turn). Any operations posted after a null request is dispatched are not executed.

@throws Blocking_queue.Closed

If you attempt to dispatch on a closed object.

     **/
028  
029  
030  
031  
032  
public final void dispatch( Runnable operation )
    {   requests.enqueue( operation );
    }
    /*****************************************************************
     |    

Close the active object (render it unwilling to accept new "dispatch" requests). All pending requests are executed, but new ones are not accepted. This method returns immediately. Before the pending requests have been processed

Active_object

shuts down. You can block until the pending requests are handled by sending the

Active_object

object a

join()

message:

   Active_object dispatcher = new Active_object();
    //...
    dispatcher.close(); // cause it to reject new requests
    dispatcher.join();  // wait for existing request to be processed

You can also cause the Active_object to terminate gracefully (without blocking) by calling dispatch(null). Any requests that are "dispatched" after a dispatch(null) is issued are silently ignored, however.

Attempts to close a closed queue are silently ignored.

     **/
033  
034  
035  
036  
037  
public final void close()
    {   requests.enqueue_final_item( null );
    }
}

Detangling console output

A more realistic example of using an Active_object is in the Console class in Listing 3. The main problem with console-based I/O in multithreaded applications is that the strings that are being printed by different threads tend to get mixed up with one another, particularly if these strings are being printed one piece at a time rather than as single, monolithic units. The active-object pattern can be used to solve this problem by creating a console task whose job is to write to the console. Rather than calling System.out.println() (or equivalent) directly, you send the output to the console task. The task keeps buffers for every thread that's using it, and flushes the buffer to the actual console when a newline is encountered. Since the console task is single threaded, strings created by separate threads won't be mixed together.

My implementation of the console task is in Listing 3. The Console class is a singleton, so you can't create one with new; use Console.out() to get a reference to the actual Console object. I've chosen to implement the Console as an extension of java.io.OutputStream. This way, you can wrap the single Console object in any of the standard java.io wrappers (such as DataOutputStream) to add functionality to it. For example, use

    DataOutputStream data_writer = new DataOutputStream( Console.out() );

or

    PrintWriter writer = new PrintWriter( Console.out() );

Looking at the code, out() creates the object (Listing 3, line 24) using the double-check locking strategy discussed in the April 1999 column. I've also used the JDK_11_unloading_bug_fix class discussed in the same column. The interesting code -- at least for this month -- concerns the active object: dispatcher (Listing 3, line 16).

The only method that's doing real work is write(int) (Listing 3, line 47), which creates a Handler command object and passes it to the dispatcher for execution. The run() method of the Handler class (Listing 3, line 63) is called by the dispatcher, in its turn, to handle a single-character write request.

If the character isn't a newline, it's buffered up in a Vector that's associated with the current thread (it's the value component of a Map that uses a reference to the thread that issued the request as a key). Note that I can't call Thread.currentThread() in the Handler's run() method because I would get a reference to the active object's thread, not the thread that's issuing the write request. The current-thread reference is determined on line 49 when I dispatch the request. (The write() method runs on the thread that requests the write operation; Handler.run() runs on the active object's thread at some later time.)

If the character passed to run through the Handler is a newline, the else clause on line 73 prints all the buffered characters to the console (along with the newline) and then destroys the buffer. That is, the users Map contains buffers only for those threads that are in the process of assembling a line. Once the line is flushed, the buffer is discarded.

I've also implemented the OutputStream's flush() (Listing 3, line 87) and close() (Listing 3, line 94) methods. Note that flush() flushes the partially assembled buffers for all threads to standard output.

The Test class (Listing 3, line 110) encapsulates a small test routine that creates two threads, each of which prints a message with random sleeps inserted between each character-write operation to make sure that the write requests won't be jumbled up.

Finally, a couple of style notes: First, nothing at all is synchronized (and nothing needs to be), because all write requests are executed serially on a single thread (the active object's event-loop thread). Second, whenever possible I've written code in terms of the most abstract class or interface available. For example, even though the list of actively writing threads is maintained in a HashMap, the actual reference (users [Listing 3, line 18]) is declared as a Map reference. This is just good object-oriented programming, but many Java programmers don't do it. By using the most abstract class possible, I can replace the HashMap with a different data structure (such as a TreeMap) at any time simply by changing the single new invocation on line 29. The rest of the code adapts automatically.

Listing 3: /src/com/holub/asynch/Console.java
001
002
003
004
005
006
007
008
package com.holub.asynch;
import java.io.*;
import java.util.*;
import com.holub.asynch.Active_object;
import com.holub.asynch.Mutex;
import com.holub.asynch.JDK_11_unloading_bug_fix;
/***********************************************************************
 |    

This file presents a console-output task that demonstrates how to use the Active_object class. The Console is an OutputStream that multiple threads can use to write to the console. Unlike a normal printstream(), the current class guarantees that lines print intact. (Characters from one line will not be inserted into another line.)

(c) 1999, Allen I. Holub.
This code may not be distributed by yourself except in binary form, incorporated into a java .class file. You may use this code freely for personal purposes, but you may not incorporate it into any commercial product without my express written permission.

@author Allen

I. Holub

 */
009
010
011
012
013
014
015
016
017
018
019
public class Console extends OutputStream
{
private Console()
    {   new JDK_11_unloading_bug_fix( Console.class );
    }
private static  Active_object   dispatcher  = null;
private static  Console         the_console = null;
private static  Map             users       = null;
    /******************************************************************
     |    A private constructor makes it impossible to create a Console using new. Use System.out() to get a reference to the Console.
     */
020
021
022
private Console(){}
    /******************************************************************
     |    

The console is a "singleton" -- only one object is permitted to exist. The Console has a private constructor, so you cannot manufacture one with new. Get a reference to the one-and-only instance of the Console by calling Console.out().

@return a

thread-safe

OutputStream

that you can wrap with any of the standard java.io decorators. This output stream buffers characters on a per-thread basis until a newline, sent by that thread, is encountered. The Console object then sends the entire line to the standard output as a single unit.

     */
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
public static final Console out()
    {   if( the_console == null )
        {   synchronized( OutputStream.class )
            {   if( the_console == null )
                {   the_console = new Console();
                   users       = new HashMap();
                    dispatcher  = new Active_object();
                    dispatcher.start();
                }
            }
        }
        return the_console;
    }
    /*******************************************************************
     |    Shut down the Console in an orderly way. The Console uses a daemon thread to do its work, so it's not essential to shut it down explicitly, but you can call shut_down() to kill the thread in situations where you know that no more output is expected. Any characters that have been buffered, but not yet sent to the console, will be lost. You can actually call out() after shut_down(), but it's inefficient to do so.
     */
038
039
040
041
042
043
044
045
public static void shut_down()
    {   dispatcher.close();
        dispatcher  = null;
        the_console = null;
        users       = null;
    }
    /*******************************************************************
     |    This method overrides the OutputStream write(int) function. Use the inherited functions for all other OutputStream functionality. For a given thread, no output at all is flushed until a newline is encountered. (This behavior is accomplished using a hashtable, indexed by thread object, that contains a buffer of accumulated characters.) The entire line is flushed as a unit when the newline is encountered. Once the Console is closed, (see close), any requests to write characters are silently ignored.
     */
046
047
048
049
050
051
public void write(final int character) throws IOException
    {   if( character != 0 )
dispatcher.dispatch( new Handler(character, Thread.currentThread()) );
    }
    /*******************************************************************
     |    This class defines the request object that's sent to the Active_object. All the real work is done here.
     */
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
private class Handler implements Runnable
    {
private int     character;
private Object  key;
        Handler( int character, Object key )
        {   this.character = character;
            this.key       = key;
        }
public void run()
        {   List buffer = (List)( users.get(key) );
            if( character != '\n' )
            {   if( buffer == null ) // first time this thread made request
                {   buffer = new Vector();
                    users.put( key, buffer );
                }
                buffer.add( new int[]{ character } );
            }
else
            {   if( buffer != null )
                {   for( Iterator i = ((List)buffer).iterator(); i.hasNext() ; )
                    {   int c = (  (int[])( i.next() )   )[0];
                        System.out.print( (char)c );
                    }
                    users.remove( key );
                }
                System.out.print( '\n' );
            }
        }
    }
    /*******************************************************************
     |    This method overrides the OutputStream flush() method. All partially-buffered lines are printed. A newline is added automatically to the end of each text string. This method does not block.
     **/
086
087
088
089
090
091
092
public void flush() throws IOException
    {   Set keys = users.keySet();
        for( Iterator i = keys.iterator(); i.hasNext(); )
            dispatcher.dispatch( new Handler('\n', i.next()) );
    }
    /*******************************************************************
     |    This method overrides the OutputStream close() method. Output is flushed (see flush). Subsequent output requests are silently ignored.
     **/
093
094
095
096
097
098
public void close() throws IOException
    {   flush();
        dispatcher.close();     // blocks until everything stops.
    }
    /*******************************************************************
     |    A convenience method, this method provides a simple way to print a string without having to wrap the Console.out() stream in a DataOutputStream.
     **/
099
100
101
102
103
104
105
106
107
108
public void println( final String s )
    {   try
        {   for( int i = 0; i < s.length(); ++i )
                write( s.charAt(i) );
            write( '\n' );
        }
        catch( IOException e ){ /*ignore it*/ }
    }
    /*******************************************************************
     |    A test class that prints two messages in parallel on two threads, with random sleeps between each character.
     */
1 2 3 4 Page 3
Page 3 of 4