Thursday, January 2, 2014

Spring Batch - Chunk Oriented Processing

This is continuation to post. In General, the batch processing needs to process tons tons of data instead of running simple tasks(as we saw in last post). Spring batch allows us to do this with Reader/Writer and Processors. As the name suggests, reader will read the data from the source, processor processes the source data and converts to output. Finally, Writers writes to the destination.
Looks simple, then how can we achieve chunk oriented processing with this. Spring simplified the process to the developers. We need just take care of from where to read(Reader), how to process it(Processor) and where and how to write the processed data(Writer).

Now will see the following with example:
  • Setup a job with steps ( This we already know)
  • Configure readers, writers and processor for each step
  • Conditional based routing to next step
  • Read and process one by one but commit once.
  • Exceptional cases
Spring config XML
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/batch  http://www.springframework.org/schema/batch/spring-batch-2.1.xsd">

 <!-- Transaction Manager -->
 <bean id="transactionManager"
    class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
  
 <!-- jobRepository Declaration -->
 <bean id="jobRepository"
  class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
  <property name="transactionManager" ref="transactionManager" />
 </bean>

 <!-- jobLauncher Declaration -->
 <bean id="jobLauncher"
  class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
  <property name="jobRepository" ref="jobRepository" />
 </bean>

 <!-- Readers -->
 <bean id="sucessTestReader" class="com.test.batch.TestReader">
  <property name="count" value="10" />
 </bean>
 
 <bean id="failedTestReader" class="com.test.batch.TestReader">
  <property name="raiseError" value="true" />
 </bean>

 <!-- Processor Bean Declaration -->
 <bean id="testProcessor" class="com.test.batch.TestProcessor" />

 <!-- Writer Bean Declaration -->
 <bean id="testWriter" class="com.test.batch.TestWriter" />
 
 <!-- Tasklet -->
 <bean id="testTasklet" class="com.test.batch.TestTasklet">
  <property name="fail" value="false" />
 </bean>
 <bean id="failTasklet" class="com.test.batch.TestTasklet" >
  <property name="fail" value="true" />
 </bean>

 <!-- Batch Job Declarations -->
 <batch:job id="sucessJob">
  <batch:step id="firstStep" next="secondStep">
   <batch:tasklet>
    <batch:chunk reader="sucessTestReader" processor="testProcessor"
     writer="testWriter" commit-interval="5" />
   </batch:tasklet>
  </batch:step>
  <batch:step id="secondStep">
   <batch:tasklet>
    <batch:chunk reader="sucessTestReader" processor="testProcessor"
     writer="testWriter" commit-interval="2" />
   </batch:tasklet>
  </batch:step>
 </batch:job>

 <batch:job id="anotherJob">
  <batch:step id="failStep">
   <batch:tasklet>
    <batch:chunk reader="failedTestReader" processor="testProcessor"
     writer="testWriter" commit-interval="2" />
   </batch:tasklet>
   <batch:next on="*" to="sucessStep" />
   <batch:next on="FAILED" to="failedStep" />
  </batch:step>
  <batch:step id="sucessStep">
   <batch:tasklet ref="testTasklet" />
  </batch:step>
  <batch:step id="failedStep">
   <batch:tasklet ref="failTasklet" />
  </batch:step>
 </batch:job>
</beans>

Explanation:
  • There are two jobs
    • sucessJob - Consists of two steps
      • firstStep - A combination of testReader, testWriter and testProcessor with commit-interval of 5. Once the step is completed, the job proceeds to secondStep (because next attribute is defined as secondStep). The commit-interval says that - five items will be read one by one and processed but the 5 items will be written at once
      • secondStep - Same as above but commit interval is 2
    • anotherJob - Consists of three steps
      • failStep - A combination of failedTestReader, testWriter and testProcessor with a commit-interval of 2.
        • Goes to sucessStep when completed properly
        • Goes to failedStep when there is an exception.
      • sucessStep - Job executes this step for all cases except when failed
      • failedStep - Job executes this step if the first step is failed or throws an exception.
  • Step can be defined as Tasklet(Simple task) or a combination of Reader, Writer and Processor (Chunk oriented processing). 
  • We can define next step either 
    • As attribute of step tag  - The next step will be executed irrespective of the success criteria
    • Separate tag inside step - The next step will be executed based on the outcome of current step.
  • Reader, Writer and Processor should implement ItemReader, ItemWriter and ItemProcessor respectively. (As shown in the next code snippet).
    • ItemReader has method read - which returns a generic type I (Should be an item which is to be read)
    • ItemProcessor has method process - takes I  as an input (which is read by Reader) and returns O (Should be an item which should be committed).
    • ItemWriter has method write - takes O as an input which is processed by Processor.
  • ItemWriter has the write method which takes list as an argument - Because commit will be done in batch (batch count will be equal to commit-interval).
  • Reader reads continuously (in a step), unless it returns a null. One reader returns null, the step will be completed after committing the batch. 
  • Writer commits the data once the reader read n (commit-interval) number of inputs and processor processed them or reader completed reading all the inputs (returns null).
  • When an exception occurs, writer ignores the data to write. This case need to handled explicitly - (Using failed step in the above XML). 
