Bepoz
파즈의 공부 일기
Bepoz
전체 방문자
오늘
어제
  • 분류 전체보기 (232)
    • 공부 기록들 (85)
      • 우테코 (17)
    • Spring (85)
    • Java (43)
    • Infra (17)
    • 책 정리 (0)

블로그 메뉴

  • 홈
  • 태그
  • 방명록

공지사항

인기 글

태그

최근 댓글

최근 글

티스토리

hELLO · Designed By 정상우.
Bepoz

파즈의 공부 일기

Spring

[Spring Batch] 이벤트 리스너 내용 정리

2023. 5. 4. 17:03

Spring Batch 이벤트 리스너 내용 정리

  • Listener는 배치 흐름 중에 Job, Step, Chunk 단계의 실행 전후에 발생하는 이벤트를 받아 용도에 맞게 활용할 수 있도록 제공하는 인터셉터 개념의 클래스
  • 이벤트를 받기 위해 Listener를 등록해야 하며 등록은 API 설정에서 각 단계별로 지정 가능하다.
  • Job
    • JobExecutionListener - Job 실행 전 후
  • Step
    • StepExecutionListener - Step 실행 전 후
    • ChunkListener - Chunk 실행 전 후 (Tasklet 실행 전 후), 오류 시점
    • ItemReadListener - ItemReader 실행 전 후, 오류 시점, item이 null 일 경우 호출 안됨
    • ItemProcessListener - ItemProcessor 실행 전 후, 오류 시점, item이 null 일 경우 호출 안됨
    • ItemWriteListener - ItemWriter 실행 전 후, 오류 시점, item이 null 일 경우 호출 안됨
  • SkipListener - 읽기, 쓰기, 처리 Skip 실행 시점, Item 처리가 Skip 될 경우 Skip 된 item을 추적
  • RetryListener - Retry 시작, 종료, 에러 시점

image

위와 같은 인터페이스를 구현해서 사용하는 방식도 있지만, @BeforeRead와 같은 어노테이션을 이용해서 사용할 수도 있다.


JobExecutionListener / StepExecutionListener

public interface JobExecutionListener {

    /**
     * Callback before a job executes.
     *
     * @param jobExecution the current {@link JobExecution}
     */
    void beforeJob(JobExecution jobExecution);

    /**
     * Callback after completion of a job. Called after both both successful and
     * failed executions. To perform logic on a particular status, use
     * "if (jobExecution.getStatus() == BatchStatus.X)".
     *
     * @param jobExecution the current {@link JobExecution}
     */
    void afterJob(JobExecution jobExecution);

}
public interface StepExecutionListener extends StepListener {

    /**
     * Initialize the state of the listener with the {@link StepExecution} from
     * the current scope.
     *
     * @param stepExecution instance of {@link StepExecution}.
     */
    void beforeStep(StepExecution stepExecution);

    /**
     * Give a listener a chance to modify the exit status from a step. The value
     * returned will be combined with the normal exit status using
     * {@link ExitStatus#and(ExitStatus)}.
     *
     * Called after execution of step's processing logic (both successful or
     * failed). Throwing exception in this method has no effect, it will only be
     * logged.
     *
     * @param stepExecution {@link StepExecution} instance.
     * @return an {@link ExitStatus} to combine with the normal value. Return
     * {@code null} to leave the old value unchanged.
     */
    @Nullable
    ExitStatus afterStep(StepExecution stepExecution);
}
  • Job / Step의 성공여부와 상관 없이 호출된다.
  • 성공 / 실패 여부는 JobExecution / StepExecution 을 통해 알 수 있다.
public class CustomJobListener implements JobExecutionListener {

    @Override
    public void beforeJob(JobExecution jobExecution) {
        System.out.println("JobExecution.getJobName() : " + jobExecution.getJobInstance().getJobName());
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        System.out.println("JobExecution.getStatus() : " + jobExecution.getStatus());
        long startTime = jobExecution.getStartTime().getTime();
        long endTime = jobExecution.getEndTime().getTime();
        System.out.println("(endTime-startTime) = " + (endTime - startTime));
    }
}

