Add concurrent processing with message-driven beans

Process a request concurrently in the J2EE framework

A concurrent program is one that can execute multiple tasks simultaneously. Concurrency improves a program's throughput, execution speed, and responsiveness. On a single processor system, concurrent programs efficiently use computer resources by overlapping slow I/O (input/output) operations with computational tasks; on a multiprocessor system, concurrency helps maximize throughput by executing tasks in parallel on different CPUs.

Concurrency can be achieved in multiple ways. In Java, concurrency is easily achieved by multithreading. Threads have a lower overhead as compared to independent processes, and Java provides built-in support for threads. Thus, concurrent programming is an integral and desirable feature for Java programs.

Many real-world applications find parallel processing either necessary or highly desirable. One such example is an application that displays the best price and availability of an item by searching multiple retailers. This article discusses various implementations of this example application, called Price Buster, in a J2EE (Java 2 Platform, Enterprise Edition) framework and describes how message-driven beans (MDBs) can effectively achieve parallel processing of a single user request.

Need for concurrent processing

To justify the need of concurrent processing for the Price Buster application, let us discuss what this application does, the desirable features, and the best way to implement it in the J2EE framework. Our sample application takes an item name or model number as user input over the Web, invokes a backend process that searches the item's price and availability from multiple retailers, formats the data, and presents it to the user.

This application's ideal implementation is one that can search many retailers in the shortest possible time. Out of the total time required to process one user request, the backend process that connects to multiple retailer systems will spend the maximum time to gather pricing information. Let's assume it takes a minimum of 15 seconds to search an item's details from a retailer. Hence, searching 10 retailers requires a minimum of 150 seconds, assuming the retailers are searched serially, one after the other. From the user's viewpoint, a response time of 150 seconds is completely unacceptable. To ensure an acceptable user response time while searching numerous retailers, the implementing technology must search in parallel rather than serially.

EJB and concurrent processing

Now let us discuss the different implementation options available if the Price Buster application needs to be implemented in the J2EE framework. The typical and most common implementation in a J2EE environment would consist of a Web module (servlets and JavaServer Pages, or JSP) that handles the session and presentation, and a J2EE module with EJB (Enterprise JavaBeans) components containing the business logic to connect and search the retailers' legacy systems, as illustrated in Figure 1. The user invokes the JSP/servlet over the Web, which, in turn, invokes a method on the search EJB component. The search EJB component gets the item's price and availability from three retailers A, B, and C by invoking retailer EJB components, one after the other. The important point to note here is that the retailers are searched sequentially. Because of the serial invocation, a minimum of 45 seconds will pass before the results display to the user.

Figure 1. Typical implementation of Price Buster application in J2EE framework. Click on thumbnail to view full-size image.

In this implementation, the user response time depends on the number of retailers searched and the time each retailer takes to respond. Although this solution is functionally correct, it has a serious design flaw. The response time increases linearly with the number of retailers searched and is a minimum of 300 seconds (5 minutes) for 20 retailers, which is not at all user friendly!

To reduce the user response time to an acceptable value and still maintain the ability to search numerous retailers, the application needs to be redesigned to use a technology that allows searching of retailer systems in parallel rather than serially. In other words, we need to add concurrency to the above implementation to get a faster response time. As mentioned earlier, in Java programs, this is easily achieved by multithreading, i.e., assigning independent tasks to multiple threads. This technique is effective when the tasks are I/O intensive instead of CPU intensive, which is the case for the Price Buster application. The retailer EJB component makes an I/O call to the retailer system, which responds in 15 seconds, at a minimum. During this time, the retailer EJB component is not doing any processing, just waiting for the response from the retailer. Hence, this application is a perfect target for a multithreaded implementation.

