Listen to heartbeats using JMS

Extend heartbeats using JMS publish and subscribe

Heartbeats, a common software construct, verify the continual operation of a specific component or service. With heartbeats, a targeted service continually broadcasts a signal across its environment. You can assume your system works normally when your client services can detect a targeted service's heartbeat signals. Meanwhile, if the critical heartbeat signal ceases, each component knows how to react.

As a heartbeat example, consider a backend market-price publisher that publishes realtime prices to other services on the network, as Figure 1 shows. Because a pause in price publishing could indicate either a legitimate market pause or a system problem resulting in stale price information, it's critical that other services receive proper system status information.

Figure 1. Messaging architecture scenario

To avoid stale price information, the publisher periodically issues a heartbeat signal to the client services, keeping them informed of its status. If the heartbeat signal ends, the system's customer inquiry module might warn the customer of the stale price information, but will resume shortly. Meanwhile, the application's risk management system would suspend other transactions until the publisher resumes publishing current prices.

Background

I recently used a similar setup with a series of interdependent components such that, if one failed, the others received notification. Because the system employed JMS (Java Message Service) as a core component, I used JMS messaging as a way to transmit heartbeats between components.

However, at the project's start, I didn't realize the critical nature of the interdependencies, so a proprietary heartbeat, implemented differently each time, supported each relationship. Figure 2 shows the result.

Figure 2. The initial heartbeat implementation

All right, I exaggerate a bit. However, that would have been the result had I not implemented a different system.

In many systems, particularly high-availability operating systems, developers implement heartbeats using multicast -- suitable for broadcasting an "I'm still here!" signal, but not necessarily for targeting a specific process or consumer.

However, a JMS-based heartbeat system, as Figure 3 shows, proves ideal for periodically determining whether the overall system -- the network, the JMS server, the JVM, and so on -- remains in operation. Moreover, with a JMS-based solution, you can test the availability of other resources (see the "Dependent Publishers" section below).

Figure 3. The publish/subscribe heartbeat architecture

With my JMS-based solution, each critical component publishes a heartbeat to an exclusive known topic, and any interested client components receive the heartbeats by subscribing to that topic. Further, the publisher need not know the subscribers' nature or number. By choosing a JMS-based approach, my solution became simple, portable, reusable, and scalable.

The heartbeat class library's functional components

To keep things simple and to promote reuse, I've supplied several base components.

Heartbeat publishers

A heartbeat publisher transmits a heartbeat message to a predetermined topic every NP seconds. NP, known as the publisher period, is the published heartbeat's frequency. The publisher has the following components:

  • A connected TopicConnection, TopicSession, Topic, and TopicPublisher.
  • A thread that sleeps for NP seconds, awakens, publishes a message, then sleeps again.
  • An exception handler that generates an appropriate alert if the message publication fails.

The architecture features two heartbeat-publisher categories:

  • Independent publishers: Publish messages every NP seconds as long as the assigned thread can run and a good connection exists to the JMS supplier. The heartbeat's requirement is basic, but the subscribers are limited by what they can infer from a successful heartbeat. The inference is limited to knowing that the JVM is reasonably healthy and still runs.
  • Dependent publishers: Execute a system test, and, if successful, publish a heartbeat. Test examples include connecting to a database, making an IIOP (Internet Inter-ORB Protocol) connection, testing for a file's existence, or receiving a heartbeat receipt from another message system, such as TIBCO or MQSeries. Subscribers can make specific inferences from heartbeats they receive from dependent publishers.

Heartbeat subscribers

A heartbeat subscriber listens for heartbeats on a predetermined topic. If a heartbeat message is not received every n seconds, an event defined by a subscriber listener fires.

Heartbeat event listeners

The heartbeat publisher and subscriber both possess event listener interfaces. You can register the objects that implement the listener interfaces with each component. The objects receive notification when a specific event occurs. You can design the event listeners to execute any action upon receiving notification of a heartbeat event. Actions could range from "do nothing" to "turn on a display light" to "send an email."

The listeners are alerted to the following events:

Publisher listeners

  • Start: Raised when the publisher first starts sending heartbeats. Start events represent a good place to initialize any resources whose lifetimes link to the heartbeat session.
  • Stop: Raised when the publisher stops sending heartbeats. Accordingly, stop events serve as good clean-up places following a heartbeat session.
  • Sent: Raised every time a heartbeat publishes. I usually do not implement any code to respond to the sent event, except perhaps a high-verbosity log statement. You, however, may find it useful.
  • Exception: Raised whenever the publisher throws an exception. The exception itself also passes so the event handler can properly handle the event.

Subscriber listeners

  • Start: Raised when the subscriber starts and expects heartbeats from the publisher. Once this event occurs, the subscriber expects heartbeats and raises failure events if they do not arrive on schedule.
  • Stop: Raised when the subscriber stops and no longer expects heartbeats from the publisher. The subscriber issues no further events.
  • Received: Raised each time the subscriber receives a heartbeat from the publisher. In this scenario, your heartbeat-subscriber process constantly verges on shutting down, but the heartbeat's receipt pulls your process back from the edge. However, the event handler typically does nothing, since the heartbeat subscriber implementation detects the failure for you.
  • Failed: Raised if the subscriber's period expires and the subscriber has not received a heartbeat. For example, if the period is 5 seconds, and 5 seconds elapses without a heartbeat, the failed event handler might display a warning message. The failed event handler also receives the number of failures since the last successfully received heartbeat, allowing the handler to optionally execute the failure event handling code the first time (or, for example, every 10th time) a failure occurs.
  • Resumed: After one or more failures, a heartbeat's receipt raises both a generic received event and a more specialized resumed event. The resumed event indicates that the subscriber was previously in a failed state, but has now resumed its heartbeat session. The event handler may need to resume certain activity once the session has been re-established, because it may have stopped that activity on the first failure. However, it stands to reason that you may not want to execute this activity each time a heartbeat is received.
  • Exception: Raised whenever a subscriber throws an exception. The exception itself also passes, so the event handler can properly handle the event. I typically implement a javax.jms.ExceptionListener in the subscriber, and when the JMS connection throws an exception, the subscriber simply passes it to the subscriber listener.

