Newsletter sign-up
View all newsletters

Enterprise Java Newsletter
Stay up to date on the latest tutorials and Java community news posted on JavaWorld

Sponsored Links

Optimize with a SATA RAID Storage Solution
Range of capacities as low as $1250 per TB. Ideal if you currently rely on servers/disks/JBODs

Book excerpt: Executing tasks in threads

When creating threads to perform tasks, look to the Executor framework

  • Print
  • Feedback

Page 5 of 5

To address the issue of execution service lifecycle, the ExecutorService interface extends Executor, adding a number of methods for lifecycle management (as well as some convenience methods for task submission). The lifecycle management methods of ExecutorService are shown in Listing 7.

Listing 7. Lifecycle methods in ExecutorService

                        interface ExecutorService extends Executor {
   void shutdown();
   List<Runnable> shutdownNow();
   boolean isShutdown();
   boolean isTerminated();
   boolean awaitTermination(long timeout, TimeUnit unit)
      throws InterruptedException;
   // ... additional convenience methods for task submission
}
                   


The lifecycle implied by ExecutorService has three states�running, shutting down, and terminated. ExecutorServices are initially created in the running state. The shutdown method initiates a graceful shutdown: no new tasks are accepted but previously submitted tasks are allowed to complete�including those that have not yet begun execution. The shutdownNow method initiates an abrupt shutdown: it attempts to cancel outstanding tasks and does not start any tasks that are queued but not begun.

Tasks submitted to an ExecutorService after it has been shut down are handled by the rejected execution handler, which might silently discard the task or might cause execute to throw the unchecked RejectedExecutionException. Once all tasks have completed, the ExecutorService transitions to the terminated state. You can wait for an ExecutorService to reach the terminated state with awaitTermination, or poll for whether it has yet terminated with isTerminated. It is common to follow shutdown immediately by awaitTermination, creating the effect of synchronously shutting down the ExecutorService.

The LifecycleWebServer in Listing 8 extends our Web server with lifecycle support. It can be shut down in two ways: programmatically by calling stop, and through a client request by sending the Web server a specially formatted HTTP request.

Listing 8. Web server with shutdown support

                        class LifecycleWebServer {
   private final ExecutorService exec = ...;
   public void start() throws IOException {
      ServerSocket socket = new ServerSocket(80);
      while (!exec.isShutdown()) {
         try {
            final Socket conn = socket.accept();
            exec.execute(new Runnable() {
               public void run() { handleRequest(conn); }
            });
         } catch (RejectedExecutionException e) {
            if (!exec.isShutdown())
               log("task submission rejected", e);
         }
      }
   }
   public void stop() { exec.shutdown(); }
   void handleRequest(Socket connection) {
      Request req = readRequest(connection);
      if (isShutdownRequest(req))
         stop();
      else
         dispatchRequest(req);
   }
}
                   


Delayed and periodic tasks

The Timer facility manages the execution of deferred ("run this task in 100 ms") and periodic ("run this task every 10 ms") tasks. However, Timer has some drawbacks, and ScheduledThreadPoolExecutor should be thought of as its replacement. (Timer does have support for scheduling based on absolute, not relative time, so that tasks can be sensitive to changes in the system clock; ScheduledThreadPoolExecutor supports only relative time.) You can construct a ScheduledThreadPoolExecutor through its constructor or through the newScheduledThreadPool factory.

A Timer creates only a single thread for executing timer tasks. If a timer task takes too long to run, the timing accuracy of other TimerTasks can suffer. If a recurring TimerTask is scheduled to run every 10 ms and another TimerTask takes 40 ms to run, the recurring task either (depending on whether it was scheduled at fixed rate or fixed delay) gets called four times in rapid succession after the long-running task completes, or "misses" four invocations completely. Scheduled thread pools address this limitation by letting you provide multiple threads for executing deferred and periodic tasks.

Another problem with Timer is that it behaves poorly if a TimerTask throws an unchecked exception. The Timer thread doesn't catch the exception, so an unchecked exception thrown from a TimerTask terminates the timer thread. Timer also doesn't resurrect the thread in this situation; instead, it erroneously assumes the entire Timer was cancelled. In this case, TimerTasks that are already scheduled but not yet executed are never run, and new tasks cannot be scheduled.

