Batch Processing in microservices using SpringBatch. Heard about Long-Running Transaction that run upto hours for generating huge report size or insert bulk records to database?

Audience for this article

  • java developer, Spring developer.
  • Java solution architect

While developers were building tons of frameworks for web apps and microservices and batch processing got left behind. Companies still needed to run these big data jobs so instead of building own custom solutions, spring batch is for you.

SpringBatch introduction and Uses

Companies need to process huge amounts of data without anyone clicking buttons or monitoring screens. Think about long running transactions like calculating paycheck at month-end, sending out thousands of bills or generating reports. These jobs need to run smoothly in the background and handle massive datasets efficiently.

Spring Batch is a Java framework that makes building these kinds of automated data processing jobs easier. Spring Batch doesn’t schedule when jobs run. Instead, it works alongside scheduling tools like Quartz or Control-M to handle the actual data processing part.

The framework gives you ready-made building blocks for common batch needs like tracking what is happening, managing transactions, restarting failed jobs and handling errors. It can also split up work across multiple CPUs to speed up. Whether you are doing something simple like loading a CSV file into a database or something complex like migrating millions of records between systems, Spring Batch do these for you.

What Does a Batch Job Actually Do?

  1. Grab a chunk of data from a file, database.
  2. Do something with that data (calculate, transform, validate).
  3. Save the results somewhere else like file , insert to database.
  4. Task Transaction management.
  5. Chunk-based processing.
  6. Start/Stop/Restart.
  7. Retry/Skip.

Spring also provides the view / manage status in database (Spring-Batch needs to create some tables in your db for tracking) and you can create the UI on that tables and provide user option to manage the reports / long running task.

In this blog, i m going to show you one such example of using spring-batch using ibatis ORM and oracle db. This use-case, my reports can complete within minutes or can run for hours without any problem.

Use-Case of blog example:- i have one app which is scheduled to run (upto hours depends upon data) report task at particular time. And when this run, it goes to database to fetch data in chunks in write to csv file. (Basically a report generation but with huge size)

1. create a MybatisConfig.java

import javax.sql.DataSource;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;

@Configuration
@EnableAutoConfiguration(exclude = { DataSourceAutoConfiguration.class })
@MapperScan("com.mapper")
public class MyBatisConfig {
  
  @Autowired
  private DataSource dataSource;

  //embed a datasource into transaction manager
  @Bean
  public DataSourceTransactionManager transactionManager() {
    return new DataSourceTransactionManager(this.dataSource);
  }
  
  @Bean
  public SqlSessionFactory sqlSessionFactory() throws Exception {
    SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
    sessionFactory.setDataSource(this.dataSource);
    sessionFactory.setTypeAliasesPackage("com.domain"); 
    sessionFactory.setConfigLocation(new ClassPathResource("META-INF/mybatis-oracle-config.xml"));
    return sessionFactory.getObject();
  }

  @Bean
  @DependsOn("sqlSessionFactory")
  public SqlSession getSqlSession() throws Exception {
    SqlSessionTemplate template = new SqlSessionTemplate(sqlSessionFactory());
    return template;
  }
}

2. create a spring-batch-config

import javax.sql.DataSource;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.ImportResource;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;

@Configuration
@ImportResource("classpath:META-INF/spring-batch-jdbc-config.xml")
@EnableAutoConfiguration
@EnableBatchProcessing
public class SpringBatchConfig {

  /**
   * Jdbc template.
   * @return the jdbc template
   */
  @Autowired
  DataSourceTransactionManager txManager;

  /** The spring batch data source. */
  @Autowired
  DataSource dataSource;
  
  /**
   * Jdbc template.
   * @param dataSource the data source
   * @return the jdbc template
   */
  @Bean
  public JdbcTemplate jdbcTemplate(DataSource dataSource) {
    return new JdbcTemplate(dataSource);
  }

