Programming Java threads in the real world, Part 6

The Observer pattern and mysteries of the AWTEventMulticaster

This month's column looks at the Observer pattern from a multithreaded perspective. Observer is one of the more useful design patterns if your goal is to loosen coupling relationships between classes. It's essential, really, for building truly reusable components. It turns out that, as usual, problems arise when threads are on the scene. For example, how do you handle simultaneous notifications on multiple threads? What about modifying the listener list from one thread while notifications are in progress on another? What happens when the notification is sent from one thread to an object that's being used by a second thread? You get the idea. This article will examine answers to these questions, and others.

Implementing Observer in a multithreaded world

The AWT/Swing listeners are an example of a general-purpose design pattern called Observer (in the "Gang of Four book"). This design pattern is often called publisher/subscriber because publishing is a useful metaphor for describing how it works. Objects interested in finding out about some event subscribe to a publication administered by a publisher. The publisher notifies the subscribers that an event occurs by publishing it. To complete the metaphor, the event itself -- passed as an argument in the notification message -- is the publication. If you're familiar with the Observer pattern, you might want to skip to the next section.

The main intent of Observer is to decouple the generator of a message from the receiver of the message. For example, in the following code, the Sender and Receiver classes are tightly coupled. You couldn't even compile Sender if Receiver didn't exist.

class Sender
{   Receiver listener;
    Sender( Receiver listener ){ this.listener = listener };
    //...
   public void tickle(){ listener.hello(); }
}
class Receiver
{   public hello(){ System.out.println("Hello world"); }
}
//...
    Sender s = new Sender( new Receiver() );
    s.tickle();

When the Sender is something as generic as a button, this tight coupling presents a problem. Were a button class actually to use this simple-notification strategy, it could only notify one other class that it had been pressed -- not a good strategy for reuse. You can decouple the button from the object to which it's talking by designing the button to talk to objects of an unknown (at least to the button) class through a public interface. The sending class is then coupled to the interface, but not to the classes that implement that interface:

interface Observer
{   public void hello();
}
class Sender2
{   Observer listener;
    Sender2( Observer listener ){ this.listener = listener };
    //...
   public void tickle(){ listener.hello() };
}
class Receiver implements Observer
{   public hello(){ System.out.println("Hello world"); }
}
//...
    Sender2 s = new Sender2( new Receiver() );
    s.tickle();

The main thing to notice is how similar the modified code is to the original code -- the design pattern imposes almost no inconvenience with respect to coding. Writing in terms of an interface makes Sender2 much more flexible than Sender, since it can now talk to any class that implements the Observer interface.

A more-complicated implementation of the Observer pattern would use a multicast model where several observers would be notified when some event occurs rather than just one. Similarly, some mechanism could be added to add and remove observers after the object was created. AWT/Swing's delegation event model, of course, uses the Observer pattern to notify what Sun calls listeners (which are just observers) about various UI-related events. (The remainder of this article assumes some familiarity with the delegation event model.)

Observer-side problems: inner-class synchronization

The first thread-related problem with Observer shows up when you implement a listener with an inner class. This example really drives home the fact that race conditions can appear even in situations where you have written no explicit multithreaded code at all because several of the Java packages (most notably AWT/Swing) create threads of their own.

I'll demonstrate. The Status class in Listing 1 does nothing but monitor a status that's modified by the (synchronized) change_status() (Listing 1, line 26). The Status object's UI is a single button, which, when pressed, pops up a message box that reports the current status. The main() method creates the Status object, then changes the status a few times, waiting for a few seconds between each change.

Listing 1 (Status.java): Listener-related race conditions
01  
02  
03  
04  
05  
06  
07  
08  
09  
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  
21  
22  
23  
24  
25  
26  
27  
28  
29  
30  
31  
32  
33  
34  
35  
36  
37  
38  
39  
40  
41  
42  
43  
44  
45  
46  
47  
48  
49  
50  
import javax.swing.*;
import java.awt.event.*;
class Status extends JFrame
{
   private String title    = "Status: idle";
   private String contents = "Nothing happening";
   public Status()
    {
        JButton pop_it_up = new JButton( "Show status" );
        pop_it_up.addActionListener
       (   new ActionListener()
           {   public void actionPerformed( ActionEvent e )
                {   JOptionPane.showMessageDialog( null,
                                contents, title, JOptionPane.INFORMATION_MESSAGE );
                }
            }
        );
        getContentPane().add( pop_it_up );
        pack();
        show();
    }
   public synchronized void change_status( String status, String explanation )
    {   this.title    = "Status: " + status;
        this.contents = explanation;
    }
   public static void main( String[] args )
    {
        Status myself = new Status();
        myself.change_status( "Busy", "I'm busy");
        work();
        myself.change_status( "Done", "I'm done");
        work();
        myself.change_status( "Busy", "I'm busy again");
        work();
        myself.change_status( "Done", "I'm done again");
        work();
        System.exit( 0 );
    }
   private static void work()
    {   try{ Thread.currentThread().sleep(4000); }catch( Exception e ){}
    }
}

