BackEnd/Spring Batch
[Spring] Spring Batch 사용해보기(3) - step(tasklet, chunk)
Wonol
2022. 9. 18. 15:43
반응형
1. Tasklet
- Tasklet 은 하나의 메소드로 구성되어 있는 인터페이스.
- Step 안에서 수행될 기능들을 명시.
- 실패를 알리기위해 예외를 반환하거나 Throw 할 때까지 execute 를 반복적으로 호출.
- Tasklet 은 Step 안에서 단일로 수행도리 커스텀한 기능들을 선언할 때 사용
- Tasklet 은 하나와 Reader & Processor & Writer 한 묶음이 같은 레벨로서, Reader & Processor 와 함께 Tasklet 을 사용할 수 없다.
1-1. Sample(With. Lambda)
@Slf4j
@EnableBatchProcessing
@Configuration
@RequiredArgsConstructor
public class TaskletJobConfig{
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job TaskletJob(){
Job customJob = jobBuilderFactory.get("taskletJob")
.start(TaskStep())
.build();
return customJob;
}
@Bean
public Step TaskStep(){
return stepBuilderFactory.get("taskletStep")
.tasklet((contribution, chunkContext) ->{
//비즈니스 로직
for(int idx = 0; idx < 5; idx ++){
log.info("[idx] = " + idx);
}
return RepeatStatus.FINISHED;
}).build();
}
}
1-2. Sample(With. 외부클래스)
@Slf4j
@EnableBatchProcessing
@Configuration
@RequiredArgsConstructor
public class TaskletJobConfig{
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job TaskletJob(){
Job customJob = jobBuilderFactory.get("taskletJob")
.start(TaskStep())
.build();
return customJob;
}
@Bean
public Step TaskStep(){
return stepBuilderFactory.get("taskletStep1")
.tasklet(new CustomTasklet())
.build();
}
}
@Slf4j
public class CustomTasklet implements Tasklet, StepExecutionListener {
@Override
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
log.info("Before Step Start!");
}
@Override
@AfterStep
public ExitStatus afterStep(StepExecution stepExecution) {
log.info("After Step Start!");
return ExitStatus.COMPLETED;
}
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
//비즈니스 로직
for (int idx = 0; idx < 5; idx++) {
log.info("[idx] = " + idx);
}
return RepeatStatus.FINISHED;
}
}
2. Chunk
- 여러 개의 아이템을 묶은 하나의 덩어리를 의미.
- 처리되는 Commit Row 의 수
- 한 번에 하나의 아이템을 입력받아 Chunk 단위로 덩어리를 만들어 Chunk 단위로 Transaction(트랜잭션)을 처리.
Chunk 단위로 Commit 과 Rollback 이 동작
- 대용량 데이터를 한 번에 처리하는 것이 아닌 Chunk 단위로 쪼개어 반복 수행.
2-1. Chunk <I>, Chunk <O>
- Chunk<I>
- ItemRead 로 읽은 하나의 아이템을 Chunk 크기만큼 반복해서 저장 - Chunk<O>
- ItemReader 로 부터 전달받은 Chunk<I> 를 참조해서 ItemProcessor 에서 가공하여 ItemWriter 로 전달
2-2. 아키텍쳐
- ItemReader 가 데이터를 한 건씩 읽고 한 건씩 Chunk 크기만큼 Chunk<I> 에 저장.
- Chunk 크기 만큼 쌓였다면 Chunk<I> 를 ItemProcessor 에 전달.
- ItemProcessor 는 전달받은 Chunk 를 적절하게 가공해서 Chunk<O> 에 저장.
- Chunk<O> 를 ItemWriter 에 전달.
- ItemWriter 는 데이터를 쓰기 작업.
ItemReader 와 ItemProcessor 는 각각의 하나씩 아이템을 처리하지만, ItemWriter 는 Chunk 크기만큼을 한 번에 일괄 처리한다.
2-3. sample(with. JPA)
@Slf4j
@Configuration
@EnableBatchProcessing
@RequiredArgsConstructor
public class ChunkJdbcJobConfig {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory entityManagerFactory;
@Bean
public Job ChunkJob() {
Job chunkJob = jobBuilderFactory.get("ChunkJob0")
.start(step())
.build();
return chunkJob;
}
@Bean
@JobScope
public Step step() {
return stepBuilderFactory.get("step")
.<Member, Member>chunk(5)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
@Bean
@StepScope
public JpaPagingItemReader<Member> reader() {
Map<String,Object> parameterValues = new HashMap<>();
log.info("ItemReader 실행됌");
return new JpaPagingItemReaderBuilder<Member>()
.pageSize(5)
.parameterValues(parameterValues)
.queryString("SELECT m FROM Member m")
.entityManagerFactory(entityManagerFactory)
.name("JpaPagingItemReader")
.build();
}
@Bean
@StepScope
public ItemProcessor<Member, Member> processor(){
log.info("ItemProcessor 실행됌");
return new ItemProcessor<Member, Member>() {
@Override
public Member process(Member member) throws Exception {
log.error(" ID : " + member.getId() + " / Name : " + member.getName() + " / amount : " + member.getAmount());
member.setAmount(member.getAmount() + 1000);
log.info(" ID : " + member.getId() + " / Name : " + member.getName() + " / amount : " + member.getAmount());
return member;
}
};
}
@Bean
@StepScope
public JpaItemWriter<Member> writer(){
log.info("ItemWriter 실행됌");
return new JpaItemWriterBuilder<Member>()
.entityManagerFactory(entityManagerFactory)
.build();
}
}
참고
반응형