Here are some ways to add concurrency:

  1. The application's Web module can be modified to spawn multiple threads—let's call them worker threads—inside the JVM hosting the servlets. Each worker thread can invoke the retailer EJB component directly, with one worker thread dedicated per retailer. When invoked, the servlet breaks the input request into multiple tasks, one for each retailer, and assigns them to multiple worker threads. After assigning the tasks, the servlet thread can wait either for a specified time or until all the worker threads finish processing. This solution works perfectly fine and gives the desired results, but it deviates from J2EE's fundamental design philosophy, which targets application developers to focus on developing the business logic rather than concentrating on complex multithreading and synchronization issues.
  2. The search EJB component can be redesigned to spawn multiple threads that can invoke the retailer EJB components in parallel. Unfortunately, this is not advisable at all, because the EJB specification restricts the creation of user threads inside an EJB container. Note that the EJB server itself is multithreaded and can handle multiple client requests concurrently. The reason behind this restriction lies in the fact that J2EE is a framework for building highly scalable server-side components and is responsible for providing concurrency and other services. This architecture removes the burden from enterprise bean developers to understand complex multithreaded code. Moreover, the EJB container can manage resource allocation more effectively by controlling thread creation.
  3. Another solution implements the retailer EJB components as MDBs. This allows the search EJB component to assign tasks to multiple retailer MDBs simultaneously by sending multiple message events, which enables the retailer MDBs to process the requests in parallel. Session EJB components cannot be used this way because they are limited to synchronous invocation and must be invoked serially. On the other hand, MDBs can be invoked asynchronously on message events and hence can process requests in parallel.

Keeping in mind the EJB specification's limitations and ease of implementation, the best solution would be Solution 3, i.e., to use asynchronous invocation of retailer MDBs. Messaging is the most common and reliable mechanism for providing asynchronous communication among different system components and becomes the natural choice for J2EE applications because it is well integrated with the EJB framework with the support of JMS (Java Messaging Service) and MDBs. The rest of the article covers how concurrency can be added to EJB using MDBs.

Concurrency in an EJB environment using message-driven beans

Now I discuss in detail how we can use MDBs to add concurrency to the Price Buster application. An MDB-based solution entails two primary requirements: first, an EJB 2.0-compliant application server that supports message-driven beans and, second, a messaging software with a JMS interface, such as IBM MQSeries. It is wise to select a messaging system that is well integrated with the application server being used. The combination of WebSphere and MQSeries is one good example.

Note: If your application server is not EJB 2.0-compliant or does not support MDBs, please refer to my article, "Event-Driven Enterprise JavaBeans," in WebSphere Developer's Journal, which discusses how to develop MDBs in any EJB application server, including EJB 1.x-based servers.

In our implementation, the retailer EJB components deploy as MDBs so they can be invoked on a message event, as shown in Figure 2, instead of exposing their methods through a session EJB interface. Below, I describe this implementation's complete flow.

Figure 2. Add concurrency in a J2EE framework using MDBs. Click on thumbnail to view full-size image.

To start, the user invokes the JSP/servlet over the Web, providing the item name or model. The servlet, in turn, invokes a method on the search EJB component. The search EJB component constructs three distinct JMS messages from the input request, one for each retailer to be searched, and places them on the request queue. Then the search EJB component enters a wait mode, waiting for the response messages to arrive on the response queue (Step 2, in Figure 2). The arrival of messages in the request queue triggers the invocation of retailer MDBs. One retailer MDB is invoked per message, and all invoked MDBs start processing their messages simultaneously (3). After searching the retailer systems for price and availability information, the retailer MDBs embed the results in JMS messages and put them on the response queue (4). Remember that all the retailer MDBs process in parallel; hence, they all return the results in approximately 15 seconds, which is the minimum time to search a single retailer. The waiting search EJB component picks up all three messages from the response queue, assembles the result, and sends the data back to the invoking servlet/JSP (5), which, in turn, displays it to the user (6).

In this implementation, the minimum response time to return to the user depends only on the time required for each retailer to respond, unlike our first implementation, where the response time also depended on the number of retailers searched. Hence, even if the number of retailers increases to 10, the minimum response time will be around 15 seconds. Few milliseconds will be spent in assembling and disassembling the messages and interacting with the messaging system. This represents a major improvement over the serial processing time of 150 seconds for 10 retailers.

