Open source Java projects: Spring Integration

Develop a robust message-passing architecture with Spring Integration

Get an overview of Spring Integration's out-of-the-box event-driven messaging architecture, which you can use to coordinate messages, channels, adapters, and gateways. Then practice using ActiveMQ and JMS in a Spring Integration solution, followed by a short introduction to stitching together multiple application workflows for both lightweight and heavyweight payloads.

Spring Integration is an enterprise integration framework that provides out-of-the-box implementation of the patterns in the now-classic Enterprise Integration Patterns book. Building on Spring's Inversion of Control design pattern, Spring Integration abstracts message sources and destinations and uses message passing and message manipulation to integrate various components within the application environment. Applications built with Spring Integration are able to send messages between components, either across a message bus to another server in your environment or even to another class within the same virtual machine.

I'll get you started with Spring Integration in this second of three Open source Java projects installments focusing on Spring projects. I'll start with an overview of the core components of Spring Integration's support for event-driven architecture (EDA), then develop a simple example to familiarize you with how Spring Integration works. I'll conclude by demonstrating a more complex scenario that integrates components across an ActiveMQ message bus via JMS.


Created by Steven Haines for JavaWorld. March 2014.
 

Spring Integration's event-driven architecture

Event-driven architecture is one of the most powerful and successful patterns used for enterprise integration, and is the main focus of examples in this article. In an event-driven architecture, a system publishes events as they happen. Components within a given system listen for specific events, or types of events, occurring within that system. When an event of interest occurs, the components are alerted and can respond as necessary.

Event-driven architecture affords a high degree of loose coupling and enhances system scalability because message producers don't need to know anything about their consumers. This makes integrating a new component with an existing or legacy system relatively easy: existing systems publish events and new components are configured to listen for those events. Because all interactions in an event-driven architecture are asynchronous, components can process messages on their own time. If load increases substantially, a component may take longer to process a message, but it will eventually happen.

While an application may slow down, it should never go down.

Spring Integration's support for event-driven architecture rests on three core components:

  • Messages are objects sent from one component to another.
  • Channels are the means by which messages are sent, they can be synchronous or asynchronous.
  • Adapters route the output from one channel to the input of another one.

Figure 1 illustrates the relationship between messages, channels, and adapters in Spring Integration.

Figure 1. Messages, channels, and adapters

jw osjp spring integration fig1

Note that when Component 1 sends a message to the specified channel, the adapter routes it to Component 2. Essentially, the adapter says that any message sent to that channel should be directed to Component 2.

Hello, Spring Integration!

No Java technology introduction would be complete without a "Hello World" example. In this case, we'll use Spring Integration to put together a small program that routes a text message from one component to another. This exercise will make the workings of Spring Integration's messages, channels, and adapters clearer. (See the latest Javadoc for Spring Integration for more detailed information about each component.)

First, Listing 1 shows the contents of an applicationContext.xml file, which is the glue holding together our three application components.

Listing 1. applicationContext.xml


<beans:beans xmlns:beans="http://www.springframework.org/schema/beans"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xmlns:context="http://www.springframework.org/schema/context"
             xmlns="http://www.springframework.org/schema/integration"
             xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                                 http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
                                 http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd">


    <!-- Component scan to find all Spring components -->
    <context:component-scan base-package="com.geekcap.springintegrationexample" />

    <!-- A Spring Integration channel -->
    <channel id="helloWorldChannel" />

    <!-- A Spring Integration adapter that routes messages sent to the helloWorldChannel to the bean named "helloServiceImpl"'s hello() method -->
    <service-activator input-channel="helloWorldChannel" ref="helloServiceImpl" method="hello" />

</beans:beans>

Note that the <beans> node defines the schemas and namespaces employed by the XML file. The default context (xmlns) is defined to be http://www.springframework.org/schema/integration, which means that we do not need to prefix the channel or service-activator nodes. (Later we'll switch back to using the beans default context, at which point we'll need to give our Spring Integration nodes their own prefix. I just wanted to make the XML easier to read in the beginning.)

