Open source Java projects: Vert.x

Enterprise messaging and integration with Vert.x

1 2 3 Page 2
Page 2 of 3

Listing 3. Maven POM to build the web server

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.geekcap</groupId>
    <artifactId>vertx-examples</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>
    <name>vertx-examples</name>
    <url>http://maven.apache.org</url>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.0.2</version>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
                <groupId>io.vertx</groupId>
                <artifactId>vertx-core</artifactId>
                <version>2.0.0-final</version>
        </dependency>
        <dependency>
          <groupId>io.vertx</groupId>
          <artifactId>vertx-platform</artifactId>
          <version>2.0.0-final</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

The two dependencies added to the POM file are vertx-core and vertx-lang-java, which are needed to develop Vert.x applications in Java. To build this application, execute the following Maven command:

mvn clean install

This will yield a file named vertx-examples-1.0-SNAPSHOT.jar that will need to be in your CLASSPATH in order to launch your verticle. This sample application serves up web resources found in the webroot directory relative to where the application is launched. You'll therefore need to create a webroot directory and build some resources to serve from it. To launch this verticle, execute the vertx application in Vert.x's bin directory, like so:

$VERTX_HOME/bin/vertx run com.geekcap.vertxexamples.Server -cp vertx-examples-1.0-SNAPSHOT.jar

The vertx command accepts several options including the one we're using, which is run. The run option requires a verticle called main, which is just the name of the class that extends Verticle and contains a start() method and an optional set of arguments. In this case we set the CLASSPATH using the -cp argument and passing in the JAR file that we just created. The server will start without outputting anything to the screen, but you can point your browser to the URL: http://localhost:8080.

For my example I created a simple HTML file that says "Hello, Vert.x," named it index.html and placed it in my webroot directory. Here is my output:

$ curl http://localhost:8080

<html>
<head><title>Hello, Vert.x</title></head>
<body>
<p>Hello, Vert.x</p>
</body>
</html>

Messaging with Vert.x

One of the most important features of Vert.x is its event bus. The Vert.x event bus allows verticles, potentially written in different programming languages, to communicate with one another using either point-to-point messaging or publish/subscribe messaging. In this section you'll get a feel for how to integrate functionality from different verticles using both approaches.

Before we begin, why would you want to use messaging over a more traditional event-based programming model? For one thing, messaging supports the integration of applications and components written in different programming languages. It also enables loose coupling, which means that you can write multiple task-focused pieces of code rather than a single, complex program. Finally, asynchronous communication between verticles increases system scalability. Asynchronous communication enables you to define the system capacity as it evolves. Messages might back up as your system load increases, but they will eventually be processed. Vert.x's support for a distributed event bus also gives you the option to start up additional verticles to handle increased load.

In order to set up a Vert.x messaging system, you need to gain access to the event bus. Start by executing the eventBus() method on a vertx class instance:

EventBus eb = vertx.eventBus();    

Once you're connected to an EventBus you can publish messages in one of two ways:

  • publish() publishes a message to an address using publish/subscribe messaging, meaning that every subscriber to a given address will receive the published message. Addresses are just Strings, so you want to choose something meaningful, but in the end what matters is that both publisher and subscriber are configured to use the same string. If you are familiar with Java Message System (JMS), publish() acts similarly to publishing a message to a topic.
  • send() sends a message to an address using point-to-point messaging, meaning that only one subscriber will receive the message. If there are multiple subscribers to the address then Vert.x will use a round-robin algorithm to send the message. The benefit of using a round-robin algorithm is scalability: if you do not have enough resources on a single Vert.x instance to support your load, then you can simply start additional Vert.x instances and register them as listeners to the specified address. In JMS terms, issuing send() is similar to publishing a message to a queue.

Publish/subscribe vs point-to-point messaging

In a publish/subscribe messaging model, a publisher sends a message to a topic that is broadcast out to all subscribers. Using publish/subscribe over point-to-point messaging in an event-driven architecture means that a component is only responsible for publishing events as they occur. The publisher does not need to be aware of its subscribers in order to broadcast to them. Figure 2 is a flow diagram of a typical Vert.x publish/subscribe messaging architecture.

Figure 2. Publish/subscribe messaging

In point-to-point messaging, a message is sent from a publisher directly to a consumer via a queue. Point-to-point messaging is a good choice when you want a message consumed exactly once, or when two components want to communicate with each other asynchronously. A point-to-point messaging architecture is shown in Figure 3.

Figure 3. Point-to-point messaging

We'll use Vert.x to explore both messaging systems in the next sections.

Publish/subscribe messaging example

Listing 4 updates my original Server class (from Listing 1) in a few ways. First, it deploys a new verticle called AuditVerticle (which is defined in Listing 5) by invoking the deployVerticle() method on the container instance. The container instance, which is defined as part of the parent Verticle class, provides access to the container in which a verticle runs; therefore, it is the appropriate place to deploy new verticles.

Listing 4. Server.java extended for point-to-point messaging

package com.geekcap.vertxexamples;

import org.vertx.java.core.Handler;
import org.vertx.java.core.eventbus.EventBus;
import org.vertx.java.core.http.HttpServerRequest;
import org.vertx.java.core.logging.Logger;
import org.vertx.java.platform.Verticle;

