Programming Java threads in the real world, Part 6

The Observer pattern and mysteries of the AWTEventMulticaster

1 2 3 4 Page 2
Page 2 of 4

The Hello_world class in Listing 4 shows how the publisher and subscriber classes work together. main() manufactures a Publisher on line 5 and a Subscriber on line 6. The Subscriber implementation is an anonymous inner class whose receive() override prints the String that's passed in from the publisher as the publication argument ("Hello world"). In main(), the subscriber is forced to subscribe to the publication (this is not a democracy), and then the publisher is told to publish the event.

Listing 4 (Hello_world.java): Publishing to the Subscribers
01  
02  
03  
04  
05  
06  
07  
08  
09  
10  
11  
12  
13  
14  
15  
16  
17  
import com.holub.tools.Subscriber;
public class Hello_world
{
   static private Publisher  publisher  = new Publisher();
   static private Subscriber subscriber =
            new Subscriber()
           {   public void receive(Object p)
                {   System.out.println( (String) p );
                }
            };
   static public void main( String[] args )
    {   publisher.subscribe( subscriber );
        publisher.publish();            // Publish "Hello world" events
    }
}

Returning to Listing 3, you'll note that subscribe() and cancel_subscription() are synchronized, but publish() is not. Also note that publish() makes a copy of the subscription_list (on line 20) and notifies the subscribers by traversing the copy. This strategy is the one suggested in the JavaBeans specification for handling notifications in a multithreaded environment, the point being that you don't want to lock up synchronized methods of a publisher class while notifications are in progress. In any event, the new Java 2 Collection classes are (deliberately) not synchronized, so, subscribe() and cancel_subscription() must be synchronized in case one thread is trying to add a subscriber while another is removing one, or if two threads are adding at the same time, and so on. (Yes, I know about the wrappers. I'll talk about them in a moment.)

Next, there is no way to tell how long it will take for a subscriber's receive() method to execute, since it's supplied by whoever implements the subscriber. Consequently, publish() executes for an indeterminate amount of time. If publish() were synchronized, then the entire object would be locked until the notifications completed. Any thread that attempted to add or remove a subscriber, or call any other synchronized method for that matter, would block. So, publish() is not synchronized.

This lack of synchronization means that publish can't use the subscription_list directly, however. Otherwise another thread could come along and add and remove elements while notifications were in progress, perhaps corrupting the Collection used for the subscription_list. The problem is solved simplistically by synchronizing long enough to make a clean copy, then working from the copy.

Even though this notify-from-the-copy strategy is recommended by Sun, it's not ideal. First, what if a subscriber is removed from the list after the copy is made but before the notifications begin? The subscriber will be notified, even though it thinks it has cancelled the subscription. This problem exists in all AWT/Swing listeners, and there is no easy solution -- never assume that you will not be notified simply because you've removed yourself as a listener. To eliminate this problem, I've included a second, synchronized, notification method in the Publisher: publish_blocking() (Listing 3, line 27). Since this version is synchronized, nobody can add or remove listeners while notifications are in process. I don't know if this behavior is better than the nonblocking solution, but it's certainly different and more appropriate in some scenarios. Note that the publish_blocking() doesn't have to copy the subscription_list since it synchronizes access to it -- it can just use an iterator to traverse the list.

The fact that a simple synchronization strategy is used by publish_blocking causes an additional problem in the blocking-notification scenario: the entire Publisher is locked while the subscription_list is being accessed (for the entire period required for notifications). The entire object doesn't have to be locked, however. We're synchronizing only to make access to the synchronization_list thread-safe. What we really need are two locks, one that guards the subscription_list and another that guards any other fields that we might choose to add to the Publisher. This way you couldn't add or remove subscribers while notifications were in progress, but you could call other synchronized methods of the Publisher class without blocking.

Three approaches to introducing a second lock come to mind. First, I could introduce a second lock using the roll-your-own Mutex class discussed in installment three of the current threading series. This solution is implemented in Listing 5.

Listing 5 (Mutex_publisher.java): Introducing a second lock with a Mutex
01  
02  
03  
04  
05  
06  
07  
08  
09  
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  
21  
22  
23  
24  
25  
26  
27  
28  
29  
30  
31  
32  
33  
34  
35  
36  
37  
38  
39  
import java.util.*;
import com.holub.asynch.Mutex;
import com.holub.tools.Subscriber;
public class Mutex_publisher
{
    Collection subscription_list = new LinkedList();   Mutex      guard             = new Mutex();
   public void subscribe( Subscriber subscriber ) throws InterruptedException
    {       guard.acquire(1000);
        subscription_list.add( subscriber );       guard.release();
    }
   synchronized public void cancel_subscription( Subscriber subscriber )
                                            throws InterruptedException
    {       guard.acquire(1000);
        subscription_list.remove( subscriber );       guard.release();
    }
   public void publish( ) throws InterruptedException
    {   Object[] copy;       guard.acquire(1000);
        copy = subscription_list.toArray();       guard.release();
        for( int i = 0; i < copy.length; ++i )
            ((Subscriber) copy[i]).receive("Hello world");
    }
   synchronized public void other_method()
    {   //...
    }
}

A Mutex is really overkill in the current situation, however. A second approach encapsulates my original Publisher (from Listing 3) into a container class. I've done this in Listing 6. Now subscribe and related methods aren't synchronized at all, but chain through to their synchronized cousins in the contained Subscriber(). That is, the lock associated with the contained Publisher object controls access to the subscription_list and the lock associated with the container (Wrapped_publisher) object is used by other synchronized methods. The other_method() method (Listing 6, line 19), which is synchronized, locks the container, not the Publisher.

Listing 6 (Wrapped_publisher.java): Using a container strategy
01  
02  
03  
04  
05  
06  
07  
08  
09  
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  
21  
22  
23  
import com.holub.tools.Subscriber;
class Wrapped_publisher
{
    Publisher subscribers = new Publisher();
   public void subscribe( Subscriber subscriber )
    {   subscribers.subscribe( subscriber );
    }
   synchronized public void cancel_subscription( Subscriber subscriber )
    {   subscribers.cancel_subscription( subscriber );
    }
   public void publish( )
    {   subscribers.publish_blocking();
    }
   synchronized public void other_method()
    {                                   // Uses lock associated with "this,"
    }                                   // not the one associated with
                                        // "subscribers."
}

A third approach is to use the synchronized version of the LinkedList class, and then synchronize on the LinkedList itself. I've done that in Listing 7. Rather than creating a simple LinkedList on line 7, I've used Collections.synchronized() to wrap my list in an adapter, all of whose methods are synchronized. Now, I don't have to synchronize subscribe() or cancel_subscription(), and I don't have to synchronize when making the copy in publish(). However, I do have to explicitly synchronize on line 28 when I iterate across the list. This last approach is, I think, the most workable in practice, since it's the simplest to implement.

Listing 7 (Synchronized_list_publisher.java): Locking on a synchronized LinkedList
01  
02  
03  
04  
05  
06  
07  
08  
09  
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  
21  
22  
23  
24  
25  
26  
27  
28  
29  
30  
31  
32  
33  
34  
import java.util.*;
import com.holub.tools.Subscriber;
public class Synchronized_list_publisher
{   Collection subscription_list
                   = Collections.synchronizedList( new LinkedList() );

   public void subscribe( Subscriber subscriber )
    {   subscription_list.add( subscriber );
    }
   public void cancel_subscription( Subscriber subscriber )
    {   subscription_list.remove( subscriber );
    }
   public void publish( )
    {   
        Object[] copy;       copy = subscription_list.toArray();
        for( int i = 0; i < copy.length; ++i )
            ((Subscriber) copy[i]).receive("Hello world");
    }
   public void publish_blocking( )
    {      synchronized( subscription_list )
        {
            for(Iterator i = subscription_list.iterator(); i.hasNext();)
                ((Subscriber) i.next()).receive("Hello world");       }
    }
}

Now let's address the problem of the amount of time required to do the notifications, and the fact that the thread that calls either publish() or publish_blocking() is effectively blocked until all the subscriber's receive() methods have been called. (Don't confuse locking with blocking. A thread can be blocked by attempting to enter a synchronized method that's in use by another thread. Acquiring a lock doesn't cause you to block unless there's contention. A thread is also effectively blocked -- in the sense that it can't do anything else -- while it's executing a lengthy method, whether or not that method is locked.)

The time problem is easily solved by wrapping threads around the publication code in Listing 7. I've done this in Listing 8. Now, the publish() method (Listing 8, line 16) creates a new Thread that notifies the subscribers. The complementary publish_sequentially() method (Listing 8, line 32) keeps the list locked while notifications are in progress, so if N threads publish simultaneously, you'll effectively go through the list N times, one after the other, instead of having N notification threads running in parallel.

Other, more complex solutions are also possible. It really isn't necessary for publish_sequentially() to create multiple threads when it's called while notifications are in progress. The publish_sequentially() method could just ignore any requests that come in while notifications are in progress, for example, or it could remember the fact that the request was made and run through the list a second time when it finished the first traversal.

Listing 8 (Background_publisher.java): Publishing from a thread
01  
02  
03  
04  
05  
06  
07  
08  
09  
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  
21  
22  
23  
24  
25  
26  
27  
28  
29  
30  
31  
32  
33  
34  
35  
36  
37  
38  
39  
40  
41  
42  
43  
44  
import java.util.*;
import com.holub.tools.Subscriber;
public class Background_publisher
{
    Collection subscription_list = Collections.synchronizedList( new LinkedList() );
   public void subscribe( Subscriber subscriber )
    {   subscription_list.add( subscriber );
    }
   public void cancel_subscription( Subscriber subscriber )
    {   subscription_list.remove( subscriber );
    }   
   public void publish()
    {       new Thread()
       {   public void run()
            {
                Object[] copy;
                synchronized( subscription_list )
                {   copy = subscription_list.toArray();
                }
                for( int i = 0; i < copy.length; ++i )
                    ((Subscriber)copy[i]).receive("Hello world");           }
        }.start();
    }
   public void publish_sequentially()
    {       new Thread()
        {   public void run()
            {
                synchronized( subscription_list )
                {   for(Iterator i = subscription_list.iterator(); i.hasNext();)
                        ((Subscriber) i.next()).receive("Hello world");
                }           }
        }.start();
    }
}

Mysteries of the AWTEventMulticaster

Though the various blocking-publication functions I've just presented do solve the notification-after-removal problem, they aren't a general solution to the notification problem since any threads that modify the subscriber list are effectively blocked until no notifications are in progress (which could be a long time). The notify-from-the-copy strategy is perfectly viable when this blocking is unacceptable. The only down side is the amount of time that it takes not only to lock the subscription_list but also to actually make the copy. Fortunately, Sun's John Rose and Amy Fowler came up with an elegant solution to the problem: the AWTEventMulticaster. The AWTEventMulticaster class implements all of the AWT/Swing listener interfaces, so you can literally pass an object of this class any message that can be sent to any listener.

1 2 3 4 Page 2
Page 2 of 4