Jul 29, 2014 12:13 PM PT

Open source Java projects: Spring Batch

Reading and writing CSV files with Spring Batch and MySQL

Implementing a batch process to handle gigabytes of data is a veritable tsunami of a task, but you can take it down a chunk with the help of Spring Batch. This popular Spring module has been engineered to handle the details of batch processing for all sorts of files. Get started with Spring Batch by building a simple job that imports products from a CSV file into a MySQL database, then explore the module's batch processing capabilities with a single or multiple processors and one or more helpful tasklets. Finally, get a quick overview of Spring Batch's resiliency tools for skipping records, retrying records, and restarting batch jobs.

If you've ever had to implement a batch process to pass hundreds of thousands of data elements between Java enterprise systems, then you know what a load of work that is. Your batch processing system needs to be able to handle huge amounts of data, handle the failure of individual records without crashing the entire process, and manage interruptions and restarts without having to re-do what's already been done.

For the uninitiated, here are some scenarios that require batch processing, and where using Spring Batch could potentially save you countless hours:

  • You receive a file that is missing some information, so you parse through the file, call a service to retrieve the missing information, and write the file out for another batch process to handle.
  • When an error occurs in your environment, you write the failed message to your database. You have a process that looks for failed messages every 15 minutes and replays ones that you've identified as replayable.
  • You have a workflow in that you expect other systems to call certain services, in addition to events received. If those other systems do not call your services then you automatically clean up your data after a couple days so that the business processes do not fail.
  • You receive a file every day that contains employee updates and you need to create artifacts for new employees.
  • You have services that can be used to customize orders. Every night you run a batch process that constructs manifest files and sends them to your fulfillment vendors.

Jobs and chunks: The Spring Batch paradigm

Spring Batch has a lot of moving parts, but let's start by looking at the core processing that you'll do in a batch job. You can think about the work in a job as following three distinct steps:

  1. Reading
  2. Processing
  3. Writing

For example, you might open a data file in CSV format, perform some processing on the data in the file, and then write the data to a database. In Spring Batch, you would configure a reader to read one line of the file at a time and pass each line to your processor; the processor would collect and group the results into "chunks" and send those records to a writer, which would insert them into the database. You can see the cycle in Figure 1.

Figure 1. The basic logic of Spring Batch processing

Spring Batch simplifies batch processing greatly by providing implementations of readers for common input sources like CSV files, XML files, databases, JSON records contained in a file, and even JMS, as well as writers. It's also fairly simple to build custom readers and writers if you need to.

To get started, let's look at the process to configure a file reader to read a CSV file, map its contents to an object, and insert the resulting objects into a database.

Reading and processing a CSV file

Spring Batch's built-in reader, org.springframework.batch.item.file.FlatFileItemReader, parses a file into individual lines. It requires a resource that references the flat file, the number of lines to skip at the beginning of the file (typically just the file headers), and a line mapper that converts an individual line into an object. The line mapper, in turn, requires a line tokenizer that divides a line into its constituent fields, and a field set mapper that builds an object from the set of field values. The configuration for FlatFileItemReader is shown below:

Listing 1. A Spring Batch config file


    <bean id="productReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">

        <!-- <property name="resource" value="file:./sample.csv" /> -->
        <property name="resource" value="file:#{jobParameters['inputFile']}" />

        <property name="linesToSkip" value="1" />

        <property name="lineMapper">
            <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">

                <property name="lineTokenizer">
                    <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                        <property name="names" value="id,name,description,quantity" />
                    </bean>
                </property>

                <property name="fieldSetMapper">
                    <bean class="com.geekcap.javaworld.springbatchexample.simple.reader.ProductFieldSetMapper" />
                </property>
            </bean>
        </property>
    </bean>

Let's look at these components. First, Figure 2 shows a diagram of the relationship between them.

Figure 2. Components of FlatFileItemReader

Resources: The resource property defines the file to read. The commented-out resource show the path to an absolute file, which is sample.csv in the same directory in which the batch job is run. The more interesting entry is the inputFile job parameter: job parameters allow you to specify parameters at runtime to affect the job. In the case of the import file, it's a very important parameter to resolve at runtime rather than at build time. (It would be pretty boring to import the same file over and over again!)

Lines to skip: The linesToSkip property tells the file reader how many leading lines in the file to skip. Quite often CSV files will contain header information, such as column names, in the first line of a file, so in this example we tell the file reader to skip the first line.

