Ahoy there callbacks!

Because it’s my bag, I like JavaScript. In fact, I’ve grown to love JavaScritp’s asynchronous callback oriented style of programming. Consequently, when I find myself in a non-JavaScript environment, say, like Java, I tend to miss using callbacks.

The good news is that you can emulate asynchronous callbacks in Java. In fact, I did just that recently with a library I’ve dubbed Ahoy!, which is an asynchronous SQS adapter for AWS’s Java SQS library.

For the uninitiated, SQS is a cloud based messaging platform – with SQS you can create queues and put messages onto those queues, which can then be read – later or immediately by some other process or the same exact process. All of this leverages Amazon’s massively redundant architecture to offer extremely high availability in the face of concurrent access.

Asynchronous callbacks in Java can be achieved with two features: anonymous classes (containing one method) and Java’s java.util.concurrent package.

Because Java doesn’t allow you to pass functions (or methods) easily as a parameter, to simulate a callback, you can create an interface that contains one method, which basically mimics a function. In the case of Ahoy, there are two interfaces: MessageSendCallback and MessageReceivedCallback – both have one method: onSend and onReceive respectively. Accordingly, Ahoy!”s primary class, dubbed SQSAdapter exposes two simple methods: send and receive and both take their related callback interface.

The most straightforward callback to understand is the receive method. As you can imagine, receive is intended to handle behavior when a message is received off of a particular queue. Thus, the receive method is defined as follows:

SQSAdapter’s receive method
<span class='line-number'>1</span>
<code class='java'><span class='line'><span class="kd">public</span> <span class="kt">void</span> <span class="nf">receive</span><span class="o">(</span><span class="kd">final</span> <span class="n">MessageReceivedCallback</span> <span class="n">callback</span><span class="o">)</span> <span class="o">{}</span>
</span></code>

The MessageReceivedCallback interface looks like this:

The MessageReceivedCallback interface
<span class='line-number'>1</span>
<span class='line-number'>2</span>
<span class='line-number'>3</span>
<code class='java'><span class='line'><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">MessageReceivedCallback</span> <span class="o">{</span>
</span><span class='line'>    <span class="kd">public</span> <span class="kt">void</span> <span class="nf">onReceive</span><span class="o">(</span><span class="n">String</span> <span class="n">messageId</span><span class="o">,</span> <span class="n">String</span> <span class="n">message</span><span class="o">);</span>
</span><span class='line'><span class="o">}</span>
</span></code>

Note, the onReceive method takes a message id (which is particular to SQS) and the message itself – which in the case of SQS is always a String (that String can hold anything you want, keep in mind: JSON, XML, byte sequence, etc).

Thus, clients of Ahoy! provide the intended behavior for a message when it is received. This behavior could be to write something to a database, generate another message and send it on another queue, you name it.

Now the interesting part is the implementation of Ahoy!’s receive method. To achieve asynchronocity, I employed Java’s java.util.concurrent package, which sadly, seems to be under appreciated.

