BackEnd/Spring Batch

[Spring Batch 정리하기] 3. 도메인

Wonol 2024. 7. 25. 09:08
반응형

1. Job

public interface Job {

	String getName();

	boolean isRestartable();

	void execute(JobExecution execution);

	@Nullable
	JobParametersIncrementer getJobParametersIncrementer();

	JobParametersValidator getJobParametersValidator();

}

1-1) 기본 개념

- 배치 계층 구조에서 최상위의 개념으로 전체 배치 프로세스를 캡슐화한 Entity 

- Job Configuration 을 통해 생성되는 객체 단위로 배치 작업을 어떻게 구성하고 실행할 것인지 명세해 놓은 객체

- 여러 Step 을 포함하고 있는 컨테이너로 반드시 1개 이상의 Step으로 구성

1-2) 기본 구현체

  • SimpleJob
    - 순차적으로 Step 을 실행시키는 Job
  • FlowJob
    - 특정한 조건과 흐름에 따라 Step 을 구성하여 실행시키는 Job

1-3) AbstractJob

- Job 을 구현하고 있는 추상클래스로 여러 필드 및 메소드 존재

public abstract class AbstractJob implements Job, StepLocator, BeanNameAware, InitializingBean {
    protected static final Log logger = LogFactory.getLog(AbstractJob.class);
    private String name;
    private boolean restartable = true;
    private JobRepository jobRepository;
    private CompositeJobExecutionListener listener = new CompositeJobExecutionListener();
    private JobParametersIncrementer jobParametersIncrementer;
    private JobParametersValidator jobParametersValidator = new DefaultJobParametersValidator();
    private StepHandler stepHandler;

    public AbstractJob() {
    }

    public AbstractJob(String name) {
        this.name = name;
    }
    
    ...
}
  • name : job 이름
  • restartable : 재시작 여부(기본값 true)
  • JobRepository : 메타데이터 저장소
  • JobExecutionListener : job 이벤트 리스너
  • JobParametersIncrementer : JobParameter 증가기
  • JobParametersValidator : JobParameter 검증기
  • StepHandler : Step 실행 핸들러

2. JobInstance

public class JobInstance extends Entity implements javax.batch.runtime.JobInstance{

	private final String jobName;

	public JobInstance(Long id, String jobName) {
		super(id);
		Assert.hasLength(jobName, "A jobName is required");
		this.jobName = jobName;
	}

	/**
	 * @return the job name. (Equivalent to getJob().getName())
	 */
	@Override
	public String getJobName() {
		return jobName;
	}

	@Override
	public String toString() {
		return super.toString() + ", Job=[" + jobName + "]";
	}

	@Override
	public long getInstanceId() {
		return super.getId();
	}
}

2-1) 기본 개념

- Job 이 실행되는 시점에서 생성되는 Job 의 논리적 작업 실행 개념

- Job 이 실행되는 시점에 처리하는 내용이 다르기 때문에 Job 의 실행을 구분해야 함 -> 1월 1일 실행, 1월 2일 실행을 하게 되면 각각의 JobInstance 가 생성되고, 만약 1월 1일에 실행한 JobInstance 가 실패하여 이후 재실행하더라도 이 JobInstance 는 1월 1일 데이터에 대해서만 처리

- 처음 시작하는 Job & JobParameter 의 경우 새로운 JobInstance 생성

- 이전과 동일한 Job & JobParameter 의 경우 이전에 만든 JobInstance 리턴

- Meta Data Schema 의 BATCH_JOB_INSTANCE 테이블과 매핑

2-2) JobInstance 과정

- JobLauncher 는 실제 Batch Job 을 실행시키는 Launcher 로 Job 을 실행(run)할 때 Job, JobParameters 두 개의 인자를 받아서 Job 을 실행

- 위 이미지처럼 Job = 일별정산, JobParameter = '2021.01.01' 이라는 인자를 받아 Job을 실행하면, JobRepository 라는 클래스(메타 데이터의 저장소)가 Job & JobParameters 를 Key 로 하여 이전에 실행한 내역을 DB에서 조회하고 JobInstatnce 를 반환

- 동일한 Job & JobParameters 로 두 번째 실행을 하면 배치 실행이 실패

- 실패 내용은 JobInstance 가 이미 존재하기 때문이고, 동일한 Job 을 다시 실행하고 싶은 경우에는 JobParameters 변경이 필요

3. JobParameters

public class JobParameters implements Serializable {

	private final Map<String,JobParameter> parameters;

	public JobParameters() {
		this.parameters = new LinkedHashMap<>();
	}

	public JobParameters(Map<String,JobParameter> parameters) {
		this.parameters = new LinkedHashMap<>(parameters);
	}

	@Nullable
	public Long getLong(String key){
		if (!parameters.containsKey(key)) {
			return null;
		}
		Object value = parameters.get(key).getValue();
		return value==null ? null : ((Long)value).longValue();
	}
    
    ...
    
