The thread gate pattern is an effective tool for controlling thread concurrency, but many developers are unfamiliar with it. Just as a traffic light can regulate the behavior of automobiles at an intersection, thread gates can block or allow thread progress based on given factors. Obi Ezechukwu introduces the concept of thread gates, then shows you how to implement them in a multithreaded prime-number generator. Level: Intermediate
Multithreading and code concurrency were once the preserve of elite programmers, but the combination of multicore processing power, complex requirements, and the readily available javax.util.concurrent
package has changed that. Today, enterprise application developers are expected to be knowledgeable about the various synchronization mechanisms and constructs available in the Java language. The level of expectation is even higher where the problems being solved require non-textbook and highly innovative concurrency constructs. It isn't enough in those situations to understand the Java language's concurrency mechanisms and those included in the standard SDK; you also must be able to use these tools as building blocks for custom-made concurrency controls.
In this article, we'll explore a concurrency pattern that isn't widely discussed, generally called thread gates. Like its real-world counterpart, a gate instance opens or closes, thus allowing or preventing thread progress. It does so based on the truth value of some predicate.
I'll start with an overview of thread gates based on the traffic-flow model, then explain how to set up your development environment for the example application, a multithreaded prime-number generator. The remainder of the article will be hands-on, as you learn about the thread gate pattern by implementing it.
Thread gates: An overview
A good metaphor for thread gates is the traffic light system that operates on many public roads. When the signal is red, cars have to wait at the stop line until the signal changes. On the other hand, cars are free to run past the signal when it is green. There is no limit to the number of times that the signal can go from green to red, given a statistically significant observation timeframe. Traffic lights are designed to enable crossflow of traffic, and are redundant where crossflow does not exist. In a programmer's terms, you can imagine the light as a control that lets bidirectional traffic share a small section of road that might otherwise cause the paths of traffic to intersect in an unsafe manner.
In the same way, thread gates are generally best used for scenarios where one set of threads has to be prevented from proceeding beyond a determined point, whilst another set of threads is active. To put it another way, the competing sets of threads are dependent on the value of some truth predicate, where each distinct value of the predicate strictly triggers one (and only one) set of threads, forcing the others to remain suspended. Note that the emphasis here is on sets or groups of threads rather than individual threads. In essence, the focus is on scenarios where multiple threads share access to an underlying resource, and the threads can be partitioned into sets depending on the actions that they perform in relation to that resource.
A good example of this is in producer-consumer flows, where some threads are responsible for producing data that is consumed by another group of threads; the shared resource is most likely the handoff mechanism (data bus) used by the disparate thread groups; and the truth predicate that determines thread progress is the quantity of data in the handoff mechanism. An in-memory request queue can sometimes fit such a pattern if the data is enqueued as part of a process that is analogous to a production process, and the data is dequeued or consumed by a separate process.
The producer-consumer analogy is a good one for formulating a problem that explores the concept of thread gates. An example problem should be easily recognizable to the vast majority of programmers. In this case, the problem should also be one that lends itself easily to task splitting and parallelization, as the emphasis is on creating a multithreaded solution to it. This article's sample application will fulfill all of these criteria and more.
The multithreaded prime-number generator
This article's example application will tackle the age-old problem of finding all the probable prime numbers within a given bound (between one and a million, say). Specifically, the task is to implement a software component that exposes a method for retrieving all the probable prime numbers in a given range (with an upper and lower bound given). Assume that the component's client mandates that the method return a thread-safe handle for accessing the results as soon as it is invoked, and then performs the actual task of finding the prime numbers in the background or asynchronously. Assume also that the handle must provide a blocking method that allows primes to be accessed and returned as soon as possible, such that the client does not have to wait for the search to complete before accessing results. To simplify the task, no restrictions are placed on the order in which the handle returns the results.
Before reading further you should download the code archive that accompanies this article and create a development project within your favorite IDE. The archive contains ten source files, which are organized as follows:
-src\
-main\
-java\
-com\
-javaworld\
-primefinder\
PrimeNumberSearcher.java
PrimeNumberSource.java
PrimeUtil.java
ThreadGate.java
PrimeSearchThread.java
PartitionInfo.java
PrimeNumberReader.java
GatedPrimeNumberSearcher.java
ConcurrentPrimeNumberSource.java
-src\
-test\
-java\
-com\
-javaworld\
-primefinder\
PrimeFinderTest.java
PrimeFinderTest.java is a JUnit 4 test case; you will need to use JUnit in order to test out the sample code. In keeping with Maven 2 naming conventions, the application's source files are in src/main/java
, and the JUnit 4 test case that demonstrates the solution is in src/test/java
. The key contents of these folders will be covered in the listings shown in the following sections.
Finding those primes
The interface com.javaworld.primefinder.PrimeNumberSearcher
defines the contract to which the component must conform. The signature of this contract is given by the method findPrimeNumbers()
, which is shown in Listing 1.
Listing 1. The search contract
PrimeNumberSource findPrimeNumbers(BigInteger lowerBound,
BigInteger upperBound);
Observe that com.javaworld.primefinder.PrimeNumberSource
is the interface to which the result handle must conform. It defines a single method, BigInteger nextPrime()
, which, when invoked, should return the next element in the search results buffer. If the results buffer has been exhausted, the implementation must return null
to indicate that no further results are available. As already mentioned, the implementation of this method must be thread-safe.
Note that the emphasis of this task is on finding the probable primes within the search range; this makes it possible to use the nextProbablePrime()
method from the java.math.BigInteger
class. You can wrap the call to this method in a static method of the utility class com.javaworld.primefinder.PrimeUtil
, an excerpt of which is shown in Listing 2.
Listing 2. A utility method for finding the first prime number within a given range
public static BigInteger findFirstPrime(long lowerBound, long upperBound)
{
BigInteger result;
BigInteger startPos = BigInteger.valueOf(lowerBound);
BigInteger nextProbablePrime;
if (startPos.isProbablePrime(.....)) // some reasonable accuracy
nextProbablePrime = startPos;
else nextProbablePrime = startPos.nextProbablePrime();
if (nextProbablePrime.longValue() >= upperBound)
result = null;
else result = nextProbablePrime;
return result;
}
Having defined the public interfaces and utilities on which the solution is to be built, you can now move onto the task of detailing it. You'll begin by defining the thread gate implementation, which provides the means of controlling access to the results buffer.
A thread gate implementation
The objective of the thread gate is to ensure that multiple readers can access the prime numbers buffer without preventing the search threads from adding results to it.
A reader thread should be able to pick up any available results from the handle without waiting; but if there are no results, it must queue until at least one is made available. The sample application employs a fair scheme so that results are handed out more or less in the order in which the threads arrive. And yes, I mean it when I say "more or less." The emphasis is not on dispensing results in strict queuing order; rather, threads that request primes around a given interval will all receive their data roughly at the same time -- before the arrival of the next batch of threads.
A real-world analogy that illustrates this scheme is that of the modern coffee shop. The inoffensive way in which coffee orders are "batched" during busy periods is interesting. The attendants at the till take the customer orders and shout them down the line to the baristas manning the espresso machines, who in turn prepare several cups of coffee from the same espresso batch and dispense them to the waiting customers at roughly the same time. Then, of course, they return to the task of making the next batch of coffee for the next batch of customers, and the cycle repeats ad infinitum. By this strange internal scheme, people who arrive at the queue within a given interval tend to get their coffees around the same time -- although not exactly in the order in which the requests were made. Presumably the espresso machines can only prepare so many cups of coffee at any point in time, and it would simply be inefficient or slow to prepare the cups individually.
Returning to the more mundane task of generating and buffering prime numbers, the simple application controls access to the results buffer by a gate that functions in a manner similar to a binary latch: when the gate is open, threads can pass, and when it is closed, threads must queue until it is re-opened. It is more clever than a latch in the sense that it batches waiting threads into generations, which ensures that threads in the same generation all pass through the gate at the same time; threads in an older generation always take priority over those in younger generations. The gate implementation also has an innovative feature that ensures that results can always be written to the results buffer regardless of how many waiting threads there are. If a generic thread gate is like a traffic light, then this would be more akin to a railway crossing in the sense that the train always gets priority -- and for good reason, too.
The gate implementation is contained in the compilation unit com.javaworld.primefinder.ThreadGate
, and is detailed in Listing 3.
Listing 3. The gate implementation
package com.javaworld.primefinder;
class ThreadGate
{
private int waitPosition;
private int generationSize;
private int openAttempts;
private int activeSearchThreads;
private boolean open;
public ThreadGate(int searchThreadsCount)
public synchronized void await() throws InterruptedException
public synchronized void close() throws InterruptedException
public synchronized void open()
}
To deconstruct this class, begin by taking a look at the class-level fields, of which there are five:
waitPosition
: This field tracks the generation index of waiting threads. It helps ensure fairness, because the threads that are let through are the last set of threads to arrive -- counting from the last time the gate was opened. Its value is incremented each time theopen()
method is invoked, thus forcing subsequent thread arrivals into a new, younger generation.generationSize
: Tracks the size of the current generation of threads waiting for the gate to open. Its value should be reset each time theopen()
orclose()
method is invoked.openAttempts
: An integer counter indicating the number of attempts that have been made to open the gate since the last time it was closed. In practice, it actually tracks the number of results that have been added to the results buffer since the last time the gate was closed. This should be apparent on examining the result handle implementationcom.javaworld.primefinder.ConcurrentPrimeNumberSource
, which shows that theopen()
method is invoked each time a new prime number is added to the results buffer. The value of this field is not only altered by callingopen()
but is also reset to zero with each call toclose()
.activeSearchThreads
: Reader threads should not wait needlessly if there is no prospect of any further results being made available. A key indicator of the prospect of further results is whether or not there are still any active search threads. As such, it is important that the gate keep track of the number of active search threads. If this number is equal to zero, then theawait()
method will not block, thus allowing reader threads free passage through the gate.open
: A binary value that indicates whether the gate is open or closed.
Although the gate implementation's methods are self explanatory, take some time to examine the await()
and open()
methods. The code for await()
is shown in Listing 4.
Listing 4. A method to force generational wait
public synchronized void await() throws InterruptedException
{
int generationIndex = waitPosition;
generationSize++;
while (!open && generationIndex ==waitPosition
&& activeSearchThreads!=0)
wait();
}
You'll note that this method captures the value of the current wait position -- the generation index, in other words. Whilst the gate is closed and the generation index has not been advanced, it will remain in a wait state. This means that any threads arriving since the last time the gate was opened will have to wait until the previous generation of threads has been served and the gate undergoes another close()
to open()
cycle, or the producer (search) threads terminate.
(As an exercise, consider how this method can be altered so that the gate does not necessarily have to be closed in order to batch threads into generations. In essence, how would altering the method help implement a scheme where the gate is simply used to batch processing threads? What would an application look like when efficiency demands that requests are serviced in bulk, rather than individually?)
The open()
method is illustrated in Listing 5. It should be clear that the generation counter is advanced by 1 each time the gate re-opens.
Listing 5. Implementation of the ThreadGate.open() method
public synchronized void open()
{
openAttempts++;
if (generationSize<=openAttempts)
{
++waitPosition; //reset the counter
open=true;
generationSize = 0;
notifyAll();
}
}
You should also note that the method uses notifyAll()
instead of notify()
, thus ensuring that all the threads in a given generation are given the opportunity to proceed.
In the next section will outline the implementation of the PrimeNumberSource
interface, which serves as the handle for accessing search results.
PrimeNumberSource implementation
The result handle is encapsulated in the compilation unit com.javaworld.primefinder.ConcurrentPrimeNumberSource
; its contents are outlined in Listing 6.
Listing 6. PrimeNumberSource implementation
public class ConcurrentPrimeNumberSource
implements PrimeNumberSource
{
private ThreadGate barrier;
private List<BigInteger> resultsBuffer;
private int resultsBufferAccessIndex;
protected ConcurrentPrimeNumberSource(
int searchThreadsCount)
public synchronized BigInteger nextPrime()
//delegates to barrier
protected void searchThreadCompleted()
//updates results
protected void addResultToBuffer(
BigInteger result)
}
As you can see in the code, instances of this class hold a reference to a gate, which is used to control concurrent access to the underlying results buffer -- as encapsulated by the field resultsBuffer
.
An integer counter resultsBufferAccessIndex
is used as a pointer to the last read buffer location. Note that the field is not atomic, and as such the nextPrime()
method is defined as synchronized
so as to prevent destructive updates. In Listing 7, you can also see that the gate's close()
method is only invoked if the result buffer has been exhausted and the gate is not already closed.
Listing 7. The implementation of the nextPrime() method
public synchronized BigInteger nextPrime()
{
BigInteger result;
int resultIndex = resultsBufferAccessIndex++;
try
{
if (!barrier.isClosed() &&
(resultIndex == resultsBuffer.size()))
barrier.close();
barrier.await();
result = resultsBuffer.get(resultIndex);
}
catch (InterruptedException exce)
{
..........................................
}
catch (IndexOutOfBoundsException exce)
{
//search exhausted
result = null;
}
}
Next, you will examine the PrimeNumberSearcher
implementation that returns instances of this handle.
The search executor
The task of partitioning the search space and assigning contiguous blocks to individual threads is performed by the class com.javaworld.primefinder.GatedPrimeNumberSearcher
; this is an implementation of the contract com.javaworld.primefinder.PrimeNumberSearcher
.
On instantiation, an object of this type determines the number of system processors, and uses this to partition the search space into buckets or blocks of near uniform size, which in turn are assigned to individual tasks. The bucketing information is encapsulated in the com.javaworld.primefinder.PartitionInfo
class, which is shown in Listing 8.
Listing 8. Encapsulation of search-space partition information
class PartitionInfo
{
PartitionInfo(int numberOfBuckets, long bucketSize)
public int getNumberOfBuckets().....
public long getBucketSize() ........
}
Instances of this class are created by the method in Listing 9, which is defined in the task/search-thread implementation com.javaworld.primefinder.GatedPrimeNumberSearcher
.
Listing 9. Method that partitions the search space into blocks
private PartitionInfo getPartitionInfo(long lowerBound,
long upperBound)
{
PartitionInfo result;
int proposedBucketCount = numberOfProcessors;
long bucketSize = (upperBound-lowerBound) /
proposedBucketCount;
result = new PartitionInfo(proposedBucketCount,
bucketSize);
return result;
}
As you can see in Listing 10, instances of PartitionInfo
are used to create one or more search threads -- or, more accurately, one or more runnable tasks/targets, as implemented by the class com.javaworld.primefinder.PrimeSearchThread
. In addition to information about its search range, each thread also holds a reference to the result handle, thus allowing it to write search results to the handle's internal result buffer as and when they become available.
Listing 10. Implementation of contract findPrimeNumbers()
public PrimeNumberSource findPrimeNumbers(BigInteger aLowerBound,
BigInteger aUpperBound)
{
if (aUpperBound.longValue()<=aLowerBound.longValue())
throw new IllegalArgumentException("Upperbound must be
greater than lowerbound");
long lowerBound = aLowerBound.longValue(),
upperBound = aUpperBound.longValue();
final ConcurrentPrimeNumberSource result;
PartitionInfo partitionInfo =getPartitionInfo(lowerBound,
upperBound);
result = new ConcurrentPrimeNumberSource(
partitionInfo.getNumberOfBuckets());
Thread searchThread;
long shiftingLowerBound = lowerBound,
shiftingUpperBound = partitionInfo.getBucketSize();
while (shiftingUpperBound <= upperBound)
{
searchThread =
new Thread(new PrimeSearchThread(
BigInteger.valueOf(shiftingLowerBound),
BigInteger.valueOf(shiftingUpperBound),
result));
searchThread.start();
shiftingLowerBound = shiftingUpperBound;
shiftingUpperBound+= partitionInfo.getBucketSize();
}
return result;
}
Before you can examine the test unit, there remains one application class to mention: the consumer/reader thread implementation com.javaworld.primefinder.PrimeNumberReader
, whose core method is shown in Listing 11.
Listing 11. Prime number consumer
@Override
public void run()
{
BigInteger nextVal=null;
do
{
nextVal = source.nextPrime();
if (nextVal!=null)
System.out.println(nextVal + ", ");
else break;
}while (true);
}
In the JUnit 4 test case, several instances of this class are used to simulate the effect of multiple client threads queuing at the thread gate.
A test unit
A JUnit 4 test named com.javaworld.primefinder.PrimeFinderTest
is provided in the test
folder of the source archive that accompanies this article. The test creates an instance of a GatedPimeNumberSearcher
to search for the prime numbers between 1 and a suitable upper bound (e.g., 100, 1 million, 1 thousand trillion); it also creates several reader threads to print out the resulting numbers to system out. You should adjust the upper bound of the search space to suit the capability of your computer, then run the test to watch your thread gate in action!
In conclusion
This article has introduced the concept of thread gates and demonstrated how to implement and use a basic gate class. It has provided source examples and companion source code to enable the reader to explore the idea further. For more on the threading issues raised here, you should read the excellent book Java Concurrency in Practice, written by Brian Goetz and others, along with the other material presented in the Resources section below.
Learn more about this topic
- Download the source code package that accompanies this article.
- Threads and concurrency on JavaWorld:
- "Understanding actor concurrency, Part 1: Actors in Erlang" (Alex Miller, JavaWorld, February 2009) identifies the shortcomings of Java's shared-state concurrency model and introduces Erlang's functional alternative.
- "Building cloud-ready, multicore-friendly applications, Part 1: Design principles" (Guerry Semones, JavaWorld, March 2009) explains why atomicity, statelessness, idempotence, and parallelism are key to distributed architectures.
- "Hyper-threaded Java" (Randall Scarberry, JavaWorld, November 2006) introduces two classes from
java.util.concurrent
that can be used to speed up time-consuming tasks in multithreaded Java applications. - See JavaWorld's Threads & Concurrency research center for more recent articles on these topics.
- Java Concurrency In Practice (Brian Goetz with Tim Peierls, Joshua Bloch, Joseph Bowbeer, David Holmes, and Doug Lea, Addison-Wesley, 2006) is an excellent guide to the thread concurrency issues discussed in this article.
- Also see the concurrency and collections sections of the Sun Java Tutorial.
More from JavaWorld
- See the JavaWorld Site Map for a complete listing of research centers focused on client-side, enterprise, and core Java development tools and topics.
- JavaWorld's Java Technology Insider is a podcast series that lets you learn from Java technology experts on your way to work.
- Network World's IT Product Guides offer side-by-side comparison of hundreds of products in over 50 categories.