The OutOfTime in Listing 9 illustrates how a Timer can become confused in this manner and, as confusion loves company, how the Timer shares its confusion with the next hapless caller that tries to submit a TimerTask. You might expect the program to run for six seconds and exit, but what actually happens is that it terminates after one second with an IllegalStateException whose message text is "Timer already cancelled." ScheduledThreadPoolExecutor deals properly with ill-behaved tasks; there is little reason to use Timer in Java 5.0 or later.

If you need to build your own scheduling service, you may still be able to take advantage of the library by using a DelayQueue, a BlockingQueue implementation that provides the scheduling functionality of ScheduledThreadPoolExecutor. A DelayQueue manages a collection of Delayed objects. A Delayed has a delay time associated with it: DelayQueue lets you take an element only if its delay has expired. Objects are returned from a DelayQueue ordered by the time associated with their delay.

Listing 9. Class illustrating confusing Timer behavior

                        public class OutOfTime {
   public static void main(String[] args) throws Exception {
      Timer timer = new Timer();
      timer.schedule(new ThrowTask(), 1);
      SECONDS.sleep(1);
      timer.schedule(new ThrowTask(), 1);
      SECONDS.sleep(5);
   }
   static class ThrowTask extends TimerTask {
         public void run() { throw new RuntimeException(); }
   }
}
                   


Finding exploitable parallelism

The Executor framework makes it easy to specify an execution policy, but in order to use an Executor, you have to be able to describe your task as a Runnable. In most server applications, there is an obvious task boundary: a single client request. But sometimes good task boundaries are not quite so obvious, as in many desktop applications. There may also be exploitable parallelism within a single client request in server applications, as is sometimes the case in database servers.

In this section we develop several versions of a component that admit varying degrees of concurrency. Our sample component is the page-rendering portion of a browser application, which takes a page of HTML and renders it into an image buffer. To keep it simple, we assume that the HTML consists only of marked up text interspersed with image elements with pre-specified dimensions and URLs.

Example: Sequential page renderer