    @Nullable
	public String getString(String key){
		JobParameter value = parameters.get(key);
		return value==null ? null : value.toString();
	}
    
    ...
}

3-1) 기본 개념

- Job 을 실행할 때 외부에서 받는 파라미터를 가진 도메인 객체

- 하나의 Job 에 존재할 수 있는 여러 개의 JobInstance 를 구분하기 위한 용도

- JobParameters 와 JobInstance 는 1:1 관계

- Meta Data Schema 의 BATCH_JOB_EXECUTION_PARAM 테이블과 매핑

3-2) 생성 및 바인딩

- 어플리케이션 실행 시 주입 : java -jar application.jar requestDate=20210101

- 코드로 생성 : JobParameterBuilder

- SpEL 이용 : @Value("#(jobParameter[requestDate]}") -> @JobScope, @StepScope 선언 필수

  • @JobScope, @StepScope 는 Job, Step 이 실행되는 시점에 Bean 이 생성되도록 Bean 의 생성 시점을 지연시키는 역할
  • @Value 어노테이션의 경우 어플리케이션이 실행될 때 바인딩을 하므로 @JobScope, @StepScope 어노테이션이 존재하지 않으면 NPE 예외 발생

- JobParamters 를 외부에서 전달받음으로써 특정 값에 따라 분기 처리가 필요한 내부 로직 등 여러 곳에서 굉장히 유용하게 활용 가능

4. JobExecution

public class JobExecution extends Entity {

	private final JobParameters jobParameters;

	private JobInstance jobInstance;

	private volatile Collection<StepExecution> stepExecutions = Collections.synchronizedSet(new LinkedHashSet<>());

	private volatile BatchStatus status = BatchStatus.STARTING;

	private volatile Date startTime = null;

	private volatile Date createTime = new Date(System.currentTimeMillis());

	private volatile Date endTime = null;

	private volatile Date lastUpdated = null;

	private volatile ExitStatus exitStatus = ExitStatus.UNKNOWN;

	private volatile ExecutionContext executionContext = new ExecutionContext();

	private transient volatile List<Throwable> failureExceptions = new CopyOnWriteArrayList<>();

	private final String jobConfigurationName;
    
    ...
}

4-1) 기본 개념

- JobExecution 은 Job 에 대한 한 번의 시도를 의미하는 객체

- Job 실행 중 발생한 정보들을 저장하고 있는 객체 -> 시작시간, 종료시간, 상태 등

- JobInstance 와의 관계

  • JobExecution 의 실행 상태(Status) 가 'COMPLETED' -> Job 재실행 불가능
  • JobExecution 의 실행 상태(Status) 가 'FAILED' -> Job 재실행 가능
  • JobExecution 의 실행 상태(Status) 가 'COMPLETED' 가 될 때까지 하나의 JobInstance 로 여러 번 시도 가능
  • JobInstance 와 JobExecution 은 1:N 관계

- Meta Data Schema 의 BATCH_JOB_EXECUTION 테이블과 매핑

4-2) JobExecution 과정

  1. JobLuancher 가 Job 실행
  2. JobRepository 가 Job & JobParameters 로 DB 조회(Meta Schema)
  3. JobInstance 가 존재할 경우 기존 JobInstance 를 Return, 존재하지 않을 경우 새로운 JobInstnace 생성
  4. JobInstance 가 생성되면 JobExecution 도 새롭게 생성
  5. 이미 존재하는 JobInstance 에서 BatchStatus 의 조건에 따라 Job 재실행 여부 판단

5. Step

public interface Step {

	static final String STEP_TYPE_KEY = "batch.stepType";

	String getName();

	boolean isAllowStartIfComplete();

	int getStartLimit();

	void execute(StepExecution stepExecution) throws JobInterruptedException;

}

 

5-1) 기본 개념

- Job 을 구성하는 독립적인 하나의 단계로 실제 배치 처리를 정의하고 컨트롤하는 데 필요한 정보를 가지고 있는 객체

- 개발자가 원하는 만큼 간단하거나 복잡할 수 있음

- 모든 Job 은 하나 이상의 Step 으로 구성

5-2) 기본 구현체

  • TaskletStep
    - 가장 기본이 되는 클래스로 Tasklet 타입의 구현체들을 제어(Tasklet / ChunkOrientedTasklet 방식)
  • PartionStep
    - 멀티 스레드 방식으로 Step 을 여러 개로 분리해서 실행
  • JobStep
    - Step 내에서 Job 을 실행
  • FlowStep
    - Step 내에서 Flow 를 실행

 

6. StepExecution

public class StepExecution extends Entity {

	private final JobExecution jobExecution;

	private final String stepName;

	private volatile BatchStatus status = BatchStatus.STARTING;

	private volatile int readCount = 0;

	private volatile int writeCount = 0;

	private volatile int commitCount = 0;

	private volatile int rollbackCount = 0;
    
    ...
}

6-1) 기본 개념