  /**
   * Job repository.
   * @return the job repository
   * @throws Exception the exception
   */
  @Bean
  public JobRepository jobRepository() throws Exception {
    JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
    jobRepositoryFactoryBean.setDataSource(dataSource);
    jobRepositoryFactoryBean.setDatabaseType(PropertiesConstants.ORACLE);
    jobRepositoryFactoryBean.setTransactionManager(txManager);
    jobRepositoryFactoryBean
    .setIsolationLevelForCreate(PropertiesConstants.DB_READ_ISOLATION_LEVEL);
    jobRepositoryFactoryBean.setTablePrefix("Batch_");
    jobRepositoryFactoryBean.setValidateTransactionState(false);
    return jobRepositoryFactoryBean.getObject();
  }

  /**
   * Simple job launcher.
   *
   * @param jobRepository
   * the job repository
   * @return the simple job launcher
   */
  @Bean
  public SimpleJobLauncher simpleJobLauncher(JobRepository jobRepository) {
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
    simpleJobLauncher.setJobRepository(jobRepository);
    simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
    return simpleJobLauncher;
  }
}

3. Create a spring-batch executor

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.List;
import javax.annotation.Resource;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

@Component
public class SpringBatchExecutor {

	@Resource
	@Qualifier("simpleJobLauncher")
	private SimpleJobLauncher simpleJobLauncher;

	@Autowired
	@Qualifier("tradeJob")
	private Job tradeJob;  // your job

	@Autowired
	private Environment env;

	private static final ILogger LOGGER = LoggerFactory
			.getLogger(SpringBatchExecutor.class);

	public BatchStatus executeJob(ReportInputRequest reportInputRequest)
			throws ParseException, JobExecutionAlreadyRunningException,
			JobRestartException, JobInstanceAlreadyCompleteException,
			JobParametersInvalidException {
		JobParametersBuilder jobParametersBuilder = getJobParameterBuilder(reportInputRequest);
		JobParameters jobParameters = jobParametersBuilder.toJobParameters();
		JobExecution execution = null;
		
		execution = simpleJobLauncher.run(tradeJob, jobParameters);
	
		if (!BatchStatus.COMPLETED.equals(execution.getStatus())) {

			throwExceptionOnFailedBatch(execution);
			return BatchStatus.FAILED;
		}
		return BatchStatus.COMPLETED;
	}

	private void throwExceptionOnFailedBatch(JobExecution execution) {

		List<Throwable> exceptionList = execution.getAllFailureExceptions();
		String errorMessage = "";
		if (exceptionList != null) {
			for (Throwable t : exceptionList) {
				errorMessage = errorMessage
						+ Throwables.getStackTraceAsString(t);
			}
		}
		MySystemException mySystemException = new MySystemException(errorMessage);
		throw mySystemException;
	}

	private JobParametersBuilder getJobParameterBuilder(
			ReportInputRequest reportInputRequest)
			throws ParseException {

		JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
		
		String fileName = env.getProperty(ReportConstants.RESPONSE_FILE_NAME);
		String [] strVar= fileName.split("\\.");
		
		SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
		String fileDateFormat = sdf.format(Calendar.getInstance().getTime());
				
		fileName=strVar[0]+fileDateFormat+"."+strVar[1];
		
		//setting entityName
		String entityName=(reportInputRequest.getTriggerMessageType()
				+ "_" + reportInputRequest.getProcessDate());
		

  jobParametersBuilder.addString(ReportConstants.INPUT_FILE_PATH,env.getProperty(ReportConstants.INPUT_FILE_PATH)+ fileName);
		jobParametersBuilder.addLong("time", System.currentTimeMillis()).toJobParameters();
		jobParametersBuilder.addString(ReportConstants.REPORT_DATE,reportInputRequest.getProcessDate());
		jobParametersBuilder.addString(ReportConstants.ENTITY_NAME,entityName);
		
		reportInputRequest.setFileName(fileName);
		reportInputRequest.setFolderPath(env.getProperty(ReportConstants.INPUT_FILE_PATH));
		
		return jobParametersBuilder;
	}
}

4. Trade job

@Configuration
public class TradeJob {

	private static final ILogger LOGGER = LoggerFactory
			.getLogger(TradeJob.class);