Listing 1 defines three components:

  1. Using component-scan we can annotate our beans in code with annotations like @Service or @Component. When we annotate our beans we'll need to run a component scan so that Spring can find them. The component-scan node takes a base package to scan and will scan the classpath for that package and all subpackages. In our case we're going to define and annotate two beans: HelloService and GreeterService.
  2. The HelloService bean prints "Hello, name" to the standard output. The GreeterService bean sends a name to HelloService. (You'll see these interactions in the code snips below.)
  3. helloWorldChannel is a channel to which our code can send messages.
  4. service-activator is an adapter that says that all messages sent to helloWorldChannel should be forwarded to the helloServiceImpl's hello() method. The default name that Spring chooses for your beans is the name of the class, but note that it starts with a lowercase letter.

Listing 2 shows the contents of the HelloService interface. An interface is not required -- i.e., we could send messages directly to a bean without involving an interface -- but it's customary when using Spring to define interfaces, giving us the flexibility to change an implementation later. (Using an interface in this case will also make unit testing easier.)

Listing 2. HelloService.java


package com.geekcap.springintegrationexample.service;

public interface HelloService
{
    public void hello( String name );
}

The HelloService interface defines a single method: hello(), which accepts a String parameter. Spring is smart enough to look at the method and its parameter signature to perform a conversion of the message to a String value.

Listing 3 shows the HelloServiceImpl class that implements the HelloService interface.

Listing 3. HelloServiceImpl.java


package com.geekcap.springintegrationexample.service;

import org.springframework.stereotype.Service;

@Service
public class HelloServiceImpl implements HelloService
{
    @Override
    public void hello(String name)
    {
        System.out.println( "Hello, " + name );
    }
}

The HelloServiceImpl implements the hello() method by printing "Hello, name" to the standard output. It is annotated with the @Service annotation, so the component-scan defined in the applicationContext.xml file will find it. Notice that the service looks very standard and there's no indication that it will be involved in a Spring Integration action.

Listing 4 shows the contents of GreeterService, which is the interface that our greeters need to implement.

Listing 4. GreeterService.java


package com.geekcap.springintegrationexample.service;

public interface GreeterService
{
    public void greet( String name );
}

Listing 5 shows the implementation of the GreeterService interface.

Listing 5. GreeterServiceImpl.java


package com.geekcap.springintegrationexample.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class GreeterServiceImpl implements GreeterService
{
    @Autowired
    private MessageChannel helloWorldChannel;

    @Override
    public void greet(String name)
    {
        helloWorldChannel.send( MessageBuilder.withPayload( name ).build() );
    }
}

More about the code

The GreeterServiceImpl class is annotated with the @Service annotation, so that Spring will identify it as a service. It has auto-wired into it a MessageChannel named helloWorldChannel. Because the name of the channel matches the one defined in the applicationContext.xml file, Spring will just find it for you. If you wanted to override that name, you could add a @Qualifier annotation to the MessageChannel to give it the name of the channel bean with which you want to communicate. When the GreeterServiceImpl's greet() method is invoked, it creates and sends a message to the helloWorldChannel.

The MessageChannel is an interface that defines two variants of the send() method: one that accepts a timeout and one that does not (which can, depending on the implementation, block indefinitely). The MessageBuilder class, an implementation of the Builder design pattern, helps you build Messages. In this case, we passed MessageBuilder a single String, but it could be used to specify message headers, expiration dates, priority, correlation IDs, reply and error channels, and more. Once we're finished configuring the MessageBuilder, invoking the build() method returns a Message that can be sent to any channel.

Listing 6 shows the source code for a command-line application that pulls all of our code together.

Listing 6. App.java


package com.geekcap.springintegrationexample.main;

import com.geekcap.springintegrationexample.service.GreeterService;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * Main entry-point into the test application
 */
public class App
{
    public static void main( String[] args )
    {
        ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "applicationContext.xml" );

        GreeterService greeterService = applicationContext.getBean( "greeterServiceImpl", GreeterService.class );

        greeterService.greet( "Spring Integration!" );
    }
}