Line mapper: The lineMapper is responsible for converting individual lines of a file into objects. It depends on two components:

  • lineTokenizer defines how to break the line up into tokens. In our case we list the names of the columns in the CSV file.
  • fieldSetMapper builds an object from field values. In our case we build a Product object from the id, name, description, and quantity fields.

Note that Spring Batch provides the infrastructure for us, but we're still responsible for the logic in the field set mapper. Listing 2 shows the source code for the Product object, which is the object we're building.

Listing 1. Product.java


package com.geekcap.javaworld.springbatchexample.simple.model;

/**
 * Simple POJO to represent a product
 */
public class Product
{
    private int id;
    private String name;
    private String description;
    private int quantity;

    public Product() {
    }

    public Product(int id, String name, String description, int quantity) {
        this.id = id;
        this.name = name;
        this.description = description;
        this.quantity = quantity;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getDescription() {
        return description;
    }

    public void setDescription(String description) {
        this.description = description;
    }

    public int getQuantity() {
        return quantity;
    }

    public void setQuantity(int quantity) {
        this.quantity = quantity;
    }
}

The Product class is a simple POJO that wraps our four fields. Listing 2 shows the source code for the ProductFieldSetMapper class.

Listing 2. ProductFieldSetMapper.java


package com.geekcap.javaworld.springbatchexample.simple.reader;

import com.geekcap.javaworld.springbatchexample.simple.model.Product;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;

/**
 * Builds a Product from a row in the CSV file (as a set of fields)
 */
public class ProductFieldSetMapper implements FieldSetMapper<Product>
{
    @Override
    public Product mapFieldSet(FieldSet fieldSet) throws BindException {
        Product product = new Product();
        product.setId( fieldSet.readInt( "id" ) );
        product.setName( fieldSet.readString( "name" ) );
        product.setDescription( fieldSet.readString( "description" ) );
        product.setQuantity( fieldSet.readInt( "quantity" ) );
        return product;
    }
}

The ProductFieldSetMapper class extends FieldSetMapper, which defines a single method: mapFieldSet(). Once the line mapper has parsed the line into its individual fields, it builds a FieldSet, which contains the named fields, and passes that to the mapFieldSet() method. This method is responsible for building an object to represent that row in the CSV file. In our case, we build a Product instance by invoking the various read methods on the FieldSet.

Writing to the database

After we've read the the file and have a set of Products, the next step is to write it to the database. Technically we could wire in a processing step that does something to the data, but for now let's just write the data to the database. Listing 3 shows the source code for the ProductItemWriter class.

Listing 3. ProductItemWriter.java


package com.geekcap.javaworld.springbatchexample.simple.writer;

import com.geekcap.javaworld.springbatchexample.simple.model.Product;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

/**
 * Writes products to a database
 */
public class ProductItemWriter implements ItemWriter<Product>
{
    private static final String GET_PRODUCT = "select * from PRODUCT where id = ?";
    private static final String INSERT_PRODUCT = "insert into PRODUCT (id,name,description,quantity) values (?,?,?,?)";
    private static final String UPDATE_PRODUCT = "update PRODUCT set name = ?, description = ?,quantity = ? where id = ?";

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Override
    public void write(List<? extends Product> products) throws Exception
    {
        for( Product product : products )
        {
            List<Product> productList = jdbcTemplate.query(GET_PRODUCT, new Object[] {product.getId()}, new RowMapper<Product>() {
                @Override
                public Product mapRow( ResultSet resultSet, int rowNum ) throws SQLException {
                    Product p = new Product();
                    p.setId( resultSet.getInt( 1 ) );
                    p.setName( resultSet.getString( 2 ) );
                    p.setDescription( resultSet.getString( 3 ) );
                    p.setQuantity( resultSet.getInt( 4 ) );
                    return p;
                }
            });

            if( productList.size() > 0 )
            {
                jdbcTemplate.update( UPDATE_PRODUCT, product.getName(), product.getDescription(), product.getQuantity(), product.getId() );
            }
            else
            {
                jdbcTemplate.update( INSERT_PRODUCT, product.getId(), product.getName(), product.getDescription(), product.getQuantity() );
            }
        }
    }
}

The ProductItemWriter class extends ItemWriter and implements its single method: write(). The write() method accepts a list of Products. Spring Batch implements its writers using a "chunking" strategy, which means that while reads are performed one item at a time, writes are chunked together into groups. In the job configuration, which is defined below, you have full control over the number of items that you want chunked together (through the commit-interval) into a single write. In this example, the write() method does the following:

  1. It executes an SQL SELECT statement to retrieve the Product with the specified id.
  2. If the SELECT returns an item then write() performs an update to update the database record with the new values.
  3. If the SELECT does not return an item then write() performs an INSERT to add the product to the database.

The ProductItemWriter class uses Spring's JdbcTemplate class, which is defined in the applicationContext.xml file below and automatically wired into the ProductItemWriter class. If you haven't used the JdbcTemplate class, it is an implementation of the Gang of Four template design pattern for interacting with databases behind a JDBC interface. The code should be pretty self-explanatory, but if you need more information, check out the Spring JdbcTemplate javadoc.

Wiring it together in the application context file

Thus far we have built a Product domain object, a ProductFieldSetMapper that converts a line in the CSV file into an object, and a ProductItemWriter that writes objects to the database. Now we need to configure Spring Batch to wire all of these together. Listing 4 shows the source code for the applicationContext.xml file, which defines our beans.

Listing 4. applicationContext.xml


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xmlns:jdbc="http://www.springframework.org/schema/jdbc"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
                http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
                http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd">


    <context:annotation-config />

    <!-- Component scan to find all Spring components -->
    <context:component-scan base-package="com.geekcap.javaworld.springbatchexample" />


    <!-- Data source - connect to a MySQL instance running on the local machine -->
    <bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
        <property name="url" value="jdbc:mysql://localhost/spring_batch_example"/>
        <property name="username" value="sbe"/>
        <property name="password" value="sbe"/>
    </bean>

    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="dataSource" />
    </bean>

    <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
        <property name="dataSource" ref="dataSource" />
    </bean>

    <!-- Create job-meta tables automatically -->
    <jdbc:initialize-database data-source="dataSource">
        <jdbc:script location="org/springframework/batch/core/schema-drop-mysql.sql" />
        <jdbc:script location="org/springframework/batch/core/schema-mysql.sql" />
    </jdbc:initialize-database>


    <!-- Job Repository: used to persist the state of the batch job -->
    <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
        <property name="transactionManager" ref="transactionManager" />
    </bean>


    <!-- Job Launcher: creates the job and the job state before launching it -->
    <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository" />
    </bean>


    <!-- Reader bean for our simple CSV example -->
    <bean id="productReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">

        <!-- <property name="resource" value="file:./sample.csv" /> -->
        <property name="resource" value="file:#{jobParameters['inputFile']}" />


        <!-- Skip the first line of the file because this is the header that defines the fields -->
        <property name="linesToSkip" value="1" />

        <!-- Defines how we map lines to objects -->
        <property name="lineMapper">
            <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">

                <!-- The lineTokenizer divides individual lines up into units of work -->
                <property name="lineTokenizer">
                    <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">

                        <!-- Names of the CSV columns -->
                        <property name="names" value="id,name,description,quantity" />
                    </bean>
                </property>

                <!-- The fieldSetMapper maps a line in the file to a Product object -->
                <property name="fieldSetMapper">
                    <bean class="com.geekcap.javaworld.springbatchexample.simple.reader.ProductFieldSetMapper" />
                </property>
            </bean>
        </property>
    </bean>

    <bean id="productWriter" class="com.geekcap.javaworld.springbatchexample.simple.writer.ProductItemWriter" />

</beans>

Note that separating our job configuration from our application/environment configuration enables us to move a job from one environment to another without redefining the job. The following beans are defined in Listing 4:

  • dataSource: The sample application connects to MySQL, so the data source is configured to connect to a MySQL database named spring_batch_example running on the localhost (see below for setup instructions).
  • transactionManager: The Spring transaction manager is used to manage MySQL transactions.
  • jdbcTemplate: This class provides an implementation of the template design pattern for interacting with JDBC connections. It's a helper class to simplify our database integration. In a production application we would probably opt to use an ORM tool like Hibernate behind a service layer, but I want to keep the example as simple as possible.
  • jobRepository: The MapJobRepositoryFactoryBean is a Spring Batch component that manages the state of a job. In this case it stores job information into the MySQL database using the previously configured JdbcTemplate.
  • jobLauncher: This is the component that launches and manages the workflow of a Spring Batch job.
  • productReader: This bean performs the read operation in our job.
  • productWriter: This bean writes the Product instances to the database.

Note that the jdbc:initialize-database node points to two Spring Batch scripts that create the database tables to support the running batch job. These scripts are located in the Spring Batch core JAR file (which is automatically imported by Maven) in the specified paths. The JAR file contains scripts for various database vendors, including MySQL, Oracle, SQL Server, and more. These scripts create the schema for use while running the jobs. In this example it drops and then creates the tables, which you can do for a temporary run. In a production environment you could extract the SQL file and create the tables yourself --- in which case you'd get to keep them around forever.

Defining the job

Listing 5 shows the file-import-job.xml file, which defines the actual job.

Listing 5. file-import-job.xml


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
                http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd">


