Java concurrency with thread gates

Manage concurrent thread execution in complex business applications

The thread gate pattern is an effective tool for controlling thread concurrency, but many developers are unfamiliar with it. Just as a traffic light can regulate the behavior of automobiles at an intersection, thread gates can block or allow thread progress based on given factors. Obi Ezechukwu introduces the concept of thread gates, then shows you how to implement them in a multithreaded prime-number generator. Level: Intermediate

Multithreading and code concurrency were once the preserve of elite programmers, but the combination of multicore processing power, complex requirements, and the readily available javax.util.concurrent package has changed that. Today, enterprise application developers are expected to be knowledgeable about the various synchronization mechanisms and constructs available in the Java language. The level of expectation is even higher where the problems being solved require non-textbook and highly innovative concurrency constructs. It isn't enough in those situations to understand the Java language's concurrency mechanisms and those included in the standard SDK; you also must be able to use these tools as building blocks for custom-made concurrency controls.

In this article, we'll explore a concurrency pattern that isn't widely discussed, generally called thread gates. Like its real-world counterpart, a gate instance opens or closes, thus allowing or preventing thread progress. It does so based on the truth value of some predicate.

I'll start with an overview of thread gates based on the traffic-flow model, then explain how to set up your development environment for the example application, a multithreaded prime-number generator. The remainder of the article will be hands-on, as you learn about the thread gate pattern by implementing it.

Thread gates: An overview

A good metaphor for thread gates is the traffic light system that operates on many public roads. When the signal is red, cars have to wait at the stop line until the signal changes. On the other hand, cars are free to run past the signal when it is green. There is no limit to the number of times that the signal can go from green to red, given a statistically significant observation timeframe. Traffic lights are designed to enable crossflow of traffic, and are redundant where crossflow does not exist. In a programmer's terms, you can imagine the light as a control that lets bidirectional traffic share a small section of road that might otherwise cause the paths of traffic to intersect in an unsafe manner.

In the same way, thread gates are generally best used for scenarios where one set of threads has to be prevented from proceeding beyond a determined point, whilst another set of threads is active. To put it another way, the competing sets of threads are dependent on the value of some truth predicate, where each distinct value of the predicate strictly triggers one (and only one) set of threads, forcing the others to remain suspended. Note that the emphasis here is on sets or groups of threads rather than individual threads. In essence, the focus is on scenarios where multiple threads share access to an underlying resource, and the threads can be partitioned into sets depending on the actions that they perform in relation to that resource.

A good example of this is in producer-consumer flows, where some threads are responsible for producing data that is consumed by another group of threads; the shared resource is most likely the handoff mechanism (data bus) used by the disparate thread groups; and the truth predicate that determines thread progress is the quantity of data in the handoff mechanism. An in-memory request queue can sometimes fit such a pattern if the data is enqueued as part of a process that is analogous to a production process, and the data is dequeued or consumed by a separate process.

The producer-consumer analogy is a good one for formulating a problem that explores the concept of thread gates. An example problem should be easily recognizable to the vast majority of programmers. In this case, the problem should also be one that lends itself easily to task splitting and parallelization, as the emphasis is on creating a multithreaded solution to it. This article's sample application will fulfill all of these criteria and more.

The multithreaded prime-number generator

This article's example application will tackle the age-old problem of finding all the probable prime numbers within a given bound (between one and a million, say). Specifically, the task is to implement a software component that exposes a method for retrieving all the probable prime numbers in a given range (with an upper and lower bound given). Assume that the component's client mandates that the method return a thread-safe handle for accessing the results as soon as it is invoked, and then performs the actual task of finding the prime numbers in the background or asynchronously. Assume also that the handle must provide a blocking method that allows primes to be accessed and returned as soon as possible, such that the client does not have to wait for the search to complete before accessing results. To simplify the task, no restrictions are placed on the order in which the handle returns the results.

Before reading further you should download the code archive that accompanies this article and create a development project within your favorite IDE. The archive contains ten source files, which are organized as follows:

-src\
  -main\
    -java\
       -com\
          -javaworld\
             -primefinder\
                  PrimeNumberSearcher.java
                  PrimeNumberSource.java
                  PrimeUtil.java
                  ThreadGate.java
                  PrimeSearchThread.java
                  PartitionInfo.java
                  PrimeNumberReader.java
                  GatedPrimeNumberSearcher.java
                  ConcurrentPrimeNumberSource.java                                    
-src\
 -test\
    -java\
       -com\
          -javaworld\
             -primefinder\
                  PrimeFinderTest.java

