Optimize with a SATA RAID Storage Solution
Range of capacities as low as $1250 per TB. Ideal if you currently rely on servers/disks/JBODs
Page 5 of 5
As you can guess, the next piece of code updates the sequence in the second database.
The last if statement throws a run-time exception when we run the code with the boolean value set to "true". This is for simulating a
run-time exception to test if the transaction manager has successfully rolled back the global transaction.
Let us look at the spring configuration file to see how we configured our DAO classes and the datasources:
<bean id="dsProps" class="java.util.Properties">
<constructor-arg>
<props>
<prop key="user">root</prop>
<prop key="password">murali</prop>
<prop key="DYNAMIC_CLASS">com.findonnet.service.transaction.jboss.jdbc.Mysql</prop>
</props>
</constructor-arg>
</bean>
<bean id="dataSource1" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName">
<value>com.arjuna.ats.jdbc.TransactionalDriver</value>
</property>
<property name="url" value="jdbc:arjuna:mysql://127.0.0.1:3306/mydb1"/>
<property name="connectionProperties">
<ref bean="dsProps"/>
</property>
</bean>
<bean id="dataSource2" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName">
<value>com.arjuna.ats.jdbc.TransactionalDriver</value>
</property>
<property name="url" value="jdbc:arjuna:mysql://127.0.0.1:3306/mydb2"/>
<property name="connectionProperties">
<ref bean="dsProps"/>
</property>
</bean>
<bean id="sequenceDAO" class="com.findonnet.persistence.MessageSequenceDAO">
<property name="dataSource">
<ref bean="dataSource1"/>
</property>
</bean>
<bean id="sequenceDAO2" class="com.findonnet.persistence.MessageSequenceDAO">
<property name="dataSource">
<ref bean="dataSource2"/>
</property>
</bean>
The above beans define the two datasources for the two databases. To use JBossTS's TransactionalDriver, we need to register
the database with either the JNDI bindings or with Dynamic class instantiations. We will be using the later, which requires us to implement the DynamicClass interface. The bean definition for dsProps shows that we are going to use the com.findonnet.service.transaction.jboss.jdbc.Mysql class, that we wrote, which implements the DynamicClass interface. Since JBossTS doesn't provide any out of the box wrappers for the MySQL database, we had to do this. In addition
to this, the code relies on the jdbc URL starting with "jdbc:arjuna", otherwise the JBossTS code throws errors.
The implementation of the DynamicClass interface is very simple, all we have to do is to implement the methods getDataSource() and the shutdownDataSource() methods. The getDataSource() method returns an appropriate XADataSource object, which, in our case is the com.mysql.jdbc.jdbc2.optional.MysqlXADataSource.
Please note that both the datasources are configured to use the DriverManagerDataSource, from Spring, which doesn't use any connection pooling and is not recommended for production use. The alternative is to use
a pooled datasource, which can handle XA datasource pooling.
It's now time for us to look at providing transactional semantics to our handleEvent() method in our EventHandler class:
<bean id="eventHandlerTarget" class="com.findonnet.messaging.EventHandler"></bean>
<bean id="eventHandler" class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean">
<property name="transactionManager"><ref bean="transactionManager" /></property>
<property name="target"><ref bean="eventHandlerTarget"/></property>
<property name="transactionAttributes">
<props>
<prop key="handle*">PROPAGATION_REQUIRED,-Exception</prop>
</props>
</property>
</bean>
Here we define the TransactionProxyFactoryBean and pass in the transactionManager reference, which we will see later, with the transaction attributes "PROPAGATION_REQUIRED,-Exception". This will be applied to the target bean eventHandlerTarget and only on methods, which start with "handle" (notice the handle*). To put it in simple words, what we are asking Spring framework to do is; for all method invocations on the target object,
whose method names start with "handle", please apply the transaction attributes "PROPAGATION_REQUIRED,-Exception". Behind the scenes, the Spring framework will create a CGLIB based proxy, which intercepts all the calls on the EventHandler for the method names, that start with "handle". In case of any Exception, within the method call, the current transaction will be rolled back and that is what the "-Exception" means. This demonstrates
how easy it is providing transactional support, declaratively, using Spring.
Now let us look at how we can wire up the Spring's JtaTransactionManager to use our choice of JTA implementation. The eventHandler bean defined above uses the transactionManager attribute, which will refer to the Spring JtaTransactionManager as shown below:
<bean id="jbossTransactionManager"
class="com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple">
</bean>
<bean id="jbossUserTransaction"
class="com.arjuna.ats.internal.jta.transaction.arjunacore.UserTransactionImple"/>
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager">
<ref bean="jbossTransactionManager" />
</property>
<property name="userTransaction">
<ref bean="jbossUserTransaction" />
</property>
</bean>
As shown above in the configuration, we are wiring up the spring provided JtaTransactionManager class to use com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple as the TransactionManager implementation and com.arjuna.ats.internal.jta.transaction.arjunacore.UserTransactionImple as the UserTransaction implementation.
That's all there is to it. We have just enabled XA transactions using Spring and JbossTS with MySQL datasources acting as XA resources. Please note that at the time of writing there are some known issues with MySQL XA implementation (See the resources section).
To run this use case please look at the JbossSender task in the ant build file. The project download is provided in the Resources section.
Figure 3: UseCase2 updates a database and sends a JMS message in a global transaction.