@Component
public class CustomStepListener implements StepExecutionListener {

    @Override
    public void beforeStep(StepExecution stepExecution) {
        System.out.println("stepExecution.getStepName() : " + stepExecution.getStepName());
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        System.out.println("stepExecution.getStatus() : " + stepExecution.getStatus());
        return ExitStatus.COMPLETED;
    }
}

Job과 Step에 대한 리스너 클래스 2개를 만들었다. StepListener는 빈 등록을 위해 @Component를 붙여주었다.
어노테이션을 사용하는 경우에는

public class CustomJobListener {

    @BeforeJob
    public void beforeJob(JobExecution jobExecution) {
        System.out.println("JobExecution.getJobName() : " + jobExecution.getJobInstance().getJobName());
    }

    @AfterJob
    public void afterJob(JobExecution jobExecution) {
        System.out.println("JobExecution.getStatus() : " + jobExecution.getStatus());
        long startTime = jobExecution.getStartTime().getTime();
        long endTime = jobExecution.getEndTime().getTime();
        System.out.println("(endTime-startTime) = " + (endTime - startTime));
    }
}

인터페이스 구현이 아니라 어노테이션을 달아주면 된다. 메서드 명은 자유롭게 하면된다.

@RequiredArgsConstructor
@Configuration
@Slf4j
public class JobAndStepListenerConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final CustomStepListener customStepListener;

    @Bean
    public Job job() throws Exception {
        return jobBuilderFactory.get("batchJob")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .next(step2())
                .listener(new CustomJobListener())
                .build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
//                        throw new RuntimeException("failed");
                        return RepeatStatus.FINISHED;
                    }
                })
                .listener(customStepListener)
                .build();
    }

    @Bean
    public Step step2() {
        return stepBuilderFactory.get("step2")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                        return RepeatStatus.FINISHED;
                    }
                })
                .listener(customStepListener)
                .build();
    }
}

.listener()에 리스너를 등록해주고(어노테이션을 사용한 클래스도 마찬가지) 하면 동작하게 된다.

image

정상적으로 동작하는 것을 확인할 수가 있다.


ChunkListener / ItemReadListener / ItemProcessorListener / ItemWriterListener

public interface ChunkListener extends StepListener {

    static final String ROLLBACK_EXCEPTION_KEY = "sb_rollback_exception";

    void beforeChunk(ChunkContext context);
    void afterChunk(ChunkContext context);
    void afterChunkError(ChunkContext context);
}
public interface ItemReadListener<T> extends StepListener {

    void beforeRead();
    void afterRead(T item);
    void onReadError(Exception ex);
}
public interface ItemProcessListener<T, S> extends StepListener {

    void beforeProcess(T item);
    void afterProcess(T item, @Nullable S result);
    void onProcessError(T item, Exception e);
}
public interface ItemWriteListener<S> extends StepListener {

    void beforeWrite(List<? extends S> items);
    void afterWrite(List<? extends S> items);
    void onWriteError(Exception exception, List<? extends S> items);
}

인터페이스 시그니처는 위와 같다. 메서드 명으로 파악할 수 있듯이 전/후에 수행할 행동과 에러가 났을 때 수행할 행동을 정의한다.

@RequiredArgsConstructor
@Configuration
public class ChunkListenerConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final CustomChunkListener customChunkListener;

    @Bean
    public Job job() throws Exception {
        return jobBuilderFactory.get("batchJob")
                                .incrementer(new RunIdIncrementer())
                                .start(step1())
                                .build();
    }

    @Bean
    public Step step1() throws Exception {
        return stepBuilderFactory.get("step1")
                                 .<Integer, String>chunk(10)
                                 .listener(customChunkListener)
                                 .listener(new CustomItemReadListener())
                                 .listener(new CustomItemProcessListener())
                                 .listener(new CustomItemWriteListener())
                                 .reader(listItemReader())
                                 .processor((ItemProcessor) item -> {
//                                     throw new RuntimeException("failed");
                    return "item" + item;
                                 })
                                 .writer((ItemWriter<String>) items -> {
//                                     throw new RuntimeException("failed");
                                 })
                                 .build();
    }

    @Bean
    public ItemReader<Integer> listItemReader() {
        List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
        return new ListItemReader<>(list);
    }
}