The App class loads the applicationContext.xml file from the classpath, which is located in the src/main/resources path so that it will be automatically embedded in the JAR file by Maven. Next, it retrieves the greeterServiceImpl bean from the application context. And finally, it invokes the GreeterService's greet() method.

A Spring Integration

Figure 2 shows the original Spring Integration diagram from Figure 1, retrofitted for the specifics of this example.

Figure 2. Hello, Spring Integration!

jw osjp spring integration fig2

Here's a summary of the integrated application's flow:

  1. The App class invokes the GreeterService's greet() method, passing it the String "Spring Integration!"
  2. The GreeterService has wired into it a MessageChannel named helloWorldChannel. It uses a MessageBuilder to build a Message that contains the String "Spring Integration!", which it sends to the MessageChannel.
  3. The service-activator has been configured such that any message sent to the helloWorldChannel will be routed to the HelloService's hello() method.
  4. The HelloServiceImpl class's hello() method is invoked and "Hello, Spring Integration!" is printed out to the screen.

Listing 7 shows a Maven pom.xml file that builds this sample application:

Listing 7. Maven POM for Hello, Spring Integration


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.geekcap.javaworld</groupId>
    <artifactId>HelloSpringIntegration</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>HelloSpringIntegration</name>
    <url>http://maven.apache.org</url>

    <properties>
        <spring.version>3.2.1.RELEASE</spring.version>
        <spring.integration.version>2.2.5.RELEASE</spring.integration.version>
        <java.version>1.6</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- Spring Dependencies -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>${spring.version}</version>
        </dependency>

        <!-- Spring Integration -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
            <version>${spring.integration.version}</version>
        </dependency>

        <!-- Testing -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>com.geekcap.springintegrationexample.main.App</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy</id>
                        <phase>install</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <finalName>hello-spring-integration</finalName>
    </build>
</project>

The POM file imports Spring's core, context, and bean dependencies as well as the specific Spring Integration dependency. It defines three plug-ins:

ul>

  • maven-compiler-plugin tells Spring to build at Java version 1.6.
  • maven-jar-plugin tells Spring to include all files in the lib directory in the resultant JAR file's classpath. It also instructs Spring to make the com.geekcap.springintegrationexample.main.App the main-class for this JAR file (meaning that if you execute the file as java -jar then this class will be executed.
  • maven-dependency-plugin tells Maven to copy all of the project's dependencies to the target/lib directory.

You can build this project by executing the command:

mvn clean install

The target directory will then be configured properly to execute the source code as follows:

java -jar hello-spring-integration.jar

Spring's logger outputs a few lines, followed by this program output:

Hello, Spring Integration!

Note that we can configure Spring's logger to be less verbose later by including a log4j.properties file that changes the logging level on Spring components.

Spring Integration with a gateway proxy

Sending messages is good but sometimes you need a response. If you want to execute a method on a service and receive a response, the solution is to employ a gateway proxy. For now, just follow along with these updates, I'll explain them shortly.

First, create a channel and a service-activator. We did this previously (see Listing 1), but this time you'll add a gateway node:

Listing 8. New channel and service-activator


    <!-- A Spring Integration channel for use by our gateway -->
    <channel id="helloWorldWithReplyChannel" />

    <!-- A Spring Integration adapter that routes messages sent to the helloWorldChannel to the bean named "helloServiceImpl"'s getHelloMessage() method -->
    <service-activator input-channel="helloWorldWithReplyChannel" ref="helloServiceImpl" method="getHelloMessage" />

    <!-- Define a gateway that we can use to capture a return value -->
    <gateway id="helloWorldGateway" service-interface="com.geekcap.springintegrationexample.service.HelloService" default-request-channel="helloWorldWithReplyChannel" />	

Notice that the gateway is defined with an interface that it implements (in this case HelloService) and it defines a default request/input channel to use. Also notice that the service-activator calls a new method, getHelloMessage(), instead of hello(). This method just returns "Hello, NAME".

After you define the gateway, you can autowire it into your GreeterService as we did in Listing 5. This time, instead of autowiring the channel, you'll autowire the gateway and note its type as being HelloService, as shown in Listing 9.