This use case uses the same code we used for use case 1. The code updates the database mydb1 and then sends a message to a message queue in a global transaction as shown in Figure 3 above. Both Atomikos and Bitronix configurations will be dealt with, in that order.
The POJO is same as the one we used for usecase1, the EventHandler class, and the relevant code looks as follows:
public void handleEvent(boolean fail) throws Exception {
MessageSequenceDAO dao = (MessageSequenceDAO) springContext.getBean("sequenceDAO");
int value = 13;
String app = "spring";
String appKey = "execution";
int upCnt = dao.updateSequence(value, app, appKey);
log.debug(" sql updCnt->" + upCnt);
...
if (springContext.containsBean("appSenderTemplate")) {
this.setJmsTemplate((JmsTemplate) springContext.getBean("appSenderTemplate"));
this.getJmsTemplate().convertAndSend("Testing 123456");
log.debug("Sent message succesfully");
}
if (fail) {
throw new RuntimeException("Simulating Rollback by throwing Exception !!");
}
}
The first code snippet in the above mentioned code updates the msgseq table in the database mydb1 with the sequence number.
The next piece of code uses the appSenderTemplate, which is configured to use the JmsTemplate class from Spring. This template is used to send JMS messages to the message provider. We will see how this is defined in
the configuration file in the following segment.
An external event, in this case the MainApp class, will invoke the method handleEvent shown in Figure 3 above.
The last if statement is for simulating a run-time exception to test if the transaction manager has successfully rolled back the global
transaction.
Let us look at the JNDI bean definitions for our messaging needs in the spring configuration file:
<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
<property name="environment">
<props>
<prop key="java.naming.factory.initial">
org.apache.activemq.jndi.ActiveMQInitialContextFactory
</prop>
</props>
</property>
</bean>
<bean id="appJmsDestination"
class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiTemplate">
<ref bean="jndiTemplate"/>
</property>
<property name="jndiName" value="test.q1"/>
</bean>
<bean id="appSenderTemplate"
class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<ref bean="queueConnectionFactoryBean"/>
</property>
<property name="defaultDestination">
<ref bean="appJmsDestination"/>
</property>
<property name="messageTimestampEnabled" value="false"/>
<property name="messageIdEnabled" value="false"/>
<!-- sessionTransacted should be true only for Atomikos -->
<property name="sessionTransacted" value="true"/>
</bean>
Here we are specifying a JndiTemplate for Spring to do a JNDI lookup on the destination specified by the appJmsDestination bean. The appJmsDestination bean has been wired with the appSenderTemplate (JmsTemplate) bean as shown above. The bean definitions also show that appSenderTemplate is wired to use the queueConnectionFactoryBean, which we will see later. For Atomikos, the sessionTransacted property should be set to "true", which is not advised by the Spring framework and the literature on JMS seem to support
that viewpoint. This is a hack we need to do only for Atomikos implementation. If this is set to "false", you will notice some Heuristic Exceptions thrown
during the 2PC protocol. This is mainly attributed to the prepare() call not responding on the JMS resource, and eventually, Atomikos decides to rollback resulting in a Heuristic Exception.
The messageTimestampEnabled and the messageIdEnabled attributes are set to "false" so that these are not generated since we are not going to use them anyway. This will reduce
the overhead on the JMS provider and improves performance.
Let us look at the spring configuration beans for Atomikos:
<bean id="xaFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
<constructor-arg>
<value>tcp://localhost:61616</value>
</constructor-arg>
</bean>
<bean id="queueConnectionFactoryBean"
class="com.atomikos.jms.QueueConnectionFactoryBean" init-method="init">
<property name="resourceName">
<value>Execution_Q</value>
</property>
<property name="xaQueueConnectionFactory">
<ref bean="xaFactory" />
</property>
</bean>
<bean id="atomikosTransactionManager"
class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close">
<property name="forceShutdown"><value>true</value></property>
</bean>
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp"/>
<bean id="transactionManager"
class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager">
<ref bean="atomikosTransactionManager" />
</property>
<property name="userTransaction">
<ref bean="atomikosUserTransaction" />
</property>
</bean>
The xaFactory definition shows that the underlying xa connection factory being used is the org.apache.activemq.ActiveMQXAConnectionFactory class. The transactionManager bean is wired up to use the atomikosTransactionManager bean and the atomikosUserTransaction bean.
Let us now look at the datasource definition for Atomikos:
<bean id="dataSource" class="com.atomikos.jdbc.SimpleDataSourceBean" init-method="init" destroy-method="close">
<property name="uniqueResourceName"><value>Mysql</value></property>
<property name="xaDataSourceClassName">
<value>com.mysql.jdbc.jdbc2.optional.MysqlXADataSource</value>
</property>
<property name="xaDataSourceProperties">
<value>URL=jdbc:mysql://127.0.0.1:3306/mydb1?user=root&password=murali</value>
</property>
<property name="exclusiveConnectionMode"><value>true</value></property>
</bean>
Atomikos provides a generic wrapper class, which makes it easy to pass in the xaDataSourceClassName, which, in our case, is com.mysql.jdbc.jdbc2.optional.MysqlXADataSource. Same is the case for the JDBC url. The exclusiveConnectionMode is set to "true" to make sure that the connection in the current transaction is not shared. Atomikos provides connection
pooling out of the box, and one can set the pool size using the connectionPoolSize attribute.
Let us now look at the relevant bean definitions for Bitronix:
<bean id="ConnectionFactory" factory-bean="ConnectionFactoryBean" factory-method="createResource" />
<bean id="dataSourceBean1" class="bitronix.tm.resource.jdbc.DataSourceBean">
<property name="className" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />
<property name="uniqueName" value="mysql" />
<property name="poolSize" value="2" />
<property name="driverProperties">
<props>
<prop key="user">root</prop>
<prop key="password">murali</prop>
<prop key="databaseName">mydb1</prop>
</props>
</property>
</bean>
<bean id="Db1DataSource" factory-bean="dataSourceBean1" factory-method="createResource" />
<bean id="BitronixTransactionManager" factory-method="getTransactionManager"
class="bitronix.tm.TransactionManagerServices" depends-on="btmConfig,ConnectionFactory" destroy-method="shutdown" />
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager" ref="BitronixTransactionManager" />
<property name="userTransaction" ref="BitronixTransactionManager" />
</bean>
<bean id="btmConfig" factory-method="getConfiguration" class="bitronix.tm.TransactionManagerServices">
<property name="serverId" value="spring-btm-sender" />
</bean>
The appSenderTemplate bean defined for Atomikos can be re-used, with the only exception of the sessionTransacted value. This is set to "false", the default for the JmsTemplate in Spring anyway, so one can even ignore this attribute.
The datasource bean definition, shown above, looks similar to Atomikos, but the main difference is that the bean creation is done using an instance factory method rather than the static factory method. In this case, the Db1DataSource bean is created using the factory-bean dataSourceBean1. The factory method specified was createResource.
The transactionManager bean refers to BitonixTransactionManager for both the transactionManager attribute and the userTransaction attribute.
To run this use case, please look at the AtomikosSender task and the BitronixSender in the ant build file, provided as part of the project download.
The sequence diagram for this use case, which is by no means a comprehensive one, is shown below:
Figure 5: Sequence diagram, which illustrates the process flow for use case 2.

