Finally, last part of the blog series! Today we’ll have a quick look at scaled batch jobs, done via partitioning and multi-threaded step.
 This is the sixth post about the new Java based configuration features in Spring Batch 2.2. Previous posts are about a comparison between the new Java DSL and XML , JobParameters, ExecutionContexts and StepScope , profiles and environments , job inheritance  and modular configurations . You can find the JavaConfig code examples on Github .
 
Partitioning
I won’t explain partitioning in detail here, just this: with partitioning you need to find a way to partition your data. Each partition of data gets its own StepExecution and will be executed in its own thread. The most important interface here is the Partitioner.
 Of course, when working with different threads, we’ll need a source of those threads, and that’ll be a TaskExecutor. Since that’s a very low level component, we add it to the InfrastructureConfiguration interface:
1public interface InfrastructureConfiguration {
2 
3    @Bean
4    public abstract DataSource dataSource();
5 
6    @Bean
7    public abstract TaskExecutor taskExecutor();
8 
9}
For testing environments, this can be an implementation:
1@Configuration
2@EnableBatchProcessing
3public class StandaloneInfrastructureConfiguration implements InfrastructureConfiguration {
4 
5    @Bean
6    public DataSource dataSource(){
7        EmbeddedDatabaseBuilder embeddedDatabaseBuilder = new EmbeddedDatabaseBuilder();
8        return embeddedDatabaseBuilder.addScript("classpath:org/springframework/batch/core/schema-drop-hsqldb.sql")
9                .addScript("classpath:org/springframework/batch/core/schema-hsqldb.sql")
10                .addScript("classpath:schema-partner.sql")
11                .setType(EmbeddedDatabaseType.HSQL)
12                .build();
13    }
14 
15    @Bean
16    public TaskExecutor taskExecutor() {
17        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
18        taskExecutor.setMaxPoolSize(4);
19        taskExecutor.afterPropertiesSet();
20        return taskExecutor;
21    }
22 
23}
The job that I used as an example during the last blog posts read data from one file and wrote that data to a database. Now we want to read data from more than one file, and we want a partition for each file.
 Let’s take a look at the important parts of the job configuration:
1@Bean
2    public Job flatfileToDbPartitioningJob(){
3        return jobBuilders.get("flatfileToDbPartitioningJob")
4                .listener(protocolListener())
5                .start(partitionStep())
6                .build();
7    }
8 
9    @Bean
10    public Step partitionStep(){
11        return stepBuilders.get("partitionStep")
12                .partitioner(flatfileToDbStep())
13                .partitioner("flatfileToDbStep", partitioner())
14                .taskExecutor(infrastructureConfiguration.taskExecutor())
15                .build();
16    }
17 
18    @Bean
19    public Step flatfileToDbStep(){
20        return stepBuilders.get("flatfileToDbStep")
21                .<Partner,Partner>chunk(1)
22                .reader(reader())
23                .processor(processor())
24                .writer(writer())
25                .listener(logProcessListener())
26                .build();
27    }
28 
29    @Bean
30    public Partitioner partitioner(){
31        MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
32        Resource[] resources;
33        try {
34            resources = resourcePatternResolver.getResources("file:src/test/resources/*.csv");
35        } catch (IOException e) {
36            throw new RuntimeException("I/O problems when resolving the input file pattern.",e);
37        }
38        partitioner.setResources(resources);
39        return partitioner;
40    }
We defined a Partitioner that’s looking for csv files in a special location and creating a partition for each file. We defined the step like we did it in the other examples, and then we defined a special partitionStep that’s combining our standard step, the partitioner and the TaskExecutor. And finally, the job is using that partitionStep.
Multi-threaded step
This is a quite simple way of scaling, it just adds some more threads to the processing of a step. Since reading from a file isn’t suitable for this kind of scaling we need a new use case, and it’ll be reading from a queue and writing to a log file. We need some more infrastructure for it:
1public interface InfrastructureConfiguration {
2 
3    @Bean
4    public abstract DataSource dataSource();
5 
6    @Bean
7    public abstract TaskExecutor taskExecutor();
8 
9    @Bean
10    public abstract ConnectionFactory connectionFactory();
11 
12    @Bean
13    public abstract Queue queue();
14 
15    @Bean
16    public abstract JmsTemplate jmsTemplate();
17 
18}
We are using ActiveMQ in a test environment:
1@Configuration
2@EnableBatchProcessing
3public class StandaloneInfrastructureConfiguration implements InfrastructureConfiguration {
4 
5    @Bean
6    public DataSource dataSource(){
7        EmbeddedDatabaseBuilder embeddedDatabaseBuilder = new EmbeddedDatabaseBuilder();
8        return embeddedDatabaseBuilder.addScript("classpath:org/springframework/batch/core/schema-drop-hsqldb.sql")
9                .addScript("classpath:org/springframework/batch/core/schema-hsqldb.sql")
10                .addScript("classpath:schema-partner.sql")
11                .setType(EmbeddedDatabaseType.HSQL)
12                .build();
13    }
14 
15    @Bean
16    public TaskExecutor taskExecutor() {
17        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
18        taskExecutor.setMaxPoolSize(4);
19        taskExecutor.afterPropertiesSet();
20        return taskExecutor;
21    }
22 
23    @Bean
24    public ConnectionFactory connectionFactory() {
25        return new ActiveMQConnectionFactory("tcp://localhost:61616");
26    }
27 
28    @Bean
29    public Queue queue() {
30        return new ActiveMQQueue("queueName");
31    }
32 
33    @Bean
34    public BrokerService broker() throws Exception{
35        BrokerService broker = new BrokerService();
36        // configure the broker
37        broker.addConnector("tcp://localhost:61616");
38        broker.start();
39        return broker;
40    }
41 
42    @Bean
43    public JmsTemplate jmsTemplate(){
44        JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory());
45        jmsTemplate.setDefaultDestination(queue());
46        jmsTemplate.setReceiveTimeout(500);
47        return jmsTemplate;
48    }
49 
50}
The job configuration is quite simple then:
1@Configuration
2public class MultiThreadedStepJobConfiguration {
3 
4    @Autowired
5    private JobBuilderFactory jobBuilders;
6 
7    @Autowired
8    private StepBuilderFactory stepBuilders;
9 
10    @Autowired
11    private InfrastructureConfiguration infrastructureConfiguration;
12 
13    @Bean
14    public Job multiThreadedStepJob(){
15        return jobBuilders.get("multiThreadedStepJob")
16                .listener(protocolListener())
17                .start(step())
18                .build();
19    }
20 
21    @Bean
22    public Step step(){
23        return stepBuilders.get("step")
24                .<String,String>chunk(1)
25                .reader(reader())
26                .processor(processor())
27                .writer(writer())
28                .taskExecutor(infrastructureConfiguration.taskExecutor())
29                .throttleLimit(4)
30                .build();
31    }
32 
33    @Bean
34    public JmsItemReader<String> reader(){
35        JmsItemReader<String> itemReader = new JmsItemReader<String>();
36        itemReader.setJmsTemplate(infrastructureConfiguration.jmsTemplate());
37        return itemReader;
38    }
39 
40    @Bean
41    public ItemProcessor<String,String> processor(){
42        return new LogItemProcessor<String>();
43    }
44 
45    @Bean
46    public ItemWriter<String> writer(){
47        return new LogItemWriter<String>();
48    }
49 
50    @Bean
51    public ProtocolListener protocolListener(){
52        return new ProtocolListener();
53    }
54 
55}
The difference to a job without any scaling is just the calls to taskExecutor and throttleLimit in the step definition.
Conclusion
Configuring scalability in Spring Batch jobs is easy in Java based configuration. And again, you can see the advantage of having an interface for the infrastructure configuration to easily switch between environments.
 I hope this blog series was useful for you, and if there are any questions, don’t hesitate to comment the blog posts!
More articles
fromTobias Flohre
Blog author
Tobias Flohre
Do you still have questions? Just send me a message.
Do you still have questions? Just send me a message.