The simplest approach is to process the HTML document sequentially. As text markup is encountered, render it into the image buffer; as image references are encountered, fetch the image over the network and draw it into the image buffer as well. This is easy to implement and requires touching each element of the input only once (it doesn't even require buffering the document), but is likely to annoy the user, who may have to wait a long time before all the text is rendered.

A less annoying but still sequential approach involves rendering the text elements first, leaving rectangular placeholders for the images, and after completing the initial pass on the document, going back and downloading the images and drawing them into the associated placeholder. This approach is shown in SingleThreadRenderer in Listing 10.

Listing 10. Rendering page elements sequentially

                        public class SingleThreadRenderer {
   void renderPage(CharSequence source) {
      renderText(source);
            List<ImageData> imageData = new ArrayList<ImageData>();
      for (ImageInfo imageInfo : scanForImageInfo(source))
         imageData.add(imageInfo.downloadImage());
      for (ImageData data : imageData)
         renderImage(data);
   }
}
                   


Downloading an image mostly involves waiting for I/O to complete, and during this time the CPU does little work. So the sequential approach may underutilize the CPU, and also makes the user wait longer than necessary to see the finished page. We can achieve better utilization and responsiveness by breaking the problem into independent tasks that can execute concurrently.

Result-bearing tasks: Callable and Future

The Executor framework uses Runnable as its basic task representation. Runnable is a fairly limiting abstraction; run cannot return a value or throw checked exceptions, although it can have side effects such as writing to a log file or placing a result in a shared datastructure.

Many tasks are effectively deferred computations�executing a database query, fetching a resource over the network, or computing a complicated function. For these types of tasks, Callable is a better abstraction: it expects that the main entry point, call, will return a value and anticipates that it might throw an exception. (To express a non-value-returning task with Callable, use Callable<Void>.) Executor includes several utility methods for wrapping other types of tasks, including Runnable and java.security.PrivilegedAction, with a Callable.

Both Runnable and Callable describe abstract computational tasks. Tasks are usually finite: they have a clear starting point and they eventually terminate. The lifecycle of a task executed by an Executor has four phases: created, submitted, started, and completed. Since tasks can take a long time to run, we also want to be able to cancel a task. In the Executor framework, tasks that have been submitted but not yet started can always be cancelled, and tasks that have started can sometimes be cancelled if they are responsive to interruption. Cancelling a task that has already completed has no effect.

The Future represents the lifecycle of a task and provides methods to test whether the task has completed or been cancelled, retrieve its result, and cancel the task. Callable and Future are shown in Listing 11. Implicit in the specification of Future is that task lifecycle can only move forwards, not backwards�just like the ExecutorService lifecycle. Once a task is completed, it stays in that state forever.

The behavior of get varies depending on the task state (not yet started, running, completed). It returns immediately or throws an Exception if the task has already completed, but if not it blocks until the task completes. If the task completes by throwing an exception, get rethrows it wrapped in an ExecutionException; if it was cancelled, get throws CancellationException. If get throws ExecutionException, the underlying exception can be retrieved with getCause.

There are several ways to create a Future to describe a task. The submit methods in ExecutorService all return a Future, so that you can submit a Runnable or a Callable to an executor and get back a Future that can be used to retrieve the result or cancel the task. You can also explicitly instantiate a FutureTask for a given Runnable or Callable. (Because FutureTask implements Runnable, it can be submitted to an Executor for execution or executed directly by calling its run method.)

Listing 11. Callable and Future interfaces

                        public interface Callable {
   V call() throws Exception;
                           
public interface Future { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException, CancellationException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, CancellationException, TimeoutException; }


As of Java 6, ExecutorService implementations can override newTaskFor in AbstractExecutorService to control instantiation of the Future corresponding to a submitted Callable or Runnable. The default implementation just creates a new FutureTask, as shown in Listing 12.

Listing 12. Default implementation of newTaskFor in ThreadPoolExecutor

                        protected   RunnableFuture  newTaskFor(Callable  task) {
   return new FutureTask(task);
}
                   


Submitting a Runnable or Callable to an Executor constitutes a safe publication of the Runnable or Callable from the submitting thread to the thread that will eventually execute the task. Similarly, setting the result value for a Future constitutes a safe publication of the result from the thread in which it was computed to any thread that retrieves it via get.

Example: Page renderer with Future

As a first step towards making the page renderer more concurrent, let's divide it into two tasks, one that renders the text and one that downloads all the images. (Because one task is largely CPU-bound and the other is largely I/O-bound, this approach may yield improvements even on single-CPU systems.)

Both Callable and Future can help us express the interaction between these cooperating tasks. In FutureRenderer in Listing 13, we create a Callable to download all the images, and submit it to an ExecutorService. This returns a Future describing the task's execution; when the main task gets to the point where it needs the images, it waits for the result by calling Future.get. If we're lucky, the results will already be ready by the time we ask; otherwise, at least we got a head start on downloading the images.

Listing 13. Waiting for image download with Future

                        public class FutureRenderer {
   private final ExecutorService executor = ...;
   void renderPage(CharSequence source) {
      final List<ImageInfo> imageInfos = scanForImageInfo(source);
      Callable<List<ImageData>> task =
         new Callable<List<ImageData>>() {
            public List<ImageData> call() {
               List<ImageData> result= new ArrayList<ImageData>();
               for (ImageInfo imageInfo : imageInfos) result.add(imageInfo.downloadImage());
               return result;

} }

Future<List<ImageData>> future = executor.submit(task); renderText(source);

try { List<ImageData> imageData = future.get(); for (ImageData data : imageData) renderImage(data); } catch (InterruptedException e) { // Re-assert the thread's interrupted status Thread.currentThread().interrupt(); // We don't need the result, so cancel the task too future.cancel(true); } catch (ExecutionException e) { throw launderThrowable(e.getCause()); } } }


The state-dependent nature of get means that the caller need not be aware of the state of the task, and the safe publication properties of task submission and result retrieval make this approach thread-safe. The exception handling code surrounding Future.get deals with two possible problems: that the task encountered an Exception, or the thread calling get was interrupted before the results were available.

The FutureRenderer allows the text to be rendered concurrently with downloading the image data. When all the images are downloaded, they are rendered onto the page. This is an improvement in that the user sees a result quickly and it exploits some parallelism, but we can do considerably better. There is no need for users to wait for all the images to be downloaded; they would probably prefer to see individual images drawn as they become available.

Limitations of parallelizing heterogeneous tasks

In the last example, we tried to execute two different types of tasks in parallel� downloading the images and rendering the page. But obtaining significant performance improvements by trying to parallelize sequential heterogeneous tasks can be tricky.

Two people can divide the work of cleaning the dinner dishes fairly effectively: one person washes while the other dries. However, assigning a different type of task to each worker does not scale well; if several more people show up, it is not obvious how they can help without getting in the way or significantly restructuring the division of labor. Without finding finer-grained parallelism among similar tasks, this approach will yield diminishing returns.

A further problem with dividing heterogeneous tasks among multiple workers is that the tasks may have disparate sizes. If you divide tasks A and B between two workers but A takes ten times as long as B, you've only speeded up the total process by 9 percent. Finally, dividing a task among multiple workers always involves some amount of coordination overhead; for the division to be worthwhile, this overhead must be more than compensated by productivity improvements due to parallelism.

The FutureRenderer uses two tasks: one for rendering text and one for downloading the images. If rendering the text is much faster than downloading the images, as is entirely possible, the resulting performance is not much different from the sequential version, but the code is a lot more complicated. And the best we can do with two threads is speed things up by a factor of two. Thus, trying to increase concurrency by parallelizing heterogeneous activities can be a lot of work, and there is a limit to how much additional concurrency you can get out of it.

Note
The real performance payoff of dividing a program's workload into tasks comes when there are a large number of independent, homogeneous tasks that can be processed concurrently.


CompletionService: Executor meets BlockingQueue

If you have a batch of computations to submit to an Executor and you want to retrieve their results as they become available, you could retain the Future associated with each task and repeatedly poll for completion by calling get with a timeout of zero. This is possible, but tedious. Fortunately there is a better way: a completion service.

The CompletionService combines the functionality of an Executor and a BlockingQueue. You can submit Callable tasks to it for execution and use the queuelike methods take and poll to retrieve completed results, packaged as Futures, as they become available. ExecutorCompletionService implements CompletionService, delegating the computation to an Executor.

The implementation of ExecutorCompletionService is quite straightforward. The constructor creates a BlockingQueue to hold the completed results. FutureTask has a done method that is called when the computation completes. When a task is submitted, it is wrapped with a QueueingFuture, a subclass of FutureTask that overrides done to place the result on the BlockingQueue, as shown in Listing 14. The take and poll methods delegate to the BlockingQueue, blocking if results are not yet available.

Listing 14. QueueingFuture class used by ExecutorCompletionService

                        private class QueueingFuture<V> extends FutureTask<V> {
   QueueingFuture(Callable<V> c) { super(c); }
   QueueingFuture(Runnable t, V r) { super(t, r); }

protected void done() { completionQueue.add(this); } }


Example: Page renderer with CompletionService

We can use a CompletionService to improve the performance of the page renderer in two ways: shorter total runtime and improved responsiveness. We can create a separate task for downloading each image and execute them in a thread pool, turning the sequential download into a parallel one: this reduces the amount of time to download all the images. And by fetching results from the CompletionService and rendering each image as soon as it is available, we can give the user a more dynamic and responsive user interface. This implementation is shown in Renderer in Listing 15.

Listing 15. Using CompletionService to render page elements as they become available

                        public class Renderer {
   private final ExecutorService executor;

Renderer(ExecutorService executor) { this.executor = executor; }

void renderPage(CharSequence source) { final List<ImageInfo> info = scanForImageInfo(source); CompletionService<ImageData> completionService = new ExecutorCompletionService<ImageData>(executor); for (final ImageInfo imageInfo : info) completionService.submit(new Callable<ImageData>() { public ImageData call() { return imageInfo.downloadImage(); } }); renderText(source);

\ try { for (int t = 0, n = info.size(); t < n; t++) { Future<ImageData> f = completionService.take(); ImageData imageData = f.get(); renderImage(imageData); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { throw launderThrowable(e.getCause()); } } }


Multiple ExecutorCompletionServices can share a single Executor, so it is perfectly sensible to create an ExecutorCompletionService that is private to a particular computation while sharing a common Executor. When used in this way, a CompletionService acts as a handle for a batch of computations in much the same way that a Future acts as a handle for a single computation. By remembering how many tasks were submitted to the CompletionService and counting how many completed results are retrieved, you can know when all the results for a given batch have been retrieved, even if you use a shared Executor.

Placing time limits on tasks

Sometimes, if an activity does not complete within a certain amount of time, the result is no longer needed and the activity can be abandoned. For example, a Web application may fetch its advertisements from an external ad server, but if the ad is not available within two seconds, it instead displays a default advertisement so that ad unavailability does not undermine the site's responsiveness requirements. Similarly, a portal site may fetch data in parallel from multiple data sources, but may be willing to wait only a certain amount of time for data to be available before rendering the page without it.

The primary challenge in executing tasks within a time budget is making sure that you don't wait longer than the time budget to get an answer or find out that one is not forthcoming. The timed version of Future.get supports this requirement: it returns as soon as the result is ready, but throws TimeoutException if the result is not ready within the timeout period.

A secondary problem when using timed tasks is to stop them when they run out of time, so they do not waste computing resources by continuing to compute a result that will not be used. This can be accomplished by having the task strictly manage its own time budget and abort if it runs out of time, or by cancelling the task if the timeout expires. Again, Future can help; if a timed get completes with a TimeoutException, you can cancel the task through the Future. If the task is written to be cancellable, it can be terminated early so as not to consume excessive resources. This technique is used in Listings 13 and 16.

Listing 16 shows a typical application of a timed Future.get. It generates a composite Webpage that contains the requested content plus an advertisement fetched from an ad server. It submits the ad-fetching task to an executor, computes the rest of the page content, and then waits for the ad until its time budget runs out. (The timeout passed to get is computed by subtracting the current time from the deadline; this may in fact yield a negative number, but all the timed methods in java.util.concurrent treat negative timeouts as zero, so no extra code is needed to deal with this case.) If the get times out, it cancels the ad-fetching task and uses a default advertisement instead. (The true parameter to Future.cancel means that the task thread can be interrupted if the task is currently running.)

Listing 16. Fetching an advertisement with a time budget

                        Page renderPageWithAd() throws InterruptedException {
   long endNanos = System.nanoTime() + TIME_BUDGET;
   Future<Ad> f = exec.submit(new FetchAdTask());
   // Render the page while waiting for the ad
   Page page = renderPageBody();
   Ad ad;
   try {
      // Only wait for the remaining time budget
      long timeLeft = endNanos - System.nanoTime();
      ad = f.get(timeLeft, NANOSECONDS);
   } catch (ExecutionException e) {
      ad = DEFAULT_AD;
   } catch (TimeoutException e) {
      ad = DEFAULT_AD;
      f.cancel(true);
   }
   page.setAd(ad);
   return page;
}
                   


Example: A travel reservations portal

The time-budgeting approach in the previous section can be easily generalized to an arbitrary number of tasks. Consider a travel reservation portal: the user enters travel dates and requirements and the portal fetches and displays bids from a number of airlines, hotels or car rental companies. Depending on the company, fetching a bid might involve invoking a Web service, consulting a database, performing an EDI transaction, or some other mechanism. Rather than have the response time for the page be driven by the slowest response, it may be preferable to present only the information available within a given time budget. For providers that do not respond in time, the page could either omit them completely or display a placeholder such as "Did not hear from Air Java in time."

Fetching a bid from one company is independent of fetching bids from another, so fetching a single bid is a sensible task boundary that allows bid retrieval to proceed concurrently. It would be easy enough to create n tasks, submit them to a thread pool, retain the Futures, and use a timed get to fetch each result sequentially via its Future, but there is an even easierway�invokeAll.

Listing 17 uses the timed version of invokeAll to submit multiple tasks to an ExecutorService and retrieve the results. The invokeAll method takes a collection of tasks and returns a collection of Futures. The two collections have identical structures; invokeAll adds the Futures to the returned collection in the order imposed by the task collection's iterator, thus allowing the caller to associate a Future with the Callable it represents. The timed version of invokeAll will return when all the tasks have completed, the calling thread is interrupted, or the timeout expires. Any tasks that are not complete when the timeout expires are cancelled. On return from invokeAll, each task will have either completed normally or been cancelled; the client code can call get or isCancelled to find out which.

Listing 17. Requesting travel quotes under a time budget

                        private class QuoteTask implements Callable<TravelQuote> {
   private final TravelCompany company;
   private final TravelInfo travelInfo;
   ...
   public TravelQuote call() throws Exception {
      return company.solicitQuote(travelInfo);
   }

public List<TravelQuote> getRankedTravelQuotes(
      TravelInfo travelInfo, Set<TravelCompany> companies,
      Comparator<TravelQuote> ranking, long time, TimeUnit unit)
      throws InterruptedException {
   List<QuoteTask> tasks = new ArrayList<QuoteTask>();
   for (TravelCompany company : companies)
      tasks.add(new QuoteTask(company, travelInfo));

   List<Future<TravelQuote>> futures =
                            exec.invokeAll(tasks, time, unit);List<TravelQuote> quotes =
      new ArrayList<TravelQuote>(tasks.size());
   Iterator<QuoteTask> taskIter = tasks.iterator();
   for (Future<TravelQuote> f : futures) {
      QuoteTask task = taskIter.next();
      try {
         quotes.add(f.get());
      } catch (ExecutionException e) {
         quotes.add(task.getFailureQuote(e.getCause()));
      } catch (CancellationException e) {
         quotes.add(task.getTimeoutQuote(e));
      }
   }

Collections.sort(quotes, ranking); return quotes; }


Summary

Structuring applications around the execution of tasks can simplify development and facilitate concurrency. The Executor framework permits you to decouple task submission from execution policy and supports a rich variety of execution policies; whenever you find yourself creating threads to perform tasks, consider using an Executor instead. To maximize the benefit of decomposing an application into tasks, you must identify sensible task boundaries. In some applications, the obvious task boundaries work well, whereas in others some analysis may be required to uncover finer-grained exploitable parallelism.

About the author

Brian Goetz is a software consultant with twenty years of industry experience and more than 75 articles on Java development. He is one of the primary members of the Java Community Process JSR 166 Expert Group (Concurrency Utilities) and has served on numerous other JCP expert groups. Tim Peierls is the very model of a modern multiprocessor, with BoxPop.biz, recording arts, and goings on theatrical. He is one of the primary members of the Java Community Process JSR 166 Expert Group (Concurrency Utilities), and has served on numerous other JCP expert groups. Joshua Bloch is a principal engineer at Google. Previously he was a distinguished engineer at Sun Microsystems and a senior systems designer at Transarc. He led the design and implementation of numerous Java platform features, including the Java 5.0 language enhancements and the Java Collections Framework. He holds a Ph.D. in computer science from Carnegie-Mellon University and a B.S. in computer science from Columbia University. Joseph Bowbeer is a software architect at Vizrea Corporation, where he specializes in mobile application development for the Java ME platform, but his fascination with concurrent programming began in his days at Apollo Computer. He served on the JCP expert group for JSR-166 (Concurrency Utilities). David Holmes is a senior research scientist at the Cooperative Research Centre for Enterprise Distributed Systems Technology (DSTC Pty Ltd), located in Brisbane, Australia. His work with Java technology has focused on concurrency and synchronization support in the language and virtual machine. He has presented tutorials on concurrency and design at numerous international object-oriented programming conferences. He completed his Ph.D. at Macquarie University, Sydney, in 1999. Doug Lea is one of the foremost experts on object-oriented technology and software reuse. He has been doing collaborative research with Sun Labs for more than five years. Lea is professor of computer science at SUNY Oswego, co-director of the Software Engineering Lab at the New York Center for Advanced Technology in Computer Applications, and adjunct professor of electrical and computer engineering at Syracuse University. In addition, he co-authored the book, Object-Oriented System Development (Addison-Wesley, 1993). He received his B.A., M.A., and Ph.D. from the University of New Hampshire.
  • Print
  • Feedback

Resources