Open source Java projects: Spring Batch

Reading and writing CSV files with Spring Batch and MySQL

1 2 3 4 Page 4
Page 4 of 4

Considering we have a hypothetical ProductFilterProcessor, we could write the process as follows:


<bean id="productFilterProcessor" class="com.geekcap.javaworld.springbatchexample.simple.processor.ProductFilterItemProcessor" />

<bean id="productProcessor" class="com.geekcap.javaworld.springbatchexample.simple.processor.ProductItemProcessor" />

<bean id="productCompositeProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor">
	<property name="delegates">
		<list>
			<ref bean="productFilterProcessor" />
			<ref bean="productProcessor" />
		</list>
	</property>
</bean>

Then simply modify the job configuration, like so:


    <job id="simpleFileImportJob" xmlns="http://www.springframework.org/schema/batch">
        <step id="importFileStep">
            <tasklet>
                <chunk reader="productReader" processor="productCompositeProcessor" writer="productWriter" commit-interval="5" />
            </tasklet>
        </step>
    </job>

Tasklets

Chunking is a very good strategy for dividing work into, well, chunks: read items one-by-one, process them, and then write them out in a chunk. But what happens if you have a linear operation that you want to perform that needs to be performed once? For this you can build a tasklet. A tasklet can do whatever you need it to do! For example, it could download a file from an FTP site, decompress or decrypt a file, or invoke a web service to determine whether or not the file processing had been approved by an executive. Here's the basic process to build a tasklet:

  1. Define a class that implements org.springframework.batch.core.step.tasklet.Tasklet.
  2. Implement the execute() method.
  3. Return the appropriate org.springframework.batch.repeat.RepeatStatus value: CONTINUABLE or FINISHED.
  4. Define your bean in the applicationContext.xml file.
  5. Create a step that has a tasklet that references your bean.

Listing 8 shows the contents of a new tasklet that archives our input file by copying it to an archive directory.

Listing 8. ArchiveProductImportFileTasklet.java


package com.geekcap.javaworld.springbatchexample.simple.tasklet;

import org.apache.commons.io.FileUtils;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;

import java.io.File;

/**
 * A tasklet that archives the input file
 */
public class ArchiveProductImportFileTasklet implements Tasklet
{
    private String inputFile;

    @Override
    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception
    {
        // Make our destination directory and copy our input file to it
        File archiveDir = new File( "archive" );
        FileUtils.forceMkdir( archiveDir );
        FileUtils.copyFileToDirectory( new File( inputFile ), archiveDir );

        // We're done...
        return RepeatStatus.FINISHED;
    }

    public String getInputFile() {
        return inputFile;
    }

    public void setInputFile(String inputFile) {
        this.inputFile = inputFile;
    }
}

The ArchiveProductImportFileTasklet class implements the Tasklet interface and provides an implementation of the execute() method. It uses the Apache Commons I/O FileUtils class to create a new archive directory and then copies the input file to it.

As far as the bean definition goes, the following bean is added to the applicationContext.xml file:


    <bean id="archiveFileTasklet" class="com.geekcap.javaworld.springbatchexample.simple.tasklet.ArchiveProductImportFileTasklet" scope="step">
        <property name="inputFile" value="#{jobParameters['inputFile']}" />
    </bean>

Note that we pass the inputFile job parameter to the bean and that the bean has step scope to ensure that the job parameter is defined before the bean is created.

Listing 9 shows the updated Job.

Listing 9. file-import-job.xml


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
                http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd">


    <!-- Import our beans -->
    <import resource="classpath:/applicationContext.xml" />

    <job id="simpleFileImportJob" xmlns="http://www.springframework.org/schema/batch">
        <step id="importFileStep" next="archiveFileStep">
            <tasklet>
                <chunk reader="productReader" processor="productProcessor" writer="productWriter" commit-interval="5" />
            </tasklet>
        </step>
        <step id="archiveFileStep">
            <tasklet ref="archiveFileTasklet" />
        </step>
    </job>

</beans>

Listing 9 adds a new step to the file import job named archiveFileStep and then configures it to be the "next" step after the importFileStep. The "next" parameter allows you to control the flow of steps to orchestrate your job. And while it is beyond the scope of this article, note that you can define special decision steps to cause the job to branch based on the completion status of a task. The archiveFileStep contains a single tasklet that references the bean that we created above.