Listing 9. Updated GreeterServiceImpl.java


package com.geekcap.springintegrationexample.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class GreeterServiceImpl implements GreeterService
{
    @Autowired
    private MessageChannel helloWorldChannel;

    @Autowired
    private HelloService helloWorldGateway;

    @Override
    public void greet(String name)
    {
        helloWorldChannel.send(MessageBuilder.withPayload(name).build());
    }

    @Override
    public void greet2(String name)
    {
        System.out.println( helloWorldGateway.getHelloMessage( name ) );
    }
}	

The updated GreeterServiceImpl class autowires helloWorldGateway, which is resolved by its name in the applicationContext.xml file, and its type is HelloService. The new greet2() method invokes it as though it is HelloService. From the method's point of view, it is just invoking a HelloService as it would a service bean directly: it has no need to know that Spring Integration is involved in the transaction.

Listing 10 shows the updated App class that invokes the new greet2() method.

Listing 10. Updated App.java


package com.geekcap.springintegrationexample.main;

import com.geekcap.springintegrationexample.service.GreeterService;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * Main entry-point into the test application
 */
public class App
{
    public static void main( String[] args )
    {
        ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext( "applicationContext.xml" );

        GreeterService greeterService = applicationContext.getBean( "greeterServiceImpl", GreeterService.class );

        greeterService.greet( "Spring Integration!" );

        greeterService.greet2( "Spring Integration (with response)!");
    }
}	

The output from executing this code is shown below, with the Spring logging omitted:


Hello, Spring Integration!
Hello, Spring Integration (with response)!	

Refactoring recap