- Step 에 대한 한 번의 시도를 의미하는 객체로 Step 실행에 대한 모든 정보를 저장

- JobExecution 과 유사하게 Step 이 실행될 때마다 새로운 StepExecution 이 생성

- JobExecution 과 관계

  • Step 의 StepExecution 이 모두 정상적으로 완료되어야 JobExecution 이 정상 완료
  • 여러 Step 중 하나라도 실패하면 JobExecution 은 실패
  • JobExecution 과 StepExecution 는 1:N 관계

- Meta Data Schema 의 BATCH_STEP_EXECUTION 테이블과 매핑

7. StepContribution

public class StepContribution implements Serializable {

	private volatile int readCount = 0;

	private volatile int writeCount = 0;

	private volatile int filterCount = 0;

	private final int parentSkipCount;

	private volatile int readSkipCount;

	private volatile int writeSkipCount;

	private volatile int processSkipCount;

	private ExitStatus exitStatus = ExitStatus.EXECUTING;

	private volatile StepExecution stepExecution;
    
    ...
}

7-1) 기본 개념

- 청크 프로세스(Chunk)의 변경 사항을 버퍼링 한 후 StepExecution 상태를 업데이트하는 도메인 객채

- 청츠 Commit 직전에 StepExecution 의 apply 메소드를 호출하여 상태 업데이트

8. ExecutionContext

public class ExecutionContext implements Serializable {

	private volatile boolean dirty = false;

	private final Map<String, Object> map;

	public ExecutionContext() {
		this.map = new ConcurrentHashMap<>();
	}

	public ExecutionContext(Map<String, Object> map) {
		this.map = new ConcurrentHashMap<>(map);
	}

	public ExecutionContext(ExecutionContext executionContext) {
		this();
		if (executionContext == null) {
			return;
		}
		for (Entry<String, Object> entry : executionContext.entrySet()) {
			this.map.put(entry.getKey(), entry.getValue());
		}
	}

	public void putString(String key, @Nullable String value) {

		put(key, value);
	}
    
    ...
}

8-1) 기본 개념

- Spring Batch 프레임워크에서 Key/Value 형식 고유 객체로 StepExecution 또는 JobExecution 객체의 상태를 저장하는 공유 객체

- 공유 범위

  • Job 범위 -> 각 Job 의 JobExecution 에 저장되며 Job 간 서로 공유가 안되며 해당 Job 의 Step 간 서로 공유
  • Step 범위 -> 각 Step 의 StepExecution 에 저장되며 Step 간 서로 공유 안됨

9. JobRepository

public interface JobRepository {

	boolean isJobInstanceExists(String jobName, JobParameters jobParameters);

	JobInstance createJobInstance(String jobName, JobParameters jobParameters);

	JobExecution createJobExecution(JobInstance jobInstance, JobParameters jobParameters, String jobConfigurationLocation);

	JobExecution createJobExecution(String jobName, JobParameters jobParameters)
			throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException;

	void update(JobExecution jobExecution);

	void add(StepExecution stepExecution);
    
    ...
}

9-1) 기본 개념

- 배치 작업 중의 정보를 저장하는 저장소 역할

- Job 에 대한 Meta Data 에 대해 CRUD 기능 처리

10. @JobScope, @StepScope

10-1) Scope

- Scope 는 스프링 컨테이너에서 Bean 이 관리되는 범위

- singleton, prototype, request, session, application 이 존재하고 기본으로 singleton 으로 생성

10-2) SpringBatch Scope

- Job 과 Step 의 Bean 생성과 실행에 관여

- Proxy 모드가 기본값

- 해당 Scope 가 선언되면 Bean 의 생성이 어플리케이션 구동시점이 아닌 Bean 의 실행 시점에 이루어짐 -> Lazy Binding

  • Job, Step 이 실행되는 시점에 Bean 이 생성
  • @Value 어노테이션의 경우 어플리케이션이 실행될 때 바인딩을 하기 때문에 Scope 를 통해 생성시점을 지연시켜 바인딩을 이룸
  • @Value("#jobParameters[파라미터명]}")

- Proxy 모드로 Bean 이 선언되기 때문에 어플리케이션 구동 시점에는 Bean 의 프록시 객체가 생성되고, 메소드 실행 시점에 실제 Bean 을 호출(AOP)

- 병렬 처리 시 각 Thread 마다 생성된 스코프 빈이 할당되기 때문에 Thread-safe 가능

- @JobScope

  • Step 선언문에 정의
  • @Value : jobParameter, ItemWriter, ItemProcessor 선언문에 정의

- @StepScope

  • Tasklet 이나 ItemReader, ItemWriter, ItemProcessor 선언문에 정의
  • @Value : jobParamter, JobExecutionContext, stepExecutionContext 사용 가능

10-3) 예제


참고

- https://www.inflearn.com/course/%EC%8A%A4%ED%94%84%EB%A7%81-%EB%B0%B0%EC%B9%98/dashboard

반응형