Resiliency

Spring Batch job resiliency gives you three tools:

  1. Skip: If a single element in your processing is incorrect, such as an improperly formatted line in your CSV file, then you have the option to skip that object and continue processing the next one.
  2. Retry: If an error occurs that is likely to be resolved by retrying the processing again in a few milliseconds, then you have the option to ask Spring Batch to retry that element. For example, you might be updating a record in the database, but another query has that item locked. Chances are that the locked record will be released shortly and retrying might succeed.
  3. Restart: If the job is configured to store its state in a database and it fails, then you have the option to restart that job instance and continue where you left off.

While I won't go through the details of each resiliency feature, I did want to summarize the options available.

Skipping Items

Sometimes you might want to skip either invalid records from a reader or exceptions that occur during processing or writing. To do so, you can specify two things:

  • Define a skip-limit on your chunk element to tell Spring how many items can be skipped before the job fails (you might handle a few invalid records, but if you have too many then the input data might be invalid).
  • Define a list of skippable-exception-classes that trigger the record to be skipped; you can define both include elements for exceptions that will be skipped and exclude elements for exceptions that will not be skipped (used in the case when you want to skip exceptions in a hierarchy, but exclude one or more of its sub-classes).

For example:


    <job id="simpleFileImportJob" xmlns="http://www.springframework.org/schema/batch">
        <step id="importFileStep">
            <tasklet>
                <chunk reader="productReader" processor="productProcessor" writer="productWriter" commit-interval="5" skip-limit="10">
			<skippable-exception-classes>
				<include class="org.springframework.batch.item.file.FlatFileParseException" />
			</skippable-exception-classes>
		</chunk>
            </tasklet>
        </step>
    </job>

In this case, the record in which a FlatFileParseException that is thrown will be skipped. If there are more than 10 skips then the job fails.

Retrying Items

In other circumstances, exceptions may occur at times when retries are possible, such as failures due to database locks. Retries are implemented very similarly to skips:

  • Define a retry-limit on your chunk element to tell Spring how many times an item can be retried before it is considered failed. Once a record has failed then it fails the job, unless you combine retries with skips.
  • Define a list of retryable-exception-classes that trigger the record to be replayed; you can define both include elements for exceptions that will be retried and exclude elements for exceptions that will not be re-tried.

For example:


    <job id="simpleFileImportJob" xmlns="http://www.springframework.org/schema/batch">
        <step id="importFileStep">
            <tasklet>
                <chunk reader="productReader" processor="productProcessor" writer="productWriter" commit-interval="5" retry-limit="5">
			<retryable-exception-classes>
				<include class="org.springframework.dao.OptimisticLockingFailureException" />
			</retryable-exception-classes>
		</chunk>
            </tasklet>
        </step>
    </job>

And you can combine both retries and skippable exceptions by defining a skippable exception class that matches the retry exception. So if you have an exception that triggers 5 replays, after 5 replays, if it is also in the skippable list, then the record will be skipped. If the exception is not in the skippable list then after 5 retries it will fail the entire job.

Restarting jobs

Finally, for jobs that do fail, you have the option to restart them and have them pick up exactly where they left off. In order to do this you need to start the job instance using the same job parameters and Spring Batch will find the job instance in the database and continue. You do have the option to refuse to allow restarts and you can control the number times that a step in a job can be restarted (after some number of retries you might want to just give up.)

In conclusion

Some business problems are best solved using batch processing and Spring Batch provides a framework for implementing batch jobs. Spring Batch defines a chunking paradigm with three phases: read, process, and write, as well as support for reading from and writing to common resources. This installment in the Open source Java projects series has explored what Spring Batch does and how to use it.

We started by building a simple job to import products from a CSV file into a database, then extended that job by adding a processor to manage product quantities. Finally, we wrote a separate tasklet to archive the input file. While not part of the example, Spring Batch's resiliency features are important, so I quickly reviewed three resiliency tools that Spring Batch provides: skipping records, retrying records, and restarting batch jobs.

This article has only scratched the surface of Spring Batch's capabilities, but I hope it has given you enough to start with to build your own Spring Batch jobs.

1 2 3 4 Page 4
Page 4 of 4