[Spring Batch 정리하기] 3. 도메인
1. Job
public interface Job {
String getName();
boolean isRestartable();
void execute(JobExecution execution);
@Nullable
JobParametersIncrementer getJobParametersIncrementer();
JobParametersValidator getJobParametersValidator();
}
1-1) 기본 개념
- 배치 계층 구조에서 최상위의 개념으로 전체 배치 프로세스를 캡슐화한 Entity
- Job Configuration 을 통해 생성되는 객체 단위로 배치 작업을 어떻게 구성하고 실행할 것인지 명세해 놓은 객체
- 여러 Step 을 포함하고 있는 컨테이너로 반드시 1개 이상의 Step으로 구성
1-2) 기본 구현체
- SimpleJob
- 순차적으로 Step 을 실행시키는 Job - FlowJob
- 특정한 조건과 흐름에 따라 Step 을 구성하여 실행시키는 Job
1-3) AbstractJob
- Job 을 구현하고 있는 추상클래스로 여러 필드 및 메소드 존재
public abstract class AbstractJob implements Job, StepLocator, BeanNameAware, InitializingBean {
protected static final Log logger = LogFactory.getLog(AbstractJob.class);
private String name;
private boolean restartable = true;
private JobRepository jobRepository;
private CompositeJobExecutionListener listener = new CompositeJobExecutionListener();
private JobParametersIncrementer jobParametersIncrementer;
private JobParametersValidator jobParametersValidator = new DefaultJobParametersValidator();
private StepHandler stepHandler;
public AbstractJob() {
}
public AbstractJob(String name) {
this.name = name;
}
...
}
- name : job 이름
- restartable : 재시작 여부(기본값 true)
- JobRepository : 메타데이터 저장소
- JobExecutionListener : job 이벤트 리스너
- JobParametersIncrementer : JobParameter 증가기
- JobParametersValidator : JobParameter 검증기
- StepHandler : Step 실행 핸들러
2. JobInstance
public class JobInstance extends Entity implements javax.batch.runtime.JobInstance{
private final String jobName;
public JobInstance(Long id, String jobName) {
super(id);
Assert.hasLength(jobName, "A jobName is required");
this.jobName = jobName;
}
/**
* @return the job name. (Equivalent to getJob().getName())
*/
@Override
public String getJobName() {
return jobName;
}
@Override
public String toString() {
return super.toString() + ", Job=[" + jobName + "]";
}
@Override
public long getInstanceId() {
return super.getId();
}
}
2-1) 기본 개념
- Job 이 실행되는 시점에서 생성되는 Job 의 논리적 작업 실행 개념
- Job 이 실행되는 시점에 처리하는 내용이 다르기 때문에 Job 의 실행을 구분해야 함 -> 1월 1일 실행, 1월 2일 실행을 하게 되면 각각의 JobInstance 가 생성되고, 만약 1월 1일에 실행한 JobInstance 가 실패하여 이후 재실행하더라도 이 JobInstance 는 1월 1일 데이터에 대해서만 처리
- 처음 시작하는 Job & JobParameter 의 경우 새로운 JobInstance 생성
- 이전과 동일한 Job & JobParameter 의 경우 이전에 만든 JobInstance 리턴
- Meta Data Schema 의 BATCH_JOB_INSTANCE 테이블과 매핑
2-2) JobInstance 과정
- JobLauncher 는 실제 Batch Job 을 실행시키는 Launcher 로 Job 을 실행(run)할 때 Job, JobParameters 두 개의 인자를 받아서 Job 을 실행
- 위 이미지처럼 Job = 일별정산, JobParameter = '2021.01.01' 이라는 인자를 받아 Job을 실행하면, JobRepository 라는 클래스(메타 데이터의 저장소)가 Job & JobParameters 를 Key 로 하여 이전에 실행한 내역을 DB에서 조회하고 JobInstatnce 를 반환
- 동일한 Job & JobParameters 로 두 번째 실행을 하면 배치 실행이 실패
- 실패 내용은 JobInstance 가 이미 존재하기 때문이고, 동일한 Job 을 다시 실행하고 싶은 경우에는 JobParameters 변경이 필요
3. JobParameters
public class JobParameters implements Serializable {
private final Map<String,JobParameter> parameters;
public JobParameters() {
this.parameters = new LinkedHashMap<>();
}
public JobParameters(Map<String,JobParameter> parameters) {
this.parameters = new LinkedHashMap<>(parameters);
}
@Nullable
public Long getLong(String key){
if (!parameters.containsKey(key)) {
return null;
}
Object value = parameters.get(key).getValue();
return value==null ? null : ((Long)value).longValue();
}
...
@Nullable
public String getString(String key){
JobParameter value = parameters.get(key);
return value==null ? null : value.toString();
}
...
}
3-1) 기본 개념
- Job 을 실행할 때 외부에서 받는 파라미터를 가진 도메인 객체
- 하나의 Job 에 존재할 수 있는 여러 개의 JobInstance 를 구분하기 위한 용도
- JobParameters 와 JobInstance 는 1:1 관계
- Meta Data Schema 의 BATCH_JOB_EXECUTION_PARAM 테이블과 매핑
3-2) 생성 및 바인딩
- 어플리케이션 실행 시 주입 : java -jar application.jar requestDate=20210101
- 코드로 생성 : JobParameterBuilder
- SpEL 이용 : @Value("#(jobParameter[requestDate]}") -> @JobScope, @StepScope 선언 필수
- @JobScope, @StepScope 는 Job, Step 이 실행되는 시점에 Bean 이 생성되도록 Bean 의 생성 시점을 지연시키는 역할
- @Value 어노테이션의 경우 어플리케이션이 실행될 때 바인딩을 하므로 @JobScope, @StepScope 어노테이션이 존재하지 않으면 NPE 예외 발생
- JobParamters 를 외부에서 전달받음으로써 특정 값에 따라 분기 처리가 필요한 내부 로직 등 여러 곳에서 굉장히 유용하게 활용 가능
4. JobExecution
public class JobExecution extends Entity {
private final JobParameters jobParameters;
private JobInstance jobInstance;
private volatile Collection<StepExecution> stepExecutions = Collections.synchronizedSet(new LinkedHashSet<>());
private volatile BatchStatus status = BatchStatus.STARTING;
private volatile Date startTime = null;
private volatile Date createTime = new Date(System.currentTimeMillis());
private volatile Date endTime = null;
private volatile Date lastUpdated = null;
private volatile ExitStatus exitStatus = ExitStatus.UNKNOWN;
private volatile ExecutionContext executionContext = new ExecutionContext();
private transient volatile List<Throwable> failureExceptions = new CopyOnWriteArrayList<>();
private final String jobConfigurationName;
...
}
4-1) 기본 개념
- JobExecution 은 Job 에 대한 한 번의 시도를 의미하는 객체
- Job 실행 중 발생한 정보들을 저장하고 있는 객체 -> 시작시간, 종료시간, 상태 등
- JobInstance 와의 관계
- JobExecution 의 실행 상태(Status) 가 'COMPLETED' -> Job 재실행 불가능
- JobExecution 의 실행 상태(Status) 가 'FAILED' -> Job 재실행 가능
- JobExecution 의 실행 상태(Status) 가 'COMPLETED' 가 될 때까지 하나의 JobInstance 로 여러 번 시도 가능
- JobInstance 와 JobExecution 은 1:N 관계
- Meta Data Schema 의 BATCH_JOB_EXECUTION 테이블과 매핑
4-2) JobExecution 과정
- JobLuancher 가 Job 실행
- JobRepository 가 Job & JobParameters 로 DB 조회(Meta Schema)
- JobInstance 가 존재할 경우 기존 JobInstance 를 Return, 존재하지 않을 경우 새로운 JobInstnace 생성
- JobInstance 가 생성되면 JobExecution 도 새롭게 생성
- 이미 존재하는 JobInstance 에서 BatchStatus 의 조건에 따라 Job 재실행 여부 판단
5. Step
public interface Step {
static final String STEP_TYPE_KEY = "batch.stepType";
String getName();
boolean isAllowStartIfComplete();
int getStartLimit();
void execute(StepExecution stepExecution) throws JobInterruptedException;
}
5-1) 기본 개념
- Job 을 구성하는 독립적인 하나의 단계로 실제 배치 처리를 정의하고 컨트롤하는 데 필요한 정보를 가지고 있는 객체
- 개발자가 원하는 만큼 간단하거나 복잡할 수 있음
- 모든 Job 은 하나 이상의 Step 으로 구성
5-2) 기본 구현체
- TaskletStep
- 가장 기본이 되는 클래스로 Tasklet 타입의 구현체들을 제어(Tasklet / ChunkOrientedTasklet 방식) - PartionStep
- 멀티 스레드 방식으로 Step 을 여러 개로 분리해서 실행 - JobStep
- Step 내에서 Job 을 실행 - FlowStep
- Step 내에서 Flow 를 실행
6. StepExecution
public class StepExecution extends Entity {
private final JobExecution jobExecution;
private final String stepName;
private volatile BatchStatus status = BatchStatus.STARTING;
private volatile int readCount = 0;
private volatile int writeCount = 0;
private volatile int commitCount = 0;
private volatile int rollbackCount = 0;
...
}
6-1) 기본 개념
- Step 에 대한 한 번의 시도를 의미하는 객체로 Step 실행에 대한 모든 정보를 저장
- JobExecution 과 유사하게 Step 이 실행될 때마다 새로운 StepExecution 이 생성
- JobExecution 과 관계
- Step 의 StepExecution 이 모두 정상적으로 완료되어야 JobExecution 이 정상 완료
- 여러 Step 중 하나라도 실패하면 JobExecution 은 실패
- JobExecution 과 StepExecution 는 1:N 관계
- Meta Data Schema 의 BATCH_STEP_EXECUTION 테이블과 매핑
7. StepContribution
public class StepContribution implements Serializable {
private volatile int readCount = 0;
private volatile int writeCount = 0;
private volatile int filterCount = 0;
private final int parentSkipCount;
private volatile int readSkipCount;
private volatile int writeSkipCount;
private volatile int processSkipCount;
private ExitStatus exitStatus = ExitStatus.EXECUTING;
private volatile StepExecution stepExecution;
...
}
7-1) 기본 개념
- 청크 프로세스(Chunk)의 변경 사항을 버퍼링 한 후 StepExecution 상태를 업데이트하는 도메인 객채
- 청츠 Commit 직전에 StepExecution 의 apply 메소드를 호출하여 상태 업데이트
8. ExecutionContext
public class ExecutionContext implements Serializable {
private volatile boolean dirty = false;
private final Map<String, Object> map;
public ExecutionContext() {
this.map = new ConcurrentHashMap<>();
}
public ExecutionContext(Map<String, Object> map) {
this.map = new ConcurrentHashMap<>(map);
}
public ExecutionContext(ExecutionContext executionContext) {
this();
if (executionContext == null) {
return;
}
for (Entry<String, Object> entry : executionContext.entrySet()) {
this.map.put(entry.getKey(), entry.getValue());
}
}
public void putString(String key, @Nullable String value) {
put(key, value);
}
...
}
8-1) 기본 개념
- Spring Batch 프레임워크에서 Key/Value 형식 고유 객체로 StepExecution 또는 JobExecution 객체의 상태를 저장하는 공유 객체
- 공유 범위
- Job 범위 -> 각 Job 의 JobExecution 에 저장되며 Job 간 서로 공유가 안되며 해당 Job 의 Step 간 서로 공유
- Step 범위 -> 각 Step 의 StepExecution 에 저장되며 Step 간 서로 공유 안됨
9. JobRepository
public interface JobRepository {
boolean isJobInstanceExists(String jobName, JobParameters jobParameters);
JobInstance createJobInstance(String jobName, JobParameters jobParameters);
JobExecution createJobExecution(JobInstance jobInstance, JobParameters jobParameters, String jobConfigurationLocation);
JobExecution createJobExecution(String jobName, JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException;
void update(JobExecution jobExecution);
void add(StepExecution stepExecution);
...
}
9-1) 기본 개념
- 배치 작업 중의 정보를 저장하는 저장소 역할
- Job 에 대한 Meta Data 에 대해 CRUD 기능 처리
10. @JobScope, @StepScope
10-1) Scope
- Scope 는 스프링 컨테이너에서 Bean 이 관리되는 범위
- singleton, prototype, request, session, application 이 존재하고 기본으로 singleton 으로 생성
10-2) SpringBatch Scope
- Job 과 Step 의 Bean 생성과 실행에 관여
- Proxy 모드가 기본값
- 해당 Scope 가 선언되면 Bean 의 생성이 어플리케이션 구동시점이 아닌 Bean 의 실행 시점에 이루어짐 -> Lazy Binding
- Job, Step 이 실행되는 시점에 Bean 이 생성
- @Value 어노테이션의 경우 어플리케이션이 실행될 때 바인딩을 하기 때문에 Scope 를 통해 생성시점을 지연시켜 바인딩을 이룸
- @Value("#jobParameters[파라미터명]}")
- Proxy 모드로 Bean 이 선언되기 때문에 어플리케이션 구동 시점에는 Bean 의 프록시 객체가 생성되고, 메소드 실행 시점에 실제 Bean 을 호출(AOP)
- 병렬 처리 시 각 Thread 마다 생성된 스코프 빈이 할당되기 때문에 Thread-safe 가능
- @JobScope
- Step 선언문에 정의
- @Value : jobParameter, ItemWriter, ItemProcessor 선언문에 정의
- @StepScope
- Tasklet 이나 ItemReader, ItemWriter, ItemProcessor 선언문에 정의
- @Value : jobParamter, JobExecutionContext, stepExecutionContext 사용 가능
10-3) 예제
참고
- https://www.inflearn.com/course/%EC%8A%A4%ED%94%84%EB%A7%81-%EB%B0%B0%EC%B9%98/dashboard