Open source Java projects: Spring Batch

Reading and writing CSV files with Spring Batch and MySQL

1 2 3 4 Page 3
Page 3 of 4

Figure 3. Jobs, steps, tasklets, and chunks

osjp spring batch fig3

In our example, the simpleFileImportJob contains a single step named importFileStep. The importFileStep contains an unnamed tasklet that contains a chunk. The chunk is configured with a reference to our productReader and productWriter. It defines a commit-interval of 5, which means that it will send the writer five records at once. The step will read five products using the productReader and then pass those products to the productWriter to be written out. This chuck repeats until all of the data is exhausted.

Listing 5 also imports the applicationContext.xml file, which contains all of our beans. Jobs are typically defined in separate files; this is because the job launcher requires a job file and a job name when it is executed. Everything could be defined in one file, but it would quickly become unwieldy, so as a convention, the job is defined in one file and it imports all dependent files.

Finally, you may notice that the XML namespace (xmlns) is defined inside the job node. We do this so that we do not need to preface each and every node with "batch:." Defining the namespace at the node level affects both the node that defines it and all child nodes.

Build the project

Listing 6 shows the contents of the POM file that builds this sample project.

Listing 6. pom.xml


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.geekcap.javaworld</groupId>
  <artifactId>spring-batch-example</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>spring-batch-example</name>
  <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring.version>3.2.1.RELEASE</spring.version>
        <spring.batch.version>2.2.1.RELEASE</spring.batch.version>
        <java.version>1.6</java.version>
    </properties>

    <dependencies>
        <!-- Spring Dependencies -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jdbc</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-core</artifactId>
            <version>${spring.batch.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-infrastructure</artifactId>
            <version>${spring.batch.version}</version>
        </dependency>

        <!-- Apache DBCP-->
        <dependency>
            <groupId>commons-dbcp</groupId>
            <artifactId>commons-dbcp</artifactId>
            <version>1.4</version>
        </dependency>

        <!-- MySQL -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.27</version>
        </dependency>


        <!-- Testing -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy</id>
                        <phase>install</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
        <finalName>spring-batch-example</finalName>
    </build>


</project>

The POM file imports Spring's context, core, beans, and JDBC packages, and then it imports the Spring Batch core and infrastructure packages. These dependencies set up Spring and Spring Batch. It also imports the Apache DBCP dependency to allow us to set up a database connection pool and the MySQL driver. The plug-in section defines the build to use Java 1.6 and configures the build to copy all dependencies to the lib directory. Use the following command to build the project:

mvn clean install

Connecting Spring Batch to a database

Your job is all set up but you'll need to connect Spring Batch to a database if you want to run it in a production environment. Spring Batch maintains a set of tables that it uses to record the current state of jobs and the records that have been processed. This way, if a job does need to be restarted, it can continue from where it left off.

You can connect Spring Batch to any database that you like, but for the purposes of this demo we'll use MySQL. Please download MySQL to follow the examples. The community version is free and will meet your needs. Review the installation instructions for your operating system to get an environment up and running.

Once you have MySQL set up you'll need to create the database and a user that has permissions to interact with that database. From the command-line, launch mysql from MySQL's bin directory and execute the following commands (note that you may need to execute mysql as root or using sudo, depending on your operating system):


create database spring_batch_example;
create user 'sbe'@'localhost' identified by 'sbe';
grant all on spring_batch_example.* to 'sbe'@'localhost';

The first line creates a new database named spring_batch_example, which will maintain your products. The second line creates a user named sbe (for Spring Batch Example) with the password sbe. The last line grants all permissions on the spring_batch_example database to the sbe user.

Next, create the PRODUCT table with the following command:


CREATE TABLE PRODUCT (
	ID INT NOT NULL,
	NAME VARCHAR(128) NOT NULL,
	DESCRIPTION VARCHAR(128),
	QUANTITY INT,
	PRIMARY KEY(ID)
);

Now create a file named sample.csv in your project's target directory with the following data:


id,name,description,quantity
1,Product One,This is product 1, 10
2,Product Two,This is product 2, 20
3,Product Three,This is product 3, 30
4,Product Four,This is product 4, 20
5,Product Five,This is product 5, 10
6,Product Six,This is product 6, 50
7,Product Seven,This is product 7, 80
8,Product Eight,This is product 8, 90

The batch job can be launched with this:


java -cp spring-batch-example.jar:./lib/* org.springframework.batch.core.launch.support.CommandLineJobRunner classpath:/jobs/file-import-job.xml simpleFileImportJob inputFile=sample.csv

The CommandLineJobRunner class is a Spring Batch class that executes a job. It requires the name of the XML file that contains job, the name of the job to execute, and optionally any job parameters that you want to send to it. Because the file-import-job.xml file is inside the JAR file, it can be accessed as follows: classpath:/jobs/file-import-job.xml. We want to execute the job named simpleFileImportJob and pass a single job parameter named inputFile, with the value of sample.csv.

This should yield output similar to the following:


Nov 12, 2013 4:09:17 PM org.springframework.context.support.AbstractApplicationContext prepareRefresh
INFO: Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@6b4da8f4: startup date [Tue Nov 12 16:09:17 EST 2013]; root of context hierarchy
Nov 12, 2013 4:09:17 PM org.springframework.beans.factory.xml.XmlBeanDefinitionReader loadBeanDefinitions
INFO: Loading XML bean definitions from class path resource [jobs/file-import-job.xml]
Nov 12, 2013 4:09:18 PM org.springframework.beans.factory.xml.XmlBeanDefinitionReader loadBeanDefinitions
INFO: Loading XML bean definitions from class path resource [applicationContext.xml]
Nov 12, 2013 4:09:19 PM org.springframework.beans.factory.support.DefaultListableBeanFactory registerBeanDefinition
INFO: Overriding bean definition for bean 'simpleFileImportJob': replacing [Generic bean: class [org.springframework.batch.core.configuration.xml.SimpleFlowFactoryBean]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null] with [Generic bean: class [org.springframework.batch.core.configuration.xml.JobParserJobFactoryBean]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null]
Nov 12, 2013 4:09:19 PM org.springframework.beans.factory.support.DefaultListableBeanFactory registerBeanDefinition
INFO: Overriding bean definition for bean 'productReader': replacing [Generic bean: class [org.springframework.batch.item.file.FlatFileItemReader]; scope=step; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=false; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null; defined in class path resource [applicationContext.xml]] with [Root bean: class [org.springframework.aop.scope.ScopedProxyFactoryBean]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null; defined in BeanDefinition defined in class path resource [applicationContext.xml]]
Nov 12, 2013 4:09:19 PM org.springframework.beans.factory.support.DefaultListableBeanFactory preInstantiateSingletons
INFO: Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@6aba4211: defining beans [org.springframework.context.annotation.internalConfigurationAnnotationProcessor,org.springframework.context.annotation.internalAutowiredAnnotationProcessor,org.springframework.context.annotation.internalRequiredAnnotationProcessor,org.springframework.context.annotation.internalCommonAnnotationProcessor,dataSource,transactionManager,jdbcTemplate,jobRepository,jobLauncher,productReader,productWriter,org.springframework.batch.core.scope.internalStepScope,org.springframework.beans.factory.config.CustomEditorConfigurer,org.springframework.batch.core.configuration.xml.CoreNamespacePostProcessor,importFileStep,simpleFileImportJob,org.springframework.context.annotation.ConfigurationClassPostProcessor.importAwareProcessor,scopedTarget.productReader]; root of factory hierarchy
Nov 12, 2013 4:09:19 PM org.springframework.batch.core.launch.support.SimpleJobLauncher afterPropertiesSet
INFO: No TaskExecutor has been set, defaulting to synchronous executor.
Nov 12, 2013 4:09:22 PM org.springframework.batch.core.launch.support.SimpleJobLauncher$1 run
INFO: Job: [FlowJob: [name=simpleFileImportJob]] launched with the following parameters: [{inputFile=sample.csv}]
Nov 12, 2013 4:09:22 PM org.springframework.batch.core.job.SimpleStepHandler handleStep
INFO: Executing step: [importFileStep]
Nov 12, 2013 4:09:22 PM org.springframework.batch.core.launch.support.SimpleJobLauncher$1 run
INFO: Job: [FlowJob: [name=simpleFileImportJob]] completed with the following parameters: [{inputFile=sample.csv}] and the following status: [COMPLETED]
Nov 12, 2013 4:09:22 PM org.springframework.context.support.AbstractApplicationContext doClose
INFO: Closing org.springframework.context.support.ClassPathXmlApplicationContext@6b4da8f4: startup date [Tue Nov 12 16:09:17 EST 2013]; root of context hierarchy
Nov 12, 2013 4:09:22 PM org.springframework.beans.factory.support.DefaultSingletonBeanRegistry destroySingletons
INFO: Destroying singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@6aba4211: defining beans [org.springframework.context.annotation.internalConfigurationAnnotationProcessor,org.springframework.context.annotation.internalAutowiredAnnotationProcessor,org.springframework.context.annotation.internalRequiredAnnotationProcessor,org.springframework.context.annotation.internalCommonAnnotationProcessor,dataSource,transactionManager,jdbcTemplate,jobRepository,jobLauncher,productReader,productWriter,org.springframework.batch.core.scope.internalStepScope,org.springframework.beans.factory.config.CustomEditorConfigurer,org.springframework.batch.core.configuration.xml.CoreNamespacePostProcessor,importFileStep,simpleFileImportJob,org.springframework.context.annotation.ConfigurationClassPostProcessor.importAwareProcessor,scopedTarget.productReader]; root of factory hierarchy

Verify that the PRODUCT table in the database contains eight rows and that they have the correct values.

Batch processing with Spring Batch

At this point, the example reads data from a CSV file and imports it into the database. While this is useful, it's likely that you'll sometimes want to transform or filter your data before inserting it into the database. In this section we'll build a simple processor that, rather than overwriting the quantity of a product, instead retrieves the existing record from the database and then adds the quantity in the CSV file to the product before passing it to the writer.

Listing 7 shows the source code for the ProductItemProcessor class.

Listing 7. ProductItemProcessor.java


package com.geekcap.javaworld.springbatchexample.simple.processor;

import com.geekcap.javaworld.springbatchexample.simple.model.Product;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

/**
 * Processor that finds existing products and updates a product quantity appropriately
 */
public class ProductItemProcessor implements ItemProcessor<Product,Product>
{
    private static final String GET_PRODUCT = "select * from PRODUCT where id = ?";
    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Override
    public Product process(Product product) throws Exception
    {
        // Retrieve the product from the database
        List<Product> productList = jdbcTemplate.query(GET_PRODUCT, new Object[] {product.getId()}, new RowMapper<Product>() {
            @Override
            public Product mapRow( ResultSet resultSet, int rowNum ) throws SQLException {
                Product p = new Product();
                p.setId( resultSet.getInt( 1 ) );
                p.setName( resultSet.getString( 2 ) );
                p.setDescription( resultSet.getString( 3 ) );
                p.setQuantity( resultSet.getInt( 4 ) );
                return p;
            }
        });

        if( productList.size() > 0 )
        {
            // Add the new quantity to the existing quantity
            Product existingProduct = productList.get( 0 );
            product.setQuantity( existingProduct.getQuantity() + product.getQuantity() );
        }

        // Return the (possibly) update prduct
        return product;
    }
}

Item processors implement the interface ItemProcessor<I,O>, where I is the type of object sent to the processor and O is the type of object returned by the processor. In this example we pass in a Product and then return a Product. The ItemProcessor defines a single method: process(), in which we execute a SELECT query to retrieve the Product with the specified id from the database. If the Product is found, it will add the existing Product's quantity to the new quantity.

This processor doesn't do any filtering, but if the process() method returned null then Spring Batch would omit this item from the list to be sent to the writer.

Wiring this into the job is quite simple. First, add a new bean to the applicationContext.xml file:


<bean id="productProcessor" class="com.geekcap.javaworld.springbatchexample.simple.processor.ProductItemProcessor" />

Next, reference it in the chunk as the processor:


    <job id="simpleFileImportJob" xmlns="http://www.springframework.org/schema/batch">
        <step id="importFileStep">
            <tasklet>
                <chunk reader="productReader" processor="productProcessor" writer="productWriter" commit-interval="5" />
            </tasklet>
        </step>
    </job>

Build and execute the job and you should see the product quantity in the database increasing each time you run this batch job.

Building multiple processors

We've defined a single processor, but at some point you might want to build several finely-grained item processors and execute all of them successively in the same chunk. For example, you might have a filter to skip over items that don't exist in the database and a processor that correctly manages the item quantity. If this is the case then you can use Spring Batch's CompositeItemProcessor. The process is as follows:

  1. Build the processor classes
  2. Define the processor beans in your applicationContext.xml file
  3. Define a bean of type org.springframework.batch.item.support.CompositeItemProcessor and set its delegates to the list of processor beans that you want to executed
  4. Define the chunk's processor to reference the CompositeItemProcessor
1 2 3 4 Page 3
Page 3 of 4