Design considerations

You should be aware of some design considerations when designing a concurrent application based on MDBs. When multiple users make simultaneous requests to the Price Buster application, multiple instances of the search EJB component will be active at any given time, each one serving one servlet thread. In that case, a mechanism must map the response messages from a group of retailer MDBs to the corresponding invoking search EJB component. You can achieve this in multiple ways:

  • Use a temporary response queue for each incoming request. On invocation, the search EJB component can create a unique temporary queue and pass its name along with input parameters in the JMS messages placed on the request queue. The retailer MDBs that process a request for a particular search EJB component can all use the temporary queue provided in the input as their response queues. This approach is simple and and prevents interference among different requests. However, the overhead of creating a new temporary queue with each request should be assessed. If the overhead is small, this approach is clean because it removes any possibility of cross talk between multiple search EJB components.
  • Use a pool of predefined response queues. This approach resembles the first one, except that the response queues are pooled instead of being created per request. A little extra work is involved in this approach. To manage the pool of response queues, a controller EJB or a utility class must be developed that can manage the assignment of response queues to search EJB components. The controller also must clear the queue of any old messages left by the previous assignment before reassigning to another search EJB component. Don't forget that at the end of processing, the search EJB component must return the queue back to the controller.
  • Use the JMS selector feature to select only the desired messages from the response queue. For those unfamiliar with the JMS selector, this JMS feature allows a client to specify the messages it is interested in, using message headers. Using a selector, the receiver receives only those messages whose headers and properties match the selector criteria. In this approach, only one response queue is needed for all requests. The search EJB component creates a unique key and passes it to the retailer MDBs in a message header. Next, the search EJB component creates a JMS selector using the key and waits for the response messages. All retailer MDBs associated with the search EJB component pass the key back in the response message header. This way, the search EJB component receives only messages with the header value set to the unique key. The sample code for the search EJB component below shows how to select only the desired messages using JMS selectors. This approach's disadvantage is that the selector feature tends to slow down with numerous messages in a queue.

Sample code for search EJB component

/* Step 1: Create a unique key, use some class, say 
   UniqueKeyCreater. **/
String uniqueKey = UniqueKeyCreater.getKey();
String retailers [] = new String []{"A", "B", "C"};
/* Step 2: Put three messages in the REQUESTQ, one for each retailer.
   Define a header called "KEY" and set its value to uniqueKey            
   Note: The retailer MDBs also have to set the same header/value 
   in the response messages. **/
QueueSender queueSender = queueSession.createSender(requestQueue);
Message message = queueSession.createTextMessage();
message.setStringProperty("KEY",uniqueKey);
for(int i = 0; i < retailers.length; i++) {
     message.setText("Retailer:" + retailers[i] + ",Item:" + itemName);
     queueSender.send(message);
}
/* Step 3: Now wait for messages in the response queue. Get only those 
   messages that have the header KEY with value set to uniqueKey.
   Use a JMS selector for this purpose. The while loop will break if 
all 
   three responses arrive in RESPONSEQ or 30 seconds are over. Even if 
   all three response messages do not arrive in 30 seconds, the code 
   will be out of while loop, ensuring a guaranteed response time of 30 
   seconds. **/
String selector = "KEY = '" + uniqueKey + "'";
QueueReceiver queueReceiver = 
queueSession.createReceiver(responseQueue, selector);
long startTime = System.currentTimeMillis();
int messagesExpected = retailers.length;
long waitTime = 30000; // 30 seconds
Vector responses = new Vector(retailers.length);
while(messagesExpected > 0 && waitTime > 0 ) {
    Message rcvdMsg = queueReceiver.receive(waitTime);
     //Check if we got a msg, if not then break.
      if (rcvdMsg == null){
        //Wait time expired.
         break;
      }
      responses.add(rcvdMsg);
      messagesExpected--;
      waitTime = 30000 - (System.currentTimeMillis() - startTime);
}

