Listen to heartbeats using JMS

Extend heartbeats using JMS publish and subscribe

1 2 3 Page 2
Page 2 of 3

If the publisher is independent, these methods do nothing. However, if the publisher is dependent, then you must override these methods to achieve the dependency implementation. The subscriber setup calls the setDependentParameters() method to pass the parameters necessary for the subscriber to execute its dependency. Subsequently, as a precondition to each heartbeat being published, the publisher executes the dependency defined in the fireDependency() method. This method doesn't throw any exceptions, but it does return a Boolean indicating the dependency's success or failure. The heartbeat only publishes if the dependency succeeds. You'll see a dependent publisher example later in this article.

In contrast, for an independent publisher, setDependentParameters() does nothing, while fireDependency() always returns a true value:

public void setDependentParameters(Object[] parameters)
   throws InvalidDependentParameters{
}
public boolean fireDependency() {
  return true;
}

HeartbeatSubscriber

The HeartbeatSubscriber implementation listens on the predefined topic for heartbeat messages. If it does not receive a heartbeat within the defined period, the heartbeat session should fail. The implementation must define actions for each event a publisher can fire:

public void fireHeartbeatSubscriberStarted()
public void fireHeartbeatSubscriberStopped()
public void fireHeartbeatFailed()
public void fireHeartbeatReceived()
public void fireHeartbeatResumed()
public void fireHeartbeatSubscriberException(Exception erx)

The subscriber implementation serves two main functions. First, the main event loop resembles the publisher in that you simply implement the run() method. In the subscriber's case, the loop checks that a heartbeat has been received every period or fails the session. In our implementation, the thread sleeps for the period, wakes up, determines whether or not it has received a heartbeat, and acts accordingly. The subscriber has a tick -- a Boolean that indicates if a heartbeat has been received. So, during one period, the thread sets the tick to a false value, then sleeps. When it awakes, if the tick is true, it knows a heartbeat has been received in the interim:

1.   public void run() {
2.     try {
3.       setTick(false);
4.       topicSubscriber.setMessageListener(this);
5.       topicConnection.start();
6.       fireHeartbeatSubscriberStarted();
7.       while(run) {
8.         this.sleep(period);
9.         if(run && process) {
10.          if(!isTick()) { // Heartbeat failed
11.            failTime = System.currentTimeMillis();
12.            state = true;
13.            fTickCount++;
14.            fireHeartbeatFailed();
15.          } else {    // Heartbeat received
16.            setTick(false);
17.            state = false;
18.          }
19.        }
20.      }
21.      fireHeartbeatSubscriberStopped();
22.      shutdown();
23.     } catch (Exception erx) {
24.      if(run)fireHeartbeatSubscriberException(erx);
25.      else fireHeartbeatSubscriberStopped();
26.      shutdown();
27.    }
28.  }

When the thread starts, the code sets the tick to false in line 3. A message listener registers in line 4, then in line 5, the topic subscription-connection commences the message flow. In line 6, the subscriber listeners receive notification of the subscriber startup. The thread sleeps for the defined period in line 8; when it awakes, if the subscriber has not paused or stopped, line 10's code checks the tick. If a heartbeat has not been received, line 11's code notes the failure time, line 12's code sets the failure state, line 13's code increments the failure count, and, lastly, line 14's code notifies the subscriber's listeners of the failure. If the tick is true, line 16's code sets it to false, and line 17's code sets the failure state to false.

At some point, the thread exits the run loop ending at line 20. The subscriber then notifies the listeners in line 21 and cleans up resources in line 22. If an exception throws in the loop, line 24's code notifies listeners about the exception. Line 26's code then cleans up resources. Although the run loop checks the tick, the onMessage() method -- automatically invoked when the topic receives a JMS message -- sets it:

1.   public void onMessage(Message message) {
2.     try {
3.       if(message instanceof TextMessage) {
4.         if(((TextMessage)message).getText().equals(topicName + " HEARTBEAT")) {
5.           setTick(true);
6.           if(state) {  // Recovering from heartbeat failure
7.             state = false;
8.             resumeTime = System.currentTimeMillis() - failTime;
9.             fireHeartbeatResumed();
10.            fTickCount=0;
11.          }
12.          fireHeartbeatReceived();
13.        }
14.      }
15.    } catch (Exception erx) {
16.      setTick(false);
17.      fireHeartbeatSubscriberException(erx);
18.    }
19.  }

The onMessage() method automatically invokes in a separate thread when the subscriber from the JMS server on the designated topic receives a message. Lines 3 and 4 determine if the message is a valid heartbeat. In this case, the message type and the text's content must replicate the published publisher (see lines 3 and 4 of the run() method in the HeartbeatPublisher).