So what's just happened? Here's a summary of the refactored application's behavior:

  1. The App class invokes the GreeterService's greet2() method
  2. The greet2() method invokes the getHelloMessage() method on what it thinks is a HelloService (although it's actually a gateway).
  3. The gateway implements the methods defined in the HelloService class to route the requests through the helloWorldWithReplyChannel channel.
  4. The service-activator is configured so that any messages sent to the helloWorldWithReplyChannel are routed to the helloServiceImpl's getHelloMessage() method.
  5. The helloServiceImpl constructs its response and returns it.
  6. The service-activator looks for one of two things to handle the response: an output-channel defined in the service-activator itself, or a reply channel defined in the Message's header. The gateway automatically creates a temporary anonymous, point-to-point reply channel that it listens on and adds that channel to the replyHeader of the message.
  7. The gateway receives the response, as a Message, via the reply channel and converts it to the appropriate return value defined by the service.
  8. Finally, the gateway returns the response back to the caller (the GreeterServiceImpl in this case).

Spring Integration with JMS and ActiveMQ

Thus far we have abstracted a service call by passing a message. We have also hidden a service call behind a gateway and a channel. Conceptually, these two activities should give you enough information to understand how Spring Integration works. Next we'll take that knowledge and extend it into a more realistic enterprise scenario, by creating channels that communicate with a JMS (Java Message Service) topic running in ActiveMQ. An open-source messaging broker that supports the JMS APIs, ActiveMQ is written in Java and is available at no cost under an Apache license.

JMS defines two types of message passing: topics and queues. Topics operate in a publish-subscribe fashion while queues operate in a point-to-point fashion. The publish-subscribe paradigm means that when a message producer publishes a message, zero or more consumers will receive those messages. The point-to-point paradigm means that when a message producer publishes a message there will be exactly one consumer. Queues are great for asynchronously communicating between two components, but topics really promote the power of event-driven architecture.

The point of an EDA application is to decouple a message producer from its consumers, which is what publish-subscribe does. Using topics, a component notifies the world of changes or updates, but only consumers who have subscribed to that topic will receive its messages. The producer does not know who its consumers are and it does not care a perfect example of loose coupling!

In order to demonstrate publish-subscribe messaging in an enterprise system, we'll build two components:

  • PublisherService: A component that publishes a message to a topic.
  • MessageListener: A component that subscribes to that topic and receives its message.

In order toPOST messages to a RESTful service (as a way of publishing a message to a topic) we'll also need to build infrastructure around the PublisherService. Figure 3 shows the interaction of the various components, which we'll look at in detail below.

Figure 3. JMS Spring Integration example

jw osjp spring integration fig3

Here's the flow of what's going on in Figure 3

  • A REST client publishes a message to the MessageController, which is a Spring MVC Controller
  • The MessageController invokes the PublishService's send() method to send the message to the topicChannel
  • A JMS outbound channel adapter is configured to route messages sent to the topicChannel to the topic.myTopic destination
  • The JMS configuration is defined in a ConnectionFactory
  • A JMS Message-Driven Channel Adapter is configured to listen to the topic.myTopic topic and send those messages to the listenerChannel
  • A service-activator is configured to route messages sent to the listenerChannel to the messageListenerImpl's processMessage() method
  • The messageListenerImpl class receives the message and processes it (prints it out to the screen)

The configuration for this application is shown in Listing 11.

Listing 11. springintegrationexample-servlet.xml (Application Context)


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
       xmlns:oxm="http://www.springframework.org/schema/oxm"
       xmlns:int-jme="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
                http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
                http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
                http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
                http://www.springframework.org/schema/oxm http://www.springframework.org/schema/oxm/spring-oxm-3.0.xsd">


    <!-- Component scan to find all Spring components -->
    <context:component-scan base-package="com.geekcap.springintegrationexample" />

    <bean class="org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandlerAdapter">
        <property name="order" value="1" />
        <property name="messageConverters">
            <list>
                <!-- Default converters -->
                <bean class="org.springframework.http.converter.StringHttpMessageConverter"/>
                <bean class="org.springframework.http.converter.FormHttpMessageConverter"/>
                <bean class="org.springframework.http.converter.ByteArrayHttpMessageConverter" />
                <bean class="org.springframework.http.converter.xml.SourceHttpMessageConverter"/>
                <bean class="org.springframework.http.converter.BufferedImageHttpMessageConverter"/>
                <bean class="org.springframework.http.converter.json.MappingJacksonHttpMessageConverter" />
            </list>
        </property>
    </bean>

    <!-- Define a channel to communicate out to a JMS Destination -->
    <int:channel id="topicChannel"/>

    <!-- Define the ActiveMQ connection factory -->
    <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
    </bean>

    <!--
        Define an adaptor that route topicChannel messages to the myTopic topic; the outbound-channel-adapter
        automagically finds the configured connectionFactory bean (by naming convention)
      -->
    <int-jms:outbound-channel-adapter channel="topicChannel"
                                      destination-name="topic.myTopic"
                                      pub-sub-domain="true" />

    <!-- Create a channel for a listener that will consume messages-->
    <int:channel id="listenerChannel" />

    <int-jms:message-driven-channel-adapter id="messageDrivenAdapter"
                                            channel="listenerChannel"
                                            destination-name="topic.myTopic"
                                            pub-sub-domain="true" />

    <int:service-activator input-channel="listenerChannel" ref="messageListenerImpl" method="processMessage" />

</beans>	

The beginning of Listing 11 sets up the AnnotationMethodHandlerAdapter for Spring MVC, which is beyond the scope for this article. What you really need to know is that the message converters render items returned by the Spring Controllers. The important sections for this example are the following:

  • The connectionFactory defines the connection parameters for ActiveMQ. I installed ActiveMQ and left its default configuration, which is running on my localhost and listening on port 61616.
  • The topicChannel defines a channel that will be used to publish messages (see below).
  • The outbound-channel-adapter is defined in the Spring Integration JMS namespace and it wires messages sent to the topicChannel to be published to the topic.myTopic destination, and it is configured as a topic (setting pub-sub-domain to true means that this is a topic, setting it false would make it a queue); note that the outbound-channel-adapter finds the ActiveMQ configuration via the bean with the "connectionFactory" name.
  • The listenerChannel defines a channel that will be used for consuming messages.
  • The message-driven-channel-adapter defines an adapter that listens for messages on the topic.myTopic topic and routes those to the listenerChannel.
  • The service-activator routes messages sent to the listenerChannel to the messageListenerImpl's processMessage() method.

Publish: MessageController.java

You can download the source code for this example to access all of the files. I will just review just the highlights. First, Listing 12 shows the contents of the MessageController, which is where message publishing starts.

Listing 12. MessageController.java


package com.geekcap.springintegrationexample.web;

import com.geekcap.springintegrationexample.service.PublishService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.servlet.http.HttpServletResponse;

@Controller
public class MessageController
{
    @Autowired
    private PublishService publishService;

    @RequestMapping( value = "/message", method = RequestMethod.POST )
    @ResponseBody
    public void postMessage( @RequestBody com.geekcap.springintegrationexample.model.Message message, HttpServletResponse response )
    {
        // Publish the message
        publishService.send( message );

        // Set the status to 201 because we created a new message
        response.setStatus( HttpStatus.CREATED.value() );
    }

}
	

The MessageController is a Spring MVC controller that implements a RESTful web service that handles POSTs to the /message resource. It is expecting a JSON object, which we'll review later, and Spring MVC automatically converts that JSON into a com.geekcap.springintegrationexample.model.Message object. The MessageController has wired into it a PublishService and it invokes its send() method, passing it the Message that it received from the caller.

Listing 13 shows the the PublishServiceImpl class. (Note that the PublishService interface is not shown but can be found in the article's source code.)

Listing 13. PublishServiceImpl.java


package com.geekcap.springintegrationexample.service;

import com.geekcap.springintegrationexample.model.Message;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.message.GenericMessage;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class PublishServiceImpl implements PublishService
{
    private static final Logger logger = Logger.getLogger( PublishServiceImpl.class );

    @Autowired
    private MessageChannel topicChannel;

    @Override
    public void send( Message message )
    {
        logger.info( "Sending message to message channel: " + message );
        topicChannel.send( MessageBuilder.withPayload( message.toString() ).build() );
    }
}

The PublishServiceImpl class has wired into it the topicChannel, which we created in our application context. It sends it a String representation of the Message, using the MessageBuilder class that we looked at earlier. And that is all that is required to publish the message to the ActiveMQ topic! At this point, Spring integration uses the outbound-channel-adapter to look up the connectionFactory, find the topic.myTopic destination (as a topic), and send it the message.

Subscribe: MessageListenerImpl

On the other side of the operation, Listing 14 shows the source code for the MessageListenerImpl class (the MessageListener interface is not shown, but can be found in the source code that accompanies this article.

Listing 14. MessageListenerImpl.java


package com.geekcap.springintegrationexample.listener;

import org.apache.log4j.Logger;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Service;

@Service
public class MessageListenerImpl
{
    private static final Logger logger = Logger.getLogger( MessageListenerImpl.class );

    public void processMessage( String message )
    {
        logger.info( "Received message: " + message );
        System.out.println( "MessageListener::::::Received message: " + message );
    }
}

The MessageListenerImpl class is a service that defines a single method: processMessage(String). The message-driven-channel-adapter defined in the application context listens for messages published to the topic.myTopic topic and routes them to the listenerChannel. The service-activator routes messages sent to the listenerChannel to the messageListenerImpl's processMessage(String) method. As you can see, the MessageListenerImpl service has no idea that it is responding to JMS messages, all of the details are handled in the application context configuration.

Build and run the application

The POM file for this project is shown in Listing 15.

Listing 15. pom.xml


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.geekcap</groupId>
    <artifactId>spring-integration-example</artifactId>
    <packaging>war</packaging>
    <version>1.0-SNAPSHOT</version>
    <name>spring-integration-example Maven Webapp</name>
    <url>http://maven.apache.org</url>

    <properties>
        <spring.version>3.2.1.RELEASE</spring.version>
        <spring.integration.version>2.2.5.RELEASE</spring.integration.version>
        <servlet-api.version>2.5</servlet-api.version>
        <java.version>1.6</java.version>
        <jackson.version>1.9.12</jackson.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>


    <dependencies>
        <!-- Spring Dependencies -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-oxm</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-orm</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>servlet-api</artifactId>
            <version>${servlet-api.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Spring Integration-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
            <version>${spring.integration.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-jms</artifactId>
            <version>${spring.integration.version}</version>
        </dependency>



        <!-- Logging: Log4J -->
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.5.8</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.15</version>
            <exclusions>
                <exclusion>
                    <groupId>javax.mail</groupId>
                    <artifactId>mail</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>javax.jms</groupId>
                    <artifactId>jms</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.sun.jdmk</groupId>
                    <artifactId>jmxtools</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.sun.jmx</groupId>
                    <artifactId>jmxri</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- Include Jackson so that we can render JSON responses -->
        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-jaxrs</artifactId>
            <version>1.6.1</version>
        </dependency>

        <!-- Include ActiveMQ -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.7.0</version>
        </dependency>

        <!-- JUnit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>
        </plugins>
        <finalName>spring-integration-example</finalName>
    </build>
</project>

Application dependencies

Note that this example includes the following dependencies:

  • Core Spring
  • Spring MVC
  • Spring JMS
  • Spring Integration
  • Spring Integration JMS

You'll need to add additional dependencies based on the components that you are integrating. Some of those will have different configuration options, but they all boil down to channels and adapters.

Deploying with Tomcat and ActiveMQ

Next, use the following command to build the application:

mvn clean install

You can download Tomcat from the Apache Tomcat website. After you download it, decompress it locally and start it by executing the startup.sh or startup.bat file from the Tomcat bin directory. Copy the resultant WAR file to Tomcat's webapps directory in order to deploy it to Tomcat. You can download ActiveMQ from the Apache ActiveMQ website. After you download it, decompress it locally and start it by executing the following command from its bin directory:


./activemq start

Or, on Windows:


activemq start

Note that you can stop ActiveMQ by executing activemq stop and you can shut down Tomcat by executing shutdown.sh or shutdown.bat. I would also recommend starting ActiveMQ first so that the sample application can connect to the topic.myTopic topic and start listening for messages.

Stitching applications together with Spring Integration

You have learned how messages, channels, adapters, and gateways work together to abstract message consumers from message producers and you've seen how to integrate ActiveMQ and JMS into a Spring Integration example. The real power in using Spring Integration is when you start thinking about building modular and reusable services and then stitching together application workflows using Spring Integration. For example, an event-driven application could publish a message with either a lightweight payload (that identified the system that generated the event and the ID(s) of the affected resources) or a heavyweight payload (that contained the entire contents of the modified resource).

The benefit to using a lightweight payload is that if the resource changes several times, you do not need to be worried about the content of the heavyweight payload: you go back to the system of record, which is the source of truth, and ask it for the latest version of the affected resource. The drawback, of course, is that every time a system generates an event, all listeners are going to call that system back, which may add too much load. The compromise, therefore, is that the system can generate a heavyweight payload and forbid listeners from calling it back. This means that if the system generates multiple events for the same resource that some of the payloads may be out-of-date and the burden of reconciling events falls on the listeners.

We could configure Spring Integration to handle both scenarios by wiring together inbound and outbound channels and modular components. For a concrete example, let's consider the listener defined in the previous example. Here is the Spring Integration configuration for this listener:


    <!-- Create a channel for a listener that will consume messages-->
    <int:channel id="listenerChannel" />

    <int-jms:message-driven-channel-adapter id="messageDrivenAdapter"
                                            channel="listenerChannel"
                                            destination-name="topic.myTopic"
                                            pub-sub-domain="true" />

    <int:service-activator input-channel="listenerChannel" ref="messageListenerImpl" method="processMessage" />

In theory, the payload already exists and is passed to the messageListenerImpl's processMessage() method. But what if it didn't? What if the message sent to the topic.myTopic destination was only the ID of the changed resource and it was up to us to call the system back and retrieve the payload? Would we have to rewrite our logic? Luckily, we wouldn't. We can define another bean that retrieves our payload for us, route the message to that bean, and route the response from that bean to the messageListenerImpl bean.

Listing 16 shows the source code for a new RetrievePayloadImpl class.

Listing 16. RetrievePayloadImpl.java


package com.geekcap.springintegrationexample.service;

import org.springframework.stereotype.Service;

@Service
public class RetrievePayloadServiceImpl implements RetrievePayloadService
{
    @Override
    public String getPayload(String id)
    {
        // Go back to the SOR and retrieve the payload for the specified id ...
        return "Payload for " + id;
    }
}

This class would theoretically go back to the system of record (SOR) and retrieve the payload for the component with the specified ID and return that payload. In this case it just returns the String: "Payload for ..."

Now let's wire this new service in between the message-driven-channel and the messageListenerImpl, shown in listing 17.

Listing 17. Updated applicationContext.xml file with the RetrievePayloadService


    <!-- Create a channel for a listener that will consume messages-->
    <int:channel id="listenerChannel" />

    <int-jms:message-driven-channel-adapter id="messageDrivenAdapter"
                                            channel="getPayloadChannel"
                                            destination-name="topic.myTopic"
                                            pub-sub-domain="true" />

    <int:service-activator input-channel="listenerChannel" ref="messageListenerImpl" method="processMessage" />

    <int:channel id="getPayloadChannel" />

    <int:service-activator input-channel="getPayloadChannel" output-channel="listenerChannel" ref="retrievePayloadServiceImpl" method="getPayload" />

Instead of routing the message-driven-channel-adapter directly to the listenerChannel, we instead route it to the getPayloadChannel. The getPayloadChannel invokes the retrievePayloadServiceImpl bean's getPayload() method and routes its output to the listenerChannel, which sends the augmented payload to the messageListenerImpl's processMessage() method. We were able to leave the MessageListenerImpl class completely alone and route the message through another service, all through configuration.

Note that if the server exposes a RESTful interface then the RetrievePayloadService could be replaced entirely by an HTTP Outbound Gateway. For example, listing 18 shows the configuration from a project that I built that extracts the "ResourceLink" from a Message object and uses it as part of an HTTP request.

Listing 18. Using an HTTP Outbound Gateway to callback to a service


    <int:channel id="createEntityChannel" />
    <int-http:outbound-gateway request-channel="createGuestChannel"
                               url="http://localhost:8080{link}"
                               http-method="GET"
                               expected-response-type="com.mycompany.model.Entity"
                               reply-channel="transformEntityChannel" >
        <int-http:uri-variable name="link" expression="payload.getResourceLink()" />
    </int-http:outbound-gateway>

Listing 18 is only one part of a much larger applicationContext.xml file, but it illustrates how a message that is sent to the createEntityChannel is routed to an HTTP Outbound Gateway to retrieve a com.mycompany.model.Entity from http://localhost:8080/link and then is passed to the transformEntityChannel for the next step in the orchestration. Spring Integration provides a rich set of adapters and gateways that allow you to focus on your business objective and not the plumbing code required to call services, publish message, read from topics, and so forth.

In conclusion

Spring Integration helps solve enterprise integration issues by implementing the design patterns defined in the Enterprise Integration Patterns book. This includes an asynchronous messaging paradigm that abstracts message producers from message consumers. Rather than invoking a method directly, Spring Integration has you send a message to a channel. An adapter or gateway manages that channel and routes the message to the appropriate destination, whether that destination is another service running in the same virtual machine or a service running in another data center that is connected by an enterprise service bus.

In this installment of Open source Java projects I defined the terms message, channel, adapter, and gateway. I then demonstrated how to use Spring Integration to pass a message from one component to another, how to handle responses, and how to integrate components together using JMS and ActiveMQ as a message bus. Finally, I demonstrated the process of wiring applications together by writing modular components and defining channels to control the flow of messages.

This introductory article has only scratched the surface of what you can do with Spring Integration. JMS is one type of adapter, but Spring Integration support others; for instance you could use Spring integration to pass messages by email, file systems, web service calls, tweeting, and more. The key is to think in terms of routing messages to the correct components until you arrive at your desired destination.

Join the discussion
Be the first to comment on this article. Our Commenting Policies