PrimeFinderTest.java is a JUnit 4 test case; you will need to use JUnit in order to test out the sample code. In keeping with Maven 2 naming conventions, the application's source files are in src/main/java, and the JUnit 4 test case that demonstrates the solution is in src/test/java. The key contents of these folders will be covered in the listings shown in the following sections.

Finding those primes

The interface com.javaworld.primefinder.PrimeNumberSearcher defines the contract to which the component must conform. The signature of this contract is given by the method findPrimeNumbers(), which is shown in Listing 1.

Listing 1. The search contract

PrimeNumberSource  findPrimeNumbers(BigInteger lowerBound,
                                    BigInteger upperBound);

Observe that com.javaworld.primefinder.PrimeNumberSource is the interface to which the result handle must conform. It defines a single method, BigInteger nextPrime(), which, when invoked, should return the next element in the search results buffer. If the results buffer has been exhausted, the implementation must return null to indicate that no further results are available. As already mentioned, the implementation of this method must be thread-safe.

Note that the emphasis of this task is on finding the probable primes within the search range; this makes it possible to use the nextProbablePrime() method from the java.math.BigInteger class. You can wrap the call to this method in a static method of the utility class com.javaworld.primefinder.PrimeUtil, an excerpt of which is shown in Listing 2.

Listing 2. A utility method for finding the first prime number within a given range

public static BigInteger findFirstPrime(long lowerBound, long upperBound) 
{
    BigInteger result;
    BigInteger startPos = BigInteger.valueOf(lowerBound);    
    BigInteger nextProbablePrime;
   
    if (startPos.isProbablePrime(.....)) // some reasonable accuracy
        nextProbablePrime = startPos;
    else nextProbablePrime = startPos.nextProbablePrime();
       
    if (nextProbablePrime.longValue() >= upperBound)
        result = null;
    else result = nextProbablePrime;
       
        return result;
   }

Having defined the public interfaces and utilities on which the solution is to be built, you can now move onto the task of detailing it. You'll begin by defining the thread gate implementation, which provides the means of controlling access to the results buffer.

A thread gate implementation

The objective of the thread gate is to ensure that multiple readers can access the prime numbers buffer without preventing the search threads from adding results to it.

A reader thread should be able to pick up any available results from the handle without waiting; but if there are no results, it must queue until at least one is made available. The sample application employs a fair scheme so that results are handed out more or less in the order in which the threads arrive. And yes, I mean it when I say "more or less." The emphasis is not on dispensing results in strict queuing order; rather, threads that request primes around a given interval will all receive their data roughly at the same time -- before the arrival of the next batch of threads.

A real-world analogy that illustrates this scheme is that of the modern coffee shop. The inoffensive way in which coffee orders are "batched" during busy periods is interesting. The attendants at the till take the customer orders and shout them down the line to the baristas manning the espresso machines, who in turn prepare several cups of coffee from the same espresso batch and dispense them to the waiting customers at roughly the same time. Then, of course, they return to the task of making the next batch of coffee for the next batch of customers, and the cycle repeats ad infinitum. By this strange internal scheme, people who arrive at the queue within a given interval tend to get their coffees around the same time -- although not exactly in the order in which the requests were made. Presumably the espresso machines can only prepare so many cups of coffee at any point in time, and it would simply be inefficient or slow to prepare the cups individually.

Returning to the more mundane task of generating and buffering prime numbers, the simple application controls access to the results buffer by a gate that functions in a manner similar to a binary latch: when the gate is open, threads can pass, and when it is closed, threads must queue until it is re-opened. It is more clever than a latch in the sense that it batches waiting threads into generations, which ensures that threads in the same generation all pass through the gate at the same time; threads in an older generation always take priority over those in younger generations. The gate implementation also has an innovative feature that ensures that results can always be written to the results buffer regardless of how many waiting threads there are. If a generic thread gate is like a traffic light, then this would be more akin to a railway crossing in the sense that the train always gets priority -- and for good reason, too.

The gate implementation is contained in the compilation unit com.javaworld.primefinder.ThreadGate, and is detailed in Listing 3.

Listing 3. The gate implementation

package com.javaworld.primefinder;

class ThreadGate
{
    private int waitPosition;
    private int generationSize;
    private int openAttempts;
    private int activeSearchThreads;
    private boolean open;
    
    public ThreadGate(int searchThreadsCount)
        
    public synchronized void await() throws InterruptedException

    public synchronized void close() throws InterruptedException
    
    public synchronized void open()
    
}

