Newsletter sign-up
View all newsletters

Enterprise Java Newsletter
Stay up to date on the latest tutorials and Java community news posted on JavaWorld

Sponsored Links

Optimize with a SATA RAID Storage Solution
Range of capacities as low as $1250 per TB. Ideal if you currently rely on servers/disks/JBODs

Java concurrency with thread gates

Manage concurrent thread execution in complex business applications

  • Print
  • Feedback

Page 4 of 6

Returning to the more mundane task of generating and buffering prime numbers, the simple application controls access to the results buffer by a gate that functions in a manner similar to a binary latch: when the gate is open, threads can pass, and when it is closed, threads must queue until it is re-opened. It is more clever than a latch in the sense that it batches waiting threads into generations, which ensures that threads in the same generation all pass through the gate at the same time; threads in an older generation always take priority over those in younger generations. The gate implementation also has an innovative feature that ensures that results can always be written to the results buffer regardless of how many waiting threads there are. If a generic thread gate is like a traffic light, then this would be more akin to a railway crossing in the sense that the train always gets priority -- and for good reason, too.

The gate implementation is contained in the compilation unit com.javaworld.primefinder.ThreadGate, and is detailed in Listing 3.

Listing 3. The gate implementation

package com.javaworld.primefinder;

class ThreadGate
{
    private int waitPosition;
    private int generationSize;
    private int openAttempts;
    private int activeSearchThreads;
    private boolean open;
    
    public ThreadGate(int searchThreadsCount)
        
    public synchronized void await() throws InterruptedException

    public synchronized void close() throws InterruptedException
    
    public synchronized void open()
    
}

To deconstruct this class, begin by taking a look at the class-level fields, of which there are five:

  • waitPosition: This field tracks the generation index of waiting threads. It helps ensure fairness, because the threads that are let through are the last set of threads to arrive -- counting from the last time the gate was opened. Its value is incremented each time the open() method is invoked, thus forcing subsequent thread arrivals into a new, younger generation.
  • generationSize: Tracks the size of the current generation of threads waiting for the gate to open. Its value should be reset each time the open() or close() method is invoked.
  • openAttempts: An integer counter indicating the number of attempts that have been made to open the gate since the last time it was closed. In practice, it actually tracks the number of results that have been added to the results buffer since the last time the gate was closed. This should be apparent on examining the result handle implementation com.javaworld.primefinder.ConcurrentPrimeNumberSource, which shows that the open() method is invoked each time a new prime number is added to the results buffer. The value of this field is not only altered by calling open() but is also reset to zero with each call to close().
  • activeSearchThreads: Reader threads should not wait needlessly if there is no prospect of any further results being made available. A key indicator of the prospect of further results is whether or not there are still any active search threads. As such, it is important that the gate keep track of the number of active search threads. If this number is equal to zero, then the await() method will not block, thus allowing reader threads free passage through the gate.
  • open: A binary value that indicates whether the gate is open or closed.

Although the gate implementation's methods are self explanatory, take some time to examine the await() and open() methods. The code for await() is shown in Listing 4.

Listing 4. A method to force generational wait

public synchronized void await() throws InterruptedException
{
    int generationIndex = waitPosition;
    generationSize++;
    while (!open && generationIndex ==waitPosition
        && activeSearchThreads!=0)
        wait();
}

You'll note that this method captures the value of the current wait position -- the generation index, in other words. Whilst the gate is closed and the generation index has not been advanced, it will remain in a wait state. This means that any threads arriving since the last time the gate was opened will have to wait until the previous generation of threads has been served and the gate undergoes another close() to open() cycle, or the producer (search) threads terminate.

(As an exercise, consider how this method can be altered so that the gate does not necessarily have to be closed in order to batch threads into generations. In essence, how would altering the method help implement a scheme where the gate is simply used to batch processing threads? What would an application look like when efficiency demands that requests are serviced in bulk, rather than individually?)

The open() method is illustrated in Listing 5. It should be clear that the generation counter is advanced by 1 each time the gate re-opens.

Listing 5. Implementation of the ThreadGate.open() method

public synchronized void open()
{
    openAttempts++;
        
    if (generationSize<=openAttempts)
    {
        ++waitPosition; //reset the counter        
        open=true;
        generationSize = 0;
        notifyAll();            
    }
}

You should also note that the method uses notifyAll() instead of notify(), thus ensuring that all the threads in a given generation are given the opportunity to proceed.

In the next section will outline the implementation of the PrimeNumberSource interface, which serves as the handle for accessing search results.

PrimeNumberSource implementation

The result handle is encapsulated in the compilation unit com.javaworld.primefinder.ConcurrentPrimeNumberSource; its contents are outlined in Listing 6.

Listing 6. PrimeNumberSource implementation

public class ConcurrentPrimeNumberSource 
        implements PrimeNumberSource
{
    private ThreadGate barrier;
    
    private List<BigInteger> resultsBuffer;
    private int resultsBufferAccessIndex;   
    
    protected ConcurrentPrimeNumberSource(
                int searchThreadsCount)
    
    
    
    public synchronized BigInteger nextPrime() 
    
    //delegates to barrier
    protected void searchThreadCompleted()
    
    //updates results
    protected void addResultToBuffer(
                BigInteger result)
}

As you can see in the code, instances of this class hold a reference to a gate, which is used to control concurrent access to the underlying results buffer -- as encapsulated by the field resultsBuffer.

An integer counter resultsBufferAccessIndex is used as a pointer to the last read buffer location. Note that the field is not atomic, and as such the nextPrime() method is defined as synchronized so as to prevent destructive updates. In Listing 7, you can also see that the gate's close() method is only invoked if the result buffer has been exhausted and the gate is not already closed.

  • Print
  • Feedback

Resources

More from JavaWorld