public class Server extends Verticle {
    public void start() {

        // Create our dependent verticles
        container.deployVerticle("com.geekcap.vertxexamples.AuditVerticle");

        // Create an HTTP Server that serves files
        vertx.createHttpServer().requestHandler(new Handler<HttpServerRequest>() {
            public void handle(HttpServerRequest req) {
                Logger logger = container.logger();

                if (logger.isDebugEnabled()) {
                    logger.debug("Received a request for resource: " + req.path());
                }
                logger.fatal("Where are my logs!?!?");
                logger.info("Here is an info message");

                // Serve up our files
                String file = req.path().equals("/") ? "index.html" : req.path();
                req.response().sendFile("webroot/" + file);

                // Let's tell the world (via the event bus) that we received a request
                EventBus eb = vertx.eventBus();
                eb.publish( "com.geekcap.vertxexamples.Server.announcements", "We received a request for resource: " + req.path() );

            }
        }).listen(8080);
    }
}

Listing 4 executes deployVerticle() to deploy the AuditVerticle. The deployVerticle() method deploys a standard Verticle to the container, which maintains its own event-loop. After handling an incoming HTTP request (as shown in Listing 1), Listing 4 publishes a message to the event bus. First, it obtains access to the event bus through the vertx instance variable, then it executes the eventBus() method. Once it has the EventBus object it invokes its publish() method, which is the gateway to publishing messages in a publish/subscribe fashion.

Messaging for loose coupling

For the past three years I have worked in an event-driven architecture, and I have found that publish/subscribe messaging, sometimes called topics, is a great way to loosely couple systems. Message publishers do not need to know anything about their subscribers, so new subscribers can be added at any time without affecting the publisher.

The publish() method accepts a destination, which in this case is "com.geekcap.vertxexamples.Server.announcements". Recall that the naming of this destination is arbitrary, but prefacing the type of notification (announcement in this example) with the fully-qualified name of the class makes it clear where the message came from.

Listing 5 shows the source code for the AuditVerticle class.

Listing 5. AuditVerticle.java

package com.geekcap.vertxexamples;

import org.vertx.java.core.Handler;
import org.vertx.java.core.eventbus.EventBus;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.logging.Logger;
import org.vertx.java.platform.Verticle;

public class AuditVerticle extends Verticle {

    @Override
    public void start() {
        // Let's register ourselves as a listener to Server notifications
        EventBus eb = vertx.eventBus();
        Handler<Message> auditHandler = new Handler<Message>() {
            @Override
            public void handle(Message message) {
                Logger logger = container.logger();
                logger.info( "AuditVerticle here, someone requested resource: " + message.body() );
            }
        };
        eb.registerHandler( "com.geekcap.vertxexamples.Server.announcements", auditHandler );
    }
}

The AuditVerticle in Listing 5 acts much like a reporting engine: it listens for "announcements" from the Server class and then writes those out as informational log messages. When something of interest happens in the Server class, it can publish it to its announcements topic and different subscribers can do different things, such as logging the message or inserting it in a Hadoop cluster for later analysis.

Listing 5 then creates a Handler in-line instance (an anonymous inner class is created and assigned to a variable without creating the class in a separate file) that logs the message. Next, it registers a handler to the "com.geekcap.vertxexamples.Server.announcements" address by invoking the EventBus's registerHandler() method. Now, any time that the Server class publishes a message to this destination, the AuditHandler's handle() method will be invoked.

Point-to-point messaging example

Point-to-point messaging is used either when you want a message to be processed only by a single consumer or as a mechanism for components to communicate with one another asynchronously. In this section I demonstrate the latter by creating a new class that relies on a worker verticle to do its work for it, and then that worker verticle communicates the result back to the Server2.

Listing 6 shows the source code for the Server2 class.

Listing 6. Server2.java

package com.geekcap.vertxexamples;

import org.vertx.java.core.Handler;
import org.vertx.java.core.eventbus.EventBus;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.http.HttpServerRequest;
import org.vertx.java.core.logging.Logger;
import org.vertx.java.deploy.Verticle;

import java.util.concurrent.ConcurrentMap;

public class Server2 extends Verticle {
    public void start() {

        // Create our dependent verticles
        container.deployWorkerVerticle("com.geekcap.vertxexamples.MyWorkerVerticle");

        // Start a server that handles things with point-to-point messaging
        vertx.createHttpServer().requestHandler(new Handler<HttpServerRequest>() {
            @Override
            public void handle(final HttpServerRequest req) {

                // Set a shared variable
                ConcurrentMap<String, String> map = vertx.sharedData().getMap("mymap");
                map.put("mykey", "myvalue");

                // Let's send a message to a worker verticle and wait for it to respond
                EventBus eb = vertx.eventBus();
                eb.send("request.worker", req.path, new Handler<Message<String>>() {
                    @Override
                    public void handle(Message<String> message) {
                        Logger logger = container.getLogger();
                        logger.info( "Received a reply from our worker: " + message.body );
                        req.response.headers().put("Content-Length", Integer.toString(message.body.length()));
                        req.response.write(message.body);
                    }
                });
            }
        }).listen(8080);
    }
}

The Server2 class starts by deploying a worker verticle. Worker verticles are different from standard verticles in that they do not contain an event look and are expected to be triggered by an event bus message. Worker verticles are deployed by obtaining access to the Vert.x container and invoking its deployWorkerVerticle() method.

Next, the Server2 obtains access to the EventBus, again by invoking the eventBus() method on the vertx instance variable. This time the Server2 invokes the send() method, which is the gateway to sending messages in a point-to-point fashion. In this case it sends the request path to a destination named "request.worker". The first parameter to the send() method is the destination, the second parameter is the data to send to the destination, and an optional third parameter is a Handler that can be called back by the recipient of the message.

1 2 3 Page 2
Page 2 of 3