So, what's the problem? Everything's synchronized, isn't it? Well, not really. The problem is that, even though the word Thread appears nowhere in Listing 1, this is nonetheless a multithreaded app: there's the main thread (on which main() executes) and there's the AWT/Swing thread that handles GUI events like button presses (as discussed last month). Both threads are running in parallel and are accessing the same Status object. Now imagine the following sequence of events:

  1. main() executes, popping up the main frame and setting the status message.

  2. main() finishes the first piece of work, and sends the message

    myself.change_status( "Done", "I'm done doing something");
    

    to the main-frame object. This method is synchronized, so it appears safe.

  3. Half-way through the execution of change_status() (after the title is changed, but before the contents are changed) the user hits the "Show status" button. The main thread is preempted, and the AWT thread wakes up to process the button press.

  4. To see what happens next, examine the event-handler setup code on line 13. An anonymous inner-class object handles the button press, and it manufactures the message dialog using the title and contents fields. At this point, however, the title field has been modified, but the contents have not, so the displayed title doesn't jibe with the actual message.

A first (incorrect) attempt to solve the problem might be to synchronize the actionPerformed() method:

    pop_it_up.addActionListener
    (   new ActionListener()
       {   public synchronized void actionPerformed( ActionEvent e )
            {   JOptionPane.showMessageDialog( null,
                            contents, title, JOptionPane.INFORMATION_MESSAGE );
            }
        }
    );

This doesn't work, though. Remember, we have two objects and two monitors. Locking the inner-class object does not affect access to the outer-class object, which contains the two fields that are giving us grief. The only solution is to synchronize on the object that actually contains the fields that the two threads are accessing -- the outer-class object:

    pop_it_up.addActionListener
    (   new ActionListener()
        {   public void actionPerformed( ActionEvent e )
            {               synchronized( Status.this )
                {
                    JOptionPane.showMessageDialog( null,
                            contents, title, JOptionPane.INFORMATION_MESSAGE );               }
            }
        }
    );

To be safe, all inner-class listeners that access outer-class fields should synchronize on the outer class object in this way.

Notifier-side problems: notifications in a multithreaded world

Flipping the perspective over to that of the notifier, various thread-related problems emerge here, too:

  1. Observers can be added and removed from the current list while notifications are in progress.

  2. Events can occur so fast that several notifications go on at the same time, perhaps using different, but overlapping, observer lists.

  3. Observers are notified even though they have been removed from the list of observers.

Let's start by analyzing the modification-while-notifications-are-in-progress problem, which in some ways is the hardest to solve. AWT listeners can be added or removed at any time, even when notifications are in progress. In fact, a listener can even remove itself from a list of listeners as it services the notification message passed to it. The following code is perfectly legal:

Button some_button;
//...
some_button.addActionListener
(   new ActionListener()
    {   public void actionPerformed( ActionEvent e )
        {   some_button.removeActionListener( this );
            //...
        }
    }
);

I'll describe how to get control over this potentially chaotic situation by looking at various examples (which comprise this month's entries in the world's-most-complicated-way-to-print-"hello world" contest). Listing 2 shows an implementation of the world's simplest observer/subscriber. The publication can be any arbitrary Object. (I don't like having to cast it all the time, but it's the price you pay for flexibility.) If you're interested in receiving notices about something, you implement this interface, providing your own version of receive(), which is called when the event is fired off. In a more complex example, you might extend Subscriber with another interface that adds event-specific methods.

Listing 2 (/src/com/holub/tools/Subscriber.java): A simple Observer
01  
02  
package com.holub.tools;
/**
 |    

The interface implemented by subscribers to a multicaster. In the multicaster model, a publisher can also be a subscriber, so the Subscriber interface extends EventListener to give you some flexibility. This way the top-level publisher can choose to be a Subscriber (that is, implement Subscriber) or to use a Subscriber (that is, contain a reference to a Multicaster, which is a Subscriber).

@see Multicaster

 */