To deconstruct this class, begin by taking a look at the class-level fields, of which there are five:

  • waitPosition: This field tracks the generation index of waiting threads. It helps ensure fairness, because the threads that are let through are the last set of threads to arrive -- counting from the last time the gate was opened. Its value is incremented each time the open() method is invoked, thus forcing subsequent thread arrivals into a new, younger generation.
  • generationSize: Tracks the size of the current generation of threads waiting for the gate to open. Its value should be reset each time the open() or close() method is invoked.
  • openAttempts: An integer counter indicating the number of attempts that have been made to open the gate since the last time it was closed. In practice, it actually tracks the number of results that have been added to the results buffer since the last time the gate was closed. This should be apparent on examining the result handle implementation com.javaworld.primefinder.ConcurrentPrimeNumberSource, which shows that the open() method is invoked each time a new prime number is added to the results buffer. The value of this field is not only altered by calling open() but is also reset to zero with each call to close().
  • activeSearchThreads: Reader threads should not wait needlessly if there is no prospect of any further results being made available. A key indicator of the prospect of further results is whether or not there are still any active search threads. As such, it is important that the gate keep track of the number of active search threads. If this number is equal to zero, then the await() method will not block, thus allowing reader threads free passage through the gate.
  • open: A binary value that indicates whether the gate is open or closed.

Although the gate implementation's methods are self explanatory, take some time to examine the await() and open() methods. The code for await() is shown in Listing 4.

Listing 4. A method to force generational wait

public synchronized void await() throws InterruptedException
{
    int generationIndex = waitPosition;
    generationSize++;
    while (!open && generationIndex ==waitPosition
        && activeSearchThreads!=0)
        wait();
}

You'll note that this method captures the value of the current wait position -- the generation index, in other words. Whilst the gate is closed and the generation index has not been advanced, it will remain in a wait state. This means that any threads arriving since the last time the gate was opened will have to wait until the previous generation of threads has been served and the gate undergoes another close() to open() cycle, or the producer (search) threads terminate.

(As an exercise, consider how this method can be altered so that the gate does not necessarily have to be closed in order to batch threads into generations. In essence, how would altering the method help implement a scheme where the gate is simply used to batch processing threads? What would an application look like when efficiency demands that requests are serviced in bulk, rather than individually?)

The open() method is illustrated in Listing 5. It should be clear that the generation counter is advanced by 1 each time the gate re-opens.

Listing 5. Implementation of the ThreadGate.open() method

public synchronized void open()
{
    openAttempts++;
        
    if (generationSize<=openAttempts)
    {
        ++waitPosition; //reset the counter        
        open=true;
        generationSize = 0;
        notifyAll();            
    }
}

You should also note that the method uses notifyAll() instead of notify(), thus ensuring that all the threads in a given generation are given the opportunity to proceed.

In the next section will outline the implementation of the PrimeNumberSource interface, which serves as the handle for accessing search results.

PrimeNumberSource implementation

The result handle is encapsulated in the compilation unit com.javaworld.primefinder.ConcurrentPrimeNumberSource; its contents are outlined in Listing 6.

Listing 6. PrimeNumberSource implementation

public class ConcurrentPrimeNumberSource 
        implements PrimeNumberSource
{
    private ThreadGate barrier;
    
    private List<BigInteger> resultsBuffer;
    private int resultsBufferAccessIndex;   
    
    protected ConcurrentPrimeNumberSource(
                int searchThreadsCount)
    
    
    
    public synchronized BigInteger nextPrime() 
    
    //delegates to barrier
    protected void searchThreadCompleted()
    
    //updates results
    protected void addResultToBuffer(
                BigInteger result)
}

As you can see in the code, instances of this class hold a reference to a gate, which is used to control concurrent access to the underlying results buffer -- as encapsulated by the field resultsBuffer.

An integer counter resultsBufferAccessIndex is used as a pointer to the last read buffer location. Note that the field is not atomic, and as such the nextPrime() method is defined as synchronized so as to prevent destructive updates. In Listing 7, you can also see that the gate's close() method is only invoked if the result buffer has been exhausted and the gate is not already closed.

Listing 7. The implementation of the nextPrime() method

public synchronized BigInteger nextPrime() 
{
    BigInteger result;
    int resultIndex = resultsBufferAccessIndex++;
    try
    {           
        if (!barrier.isClosed() && 
                (resultIndex == resultsBuffer.size()))
            barrier.close();

            
        barrier.await(); 
            
        result = resultsBuffer.get(resultIndex);
    }   
    catch (InterruptedException exce)
    {
        ..........................................  
    }       
    catch (IndexOutOfBoundsException exce)
    {
            //search exhausted
            result = null;
    }   

}

Next, you will examine the PrimeNumberSearcher implementation that returns instances of this handle.

The search executor