If the message is a valid heartbeat, line 5 sets the tick to true. If the subscriber was formerly in a failed state, but is not now, the heartbeat has resumed. A resumed heartbeat indicates an important event, because it can trigger special events that achieve the opposite of a failure. If that's the case, the resume timestamp records, and line 9's code notifies the resumption to the subscriber listeners. Line 10's code sets the failure count back to 0. Line 12's code then notifies the listeners of a received heartbeat.

If the onMessage() method throws an exception, the code in lines 16 and 17 sets the tick to false and notifies the listeners of the exception.

Note: The heartbeat's payload can contain any runtime data or diagnostic information useful to the subscriber.

Heartbeat event listeners

Classes interested in the individual publishers' and subscribers' activities implement the heartbeat event listener interfaces IHeartbeatSubscriberListener and IHeartbeatPublisherListener. Figure 6's UML diagram describes the interfaces and two corresponding basic implementations.

Figure 6. The heartbeat event listeners' UML diagram. Click on thumbnail to view full-size image.

IHeartbeatSubscriberListener

The subscriber listener defines the following methods, which the subscriber fires at the appropriate event:

  • public void notifyHeartbeatFailedEvent(int ticks): Fired when the heartbeat fails and passes the number of consecutive failures as an argument into the implementing method. The failure count resets to 0 when the heartbeat resumes.
  • public void notifyHeartbeatReceived(): Fired when a heartbeat is received.
  • public void notifyHeartbeatSubscriberStopped(): Fired when the subscriber stops.
  • public void notifyHeartbeatSubscriberStarted(): Fired when the subscriber starts.
  • public void notifyHeartbeatSubscriberException(Exception erx,HeartbeatBase heartbeat): Fired when the subscriber throws an exception. Both the exception and the heartbeat base that threw the exception pass as arguments to the implementing method. The base lets the listener take action on the subscriber after an exception.
  • public void notifyHeartbeatResumed(long time, int ticks): Fired when a heartbeat session resumes. Both the time of the session resumption and the number of failures that occurred before the resumption pass as arguments to the implementing method. For example, if the period is 5 seconds, and the session failed at 1:32:20, then resumed at 1:32:39, the resume time is 1:32:39 and the number of elapsed failures is three.

IHeartbeatPublisherListener

IHeartbeatPublisherListener defines the following methods, which the publisher fires at the appropriate events:

  • public void notifyHeartbeatPublisherStopped(): Fired when the publisher stops.
  • public void notifyHeartbeatPublisherStarted(): Fired when the publisher starts.
  • public void notifyHeartbeatPublisherHeartbeatSent(): Fired when the publisher publishes a heartbeat.
  • public void notifyHeartbeatPublisherException(Exception erx, HeartbeatBase heartbeat): Fired when the publisher throws an exception. Both the exception and the heartbeat base that threw the exception pass as arguments to the implementing method. The base allows the listener to take action on the publisher after an exception.

The sample listener implementations merely print the event and any passed values to System.out. Here's an example from the subscriber:

    public void notifyHeartbeatFailedEvent(int ticks)
    {
        print("Heartbeat Session Failed "+ ticks + " times.");
    }

As a quick and dirty test, both the HeartbeatPublisher and the HeartbeatSubscriber have main(String args[]) methods. The publisher's listener terminates the JVM after 10 published heartbeats. I implemented the sample listeners described above in this test to display the applicable heartbeat events.