03  
04  
05  
public interface Subscriber
{
    /**
     |    

The receive message is sent by the publisher (which either "is" or "uses" a multicaster) to all subscribers.

@param publication

An object sent to the subscribers to give them information about what happened.

     */
06  
07  
08  
  void receive( Object publication );
}

Listing 3 shows a publisher implementation that corresponds to the subscriber in Listing 2. It provides methods that let you add a new subscriber, remove a subscriber, and publish news of an event to the subscribers. The publish() method (Listing 3, line 16) just publishes the string ("Hello world") to the subscribers.

Note that the list is copied by converting it to an array on line 20 of Listing 2. Unfortunately, you can't use clone() to copy a generic Collection. Of the two things you can do (make an empty list and explicitly copy the Collection into it or convert the Collection to an array), array conversion seems the most appropriate in the current code: an array is both smaller and easier to assemble than another LinkedList.

I'll come back to this listing in a moment.

Listing 3 (Publisher.java): A simplistic notifier
01  
02  
03  
04  
05  
06  
07  
08  
09  
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  
21  
22  
23  
24  
25  
26  
27  
28  
29  
30  
31  
32  
33  
import java.util.*;
import com.holub.tools.Subscriber;
public class Publisher
{
    Collection subscription_list = new LinkedList();
   synchronized public void subscribe( Subscriber subscriber )
    {   subscription_list.add( subscriber );
    }
   synchronized public void cancel_subscription( Subscriber subscriber )
    {   subscription_list.remove( subscriber );
    }
    //------------------------------------------------------------------
   public void publish( )          // usually called by other publisher
    {                               // methods so would be private
        Object[] copy;  
        synchronized( this )
       {   copy = subscription_list.toArray();
        }
        for( int i = 0; i < copy.length; ++i )
            ((Subscriber) copy[i]).receive("Hello world");
    }
    //------------------------------------------------------------------
   public synchronized void publish_blocking()
    {
        for(Iterator i = subscription_list.iterator(); i.hasNext();)
            ((Subscriber) i.next()).receive("Hello world");
    }
    //...
}

The Hello_world class in Listing 4 shows how the publisher and subscriber classes work together. main() manufactures a Publisher on line 5 and a Subscriber on line 6. The Subscriber implementation is an anonymous inner class whose receive() override prints the String that's passed in from the publisher as the publication argument ("Hello world"). In main(), the subscriber is forced to subscribe to the publication (this is not a democracy), and then the publisher is told to publish the event.

Listing 4 (Hello_world.java): Publishing to the Subscribers
01  
02  
03  
04  
05  
06  
07  
08  
09  
10  
11  
12  
13  
14  
15  
16  
17  
import com.holub.tools.Subscriber;
public class Hello_world
{
   static private Publisher  publisher  = new Publisher();
   static private Subscriber subscriber =
            new Subscriber()
           {   public void receive(Object p)
                {   System.out.println( (String) p );
                }
            };
   static public void main( String[] args )
    {   publisher.subscribe( subscriber );
        publisher.publish();            // Publish "Hello world" events
    }
}

