All other metrics are useless

When it comes to queues, whether they’re implemented as JMS, database tables (i.e. what Ruby’s Delayed::Job uses for a queue), or even Amazon’s SQS, the most common metric used to evaluate the state of a queue is its length. In essence, one derives an efficiency metric based upon how many messages are residing in a queue at any given time. If there are just a few messages, the queue is operating efficiently. If there are numerous messages, things are inefficient and alarms must be sounded.

But what if you’re in a consistently busy environment with extreme bursts where queues have the tendency to rapidly fill up? If you have sufficient workers already running to handle that burst, do you need to fire up more?

You can fire up more workers, but doing so might cost you. That is, you might have to provision new worker instances, such as Heroku worker dynos or AWS AMIs, which will end up costing you tangible money. And sometimes those worker instances take a few moments to fire up and when they’re operational, the burst of activity is over and the queue is back to normal – the initially available workers handled the load adequately.

It turns out that the queue’s length was a lagging indicator. You spun up unneeded resources. False alarm!

If you already have sufficient capacity to handle the influx of messages on a queue, then monitoring a queue’s length isn’t too helpful. In fact, it’s a misleading metric and can cause you to take unneeded actions.

Consequently, a queue’s length is not indicative of a system’s efficiency when there’s already sufficient workers present. Rather, the metric that means something in a high capacity environment is how long a message resides in a queue. That is an actionable metric: if messages are stuck in a queue waiting to be processed then you need more processors!

Moo over queue length and let queue wait time in

By default, Amazon’s SQS doesn’t provide the ability to query how long a message has been residing in a queue. Therefore, I wrote Moo.

Moo provides an interface for clients to obtain and take action on the message time in queue metric. This is done by augmenting an SQS message with a time stamp. That time stamp is then checked when a message is popped off of an SQS queue. If a threshold difference is exceed, then a callback is invoked.

Users of Moo will find its usage similar to Ahoy!, which is an asynchronous callback oriented facade on top of AWS’s Java SDK. In fact, Moo uses Ahoy! underneath, with the added feature of attaching a “maximum time in queue” asynchronous callback.

Moo supports multiple time in queue thresholds and setting a maximum time in queue threshold is done like so:

Adding a maximum threshold for time in queue
<span class='line-number'>1</span>
<span class='line-number'>2</span>
<span class='line-number'>3</span>
<span class='line-number'>4</span>
<span class='line-number'>5</span>
<span class='line-number'>6</span>
<span class='line-number'>7</span>
<code class='java'><span class='line'><span class="c1">//adds a 1 second max threshold</span>
</span><span class='line'><span class="n">sqs</span><span class="o">.</span><span class="na">addQueueWaitTimeCallback</span><span class="o">(</span><span class="mi">1000</span><span class="o">,</span> <span class="k">new</span> <span class="n">QueueWaitTimeCallback</span><span class="o">()</span> <span class="o">{</span>
</span><span class='line'>  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">onThresholdExceeded</span><span class="o">(</span><span class="kt">long</span> <span class="n">waitTime</span><span class="o">)</span> <span class="o">{</span>
</span><span class='line'>    <span class="c1">//waitTime is the actual time in queue</span>
</span><span class='line'>    <span class="c1">//do something... like fire off a web hook, etc</span>
</span><span class='line'>  <span class="o">}</span>
</span><span class='line'><span class="o">});</span>
</span></code>

Note the addQueueWaitTimeCallback method takes a millisecond maximum time in queue value and an accompanying QueueWaitTimeCallback callback implementation. The onThresholdExceeded method will be invoked asynchronously during a message receive if the maximum threshold value is exceeded; what’s more, the onThresholdExceeded will receive as a parameter the actual queue wait time.

Show me the Moo

To fire up an instance of Moo, you have a number of options, including configuring an instance of AWS’s AmazonSQS or just passing along a key, secret, and queue name like so:

Adding a maximum threshold for time in queue
<span class='line-number'>1</span>
<code class='java'><span class='line'><span class="n">SQS</span> <span class="n">sqs</span> <span class="o">=</span> <span class="k">new</span> <span class="n">SQS</span><span class="o">(</span><span class="n">System</span><span class="o">.</span><span class="na">getProperty</span><span class="o">(</span><span class="s">"key"</span><span class="o">),</span> <span class="n">System</span><span class="o">.</span><span class="na">getProperty</span><span class="o">(</span><span class="s">"secret"</span><span class="o">),</span> <span class="n">System</span><span class="o">.</span><span class="na">getProperty</span><span class="o">(</span><span class="s">"queue"</span><span class="o">));</span>
</span></code>

Next, you can attach zero to many QueueWaitTimeCallback instances like so:

Adding a maximum threshold for time in queue
<span class='line-number'>1</span>
<span class='line-number'>2</span>
<span class='line-number'>3</span>
<span class='line-number'>4</span>
<span class='line-number'>5</span>
<code class='java'><span class='line'><span class="n">sqs</span><span class="o">.</span><span class="na">addQueueWaitTimeCallback</span><span class="o">(</span><span class="mi">600000</span><span class="o">,</span> <span class="k">new</span> <span class="n">QueueWaitTimeCallback</span><span class="o">()</span> <span class="o">{</span>
</span><span class='line'>  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">onThresholdExceeded</span><span class="o">(</span><span class="kt">long</span> <span class="n">actualWaitTime</span><span class="o">)</span> <span class="o">{</span>
</span><span class='line'>    <span class="c1">//do something -- fire off SNS message?</span>
</span><span class='line'>  <span class="o">}</span>
</span><span class='line'><span class="o">});</span>
</span></code>

In this case, I’ve added a callback to be invoked if messages are in a queue longer than 10 minutes. Note, these QueueWaitTimeCallback callbacks are fired by the queue reader instance; accordingly, a QueueWaitTimeCallback can certainly fire up more instances of itself, for example.

Here’s a sample JSON document that you might want to throw onto an SQS queue:

Adding a maximum threshold for time in queue
<span class='line-number'>1</span>
<span class='line-number'>2</span>
<span class='line-number'>3</span>
<span class='line-number'>4</span>
<span class='line-number'>5</span>
<code class='java'><span class='line'><span class="o">{</span> <span class="s">"employees"</span><span class="o">:[</span>
</span><span class='line'>      <span class="o">{</span> <span class="s">"firstName"</span><span class="o">:</span><span class="s">"John"</span><span class="o">,</span> <span class="s">"lastName"</span><span class="o">:</span><span class="s">"Doe"</span> <span class="o">},</span>
</span><span class='line'>      <span class="o">{</span> <span class="s">"firstName"</span><span class="o">:</span><span class="s">"Anna"</span><span class="o">,</span> <span class="s">"lastName"</span><span class="o">:</span><span class="s">"Smith"</span> <span class="o">},</span>
</span><span class='line'>      <span class="o">{</span> <span class="s">"firstName"</span><span class="o">:</span><span class="s">"Peter"</span><span class="o">,</span> <span class="s">"lastName"</span><span class="o">:</span><span class="s">"Jones"</span> <span class="o">}</span>
</span><span class='line'><span class="o">]}</span>
</span></code>

Sending and receiving this message are exactly like you’d do if you were using Ahoy!. For example, to send a message, just pass along a String to the send method:

Adding a maximum threshold for time in queue
<span class='line-number'>1</span>
<span class='line-number'>2</span>
<span class='line-number'>3</span>
<span class='line-number'>4</span>
<span class='line-number'>5</span>
<code class='java'><span class='line'><span class="n">sqs</span><span class="o">.</span><span class="na">send</span><span class="o">(</span><span class="n">json</span><span class="o">,</span> <span class="k">new</span> <span class="n">SendCallback</span><span class="o">()</span> <span class="o">{</span>
</span><span class='line'>  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">onSend</span><span class="o">(</span><span class="n">String</span> <span class="n">messageId</span><span class="o">)</span> <span class="o">{</span>
</span><span class='line'>    <span class="c1">//messageId is from SQS</span>
</span><span class='line'>  <span class="o">}</span>
</span><span class='line'><span class="o">});</span>
</span></code>

Note, the send method takes an optional SendCallback.

Receiving a message is via the receive method, which takes a mandatory ReceiveCallback – this callback will be invoked asynchronously for each message received off of a queue. Each instance will receive the message placed upon the queue and the message’s SQS id.

Adding a maximum threshold for time in queue
<span class='line-number'>1</span>
<span class='line-number'>2</span>
<span class='line-number'>3</span>
<span class='line-number'>4</span>
<span class='line-number'>5</span>
<code class='java'><span class='line'><span class="n">sqs</span><span class="o">.</span><span class="na">receive</span><span class="o">(</span><span class="k">new</span> <span class="n">ReceiveCallback</span><span class="o">()</span> <span class="o">{</span>
</span><span class='line'>  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">onReceive</span><span class="o">(</span><span class="n">String</span> <span class="n">messageId</span><span class="o">,</span> <span class="n">String</span> <span class="n">message</span><span class="o">)</span> <span class="o">{</span>
</span><span class='line'>    <span class="c1">//do something w/the message -- in this case it's JSON</span>
</span><span class='line'>  <span class="o">}</span>
</span><span class='line'><span class="o">});</span>
</span></code>

Note, if upon the receive of a message, Moo notices that a message has been waiting in a queue for more than the max queue wait time threshold configured for an associated QueueWaitTimeCallback, Moo will invoke it. Note, Moo can invoke more than one instance; thus, you can set up a chain to take various actions as times increase.

Remember, a queue’s length is usually a lagging indicator. The metric that actually means something is how long a message resides in a queue. That’s an actionable metric and Moo gives you the ability to do something about it! Can you dig it?

Related:
Notice to our Readers
We're now using social media to take your comments and feedback. Learn more about this here.