Figure 4: UseCase3 consumes a JMS message and updates a database in a global transaction.

This use case is different from the previous use cases and all it does is define a POJO to handle the messages received from a messaging provider. It also updates the database within the same transaction as shown in Figure 4 above.
The relevant code for our MessageHandler class looks as follows:
public void handleOrder(String msg) {
log.debug("Receieved message->: " + msg);
MessageSequenceDAO dao = (MessageSequenceDAO) MainApp.springCtx.getBean("sequenceDAO");
String app = "spring";
String appKey = "allocation";
int upCnt = dao.updateSequence(value++, app, appKey);
log.debug("Update SUCCESS!! Val: " + value + " updateCnt->"+ upCnt);
if (fail)
throw new RuntimeException("Rollback TESTING!!");
}
As you can see, the code just updates the database mydb2. The MessageHandler is just a POJO, which has a handleOrder() method. Using Spring we are going to transform this into a message driven POJO (analogous to MDB in a JEE server). To accomplish
this we will use the MessageListenerAdapter class, which delegates the message handling to the target listening methods via reflection. This is a very convenient feature,
which enables simple POJO's to be converted to message driven POJO's (beans). Our MDP now supports distributed transactions.
It's time for us to look at the configuration for more clarity:
<bean id="msgHandler" class="com.findonnet.messaging.MessageHandler"/>
<bean id="msgListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<property name="delegate" ref="msgHandler"/>
<property name="defaultListenerMethod" value="handleOrder"/>
</bean>
The above configuration shows that the msgListener bean delegates the calls to the bean defined by the msgHandler. Also, we have specified the handleOrder(), which should be invoked when the message arrives from the message provider. This was done using the defaultListenerMethod attribute. Let us now look at the message listener, which listens to the destination on the message provider:
<bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"
destroy-method="close">
<property name="concurrentConsumers" value="1"/>
<property name="connectionFactory" ref="queueConnectionFactoryBean" />
<property name="destination" ref="appJmsDestination"/>
<property name="messageListener" ref="msgListener"/>
<property name="transactionManager" ref="transactionManager"/>
<property name="sessionTransacted" value="false"/>
<property name="receiveTimeout" value="5000"/>
<property name="recoveryInterval" value="6000"/>
<property name="autoStartup" value="false"/>
</bean>
The listenerContainer in this case uses the DefaultMessageListenerContainer provided by Spring. The concurrentConsumers attribute is set to "1", which indicates that we will only have one consumer on the queue. This attribute is mainly used
for draining the queues concurrently by spawning multiple listeners (threads), and is very useful in situations where you
have a fast producer and slow consumer and the ordering of the messages is not important. With Spring 2.0.3 and above there
is support for dynamically adjusting the number of listeners based on load using the maxConcurrentConsumers attribute. The recoveryInterval attribute is used for recovery purposes and is useful when a messaging provider is down and we want to re-connect without
bringing the application down. This feature, however, runs in an infinite loop and keeps re-trying for as long as the application
is running, which you may not want. Also, one has to be careful in properly disposing the DMLC, since there are background
threads, which might still be trying to receive messages from the message provider even after the JVM is shutdown. As of Spring
2.0.4, this issue has been fixed. As mentioned before, the sessionTransacted attribute should be set to "true" for Atomikos only. The same bean definition for listenerContainer applies to both Atomikos and Bitronix.
Please note that the transactionManger attribute points to the bean definitions that were defined above (for usecase2) and we are just re-using the same bean definitions.
That's all there is to it, we just implemented our MDP which receives the message and updates the database in a single global transaction.
To run this use case, please look at the AtomikosConsumer task and the BitronixConsumer in the ant build file, provided as part of the project download.
To intercept the calls, between the Spring framework and the JTA code, an Interceptor class has been used and weaved into the runtime libraries of the JTA implementation jar files. It uses AspectJ and the code snippet is shown below:
pointcut xaCalls() : call(* XAResource.*(..))
|| call(* TransactionManager.*(..))
|| call(* UserTransaction.*(..))
;
Object around() : xaCalls() {
log.debug("XA CALL -> This: " + thisJoinPoint.getThis());
log.debug(" -> Target: " + thisJoinPoint.getTarget());
log.debug(" -> Signature: " + thisJoinPoint.getSignature());
Object[] args = thisJoinPoint.getArgs();
StringBuffer str = new StringBuffer(" ");
for(int i=0; i< args.length; i++) {
str.append(" [" + i + "] = " + args[i]);
}
log.debug(str);
Object obj = proceed();
log.debug("XA CALL RETURNS-> " + obj);
return obj;
}
The above code defines a pointcut on all calls made to any JTA related code and it also defines an around advice, which logs the arguments being passed and the method return values. This will come in handy when we are trying to trace and debug issues with JTA implementations. The ant build.xml file in the project (see Resources) contains tasks to weave the aspect against the JTA implementations. Another option is to use the MaintainJ plugin for eclipse, which provides the same from the comfort of an IDE (Eclipse) and even generates the sequence diagram for the process flow.
Distributed transactions is a very complex topic and one should look out for implementations where transaction recovery is robust enough and provides all the ACID (Atomicity, Consistency, Isolation and Durability) criteria that the user or application expects. What we tested, in this article, was for pre-2PC exceptions (remember the RuntimeException we were throwing to test rollbacks? ). Applications should thoroughly test JTA implementations for failures during the 2 phase commit process as they are the most crucial and troublesome.
All the JTA implementations we looked at provide recovery test cases, which make it easy to run against the implementation
itself, and on the participating XA resources as well. Please note that using XA may turn out to be a huge performance concern
especially when the transaction volumes are large. One should also look at support for 2PC optimizations like the "last resource
commit", which might fit some application needs where only one of the participating resource cannot or need not support 2PC.
Care should be taken about the XA features supported and restrictions imposed, if any, by the vendors of the database or the
message provider. For example, MySQL doesn't support suspend()and resume() operations and also seems to have some restrictions on using XA and in some situations might even keep the data in an in-consistent
state. To learn more about XA, Principles of Transaction Processing is a very good book, which covers the 2PC failure conditions and optimizations in great detail. Also, Mike Spille's blog
(see Resources section) is another good resource, which focuses on XA within the JTA context and provides wealth of information,
especially on failures during 2PC and helps understand more about XA transactions.
When using Spring framework for sending and receiving JMS messages, one should be wary of using the JmsTemplate and the DefualtMessageListenerContainer when running in a non-J(2)EE environment. In case of JmsTemplate, for every message that is sent there will be a new JMS connection created. Same is the case when using the DefaultMessageListenerContainer when receiving messages. Creating a heavy weight JMS connection is very resource-intensive and the application may not scale
well under heavy loads. One option is to look for some sort of connection/session pooling support either from the JMS providers
or third-party libraries. Another option is to use the SingleConnnectionFactory from Spring, which makes it easy to configure a single connection, which can be re-used. The sessions are, however created
for every message being sent, and this may not be a real overhead since JMS sessions are lightweight. Same is the case when
messages are being received irrespective of if they are transactional or not.
In this article we saw how Spring framework can be integrated with JTA implementations to provide distributed transactions and how it could cater to the needs of an application which required distributed transactions without the need for a full-blown JEE server. The article also show-cased how Spring framework provided POJO based solutions and declarative transaction management with minimal intrusion while promoting best design practices. The use cases we saw, also demonstrated how Spring provides us with a rich transaction management abstraction, enabling us to easily switch between different JTA providers seamlessly.
Murali Kosaraju works as a technical architect at Wachovia Bank in Charlotte, North Carolina. He holds a masters degree in Systems and Information engineering and his interests include messaging, service oriented architectures (SOA), Web Services, JEE and .NET centric applications. He currently lives with his wife Vidya and son Vishal in South Carolina.
Read more about Enterprise Java in JavaWorld's Enterprise Java section.
Archived Discussions (Read only)