Figure 4 shows how our heartbeat architecture now looks.

Figure 4. The JMS heartbeat architecture summary

Implementation review

In our implementation, the base definition IHeartbeatPublisher and IHeartbeatSubscriber interfaces define simple behavior. In this JMS-specific implementation, because the subscriber and publisher had so much common functionality, I built most of the utility functionality into the abstract HeartbeatBase class. Consequently, HeartbeatPublisher's and HeartbeatSubscriber's concrete implementations both extend HeartbeatBase; they also implement their respective constituent interfaces, as Figure 5 shows.

Figure 5. The JMS heartbeat implementation's UML (Unified Modeling Language) diagram. Click on thumbnail to view full-size image.

Let's review the highlights from these classes.

HeartbeatBase

The HeartbeatBase provides the following constructs:

Constructors

I've implemented the constructors for both the subscriber and the publisher here because both need:

  • A topic connection.
  • A topic name: The publisher publishes heartbeats to this topic and the subscriber subscribes to it.
  • A period: A publisher publishes a heartbeat on a frequency defined as the period, expressed in milliseconds (ms). A subscriber expects to receive a heartbeat every <period> ms, or else the heartbeat fails.

The two constructors accept a TopicConnectionFactory or a TopicConnection, respectively. Both constructors initialize all JMS resources, then invoke the init() abstract method, which invokes the concrete child class's method to involve the concrete class-specific JMS resource initialization:

public HeartbeatBase(TopicConnectionFactory topicConnectionFactory,
   String topicName, long period, String message ) throws
   HeartbeatInitializationException
public HeartbeatBase( TopicConnection topicConnection, String topicName,
   long period, String message ) throws HeartbeatInitializationException

Listener registration

The HeartbeatBase registers and unregisters subscriber or publisher listeners. The concrete classes must then fire the events in the listening objects:

public void registerListener(IHeartbeatEventListener listener)
   throws InvalidEventListener
public void unregisterListener(IHeartbeatEventListener listener)

Base destruction

All JMS resources defined in the base class are closed.

Thread pooling

Thread pooling resources handle asynchronous heartbeat-event invocation.

Heartbeat process control

The HeartbeatBase determines when to stop, start, pause, or resume heartbeat events.

public void startHeartbeatProcess()
public void stopHeartbeatProcess()
public void setMessagesPaused(boolean working)

Register shutdown hook

The base class defines the regShutdownHook() abstract method. Its concrete implementation does not perform any operation, but you can optionally define a shutdown hook as supported by JDK 1.3's java.lang.Runtime class. That lets you create a runnable that runs when the JVM receives a shutdown signal. Some JMS implementations fuss about resources being closed, and closed in the right order, before the JVM shuts down; by defining a shutdown hook, you can ensure that all resources are cleaned properly. I have had mixed results using the shutdown hook, with behavior differing according to JVM version and platform. You can see the shutdown implementation in the code, where it's disabled.

HeartbeatPublisher

The HeartbeatPublisher implementation publishes a heartbeat message every period. The publication may depend on a predefined test executed before every publication. The implementation must define actions for each event a publisher can fire:

public void fireHeartbeatPublisherException(Exception erx)
public void fireHeartbeatPublisherStarted()
public void fireHeartbeatPublisherStopped()
public void fireHeartbeatSent()

Because HeartbeatBase extends java.lang.Thread, I implemented the publisher's main event loop in a public void run() method:

1. public void run() {
2.     try {
3.       TextMessage tmessage  = topicSession.createTextMessage();
4.       tmessage.setText(topicName + " HEARTBEAT");
5.       fireHeartbeatPublisherStarted();
6.       while(run) {
7.         Thread.sleep(period);
8.         topicPublisher.setTimeToLive(timeToLive);
9.         if(process){
10.          if(fireDependency()) {
11.            topicPublisher.publish(tmessage);
12.            fireHeartbeatSent();
13.          }
14.        }
15.      }
16       fireHeartbeatPublisherStopped();
17.      shutdown();
18.    } catch (Exception erx) {
19.      if(run) fireHeartbeatPublisherException(erx);
20.      else fireHeartbeatPublisherStopped();
21.      shutdown();
22.    }
23.  }

Line 4's code creates a recognizable message. When the publisher starts, it informs the listeners in line 5, then starts a loop. The loop sleeps for the publisher period, and if line 9's code determines that messages are not paused, the dependency check fires in line 10. If this returns a true value, then the heartbeat publishes (line 11) and line 12's code informs the publisher listeners of the publication. When the loop ends, the publisher notifies the listeners (line 16) and shuts down resources (line 17). If an exception throws in the loop, the listeners receive notification (line 19).

The publisher interface also defines the following methods:

public void setDependentParameters(Object[] parameters) throws InvalidDependentParameters
public boolean fireDependency()
1 2 3 Page
Join the discussion
Be the first to comment on this article. Our Commenting Policies
See more