Java concurrency with thread gates

Manage concurrent thread execution in complex business applications

1 2 Page 2
Page 2 of 2
  • waitPosition: This field tracks the generation index of waiting threads. It helps ensure fairness, because the threads that are let through are the last set of threads to arrive -- counting from the last time the gate was opened. Its value is incremented each time the open() method is invoked, thus forcing subsequent thread arrivals into a new, younger generation.
  • generationSize: Tracks the size of the current generation of threads waiting for the gate to open. Its value should be reset each time the open() or close() method is invoked.
  • openAttempts: An integer counter indicating the number of attempts that have been made to open the gate since the last time it was closed. In practice, it actually tracks the number of results that have been added to the results buffer since the last time the gate was closed. This should be apparent on examining the result handle implementation com.javaworld.primefinder.ConcurrentPrimeNumberSource, which shows that the open() method is invoked each time a new prime number is added to the results buffer. The value of this field is not only altered by calling open() but is also reset to zero with each call to close().
  • activeSearchThreads: Reader threads should not wait needlessly if there is no prospect of any further results being made available. A key indicator of the prospect of further results is whether or not there are still any active search threads. As such, it is important that the gate keep track of the number of active search threads. If this number is equal to zero, then the await() method will not block, thus allowing reader threads free passage through the gate.
  • open: A binary value that indicates whether the gate is open or closed.

Although the gate implementation's methods are self explanatory, take some time to examine the await() and open() methods. The code for await() is shown in Listing 4.

Listing 4. A method to force generational wait

public synchronized void await() throws InterruptedException
{
    int generationIndex = waitPosition;
    generationSize++;
    while (!open && generationIndex ==waitPosition
        && activeSearchThreads!=0)
        wait();
}

You'll note that this method captures the value of the current wait position -- the generation index, in other words. Whilst the gate is closed and the generation index has not been advanced, it will remain in a wait state. This means that any threads arriving since the last time the gate was opened will have to wait until the previous generation of threads has been served and the gate undergoes another close() to open() cycle, or the producer (search) threads terminate.

(As an exercise, consider how this method can be altered so that the gate does not necessarily have to be closed in order to batch threads into generations. In essence, how would altering the method help implement a scheme where the gate is simply used to batch processing threads? What would an application look like when efficiency demands that requests are serviced in bulk, rather than individually?)

The open() method is illustrated in Listing 5. It should be clear that the generation counter is advanced by 1 each time the gate re-opens.

Listing 5. Implementation of the ThreadGate.open() method

public synchronized void open()
{
    openAttempts++;
        
    if (generationSize<=openAttempts)
    {
        ++waitPosition; //reset the counter        
        open=true;
        generationSize = 0;
        notifyAll();            
    }
}

You should also note that the method uses notifyAll() instead of notify(), thus ensuring that all the threads in a given generation are given the opportunity to proceed.

In the next section will outline the implementation of the PrimeNumberSource interface, which serves as the handle for accessing search results.

PrimeNumberSource implementation

The result handle is encapsulated in the compilation unit com.javaworld.primefinder.ConcurrentPrimeNumberSource; its contents are outlined in Listing 6.

Listing 6. PrimeNumberSource implementation

public class ConcurrentPrimeNumberSource 
        implements PrimeNumberSource
{
    private ThreadGate barrier;
    
    private List<BigInteger> resultsBuffer;
    private int resultsBufferAccessIndex;   
    
    protected ConcurrentPrimeNumberSource(
                int searchThreadsCount)
    
    
    
    public synchronized BigInteger nextPrime() 
    
    //delegates to barrier
    protected void searchThreadCompleted()
    
    //updates results
    protected void addResultToBuffer(
                BigInteger result)
}

As you can see in the code, instances of this class hold a reference to a gate, which is used to control concurrent access to the underlying results buffer -- as encapsulated by the field resultsBuffer.

An integer counter resultsBufferAccessIndex is used as a pointer to the last read buffer location. Note that the field is not atomic, and as such the nextPrime() method is defined as synchronized so as to prevent destructive updates. In Listing 7, you can also see that the gate's close() method is only invoked if the result buffer has been exhausted and the gate is not already closed.

Listing 7. The implementation of the nextPrime() method

public synchronized BigInteger nextPrime() 
{
    BigInteger result;
    int resultIndex = resultsBufferAccessIndex++;
    try
    {           
        if (!barrier.isClosed() && 
                (resultIndex == resultsBuffer.size()))
            barrier.close();

            
        barrier.await(); 
            
        result = resultsBuffer.get(resultIndex);
    }   
    catch (InterruptedException exce)
    {
        ..........................................  
    }       
    catch (IndexOutOfBoundsException exce)
    {
            //search exhausted
            result = null;
    }   

}