The below is the code representing the above jobs and their reader, writer, processor and tasklets with input and output data objects.

Source

Source.java - Which is used an input - prepared by Reader
package com.test.entity;

public class Source {
 private String input;
        //getters and setters
}
Destination.java - Which is prepared by Processor and used by Writer
package com.test.entity;

public class Destination {
 private String output;
        //getters and setters
}
TestReader.java
package com.test.batch;

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import com.test.entity.Source;

public class TestReader implements ItemReader<Source> {
 
 private int count = 10;
 private boolean raiseError = false;
 private static int internalCount = 0;

 public Source read() throws Exception, UnexpectedInputException,
   ParseException, NonTransientResourceException {
  
  if(raiseError)
  {
   System.out.println("Exception occured while reading");
   throw new Exception("New Exception");
  }
  internalCount++;
  if(internalCount >= count)
  {
   System.out.println("Reached max count "+internalCount);
   return null;
  }
  Source source = new Source();
  source.setInput("Input "+internalCount);
  System.out.println("Reading item "+internalCount);
  return source;
 }
        //Getters and Setters
}
TestProcessor.java
package com.test.batch;

import org.springframework.batch.item.ItemProcessor;
import com.test.entity.Destination;
import com.test.entity.Source;

public class TestProcessor implements ItemProcessor {

 public Destination process(Source source) throws Exception {
  Destination destination = new Destination();
  destination.setOutput(source.getInput().replace("Input", "Output"));
  System.out.println("Converted "+source.getInput()+" to "+destination.getOutput());
  return destination;
 }

}
TestWriter.java
package com.test.batch;

import java.util.List;
import org.springframework.batch.item.ItemWriter;
import com.test.entity.Destination;

public class TestWriter implements ItemWriter {

 public void write(List arg0) throws Exception {
  for(Destination dest : arg0)
  {
   System.out.println("Writing : "+dest.getOutput());
  }
  System.out.println("Finished Writing");
 }

}
TestTasklet.java
package com.test.batch;

import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;

public class TestTasklet implements Tasklet {

 private boolean fail = false;

 public RepeatStatus execute(StepContribution arg0, ChunkContext arg1)
   throws Exception {
  if (!fail) {
   System.out.println("Finished sucessfully");
   return RepeatStatus.FINISHED;
  } else {
   System.out.println("Exception... So rollback should take place");
   return RepeatStatus.FINISHED;
  }
 }
        //Getters and Setters
}

Output
  • When we run the job sucessJob
INFO: Executing step: [firstStep]
Reading item 1
Reading item 2
Reading item 3
Reading item 4
Reading item 5
Converted Input 1 to Output 1
Converted Input 2 to Output 2
Converted Input 3 to Output 3
Converted Input 4 to Output 4
Converted Input 5 to Output 5
Writing : Output 1
Writing : Output 2
Writing : Output 3
Writing : Output 4
Writing : Output 5
Finished Writing
Reading item 6
Reading item 7
Reading item 8
Reading item 9
Reached max count 10
Converted Input 6 to Output 6
Converted Input 7 to Output 7
Converted Input 8 to Output 8
Converted Input 9 to Output 9
Writing : Output 6
Writing : Output 7
Writing : Output 8
Writing : Output 9
Finished Writing
INFO: Executing step: [secondStep]
Reached max count 11
INFO: Job: [FlowJob: [name=sucessJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]
  • When we run the anotherJob. Output looks like
INFO: Executing step: [failStep]
SEVERE: Encountered an error executing the step
java.lang.Exception: New Exception
 at com.test.batch.TestReader.read(TestReader.java:24)
 at com.test.batch.TestReader.read(TestReader.java:1)
 at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:90)
 ...
Exception occured while reading
Jan 02, 2014 11:34:30 PM org.springframework.batch.core.job.SimpleStepHandler handleStep
INFO: Executing step: [failedStep]
Exception... So rollback should take place
INFO: Job: [FlowJob: [name=anotherJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]