위와 같이 리스너를 모두 등록하고 돌려보면(각 리스너에서 현재 어떤 단계인지 출력하게끔 구현해놨다)

>> Before the chunk : step1
>> beforeRead
>> afterRead : 1
>> beforeRead
>> afterRead : 2
...
>> beforeRead
>> afterRead : 10
>> beforeProcess
>> afterProcess : 1
>> afterProcess : item1
>> beforeProcess
>> afterProcess : 2
>> afterProcess : item2
...
>> beforeWrite
>> afterWrite : [item1, item2, item3, item4, item5, item6, item7, item8, item9, item10]
>> After the chunk : step1
>> Before the chunk : step1
>> beforeRead
>> After the chunk : step1

위와 같이 나온다. read와 write 옆의 숫자는 reader의 list 내부 item을 출력하게끔 해둔 것이다.

1청크가 끝난 후 다음 청크 단위를 넘어가려 했지만 reader가 1~10 까지 이므로 요소가 없어서 바로 processor로 넘어가지 않고 마무리 되고 after the chunk가 호출된 것을 확인할 수가 있다.

.processor((ItemProcessor) item -> {
  throw new RuntimeException("failed");
//return "item" + item;
})

//현재 processorListener의 구현은 아래와 같음
@Component
public class CustomItemProcessListener implements ItemProcessListener {

    @Override
    public void beforeProcess(Object item) {
        System.out.println(">> beforeProcess");
    }

    @Override
    public void afterProcess(Object item, Object result) {
        System.out.println(">> afterProcess : "+ item);
        System.out.println(">> afterProcess : "+ result);
    }

    @Override
    public void onProcessError(Object item, Exception e) {
        System.out.println(">> onProcessError : " + e.getMessage());
        System.out.println(">> onProcessError : " + item);
    }
}

processor에서 예외를 던지게끔 변경하면

...
>> afterRead : 10
>> beforeProcess
>> onProcessError : failed
>> onProcessError : 1
>> After the chunk : step1

예상한 대로 onProcessError가 호출이 되고 chunk가 끝나게 된다. reader, writer에서의 예외 처리도 위와 동일하게 이루어진다.


SkipListener / RetryListener

SKipListener

public interface SkipListener<T,S> extends StepListener {

    /**
     * Callback for a failure on read that is legal, so is not going to be
     * re-thrown. In case transaction is rolled back and items are re-read, this
     * callback will occur repeatedly for the same cause.  This will only happen
     * if read items are not buffered.
     * 
     * @param t cause of the failure
     */
    void onSkipInRead(Throwable t);

    /**
     * This item failed on write with the given exception, and a skip was called
     * for. 
     * 
     * @param item the failed item
     * @param t the cause of the failure
     */
    void onSkipInWrite(S item, Throwable t);

    /**
     * This item failed on processing with the given exception, and a skip was called
     * for. 
     * 
     * @param item the failed item
     * @param t the cause of the failure
     */
    void onSkipInProcess(T item, Throwable t);

}

SkipListener는 skip이 일어났을 때에 어디서 일어났는지에 따라 호출이 되게끔하는 리스너이다.

  @Bean
  public Step step1() throws Exception {
      return stepBuilderFactory.get("step1")
              .<Integer, String>chunk(10)
              .reader(listItemReader())
              .processor(new ItemProcessor<Integer, String>() {
                  @Override
                  public String process(Integer item) throws Exception {
                      if (item == 4) {
                          throw new CustomSkipException("process skipped");
                      }
                      System.out.println("process : " + item);
                      return "item" + item;
                  }
              })
              .writer(new ItemWriter<String>() {
                  @Override
                  public void write(List<? extends String> items) throws Exception {
                      for (String item : items) {
                          if (item.equals("item5")) {
                              throw new CustomSkipException("write skipped");
                          }
                          System.out.println("write : " + item);
                      }
                  }
              })
              .faultTolerant()
              .skip(CustomSkipException.class)
              .skipLimit(3)
              .listener(customSkipListener)
              .build();
  }

  @Bean
  public ItemReader<Integer> listItemReader() {
      List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
      return new LinkedListItemReader<>(list);
  }