	@Bean
	@StepScope
	public StoredProcedureItemReader<TradeEntity> tradeReader(
			DataSource dataSource,
			@Value(ReportConstants.JOB_PARAM_REPORT_DATE) final String reportDate,
			@Value(ReportConstants.JOB_PARAM_ENTITY_NAME) final String entityName)  {

		StoredProcedureItemReader<TradeEntity> reader = new StoredProcedureItemReader<TradeEntity>();

		reader.setDataSource(dataSource);
		reader.setProcedureName(ReportConstants.STORED_PROC_TRADE_NAME);
		
		SqlParameter[] parameters = {
				new SqlParameter("IN_REPORT_DATE",
						oracle.jdbc.OracleTypes.VARCHAR),
						
						new SqlParameter("IN_ENTITY_NAME",
								oracle.jdbc.OracleTypes.VARCHAR),
								
						new SqlOutParameter(ReportConstants.CURSOR_OUT,
						oracle.jdbc.OracleTypes.CURSOR) 
				};
		
		reader.setParameters(parameters);
		reader.setRefCursorPosition(3);
		reader.setRowMapper(new TradeRowMapper());
		reader.setPreparedStatementSetter(new PreparedStatementSetter() {
			public void setValues(PreparedStatement ps) throws SQLException {
				ps.setString(1, reportDate);
				ps.setString(2, entityName);
			}
		});
	
		LOGGER.info("READER EXECUTED");
		return reader;

	}

	@Bean
	@StepScope
	public FlatFileItemWriter<TradeEntity> tradeWriter(
			@Value(ReportConstants.JOB_PARAM_INPUT_FILE_PATH) final String filepath
			) {
		FlatFileItemWriter<TradeEntity> writer = new FlatFileItemWriter<TradeEntity>();
		writer.setResource(new FileSystemResource(filepath));

		DelimitedLineAggregator<TradeEntity> delLineAgg = new DelimitedLineAggregator<TradeEntity>();
		delLineAgg.setDelimiter(ReportConstants.DELIMITER);
		BeanWrapperFieldExtractor<TradeEntity> fieldExtractor = new BeanWrapperFieldExtractor<TradeEntity>();
		String[] fieldArray = new String[136];
		
		StringTokenizer strToken = new StringTokenizer(
				ReportConstants.TRADE_EXTR_COLS,
				ReportConstants.COMMA_STRING);
		
		
		int index = 0;
		while (strToken.hasMoreTokens()) {
			fieldArray[index++] = strToken.nextToken();
		}
		fieldExtractor
				.setNames(fieldArray);
		delLineAgg.setFieldExtractor(fieldExtractor);
		writer.setLineAggregator(delLineAgg);

		writer.setFooterCallback(new FlatFileFooterCallback() {
			@Override
			public void writeFooter(Writer writer) throws IOException {
			}
		});
		LOGGER.info("WRITER EXECUTED");
		return writer;
	}

	@Bean
	public Job tradeJob(JobBuilderFactory jobs, Step tradeJobStep) {
		return jobs.get("tradeJob").flow(tradeJobStep).end().build();
	}

	@Bean
	public Step tradeJobStep(StepBuilderFactory stepBuilderFactory,
			ItemReader<TradeEntity> tradeReader,
			ItemWriter<TradeEntity> tradeWriter) {
		return stepBuilderFactory.get("tradeJobStep").<TradeEntity, TradeEntity> chunk(ReportConstants.CHUNK_SIZE)
				.reader(tradeReader)
				.writer(tradeWriter)
				.build();
    }
}

5. Create Mapper

import org.apache.ibatis.annotations.Options;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.mapping.StatementType;

public interface TradeReportMapper {
	String CHECK_TRADE = "{CALL TRADE_PKG.CHECK_TRADE(#{in_report_date, mode=IN, jdbcType=VARCHAR},"
			+ "#{in_entity_name, mode=IN, jdbcType=VARCHAR},"
			+ "#{OUT_CURSOR, mode=OUT, jdbcType=SYSREF})}";

	@Select(CHECK_TRADE)
	@Options(statementType = StatementType.CALLABLE)
	void checkTrade(CHECK_TRADE request);
}

Leave a comment