Returning to Listing 3, you'll note that subscribe() and cancel_subscription() are synchronized, but publish() is not. Also note that publish() makes a copy of the subscription_list (on line 20) and notifies the subscribers by traversing the copy. This strategy is the one suggested in the JavaBeans specification for handling notifications in a multithreaded environment, the point being that you don't want to lock up synchronized methods of a publisher class while notifications are in progress. In any event, the new Java 2 Collection classes are (deliberately) not synchronized, so, subscribe() and cancel_subscription() must be synchronized in case one thread is trying to add a subscriber while another is removing one, or if two threads are adding at the same time, and so on. (Yes, I know about the wrappers. I'll talk about them in a moment.)

Next, there is no way to tell how long it will take for a subscriber's receive() method to execute, since it's supplied by whoever implements the subscriber. Consequently, publish() executes for an indeterminate amount of time. If publish() were synchronized, then the entire object would be locked until the notifications completed. Any thread that attempted to add or remove a subscriber, or call any other synchronized method for that matter, would block. So, publish() is not synchronized.

This lack of synchronization means that publish can't use the subscription_list directly, however. Otherwise another thread could come along and add and remove elements while notifications were in progress, perhaps corrupting the Collection used for the subscription_list. The problem is solved simplistically by synchronizing long enough to make a clean copy, then working from the copy.

Even though this notify-from-the-copy strategy is recommended by Sun, it's not ideal. First, what if a subscriber is removed from the list after the copy is made but before the notifications begin? The subscriber will be notified, even though it thinks it has cancelled the subscription. This problem exists in all AWT/Swing listeners, and there is no easy solution -- never assume that you will not be notified simply because you've removed yourself as a listener. To eliminate this problem, I've included a second, synchronized, notification method in the Publisher: publish_blocking() (Listing 3, line 27). Since this version is synchronized, nobody can add or remove listeners while notifications are in process. I don't know if this behavior is better than the nonblocking solution, but it's certainly different and more appropriate in some scenarios. Note that the publish_blocking() doesn't have to copy the subscription_list since it synchronizes access to it -- it can just use an iterator to traverse the list.

The fact that a simple synchronization strategy is used by publish_blocking causes an additional problem in the blocking-notification scenario: the entire Publisher is locked while the subscription_list is being accessed (for the entire period required for notifications). The entire object doesn't have to be locked, however. We're synchronizing only to make access to the synchronization_list thread-safe. What we really need are two locks, one that guards the subscription_list and another that guards any other fields that we might choose to add to the Publisher. This way you couldn't add or remove subscribers while notifications were in progress, but you could call other synchronized methods of the Publisher class without blocking.

Three approaches to introducing a second lock come to mind. First, I could introduce a second lock using the roll-your-own Mutex class discussed in installment three of the current threading series. This solution is implemented in Listing 5.

Listing 5 (Mutex_publisher.java): Introducing a second lock with a Mutex
01  
02  
03  
04  
05  
06  
07  
08  
09  
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  
21  
22  
23  
24  
25  
26  
27  
28  
29  
30  
31  
32  
33  
34  
35  
36  
37  
38  
39  
import java.util.*;
import com.holub.asynch.Mutex;
import com.holub.tools.Subscriber;
public class Mutex_publisher
{
    Collection subscription_list = new LinkedList();   Mutex      guard             = new Mutex();
   public void subscribe( Subscriber subscriber ) throws InterruptedException
    {       guard.acquire(1000);
        subscription_list.add( subscriber );       guard.release();
    }
   synchronized public void cancel_subscription( Subscriber subscriber )
                                            throws InterruptedException
    {       guard.acquire(1000);
        subscription_list.remove( subscriber );       guard.release();
    }
   public void publish( ) throws InterruptedException
    {   Object[] copy;       guard.acquire(1000);
        copy = subscription_list.toArray();       guard.release();
        for( int i = 0; i < copy.length; ++i )
            ((Subscriber) copy[i]).receive("Hello world");
    }
   synchronized public void other_method()
    {   //...
    }
}

A Mutex is really overkill in the current situation, however. A second approach encapsulates my original Publisher (from Listing 3) into a container class. I've done this in Listing 6. Now subscribe and related methods aren't synchronized at all, but chain through to their synchronized cousins in the contained Subscriber(). That is, the lock associated with the contained Publisher object controls access to the subscription_list and the lock associated with the container (Wrapped_publisher) object is used by other synchronized methods. The other_method() method (Listing 6, line 19), which is synchronized, locks the container, not the Publisher.

Listing 6 (Wrapped_publisher.java): Using a container strategy
01  
02  
03  
04  
05  
06  
07  
08  
09  
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  
21  
22  
23  
import com.holub.tools.Subscriber;
class Wrapped_publisher
{
    Publisher subscribers = new Publisher();
   public void subscribe( Subscriber subscriber )
    {   subscribers.subscribe( subscriber );
    }
   synchronized public void cancel_subscription( Subscriber subscriber )
    {   subscribers.cancel_subscription( subscriber );
    }
   public void publish( )
    {   subscribers.publish_blocking();
    }
   synchronized public void other_method()
    {                                   // Uses lock associated with "this,"
    }                                   // not the one associated with
                                        // "subscribers."
}

A third approach is to use the synchronized version of the LinkedList class, and then synchronize on the LinkedList itself. I've done that in Listing 7. Rather than creating a simple LinkedList on line 7, I've used Collections.synchronized() to wrap my list in an adapter, all of whose methods are synchronized. Now, I don't have to synchronize subscribe() or cancel_subscription(), and I don't have to synchronize when making the copy in publish(). However, I do have to explicitly synchronize on line 28 when I iterate across the list. This last approach is, I think, the most workable in practice, since it's the simplest to implement.

Listing 7 (Synchronized_list_publisher.java): Locking on a synchronized LinkedList
01  
02  
03  
04  
05  
06  
07  
08  
09  
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  
21  
22  
23  
24  
25  
26  
27  
28  
29  
30  
31  
32  
33  
34  
import java.util.*;
import com.holub.tools.Subscriber;
public class Synchronized_list_publisher
{   Collection subscription_list
                   = Collections.synchronizedList( new LinkedList() );

   public void subscribe( Subscriber subscriber )
    {   subscription_list.add( subscriber );
    }
   public void cancel_subscription( Subscriber subscriber )
    {   subscription_list.remove( subscriber );
    }
   public void publish( )
    {   
        Object[] copy;       copy = subscription_list.toArray();
        for( int i = 0; i < copy.length; ++i )
            ((Subscriber) copy[i]).receive("Hello world");
    }
   public void publish_blocking( )
    {      synchronized( subscription_list )
        {
            for(Iterator i = subscription_list.iterator(); i.hasNext();)
                ((Subscriber) i.next()).receive("Hello world");       }
    }
}

Now let's address the problem of the amount of time required to do the notifications, and the fact that the thread that calls either publish() or publish_blocking() is effectively blocked until all the subscriber's receive() methods have been called. (Don't confuse locking with blocking. A thread can be blocked by attempting to enter a synchronized method that's in use by another thread. Acquiring a lock doesn't cause you to block unless there's contention. A thread is also effectively blocked -- in the sense that it can't do anything else -- while it's executing a lengthy method, whether or not that method is locked.)

The time problem is easily solved by wrapping threads around the publication code in Listing 7. I've done this in Listing 8. Now, the publish() method (Listing 8, line 16) creates a new Thread that notifies the subscribers. The complementary publish_sequentially() method (Listing 8, line 32) keeps the list locked while notifications are in progress, so if N threads publish simultaneously, you'll effectively go through the list N times, one after the other, instead of having N notification threads running in parallel.

Other, more complex solutions are also possible. It really isn't necessary for publish_sequentially() to create multiple threads when it's called while notifications are in progress. The publish_sequentially() method could just ignore any requests that come in while notifications are in progress, for example, or it could remember the fact that the request was made and run through the list a second time when it finished the first traversal.

Listing 8 (Background_publisher.java): Publishing from a thread
01  
02  
03  
04  
05  
06  
07  
08  
09  
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  
21  
22  
23  
24  
25  
26  
27  
28  
29  
30  
31  
32  
33  
34  
35  
36  
37  
38  
39  
40  
41  
42  
43  
44  
import java.util.*;
import com.holub.tools.Subscriber;
public class Background_publisher
{
    Collection subscription_list = Collections.synchronizedList( new LinkedList() );
   public void subscribe( Subscriber subscriber )
    {   subscription_list.add( subscriber );
    }
   public void cancel_subscription( Subscriber subscriber )
    {   subscription_list.remove( subscriber );
    }   
   public void publish()
    {       new Thread()
       {   public void run()
            {
                Object[] copy;
                synchronized( subscription_list )
                {   copy = subscription_list.toArray();
                }
                for( int i = 0; i < copy.length; ++i )
                    ((Subscriber)copy[i]).receive("Hello world");           }
        }.start();
    }
   public void publish_sequentially()
    {       new Thread()
        {   public void run()
            {
                synchronized( subscription_list )
                {   for(Iterator i = subscription_list.iterator(); i.hasNext();)
                        ((Subscriber) i.next()).receive("Hello world");
                }           }
        }.start();
    }
}

Mysteries of the AWTEventMulticaster

Though the various blocking-publication functions I've just presented do solve the notification-after-removal problem, they aren't a general solution to the notification problem since any threads that modify the subscriber list are effectively blocked until no notifications are in progress (which could be a long time). The notify-from-the-copy strategy is perfectly viable when this blocking is unacceptable. The only down side is the amount of time that it takes not only to lock the subscription_list but also to actually make the copy. Fortunately, Sun's John Rose and Amy Fowler came up with an elegant solution to the problem: the AWTEventMulticaster. The AWTEventMulticaster class implements all of the AWT/Swing listener interfaces, so you can literally pass an object of this class any message that can be sent to any listener.

Figure 1 shows how an AWTEventMulticaster is used to construct a list of listeners. A static factory_method (add()) is used to create individual multicaster instances -- you can't create them with new. As you can see in Figure 1, this factory method is passed the current head-of-list reference and the listener to add. If either of these is null, it just returns the other, effectively creating a one-element list. If neither argument is null, then an AWTEventMulticaster object, essentially a binary-tree node, is created and initialized so that one child points to the existing list and the other points to the newly added listener. It continues in this way, building a binary tree whose interior nodes are all AWTEventMulticasters and whose leaf notes are standard AWT or Swing listener objects.

Figure 1. Building a subscription list with a multicaster

Notification is done with a simple, recursive tree-traversal. (Which can be a problem. You'll note in Figure 1 that the tree will most likely degrade into a linked list, and recursive traversal of a linked list can use up a lot of runtime stack.) As I mentioned earlier, the AWTEventMulticaster implements all of the standard listener interfaces (actionPerformed(), mouseClicked(), and so on). The multicaster overrides do nothing but pass the message to their children, using instanceof in the case of the leaf nodes to ensure that the message can be received by the actual object.

Immutable objects and blank finals

The most important feature of the AWTEventMulticaster is not actually evident in the diagram. The multicaster is an immutable object: all of its fields are declared static final, but are initialized in the constructor rather than the declaration; like this:

  public class AWTEventMulticaster implements ComponentListener, ...
   {   protected final EventListener a, b;
       protected AWTEventMulticaster(EventListener a, EventListener b)
        {   this.a = a; this.b = b;
        }
        //...
    }

Since everything's final, the object's state can't change once it's created. A Java String is immutable, for example. Immutable objects have a significant advantage over normal objects in multithreaded environments: you never have to synchronize access to them, because they can't change state. Consequently, replacing the Collection in the earlier listings with a multicaster eliminates the synchronization overhead entirely.

A final field that is initialized in the constructor rather than the declaration is called a blank final. You should use them whenever you can, which is unfortunately not as often as you'd like. An oft-reported bug that's been around since the early beta versions of the JDK 1.1 compiler is still there in the Java 2 compiler: The compiler sometimes incorrectly spits out the hard-error "Blank final may not have been initialized," even when the field in question has been initialized. This bug tends to manifest only when you use a lot of inner classes. Because I don't want to corrupt the global or package-level name space with class names that the user could care less about, I usually prefer removing the final to moving the inner class out to the global level.

Using the multicaster

The multicaster exhibits a lot of very desirable behavior. For example, you can safely add listeners to the list while traversals are in progress because the tree is constructed from the bottom up. The new listeners will not be notified by any of the in-progress notification threads, but adding a listener will not damage the list either.

Listener removal elicits the following question: How do you remove nodes from a tree whose nodes can't be modified? The flip answer is that you build another tree. Figure 2 shows the easy scenario. To effectively delete node C, you create a new root node and make its child pointers reference node D and node C's right subtree. If you overwrite subscribers with new_list -- the normal case -- there will be no more references to the gray-colored nodes, and they will eventually be garbage collected.

Figure 2. Deleting a node from a multicaster: the easy case

Figure 3 shows the more difficult process of deleting a node that's further down in the tree (C again). Here, you have to build a second tree that looks exactly like that part of the original tree that's above the deleted node. As before, the gray colored nodes are subject to garbage collection once subscribers is overwritten (and any traversals positioned in that part of the tree complete).

Figure 3. Deleting a node from a multicaster: the hard case

The first time I looked at the AWTEventMulticaster source, I thought the code was weird and unnecessarily complex. On reflection, though, I realized that there's a lot of neat stuff here. The more I looked at it, the more I liked it. Multiple threads can happily add and remove nodes without conflicting either with each other or with threads that are traversing the tree, and multiple threads can traverse the tree simultaneously. Moreover, absolutely no synchronization is required to do any of this. The data structure that's used is no larger than the doubly linked list that I used earlier, and the overhead of copying part of the tree when you do deletions is a lot less than the overhead of copying the entire subscriber list every time you publish an event.

So, not being one to look a gift algorithm in the mouth, I wrote up my own general-purpose version of the multicaster, based on the AWTEventMulticaster. It's in Listing 9. It's remarkable how little code there is here. (Ahh, the miracle of recursion!) All the complexity, in fact, is in the removal. (I'll leave it as an exercise to the reader to puzzle through how it works -- opacity is the down side of most recursive algorithms.) This implementation is very general purpose in that it can publish any Object to any class that implements the generic Subscriber interface (from Listing 2).

Listing 9 (/src/com/holub/tools/Multicaster.java): A roll-your-own multicaster
001  
002  
003  
004  
005  
package com.holub.tools;
import java.util.EventObject;
import com.holub.tools.Subscriber;
/** 
 |    

The multicaster (modeled after the AWTEventMulticaster) provides an efficient way to manage relatively-short lists of subscribers. Each Multicaster object can reference two Subscriber objects, one or both of which can be another multicaster. The top-level multicaster is passed a publish message, which it broadcasts (recursively) to both of the subscribers that it references.

The multicaster is an immutable object, so you can't modify it. The add() method, for example, is passed two multicasters and returns a third one that effectively references all the subscribers referenced by the original two. Any notifications that are in progress when the add() is executed will not be affected by the operation, however. It's perfectly acceptable for notifications to be performed on one thread while a second thread is adding or removing members from the Multicaster. The order in which Subscribers are notified is undefined. (It is not necessarily the order of insertion.)

@see java.awt.AWTEventMulticaster

@see Subscriber

 */
006  
007  
008  
009  
010  
011  
012  
013  
014  
015  
public class Multicaster implements Subscriber
{
   protected final Subscriber a, b;
   protected Multicaster(Subscriber a, Subscriber b)
    {   this.a = a;
        this.b = b;
    }
    /******************************************************************
     |    Ask the subscribers of this multicaster to receive the publication. This is the publish operation, as seen from the perspective of a subscriber. Remember, a multicaster is a list of subscribers. Note that the order of traversal should generally be considered undefined. However, if you really need to notify listeners in a known order, and you consistently add nodes as follows:
subscription_list = Multicaster.add( subscription_list, new_node );

(Where subscription_list is the head-of-list reference and new_node is the node you're adding), subscribers will be notified in the order they were added. Removing nodes does not affect the order of notification. If you transpose the two arguments in the foregoing code:
subscription_list = Multicaster.add( new_node, subscription_list );

subscribers will be notified in reverse order.
     */
016  
017  
018  
019  
020  
021  
  public void receive( Object publication )
    {   a.receive( publication );
        b.receive( publication );
    }
    /******************************************************************
     |    

Add a new subscriber to the list. The way that this call is used can impact the order in which subscribers are notified. (See receive().)

@param a Typically the head-of-list pointer.

@param b

Typically the Subscriber you're adding to the list.

     */
022  
023  
024  
025  
026  
  public static Subscriber add(Subscriber a, Subscriber b)
    {   return  (a == null)  ? b :
                (b == null)  ? a : new Multicaster(a, b);
    }
    /******************************************************************
     |    Remove the indicated subscriber from the list
     */
027  
028  
029  
030  
031  
032  
033  
034  
035  
036  
037  
038  
039  
040  
041  
042  
043  
044  
045  
046  
047  
048  
049  
050  
051  
052  
053  
054  
055  
056  
057  
058  
059  
060  
061  
062  
063  
064  
065  
066  
067  
068  
069  
070  
071  
072  
073  
074  
075  
076  
077  
078  
079  
080  
081  
082  
083  
084  
085  
086  
087  
088  
089  
090  
091  
092  
093  
094  
095  
096  
097  
098  
099  
100  
101  
102  
103  
104  
105  
106  
107  
108  
109  
110  
111  
public static Subscriber remove(Subscriber list, Subscriber remove_me)
    {
        if( list == remove_me || list == null  )
            return null;
        else if( !(list instanceof Multicaster) )
            return list;
        else
            return ((Multicaster)list).remove( remove_me );
    }
    private Subscriber remove(Subscriber remove_me)
    {
        if (remove_me == a)  return b;
        if (remove_me == b)  return a;
        Subscriber a2 = remove( a, remove_me );
        Subscriber b2 = remove( b, remove_me );
        return (a2 == a && b2 == b ) // it's not here
                ? this
                : add(a2, b2)
                ;
    }
    //================================================================
   public static class Test
   {   private static class Leaf implements Subscriber
        {   String s;
           public Leaf(String s){ this.s = s; }
           public void receive( Object publication )   
            {   System.out.print(s);
            }
        }
       public static void main( String[] args )
        {   Leaf a = new Leaf("A");
            Leaf b = new Leaf("B");
            Leaf c = new Leaf("C");
            Leaf d = new Leaf("D");
            Leaf e = new Leaf("E");
            Subscriber subscription_list = null;
            subscription_list = Multicaster.add( subscription_list, a );
            subscription_list = Multicaster.add( subscription_list, b );
            subscription_list = Multicaster.add( subscription_list, c );
            subscription_list = Multicaster.add( subscription_list, d );
            subscription_list = Multicaster.add( subscription_list, e );
            System.out.print("List is: ");
            subscription_list.receive( null );
            System.out.print("\nRemoving c: ");
            subscription_list = Multicaster.remove( subscription_list, c );
            subscription_list.receive( null );
            System.out.print("\nRemoving a: ");
            subscription_list = Multicaster.remove( subscription_list, a );
            subscription_list.receive( null );
            System.out.print("\nRemoving d: ");
            subscription_list = Multicaster.remove( subscription_list, d );
            subscription_list.receive( null );
            System.out.print("\nRemoving b: ");
            subscription_list = Multicaster.remove( subscription_list, b );
            subscription_list.receive( null );
            System.out.print("\nRemoving e: ");
            subscription_list = Multicaster.remove( subscription_list, e );
            if( subscription_list != null )
                System.out.println("Couldn't remove last node");
            subscription_list = Multicaster.add( a, subscription_list );
            subscription_list = Multicaster.add( b, subscription_list );
            subscription_list = Multicaster.add( c, subscription_list );
            subscription_list = Multicaster.add( d, subscription_list );
            subscription_list = Multicaster.add( e, subscription_list );
            System.out.print("\nShould be: EDCBA: " );
            subscription_list.receive( null );
            System.out.println();
        }
    }
}

The final task, is to modify our publisher to use the multicaster. I've done that in Listing 10 (for this month's final entry in the worlds-most-complicated-way-to-print-"hello world" contest). This version is built on the one-thread-per-publication model discussed earlier.

Listing 10 (Multicast_publisher.java): Using the multicaster in a publisher
01  
02  
03  
04  
05  
06  
07  
08  
09  
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  
21  
22  
23  
24  
25  
26  
27  
28  
29  
30  
31  
32  
33  
34  
35  
36  
37  
38  
39  
40  
41  
42  
43  
44  
45  
46  
47  
48  
49  
50  
51  
52  
53  
54  
55  
56  
57  
58  
import java.util.*;
import com.holub.tools.Subscriber;
import com.holub.tools.Multicaster;
public class Multicast_publisher
{
    Subscriber subscription_list; // = null
   public void subscribe( Subscriber subscriber )
    {   subscription_list =
                        Multicaster.add( subscription_list, subscriber);
    }
   public void cancel_subscription( Subscriber subscriber )
    {   subscription_list =
                    Multicaster.remove( subscription_list, subscriber );
    }
   public void publish()
    {   
        new Thread()
       {   public void run()
            {   if( subscription_list != null ) 
                    subscription_list.receive( "Hello world" );
            }
        }.start();
    }
   public void publish_sequentially()
    {
        new Thread()
        {   public void run()
            {   synchronized( subscription_list )
                {   if( subscription_list != null ) 
                        subscription_list.receive( "Hello world" );
                }
            }
        }.start();
    }
    //==================================================================
   public static class Test
    {   
       static private Multicast_publisher publisher  =
                                                new Multicast_publisher();
       static private Subscriber subscriber = 
                                new Subscriber()
                               {   public void receive(Object p)
                                    {   System.out.println( (String) p );
                                    }
                                };
       static public void main( String[] args )
        {   publisher.subscribe( subscriber );
            publisher.publish();            // Publish "Hello world" events
        }
    }
}

Wrapping up

So that's it for this month. The Observer pattern is enormously useful when you need to write code that can be used in any context. Since the publisher knows nothing about the subscribers (except that they implement the Subscriber interface), the publisher is truly reusable in the technical OO sense. AWT/Swing obviously leverages Observer in its event model, but I use this design pattern heavily in non-UI situations as well.

Next month I'll present the last of the class-related solutions to multithreading problems: a reader-writer lock that lets you control access to a shared global resource. I'll also discuss Critical Sections and Singletons. Subsequent columns will look at a few architectural solutions to threading problems such as Synchronous Dispatchers and Active Objects.

Join the discussion
Be the first to comment on this article. Our Commenting Policies
See more