The receive method’s implementation with callback being invoked
<span class='line-number'>1</span>
<span class='line-number'>2</span>
<span class='line-number'>3</span>
<span class='line-number'>4</span>
<span class='line-number'>5</span>
<span class='line-number'>6</span>
<span class='line-number'>7</span>
<span class='line-number'>8</span>
<span class='line-number'>9</span>
<span class='line-number'>10</span>
<span class='line-number'>11</span>
<span class='line-number'>12</span>
<span class='line-number'>13</span>
<span class='line-number'>14</span>
<code class='java'><span class='line'><span class="kd">private</span> <span class="kt">void</span> <span class="nf">receive</span><span class="o">(</span><span class="kd">final</span> <span class="n">AmazonSQS</span> <span class="n">sqs</span><span class="o">,</span> <span class="kd">final</span> <span class="n">String</span> <span class="n">queueURL</span><span class="o">,</span> <span class="kd">final</span> <span class="n">MessageReceivedCallback</span> <span class="n">callback</span><span class="o">)</span> <span class="o">{</span>
</span><span class='line'>  <span class="n">pool</span><span class="o">.</span><span class="na">execute</span><span class="o">(</span><span class="k">new</span> <span class="n">Runnable</span><span class="o">()</span> <span class="o">{</span>
</span><span class='line'>    <span class="kd">public</span> <span class="kt">void</span> <span class="nf">run</span><span class="o">()</span> <span class="o">{</span>
</span><span class='line'>      <span class="kd">final</span> <span class="n">List</span><span class="o"><</span><span class="n">Message</span><span class="o">></span> <span class="n">messages</span> <span class="o">=</span> <span class="n">sqs</span><span class="o">.</span><span class="na">receiveMessage</span><span class="o">(</span>
</span><span class='line'>              <span class="k">new</span> <span class="nf">ReceiveMessageRequest</span><span class="o">(</span><span class="n">queueURL</span><span class="o">).</span><span class="na">withMaxNumberOfMessages</span><span class="o">(</span><span class="mi">10</span><span class="o">).</span><span class="na">withWaitTimeSeconds</span><span class="o">(</span><span class="mi">20</span><span class="o">)).</span><span class="na">getMessages</span><span class="o">();</span>
</span><span class='line'>      <span class="k">if</span> <span class="o">(</span><span class="n">messages</span><span class="o">.</span><span class="na">size</span><span class="o">()</span> <span class="o">></span> <span class="mi">0</span><span class="o">)</span> <span class="o">{</span>
</span><span class='line'>          <span class="k">for</span> <span class="o">(</span><span class="kd">final</span> <span class="n">Message</span> <span class="n">message</span> <span class="o">:</span> <span class="n">messages</span><span class="o">)</span> <span class="o">{</span>
</span><span class='line'>            <span class="n">callback</span><span class="o">.</span><span class="na">onReceive</span><span class="o">(</span><span class="n">message</span><span class="o">.</span><span class="na">getMessageId</span><span class="o">(),</span> <span class="n">message</span><span class="o">.</span><span class="na">getBody</span><span class="o">());</span>
</span><span class='line'>            <span class="n">sqs</span><span class="o">.</span><span class="na">deleteMessage</span><span class="o">(</span><span class="k">new</span> <span class="n">DeleteMessageRequest</span><span class="o">(</span><span class="n">queueURL</span><span class="o">,</span> <span class="n">message</span><span class="o">.</span><span class="na">getReceiptHandle</span><span class="o">()));</span>
</span><span class='line'>          <span class="o">}</span>
</span><span class='line'>      <span class="o">}</span>
</span><span class='line'>    <span class="o">}</span>
</span><span class='line'>  <span class="o">});</span>
</span><span class='line'><span class="o">}</span>
</span></code>

With a fixed Thread pool, a thread is created, which waits for messages to arrive on a particular queue; when one shows up, the passed in MessageReceivedCalledback is invoked for each message.

For an example of how this works for clients of Ahoy!, here’s a test case that verifies the execution of the callback:

The receive method implemented
<span class='line-number'>1</span>
<span class='line-number'>2</span>
<span class='line-number'>3</span>
<span class='line-number'>4</span>
<span class='line-number'>5</span>
<span class='line-number'>6</span>
<span class='line-number'>7</span>
<span class='line-number'>8</span>
<code class='java'><span class='line'><span class="kd">final</span> <span class="kt">boolean</span><span class="o">[]</span> <span class="n">wasReceived</span> <span class="o">=</span> <span class="o">{</span><span class="kc">false</span><span class="o">};</span>
</span><span class='line'><span class="n">ahoy</span><span class="o">.</span><span class="na">receive</span><span class="o">(</span><span class="k">new</span> <span class="n">MessageReceivedCallback</span><span class="o">()</span> <span class="o">{</span>
</span><span class='line'>  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">onReceive</span><span class="o">(</span><span class="n">String</span> <span class="n">messageId</span><span class="o">,</span> <span class="n">String</span> <span class="n">message</span><span class="o">)</span> <span class="o">{</span>
</span><span class='line'>    <span class="n">wasReceived</span><span class="o">[</span><span class="mi">0</span><span class="o">]</span> <span class="o">=</span> <span class="kc">true</span><span class="o">;</span>
</span><span class='line'>    <span class="n">assertNotNull</span><span class="o">(</span><span class="s">"message id was null"</span><span class="o">,</span> <span class="n">messageId</span><span class="o">);</span>
</span><span class='line'>    <span class="n">assertEquals</span><span class="o">(</span><span class="s">"message wasn't "</span> <span class="o">+</span> <span class="n">origMessage</span><span class="o">,</span> <span class="n">origMessage</span><span class="o">,</span> <span class="n">message</span><span class="o">);</span>
</span><span class='line'>  <span class="o">}</span>
</span><span class='line'><span class="o">});</span>
</span></code>

Likewise, sending a message is similar – a new Runnable instance is created, which sends a particular message and invokes the passed in MessageSentCallback’s onSend method, passing in the newly sent messages’s id.

Related:
1 2 Page 1
Notice to our Readers
We're now using social media to take your comments and feedback. Learn more about this here.