The main() method's parameters include:

  • JNDI (Java Naming and Directory Interface) URL (e.g., ldap://myserver:389/o=nickman.com)
  • JNDI principal (e.g., cn=Directory Manager)
  • JNDI credentials (e.g., secret)
  • JNDI ContextFactory name (e.g., com.sun.jndi.ldap.LdapCtxFactory)
  • JNDI authentication type (e.g., simple)
  • JNDI namespace for JMS connection factory (e.g., cn=TCF,ou=jms)
  • Heartbeat topic name
  • Period (e.g., ms)

If you run both a publisher and a subscriber, you'll produce the following output:

Publisher output

java com.nickman.jms.heartbeats.HeartbeatPublisher
   smqp://localhost:4001 admin secret com.swiftmq.jndi.
   InitialContextFactoryImpl simple TopicConnectionFactory 
   MY.WORLD 1000
[Mon Jan 21 21:45:22 EST 2002] Publisher Started
[Mon Jan 21 21:45:23 EST 2002] Publisher Sent Heartbeat
[Mon Jan 21 21:45:24 EST 2002] Publisher Sent Heartbeat
[Mon Jan 21 21:45:25 EST 2002] Publisher Sent Heartbeat
[Mon Jan 21 21:45:26 EST 2002] Publisher Sent Heartbeat
[Mon Jan 21 21:45:27 EST 2002] Publisher Sent Heartbeat
[Mon Jan 21 21:45:28 EST 2002] Publisher Sent Heartbeat
[Mon Jan 21 21:45:29 EST 2002] Publisher Sent Heartbeat
[Mon Jan 21 21:45:30 EST 2002] Publisher Sent Heartbeat
[Mon Jan 21 21:45:31 EST 2002] Publisher Sent Heartbeat
[Mon Jan 21 21:45:32 EST 2002] Publisher Sent Heartbeat
[Mon Jan 21 21:45:33 EST 2002] Publisher Sent Heartbeat
[Mon Jan 21 21:45:33 EST 2002] Publisher Stopped

Subscriber output

<java com.nickman.jms.heartbeats.HeartbeatSubscriber
   smqp://localhost:4001 admin secret com.swiftmq.jndi.
   InitialContextFactoryImpl simple TopicConnectionFactory
   MY.WORLD 2000
[Mon Jan 21 21:45:25 EST 2002] Heartbeat Subscriber Started
[Mon Jan 21 21:45:25 EST 2002] Heartbeat Session Resumed at Mon Jan 21 21:45:25 EST 2002 and  0 ticks.
[Mon Jan 21 21:45:25 EST 2002] Heartbeat Received
[Mon Jan 21 21:45:26 EST 2002] Heartbeat Received
[Mon Jan 21 21:45:27 EST 2002] Heartbeat Received
[Mon Jan 21 21:45:28 EST 2002] Heartbeat Received
[Mon Jan 21 21:45:29 EST 2002] Heartbeat Received
[Mon Jan 21 21:45:30 EST 2002] Heartbeat Received
[Mon Jan 21 21:45:31 EST 2002] Heartbeat Received
[Mon Jan 21 21:45:32 EST 2002] Heartbeat Received
[Mon Jan 21 21:45:33 EST 2002] Heartbeat Received
[Mon Jan 21 21:45:37 EST 2002] Heartbeat Session Failed 1 times.
.
.
[Mon Jan 21 21:46:55 EST 2002] Heartbeat Session Failed 40 times.

Dependent publishers

The DependentHeartbeatPublisher class implements a dependency in which the heartbeat does not publish unless the dependency is satisfied. In that case, the dependency is a connection to a JDBC (Java Database Connectivity) database.

To create a dependent publisher, simply extend HeartbeatPublisher and override the setDependentParameters() and fireDependency() methods. You must also implement the constructors, but they simply pass through. Figure 7's UML diagram summarizes DependentHeartbeatPublisher's structure.

Figure 7. The dependent heartbeat publisher's UML diagram. Click on thumbnail to view full-size image.

The setDependentParameters() method must supply the object with the data necessary for it to accomplish the dependency test. In this example implementation, you simply pass in the JDBC parameters to connect to a database:

  public void setDependentParameters(Object[] parameters) throws
    InvalidDependentParameters {
    if(parameters.length != 4) throw new InvalidDependentParameters
       ("Invalid Number Of Parameters:" + parameters.length
       +  ". Should be 4.");
    try {
      jdbcDriverClass = (String)parameters[0];
      jdbcURL = (String)parameters[1];
      jdbcUser = (String)parameters[2];
      jdbcPassword = (String)parameters[3];
    } catch (Exception erx) {
      throw new InvalidDependentParameters("Exception
          Occurred Processing Parameters:" + erx);
    }
  }

Now the run loop fires the following dependency test:

  public boolean fireDependency() {
    java.sql.Connection conn = null;
    try {
      Class.forName(jdbcDriverClass);
      if(jdbcUser==null) conn = DriverManager.getConnection(jdbcURL);
      else conn = DriverManager.getConnection(jdbcURL, jdbcUser, jdbcPassword);
      return !conn.isClosed();
    } catch (Exception erx) {
      this.fireHeartbeatPublisherException(new
          PublisherDependencyException(erx));
      return false;
    } finally {
      try { conn.close(); } catch (Exception erx) {}
    }
  }

It's neither elegant nor efficient, but I hope it makes the dependent implementation clear. Note that the method does not throw an exception but fires an exception condition to the publisher listener.

To implement another quick and dirty test, the main() method in DependentHeartbeatPublisher looks as follows:

1 2 3 Page 2
Page 2 of 3