------------------------------------------------------------------------------
@Component
public class CustomSkipListener implements SkipListener {

    @Override
    public void onSkipInRead(Throwable t) {
        System.out.println(">> onSkipInRead : "+ t.getMessage());
    }

    @Override
    public void onSkipInWrite(Object item, Throwable t) {
        System.out.println(">> onSkipInWrite : "+ item);
        System.out.println(">> onSkipInWrite : "+ t.getMessage());
    }

    @Override
    public void onSkipInProcess(Object item, Throwable t) {
        System.out.println(">> onSkipInProcess : "+ item);
        System.out.println(">> onSkipInProcess : "+ t.getMessage());
    }
}

위와 같이 batch configuration이 있다. reader에서는 item이 3일 때에 예외, processor, writer에서는 각각 4, 5에서 예외가 발생하고 있는 상황이다. 이를 돌려보면 다음과 같은 결과가 나오게 된다.

read : 1
read : 2
read : 4
read : 5
read : 6
read : 7
read : 8
read : 9
read : 10
process : 1
process : 2
process : 1
process : 2
process : 5
process : 6
process : 7
process : 8
process : 9
process : 10
write : item1
write : item2
process : 1
write : item1
>> onSkipInProcess : 4
>> onSkipInProcess : process skipped
>> onSkipInRead : read skipped : 3
process : 2
write : item2
>> onSkipInRead : read skipped : 3
process : 5
process : 6
write : item6
>> onSkipInWrite : item5
>> onSkipInWrite : write skipped
>> onSkipInRead : read skipped : 3
process : 7
write : item7
>> onSkipInRead : read skipped : 3
process : 8
write : item8
>> onSkipInRead : read skipped : 3
process : 9
write : item9
>> onSkipInRead : read skipped : 3
process : 10
write : item10
>> onSkipInRead : read skipped : 3

ItemReader에서 skip이 발생하면 해당 item을 제외하고 진행되며 ItemProcessor와 ItemWriter 에서는 chunk의 처음으로 돌아가서 스킵된 아이템을 제외한 나머지 아이템들을 가지고 처리하게 되는 것을 까먹지 말아야 한다.

ItemReader로 읽어들인 아이템들은 캐싱이 되어있으므로 processor의 skip처리 때에 다시 read 호출이 되지 않는 것을 확인할 수가 있다.

ItemWriter의 경우에는 chunk의 처음부터 다시 동작하는 것이 맞긴하지만 processor와 writer 단계에서 item을 한 번에 처리하는 것이 아닌 요소 별로 처리를 하게된다.

그리고 skip된 아이템들에 대한 정보를 출력해주는 것을 확인할 수가 있다.


RetryListener

public interface RetryListener {

    /**
     * Called before the first attempt in a retry. For instance, implementers can set up
     * state that is needed by the policies in the {@link RetryOperations}. The whole
     * retry can be vetoed by returning false from this method, in which case a
     * {@link TerminatedRetryException} will be thrown.
     * @param <T> the type of object returned by the callback
     * @param <E> the type of exception it declares may be thrown
     * @param context the current {@link RetryContext}.
     * @param callback the current {@link RetryCallback}.
     * @return true if the retry should proceed.
     */
    <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback);

    /**
     * Called after the final attempt (successful or not). Allow the interceptor to clean
     * up any resource it is holding before control returns to the retry caller.
     * @param context the current {@link RetryContext}.
     * @param callback the current {@link RetryCallback}.
     * @param throwable the last exception that was thrown by the callback.
     * @param <E> the exception type
     * @param <T> the return value
     */
    <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable);

    /**
     * Called after every unsuccessful attempt at a retry.
     * @param context the current {@link RetryContext}.
     * @param callback the current {@link RetryCallback}.
     * @param throwable the last exception that was thrown by the callback.
     * @param <T> the return value
     * @param <E> the exception to throw
     */
    <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable);

}
  • open: 재시도 전 매번 호출됨, false를 반환하면 retry 시도를 하지 않음
  • close: 재시도 후 매번 호출됨
  • onError: 재시도 실패 시마다 호출됨