    <!-- Import our beans -->
    <import resource="classpath:/applicationContext.xml" />

    <job id="simpleFileImportJob" xmlns="http://www.springframework.org/schema/batch">
        <step id="importFileStep">
            <tasklet>
                <chunk reader="productReader" writer="productWriter" commit-interval="5" />
            </tasklet>
        </step>
    </job>

</beans>

Note that a job can contain zero or more steps; a step can contain zero or one tasklet; and a tasklet can contain zero or one chunk, as shown graphically in Figure 3.

Figure 3. Jobs, steps, tasklets, and chunks

In our example, the simpleFileImportJob contains a single step named importFileStep. The importFileStep contains an unnamed tasklet that contains a chunk. The chunk is configured with a reference to our productReader and productWriter. It defines a commit-interval of 5, which means that it will send the writer five records at once. The step will read five products using the productReader and then pass those products to the productWriter to be written out. This chuck repeats until all of the data is exhausted.

Listing 5 also imports the applicationContext.xml file, which contains all of our beans. Jobs are typically defined in separate files; this is because the job launcher requires a job file and a job name when it is executed. Everything could be defined in one file, but it would quickly become unwieldy, so as a convention, the job is defined in one file and it imports all dependent files.

Finally, you may notice that the XML namespace (xmlns) is defined inside the job node. We do this so that we do not need to preface each and every node with "batch:." Defining the namespace at the node level affects both the node that defines it and all child nodes.

Build the project

Listing 6 shows the contents of the POM file that builds this sample project.

Listing 6. pom.xml


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.geekcap.javaworld</groupId>
  <artifactId>spring-batch-example</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>spring-batch-example</name>
  <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring.version>3.2.1.RELEASE</spring.version>
        <spring.batch.version>2.2.1.RELEASE</spring.batch.version>
        <java.version>1.6</java.version>
    </properties>