Each discussed method has its own advantages and disadvantages; the selection of the best solution varies from application to application. The best solution depends upon many factors such as peak load, cost of creating temporary queues, JMS provider capabilities, hosting server configuration, and so on. You should consider all these factors before finalizing your application's solution. Sometimes, a combination of the second and third approaches can be used for large volume applications.

So far, we have seen how concurrency helps to achieve faster response time for Web-based applications. Concurrency has another side too: it does not come free and requires many more resources in a short time span. The peak load requirement for a concurrent application is much higher as compared to a serial one.

Take the case of the Price Buster application: if there are 20 simultaneous users and 10 retailers, there will be 20 retailer EJB components in a non-MDB implementation as compared to 200 retailer MDBs in the MDB-based implementation. Hence, the server hosting the MDB-based solution must have a more powerful configuration compared to a server hosting a non-MDB version. A powerful server capable of running thousands of threads simultaneously does not completely solve the problem. One more factor must be considered: the concurrent application also increases the retailer systems' peak load requirements because, in the MDB-based implementation, many requests are made to the retailer systems in a short time span. To avoid any outages in production, detailed capacity planning along with stress testing should be completed during preproduction.

Guaranteed response time

The MDB-based concurrent system has one important advantage over the typical implementation. The MDB-based implementation not only provides faster response time, but also guarantees a fixed response time. To understand that, let's review some timing numbers. In a sunny day scenario (when everything works the way you want), for the serial or non-MDB implementation, the user response time for all three retailers will be approximately 45 seconds; but that is true only if all three retailers are functioning properly. What happens if one retailer's system starts malfunctioning and response time increases to 200 seconds? The outcome won't be user friendly because the user response time will be too long. In such a scenario, we would be better off ignoring the results of the malfunctioning retailer and returning the remaining two results to the user in an acceptable timeframe. Unfortunately, you cannot do that easily in the non-MDB implementation. There is no way to break the call made to the retailer EJB component having problems and go on to the next one, because the call from the search EJB component to the retailer EJB component is a synchronous blocking call. Although ensuring a fixed response time wouldn't be an issue if all participating retailers guarantee one, it is hard to enforce. Also, programming with that assumption is not at all practical.

However, the above problem can be easily solved in the MDB-based implementation: after assigning tasks to multiple retailer EJB components, the search EJB component can wait either for a specified time period for responses or until it receives responses from all retailer MDBs. Achieve that by invoking the receive(long waitimeout) method call on the queueReceiver object with a specified maximum wait time. Invoking the receive method call without specifying a wait time blocks the call, and it only returns when a message arrives in the queue. On the other hand, if a wait time is specified, the method returns either on a message arrival or on the wait time's expiration. To ensure a guaranteed user response time of 30 seconds, set the maximum wait time in the search EJB component to 28-29 seconds. The sample code above explains how to achieve this using the queueReceiver.receive(long waitimeout) method.

Some time should remain for assembling the results and formatting them for display. Even if not all the retailers respond in the specified time, the application will return data from the others to the user in 30 seconds. The data returned from the slow responding retailer can be safely discarded. For a system with numerous retailers, always use a maximum wait time because it is impossible to always ensure the availability of all the retailer systems. As a general rule of thumb, always invoke external systems not in your control with a timeout.

Implement faster response times

You have seen from the above discussion that concurrency is a must for providing faster response times in large-scale I/O-intensive applications such as search engines. Adding concurrency to these applications in the J2EE framework can be easily achieved by using JMS and MDBs. The two main benefits of an MDB-based solution are:

  1. MDBs enable parallel processing of user requests, which provides a faster response compared to serial invocation
  2. MDB implementation can ensure guaranteed user response

I recommend adding concurrency analysis as an integral part of the J2EE application design process.

Amit Poddar is a senior engineer with a large Internet company. He has 10 years of IT experience, with expertise in the development of networking protocols and server-side Java, and has designed and implemented large-scale fault-tolerance systems using IBM's MQSeries and WebSphere.

Learn more about this topic

Join the discussion
Be the first to comment on this article. Our Commenting Policies