@RequiredArgsConstructor
@Configuration
public class RetryListenerConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final CustomRetryListener customRetryListener;

    @Bean
    public Job job(){
        return jobBuilderFactory.get("batchJob")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .build();
    }

    @Bean
    public Step step1(){
        return stepBuilderFactory.get("step1")
                .<Integer, String>chunk(10)
                .reader(listItemReader())
                .processor(new CustomItemProcessor())
                .writer(new CustomItemWriter())
                .faultTolerant()
                .retry(CustomRetryException.class)
                .retryLimit(2)
                .listener(customRetryListener)

                .build();
    }

    @Bean
    public ItemReader<Integer> listItemReader() {
        List<Integer> list = Arrays.asList(1,2,3,4);
//        List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
        return new LinkedListItemReader<>(list);
    }
}

---------------------------------------------------------------------------------------
public class CustomItemProcessor implements ItemProcessor<Integer,String> {

    int count = 0;

    @Override
    public String process(Integer item) throws Exception {

        if(count < 2) {
            if (count % 2 == 0) {
                count = count + 1;

            } else if (count % 2 == 1) {
                count = count + 1;
                throw new CustomRetryException();
            }
        }
        return String.valueOf(item);
    }
}

public class CustomItemWriter implements ItemWriter<String> {
    int count = 0;
    @Override
    public void write(List<? extends String> items) throws CustomRetryException {
        for (String item : items) {
            if(count < 2) {
                if (count % 2 == 0) {
                    count = count + 1;

                } else if (count % 2 == 1) {
                    count = count + 1;
                    throw new CustomRetryException();
                }
            }
            System.out.println("write : " + item);
        }
    }
}

@Component
public class CustomRetryListener implements RetryListener{

    @Override
    public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
        System.out.println("open");
        return true;
    }

    @Override
    public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
        System.out.println("close");
    }

    @Override
    public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
        System.out.println("Retry Count: " + context.getRetryCount());
    }
}

위와 같은 코드일 때 실행시키면

open
close
open
Retry Count: 1
close
open
close
open
close
open
close
open
close
open
write : 1
Retry Count: 1
close
open
close
open
close
open
close
open
close
open
write : 1
write : 2
write : 3
write : 4
close

위와 같이 출력이 된다.

Retry의 경우 ChunkIterator를 통해 RetryTemplate을 호출해 내부 로직을 for문을 돌리게 되므로,
처음에 open close 후 retry 한 번 발생하고 이후 다시 process 과정을 거친다. write 에서도 마찬가지로 1을 write하고 2에서 예외가 발생하여 retry가 일어나게 되고 다시 청크의 처음부터 돌아가서 process작업을 한 후 write를 정상적으로 작성하게 된다.


REFERENCE

정수원님 스프링 배치 강의

'Spring' 카테고리의 다른 글

MongoDB 특정 필드만 가져오게끔 하는 projection  (0) 2023.08.13
[Spring] Filter와 server.compression 설정을 통한 api 압축  (0) 2023.07.29
[Spring Batch] Multi Thread Processing 내용 정리  (1) 2023.04.03
[Spring] Spring Data MongoDB 잊어버리는 것들 정리  (0) 2023.02.09
[Spring Batch] @JobScope, @StepScope 에 대해  (0) 2022.11.15
    'Spring' 카테고리의 다른 글
    • MongoDB 특정 필드만 가져오게끔 하는 projection
    • [Spring] Filter와 server.compression 설정을 통한 api 압축
    • [Spring Batch] Multi Thread Processing 내용 정리
    • [Spring] Spring Data MongoDB 잊어버리는 것들 정리
    Bepoz
    Bepoz
    https://github.com/Be-poz/TIL

    티스토리툴바