    <dependencies>
        <!-- Spring Dependencies -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jdbc</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-core</artifactId>
            <version>${spring.batch.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-infrastructure</artifactId>
            <version>${spring.batch.version}</version>
        </dependency>

        <!-- Apache DBCP-->
        <dependency>
            <groupId>commons-dbcp</groupId>
            <artifactId>commons-dbcp</artifactId>
            <version>1.4</version>
        </dependency>

        <!-- MySQL -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.27</version>
        </dependency>


        <!-- Testing -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy</id>
                        <phase>install</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
        <finalName>spring-batch-example</finalName>
    </build>


</project>

The POM file imports Spring's context, core, beans, and JDBC packages, and then it imports the Spring Batch core and infrastructure packages. These dependencies set up Spring and Spring Batch. It also imports the Apache DBCP dependency to allow us to set up a database connection pool and the MySQL driver. The plug-in section defines the build to use Java 1.6 and configures the build to copy all dependencies to the lib directory. Use the following command to build the project:

mvn clean install

Connecting Spring Batch to a database

Your job is all set up but you'll need to connect Spring Batch to a database if you want to run it in a production environment. Spring Batch maintains a set of tables that it uses to record the current state of jobs and the records that have been processed. This way, if a job does need to be restarted, it can continue from where it left off.

You can connect Spring Batch to any database that you like, but for the purposes of this demo we'll use MySQL. Please download MySQL to follow the examples. The community version is free and will meet your needs. Review the installation instructions for your operating system to get an environment up and running.

Once you have MySQL set up you'll need to create the database and a user that has permissions to interact with that database. From the command-line, launch mysql from MySQL's bin directory and execute the following commands (note that you may need to execute mysql as root or using sudo, depending on your operating system):


create database spring_batch_example;
create user 'sbe'@'localhost' identified by 'sbe';
grant all on spring_batch_example.* to 'sbe'@'localhost';

The first line creates a new database named spring_batch_example, which will maintain your products. The second line creates a user named sbe (for Spring Batch Example) with the password sbe. The last line grants all permissions on the spring_batch_example database to the sbe user.

Next, create the PRODUCT table with the following command:


CREATE TABLE PRODUCT (
	ID INT NOT NULL,
	NAME VARCHAR(128) NOT NULL,
	DESCRIPTION VARCHAR(128),
	QUANTITY INT,
	PRIMARY KEY(ID)
);

Now create a file named sample.csv in your project's target directory with the following data:


id,name,description,quantity
1,Product One,This is product 1, 10
2,Product Two,This is product 2, 20
3,Product Three,This is product 3, 30
4,Product Four,This is product 4, 20
5,Product Five,This is product 5, 10
6,Product Six,This is product 6, 50
7,Product Seven,This is product 7, 80
8,Product Eight,This is product 8, 90

The batch job can be launched with this:


java -cp spring-batch-example.jar:./lib/* org.springframework.batch.core.launch.support.CommandLineJobRunner classpath:/jobs/file-import-job.xml simpleFileImportJob inputFile=sample.csv

The CommandLineJobRunner class is a Spring Batch class that executes a job. It requires the name of the XML file that contains job, the name of the job to execute, and optionally any job parameters that you want to send to it. Because the file-import-job.xml file is inside the JAR file, it can be accessed as follows: classpath:/jobs/file-import-job.xml. We want to execute the job named simpleFileImportJob and pass a single job parameter named inputFile, with the value of sample.csv.

This should yield output similar to the following:


Nov 12, 2013 4:09:17 PM org.springframework.context.support.AbstractApplicationContext prepareRefresh
INFO: Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@6b4da8f4: startup date [Tue Nov 12 16:09:17 EST 2013]; root of context hierarchy
Nov 12, 2013 4:09:17 PM org.springframework.beans.factory.xml.XmlBeanDefinitionReader loadBeanDefinitions
INFO: Loading XML bean definitions from class path resource [jobs/file-import-job.xml]
Nov 12, 2013 4:09:18 PM org.springframework.beans.factory.xml.XmlBeanDefinitionReader loadBeanDefinitions
INFO: Loading XML bean definitions from class path resource [applicationContext.xml]
Nov 12, 2013 4:09:19 PM org.springframework.beans.factory.support.DefaultListableBeanFactory registerBeanDefinition
INFO: Overriding bean definition for bean 'simpleFileImportJob': replacing [Generic bean: class [org.springframework.batch.core.configuration.xml.SimpleFlowFactoryBean]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null] with [Generic bean: class [org.springframework.batch.core.configuration.xml.JobParserJobFactoryBean]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null]
Nov 12, 2013 4:09:19 PM org.springframework.beans.factory.support.DefaultListableBeanFactory registerBeanDefinition
INFO: Overriding bean definition for bean 'productReader': replacing [Generic bean: class [org.springframework.batch.item.file.FlatFileItemReader]; scope=step; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=false; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null; defined in class path resource [applicationContext.xml]] with [Root bean: class [org.springframework.aop.scope.ScopedProxyFactoryBean]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null; defined in BeanDefinition defined in class path resource [applicationContext.xml]]
Nov 12, 2013 4:09:19 PM org.springframework.beans.factory.support.DefaultListableBeanFactory preInstantiateSingletons
INFO: Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@6aba4211: defining beans [org.springframework.context.annotation.internalConfigurationAnnotationProcessor,org.springframework.context.annotation.internalAutowiredAnnotationProcessor,org.springframework.context.annotation.internalRequiredAnnotationProcessor,org.springframework.context.annotation.internalCommonAnnotationProcessor,dataSource,transactionManager,jdbcTemplate,jobRepository,jobLauncher,productReader,productWriter,org.springframework.batch.core.scope.internalStepScope,org.springframework.beans.factory.config.CustomEditorConfigurer,org.springframework.batch.core.configuration.xml.CoreNamespacePostProcessor,importFileStep,simpleFileImportJob,org.springframework.context.annotation.ConfigurationClassPostProcessor.importAwareProcessor,scopedTarget.productReader]; root of factory hierarchy
Nov 12, 2013 4:09:19 PM org.springframework.batch.core.launch.support.SimpleJobLauncher afterPropertiesSet
INFO: No TaskExecutor has been set, defaulting to synchronous executor.
Nov 12, 2013 4:09:22 PM org.springframework.batch.core.launch.support.SimpleJobLauncher$1 run
INFO: Job: [FlowJob: [name=simpleFileImportJob]] launched with the following parameters: [{inputFile=sample.csv}]
Nov 12, 2013 4:09:22 PM org.springframework.batch.core.job.SimpleStepHandler handleStep
INFO: Executing step: [importFileStep]
Nov 12, 2013 4:09:22 PM org.springframework.batch.core.launch.support.SimpleJobLauncher$1 run
INFO: Job: [FlowJob: [name=simpleFileImportJob]] completed with the following parameters: [{inputFile=sample.csv}] and the following status: [COMPLETED]
Nov 12, 2013 4:09:22 PM org.springframework.context.support.AbstractApplicationContext doClose
INFO: Closing org.springframework.context.support.ClassPathXmlApplicationContext@6b4da8f4: startup date [Tue Nov 12 16:09:17 EST 2013]; root of context hierarchy
Nov 12, 2013 4:09:22 PM org.springframework.beans.factory.support.DefaultSingletonBeanRegistry destroySingletons
INFO: Destroying singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@6aba4211: defining beans [org.springframework.context.annotation.internalConfigurationAnnotationProcessor,org.springframework.context.annotation.internalAutowiredAnnotationProcessor,org.springframework.context.annotation.internalRequiredAnnotationProcessor,org.springframework.context.annotation.internalCommonAnnotationProcessor,dataSource,transactionManager,jdbcTemplate,jobRepository,jobLauncher,productReader,productWriter,org.springframework.batch.core.scope.internalStepScope,org.springframework.beans.factory.config.CustomEditorConfigurer,org.springframework.batch.core.configuration.xml.CoreNamespacePostProcessor,importFileStep,simpleFileImportJob,org.springframework.context.annotation.ConfigurationClassPostProcessor.importAwareProcessor,scopedTarget.productReader]; root of factory hierarchy

Verify that the PRODUCT table in the database contains eight rows and that they have the correct values.

Batch processing with Spring Batch

At this point, the example reads data from a CSV file and imports it into the database. While this is useful, it's likely that you'll sometimes want to transform or filter your data before inserting it into the database. In this section we'll build a simple processor that, rather than overwriting the quantity of a product, instead retrieves the existing record from the database and then adds the quantity in the CSV file to the product before passing it to the writer.

Listing 7 shows the source code for the ProductItemProcessor class.

Listing 7. ProductItemProcessor.java


package com.geekcap.javaworld.springbatchexample.simple.processor;

import com.geekcap.javaworld.springbatchexample.simple.model.Product;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

/**
 * Processor that finds existing products and updates a product quantity appropriately
 */
public class ProductItemProcessor implements ItemProcessor<Product,Product>
{
    private static final String GET_PRODUCT = "select * from PRODUCT where id = ?";
    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Override
    public Product process(Product product) throws Exception
    {
        // Retrieve the product from the database
        List<Product> productList = jdbcTemplate.query(GET_PRODUCT, new Object[] {product.getId()}, new RowMapper<Product>() {
            @Override
            public Product mapRow( ResultSet resultSet, int rowNum ) throws SQLException {
                Product p = new Product();
                p.setId( resultSet.getInt( 1 ) );
                p.setName( resultSet.getString( 2 ) );
                p.setDescription( resultSet.getString( 3 ) );
                p.setQuantity( resultSet.getInt( 4 ) );
                return p;
            }
        });

        if( productList.size() > 0 )
        {
            // Add the new quantity to the existing quantity
            Product existingProduct = productList.get( 0 );
            product.setQuantity( existingProduct.getQuantity() + product.getQuantity() );
        }

        // Return the (possibly) update prduct
        return product;
    }
}

Item processors implement the interface ItemProcessor<I,O>, where I is the type of object sent to the processor and O is the type of object returned by the processor. In this example we pass in a Product and then return a Product. The ItemProcessor defines a single method: process(), in which we execute a SELECT query to retrieve the Product with the specified id from the database. If the Product is found, it will add the existing Product's quantity to the new quantity.

This processor doesn't do any filtering, but if the process() method returned null then Spring Batch would omit this item from the list to be sent to the writer.

Wiring this into the job is quite simple. First, add a new bean to the applicationContext.xml file:


<bean id="productProcessor" class="com.geekcap.javaworld.springbatchexample.simple.processor.ProductItemProcessor" />

Next, reference it in the chunk as the processor:


    <job id="simpleFileImportJob" xmlns="http://www.springframework.org/schema/batch">
        <step id="importFileStep">
            <tasklet>
                <chunk reader="productReader" processor="productProcessor" writer="productWriter" commit-interval="5" />
            </tasklet>
        </step>
    </job>

Build and execute the job and you should see the product quantity in the database increasing each time you run this batch job.

Building multiple processors

We've defined a single processor, but at some point you might want to build several finely-grained item processors and execute all of them successively in the same chunk. For example, you might have a filter to skip over items that don't exist in the database and a processor that correctly manages the item quantity. If this is the case then you can use Spring Batch's CompositeItemProcessor. The process is as follows:

  1. Build the processor classes
  2. Define the processor beans in your applicationContext.xml file
  3. Define a bean of type org.springframework.batch.item.support.CompositeItemProcessor and set its delegates to the list of processor beans that you want to executed
  4. Define the chunk's processor to reference the CompositeItemProcessor

Considering we have a hypothetical ProductFilterProcessor, we could write the process as follows:


<bean id="productFilterProcessor" class="com.geekcap.javaworld.springbatchexample.simple.processor.ProductFilterItemProcessor" />

<bean id="productProcessor" class="com.geekcap.javaworld.springbatchexample.simple.processor.ProductItemProcessor" />

<bean id="productCompositeProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor">
	<property name="delegates">
		<list>
			<ref bean="productFilterProcessor" />
			<ref bean="productProcessor" />
		</list>
	</property>
</bean>

Then simply modify the job configuration, like so:


    <job id="simpleFileImportJob" xmlns="http://www.springframework.org/schema/batch">
        <step id="importFileStep">
            <tasklet>
                <chunk reader="productReader" processor="productCompositeProcessor" writer="productWriter" commit-interval="5" />
            </tasklet>
        </step>
    </job>

Tasklets

Chunking is a very good strategy for dividing work into, well, chunks: read items one-by-one, process them, and then write them out in a chunk. But what happens if you have a linear operation that you want to perform that needs to be performed once? For this you can build a tasklet. A tasklet can do whatever you need it to do! For example, it could download a file from an FTP site, decompress or decrypt a file, or invoke a web service to determine whether or not the file processing had been approved by an executive. Here's the basic process to build a tasklet:

  1. Define a class that implements org.springframework.batch.core.step.tasklet.Tasklet.
  2. Implement the execute() method.
  3. Return the appropriate org.springframework.batch.repeat.RepeatStatus value: CONTINUABLE or FINISHED.
  4. Define your bean in the applicationContext.xml file.
  5. Create a step that has a tasklet that references your bean.

Listing 8 shows the contents of a new tasklet that archives our input file by copying it to an archive directory.

Listing 8. ArchiveProductImportFileTasklet.java


package com.geekcap.javaworld.springbatchexample.simple.tasklet;

import org.apache.commons.io.FileUtils;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;

import java.io.File;

/**
 * A tasklet that archives the input file
 */
public class ArchiveProductImportFileTasklet implements Tasklet
{
    private String inputFile;

    @Override
    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception
    {
        // Make our destination directory and copy our input file to it
        File archiveDir = new File( "archive" );
        FileUtils.forceMkdir( archiveDir );
        FileUtils.copyFileToDirectory( new File( inputFile ), archiveDir );

        // We're done...
        return RepeatStatus.FINISHED;
    }

    public String getInputFile() {
        return inputFile;
    }

    public void setInputFile(String inputFile) {
        this.inputFile = inputFile;
    }
}

The ArchiveProductImportFileTasklet class implements the Tasklet interface and provides an implementation of the execute() method. It uses the Apache Commons I/O FileUtils class to create a new archive directory and then copies the input file to it.

As far as the bean definition goes, the following bean is added to the applicationContext.xml file:


    <bean id="archiveFileTasklet" class="com.geekcap.javaworld.springbatchexample.simple.tasklet.ArchiveProductImportFileTasklet" scope="step">
        <property name="inputFile" value="#{jobParameters['inputFile']}" />
    </bean>

Note that we pass the inputFile job parameter to the bean and that the bean has step scope to ensure that the job parameter is defined before the bean is created.

Listing 9 shows the updated Job.

Listing 9. file-import-job.xml


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
                http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd">


    <!-- Import our beans -->
    <import resource="classpath:/applicationContext.xml" />

    <job id="simpleFileImportJob" xmlns="http://www.springframework.org/schema/batch">
        <step id="importFileStep" next="archiveFileStep">
            <tasklet>
                <chunk reader="productReader" processor="productProcessor" writer="productWriter" commit-interval="5" />
            </tasklet>
        </step>
        <step id="archiveFileStep">
            <tasklet ref="archiveFileTasklet" />
        </step>
    </job>

</beans>

Listing 9 adds a new step to the file import job named archiveFileStep and then configures it to be the "next" step after the importFileStep. The "next" parameter allows you to control the flow of steps to orchestrate your job. And while it is beyond the scope of this article, note that you can define special decision steps to cause the job to branch based on the completion status of a task. The archiveFileStep contains a single tasklet that references the bean that we created above.

Resiliency

Spring Batch job resiliency gives you three tools:

  1. Skip: If a single element in your processing is incorrect, such as an improperly formatted line in your CSV file, then you have the option to skip that object and continue processing the next one.
  2. Retry: If an error occurs that is likely to be resolved by retrying the processing again in a few milliseconds, then you have the option to ask Spring Batch to retry that element. For example, you might be updating a record in the database, but another query has that item locked. Chances are that the locked record will be released shortly and retrying might succeed.
  3. Restart: If the job is configured to store its state in a database and it fails, then you have the option to restart that job instance and continue where you left off.

While I won't go through the details of each resiliency feature, I did want to summarize the options available.

Skipping Items

Sometimes you might want to skip either invalid records from a reader or exceptions that occur during processing or writing. To do so, you can specify two things:

  • Define a skip-limit on your chunk element to tell Spring how many items can be skipped before the job fails (you might handle a few invalid records, but if you have too many then the input data might be invalid).
  • Define a list of skippable-exception-classes that trigger the record to be skipped; you can define both include elements for exceptions that will be skipped and exclude elements for exceptions that will not be skipped (used in the case when you want to skip exceptions in a hierarchy, but exclude one or more of its sub-classes).

For example:


    <job id="simpleFileImportJob" xmlns="http://www.springframework.org/schema/batch">
        <step id="importFileStep">
            <tasklet>
                <chunk reader="productReader" processor="productProcessor" writer="productWriter" commit-interval="5" skip-limit="10">
			<skippable-exception-classes>
				<include class="org.springframework.batch.item.file.FlatFileParseException" />
			</skippable-exception-classes>
		</chunk>
            </tasklet>
        </step>
    </job>

In this case, the record in which a FlatFileParseException that is thrown will be skipped. If there are more than 10 skips then the job fails.

Retrying Items

In other circumstances, exceptions may occur at times when retries are possible, such as failures due to database locks. Retries are implemented very similarly to skips:

  • Define a retry-limit on your chunk element to tell Spring how many times an item can be retried before it is considered failed. Once a record has failed then it fails the job, unless you combine retries with skips.
  • Define a list of retryable-exception-classes that trigger the record to be replayed; you can define both include elements for exceptions that will be retried and exclude elements for exceptions that will not be re-tried.

For example:


    <job id="simpleFileImportJob" xmlns="http://www.springframework.org/schema/batch">
        <step id="importFileStep">
            <tasklet>
                <chunk reader="productReader" processor="productProcessor" writer="productWriter" commit-interval="5" retry-limit="5">
			<retryable-exception-classes>
				<include class="org.springframework.dao.OptimisticLockingFailureException" />
			</retryable-exception-classes>
		</chunk>
            </tasklet>
        </step>
    </job>

And you can combine both retries and skippable exceptions by defining a skippable exception class that matches the retry exception. So if you have an exception that triggers 5 replays, after 5 replays, if it is also in the skippable list, then the record will be skipped. If the exception is not in the skippable list then after 5 retries it will fail the entire job.

Restarting jobs

Finally, for jobs that do fail, you have the option to restart them and have them pick up exactly where they left off. In order to do this you need to start the job instance using the same job parameters and Spring Batch will find the job instance in the database and continue. You do have the option to refuse to allow restarts and you can control the number times that a step in a job can be restarted (after some number of retries you might want to just give up.)

In conclusion

Some business problems are best solved using batch processing and Spring Batch provides a framework for implementing batch jobs. Spring Batch defines a chunking paradigm with three phases: read, process, and write, as well as support for reading from and writing to common resources. This installment in the Open source Java projects series has explored what Spring Batch does and how to use it.

We started by building a simple job to import products from a CSV file into a database, then extended that job by adding a processor to manage product quantities. Finally, we wrote a separate tasklet to archive the input file. While not part of the example, Spring Batch's resiliency features are important, so I quickly reviewed three resiliency tools that Spring Batch provides: skipping records, retrying records, and restarting batch jobs.

This article has only scratched the surface of Spring Batch's capabilities, but I hope it has given you enough to start with to build your own Spring Batch jobs.