The task of partitioning the search space and assigning contiguous blocks to individual threads is performed by the class com.javaworld.primefinder.GatedPrimeNumberSearcher; this is an implementation of the contract com.javaworld.primefinder.PrimeNumberSearcher.

On instantiation, an object of this type determines the number of system processors, and uses this to partition the search space into buckets or blocks of near uniform size, which in turn are assigned to individual tasks. The bucketing information is encapsulated in the com.javaworld.primefinder.PartitionInfo class, which is shown in Listing 8.

Listing 8. Encapsulation of search-space partition information

class PartitionInfo
{
    PartitionInfo(int numberOfBuckets, long bucketSize)
    
    public int getNumberOfBuckets().....
    public long getBucketSize() ........
}

Instances of this class are created by the method in Listing 9, which is defined in the task/search-thread implementation com.javaworld.primefinder.GatedPrimeNumberSearcher.

Listing 9. Method that partitions the search space into blocks

private PartitionInfo getPartitionInfo(long lowerBound,
                                       long upperBound)
{
    PartitionInfo result;
        
    int proposedBucketCount = numberOfProcessors;
    long bucketSize = (upperBound-lowerBound) / 
                       proposedBucketCount;
        
    result = new PartitionInfo(proposedBucketCount,
                               bucketSize);
        
    return result;
}

As you can see in Listing 10, instances of PartitionInfo are used to create one or more search threads -- or, more accurately, one or more runnable tasks/targets, as implemented by the class com.javaworld.primefinder.PrimeSearchThread. In addition to information about its search range, each thread also holds a reference to the result handle, thus allowing it to write search results to the handle's internal result buffer as and when they become available.

Listing 10. Implementation of contract findPrimeNumbers()

public PrimeNumberSource findPrimeNumbers(BigInteger aLowerBound, 
                                          BigInteger aUpperBound) 
{
    if (aUpperBound.longValue()<=aLowerBound.longValue())
        throw new IllegalArgumentException("Upperbound must be
                                            greater than lowerbound");
    long lowerBound = aLowerBound.longValue(), 
         upperBound = aUpperBound.longValue();
        
    final ConcurrentPrimeNumberSource result;
        
    PartitionInfo partitionInfo =getPartitionInfo(lowerBound,
                                                  upperBound);

    result = new ConcurrentPrimeNumberSource(
                    partitionInfo.getNumberOfBuckets());
        
    Thread searchThread;
    long shiftingLowerBound = lowerBound,
        shiftingUpperBound = partitionInfo.getBucketSize();     
    while (shiftingUpperBound <= upperBound)
    {
        searchThread =
                new Thread(new PrimeSearchThread(
                    BigInteger.valueOf(shiftingLowerBound),
                    BigInteger.valueOf(shiftingUpperBound),
                    result));
            
        searchThread.start();
            
        shiftingLowerBound = shiftingUpperBound;
        shiftingUpperBound+= partitionInfo.getBucketSize();
    }

    return result;
}

Before you can examine the test unit, there remains one application class to mention: the consumer/reader thread implementation  com.javaworld.primefinder.PrimeNumberReader, whose core method is shown in Listing 11.

Listing 11. Prime number consumer

@Override
public void run() 
{
    BigInteger nextVal=null;
        
    do
    {
        nextVal = source.nextPrime();
        if (nextVal!=null)
            System.out.println(nextVal + ", ");
        else break;
    }while (true);
}

In the JUnit 4 test case, several instances of this class are used to simulate the effect of multiple client threads queuing at the thread gate.

A test unit

A JUnit 4 test named com.javaworld.primefinder.PrimeFinderTest is provided in the test folder of the source archive that accompanies this article. The test creates an instance of a GatedPimeNumberSearcher to search for the prime numbers between 1 and a suitable upper bound (e.g., 100, 1 million, 1 thousand trillion); it also creates several reader threads to print out the resulting numbers to system out. You should adjust the upper bound of the search space to suit the capability of your computer, then run the test to watch your thread gate in action!

In conclusion

This article has introduced the concept of thread gates and demonstrated how to implement and use a basic gate class. It has provided source examples and companion source code to enable the reader to explore the idea further. For more on the threading issues raised here, you should read the excellent book Java Concurrency in Practice, written by Brian Goetz and others, along with the other material presented in the Resources section below.

Obi Ezechukwu is a quantitative Java and Java EE developer working in the UK financial industry. He specializes in designing and implementing real-time and computationally intensive quantitative financial models. He is currently vice president of bond analytics at Markit and co-founder of the obix-configuration project. He holds a Ph.D. in the field of computational finance from Imperial College London.

Learn more about this topic

More from JavaWorld

Join the discussion
Be the first to comment on this article. Our Commenting Policies