Next, you will examine the PrimeNumberSearcher implementation that returns instances of this handle.

The search executor

The task of partitioning the search space and assigning contiguous blocks to individual threads is performed by the class com.javaworld.primefinder.GatedPrimeNumberSearcher; this is an implementation of the contract com.javaworld.primefinder.PrimeNumberSearcher.

On instantiation, an object of this type determines the number of system processors, and uses this to partition the search space into buckets or blocks of near uniform size, which in turn are assigned to individual tasks. The bucketing information is encapsulated in the com.javaworld.primefinder.PartitionInfo class, which is shown in Listing 8.

Listing 8. Encapsulation of search-space partition information

class PartitionInfo
{
    PartitionInfo(int numberOfBuckets, long bucketSize)
    
    public int getNumberOfBuckets().....
    public long getBucketSize() ........
}

Instances of this class are created by the method in Listing 9, which is defined in the task/search-thread implementation com.javaworld.primefinder.GatedPrimeNumberSearcher.

Listing 9. Method that partitions the search space into blocks

private PartitionInfo getPartitionInfo(long lowerBound,
                                       long upperBound)
{
    PartitionInfo result;
        
    int proposedBucketCount = numberOfProcessors;
    long bucketSize = (upperBound-lowerBound) / 
                       proposedBucketCount;
        
    result = new PartitionInfo(proposedBucketCount,
                               bucketSize);
        
    return result;
}

As you can see in Listing 10, instances of PartitionInfo are used to create one or more search threads -- or, more accurately, one or more runnable tasks/targets, as implemented by the class com.javaworld.primefinder.PrimeSearchThread. In addition to information about its search range, each thread also holds a reference to the result handle, thus allowing it to write search results to the handle's internal result buffer as and when they become available.

Listing 10. Implementation of contract findPrimeNumbers()

public PrimeNumberSource findPrimeNumbers(BigInteger aLowerBound, 
                                          BigInteger aUpperBound) 
{
    if (aUpperBound.longValue()<=aLowerBound.longValue())
        throw new IllegalArgumentException("Upperbound must be
                                            greater than lowerbound");
    long lowerBound = aLowerBound.longValue(), 
         upperBound = aUpperBound.longValue();
        
    final ConcurrentPrimeNumberSource result;
        
    PartitionInfo partitionInfo =getPartitionInfo(lowerBound,
                                                  upperBound);

    result = new ConcurrentPrimeNumberSource(
                    partitionInfo.getNumberOfBuckets());
        
    Thread searchThread;
    long shiftingLowerBound = lowerBound,
        shiftingUpperBound = partitionInfo.getBucketSize();     
    while (shiftingUpperBound <= upperBound)
    {
        searchThread =
                new Thread(new PrimeSearchThread(
                    BigInteger.valueOf(shiftingLowerBound),
                    BigInteger.valueOf(shiftingUpperBound),
                    result));
            
        searchThread.start();
            
        shiftingLowerBound = shiftingUpperBound;
        shiftingUpperBound+= partitionInfo.getBucketSize();
    }

    return result;
}

Before you can examine the test unit, there remains one application class to mention: the consumer/reader thread implementation  com.javaworld.primefinder.PrimeNumberReader, whose core method is shown in Listing 11.

Listing 11. Prime number consumer

@Override
public void run() 
{
    BigInteger nextVal=null;
        
    do
    {
        nextVal = source.nextPrime();
        if (nextVal!=null)
            System.out.println(nextVal + ", ");
        else break;
    }while (true);
}

In the JUnit 4 test case, several instances of this class are used to simulate the effect of multiple client threads queuing at the thread gate.

A test unit

A JUnit 4 test named com.javaworld.primefinder.PrimeFinderTest is provided in the test folder of the source archive that accompanies this article. The test creates an instance of a GatedPimeNumberSearcher to search for the prime numbers between 1 and a suitable upper bound (e.g., 100, 1 million, 1 thousand trillion); it also creates several reader threads to print out the resulting numbers to system out. You should adjust the upper bound of the search space to suit the capability of your computer, then run the test to watch your thread gate in action!

In conclusion

This article has introduced the concept of thread gates and demonstrated how to implement and use a basic gate class. It has provided source examples and companion source code to enable the reader to explore the idea further. For more on the threading issues raised here, you should read the excellent book Java Concurrency in Practice, written by Brian Goetz and others, along with the other material presented in the Resources section below.

Obi Ezechukwu is a quantitative Java and Java EE developer working in the UK financial industry. He specializes in designing and implementing real-time and computationally intensive quantitative financial models. He is currently vice president of bond analytics at Markit and co-founder of the obix-configuration project. He holds a Ph.D. in the field of computational finance from Imperial College London.

Learn more about this topic

More from JavaWorld

1 2 Page 2
Page 2 of 2