Spring

[Spring Batch] 반복 및 오류 제어 내용 정리

Bepoz 2022. 10. 24. 03:07

Spring Batch 반복 및 오류 제어 내용 정리

Repeat

  • Spring Batch는 특정 조건이 충족 될 때 까지(또는 반대) Job 또는 Step을 반복하도록 배치 어플리케이션을 구성 할 수 있다.
  • Step의 반복과 Chunk 반복을 RepeatOperation을 사용해서 처리하고 있다.
  • 기본 구현체로 RepeatTemplate을 제공한다.

Job -> Step -> ( RepeatTemplate -> Tasklet -> ( RepeatTemplate -> Chunk) 반복 )반복

Tasklet은 이제 Chunk 사용 시에 ChunkOrientedTasklet 를 사용하게 되는데 이곳의 ChunkProviderRepeatTemplate 을 가지고 ItemReader 에서 데이터를 읽어오는 것을 반복한다.

반복을 종료할 것인지 여부를 결정하는 세 가지 항목

  • RepeatStatus
    • 스프링 배치의 처리가 끝났는지 판별하기 위한 열거형
      • CONTINUABLE: 작업이 남아 있음
      • FINISHED: 더 이상의 반복 없음
  • CompletionPolicy
    • RepeatTemplate의 iterate 메서드 안에서 반복을 중단할지 결정
    • 실행 횟수 또는 완료시기, 오류 발생 시 수행 할 작업에 대한 반복여부 결정
    • 정상 종료를 알리는데 사용된다
  • ExceptionHandler
    • RepeatCallback 안에서 예외가 발생하면 RepeatTemplate가 ExceptionHandler를 참조해서 예외를 다시 던질지 여부 결정
    • 예외를 받아서 다시 던지게 되면 반복 종료
    • 비정상 종료를 알리는데 사용된다

RepeatOperation <--- RepeatTemplate->RepeatCallback -> RepeatStatus & RepeatContext

RepeatOperation 인터페이스는 RepeatStatus iterate(RepeatCallback callback) 을 가지고 있다.
이를 RepeatTemplate 이 구현하고, RepeatTemplate 에서 RepeatCallback
RepeatStatus doInIteration(Repeatcontext context) 를 호출하게 된다.

image
image

메서드를 타고타고 밑의 메서드에서 doinIteration 을 호출

image

결국, Step -> RepeatTemplate -> ( RepeatCallback --(doInIteration())--> tasklet ) 반복
tasklet 에서 예외가 발생했다면 ExceptionHandler 를 통해 예외 정책에 따라 반복여부를 결정할 수 있다.
예외가 발생하지 않았다면 completionPolicy 를 참조해서 종료 정책에 따라 반복 여부를 결정할 수 있게된다.
만약 종료하지 않는다면 RepeatStatus 를 확인하게된다. 만약 FINISHED 면 반복문이 종료된다.
여기서 FINISHED 가 아니면 반복문이 유지하게 된다.


CompletionPolicy 에는 2개의 시그니처가 있다.

  • boolean isComplete(RepeatContext context, RepeatStatus result)
    • 콜백의 최종 상태 결과를 참조하여 배치가 완료되었는지 확인
  • boolean isComplete(RepeatContext context)
    • 콜백이 완료 될 때 까지 기다릴 필요없이 완료되었는지 확인

image

스프링 배치에서 기본적으로 지원되는 CompletionPolicy는 위와 같은데 간단히 살펴보자면,

  • TimeoutTerminationPolicy: 반복시점부터 현재시점까지 소요된 시간이 설정된 시간보다 크면 반복종료
  • CountingCompletionPolicy: 일정한 카운트를 계산 및 집계해서 카운트 제한 조건이 만족하면 반복종료
  • SimpleCompletionPolicy: default 값, 현재 반복 횟수가 Chunk 개수보다 크면 반복종료

ExceptionHandler

  • void handleException(RepeatContext context, Throwable throwable) throws Throwable
    • 예외를 다시 던지기위한 전략을 허용하는 핸들러

image

스프링 배치에서 기본적으로 지원되는 ExceptionHandler는 위와 같은데 간단히 살펴보자면,

  • LogOrrethrowExceptionHandler: 예외를 로그로 기록할지 아니면 다시 던질 지 결정
  • RethrownOnThresholdExceptionHandler: 지정된 유형의 예외가 임계 값에 도달하면 다시 발생
  • SimpleLimitExceptionHandler: default, 예외 타입 중 하나가 발견되면 카운트가 증가하고 한계가 초과되었는지 여부를 확인하고 Throwable를 다시 던짐

이제 코드 흐름을 따라가 보겠다.
앞서 언급한 Job -> Step -> ( RepeatTemplate -> Tasklet -> ( RepeatTemplate -> Chunk) 반복 )반복을 기억하자.
그리고 또한 TaskletStep -> ChunkOrientedTasklet -> ChunkProvider -> ItemReader -> ChunkOrientedTasklet -> ChunkProcessor -> ItemProcessor -> ChunkProcessor -> ItemWriter 의 흐름 또한 기억해두자.

위 2개의 흐름을 합친 최종 정리는 마지막에 정리하겠다.

먼저 job과 step은 다음과 같다.

image

이제 배치를 돌려보겠다.

image

TaskletStepdoExecute 에 들어왔다. 흐름상 Step 에서는 RepeatTemplate 을 반복시킨다.
위 코드의 stepOperation 이 바로 RepeatTemplate 이고 callback을 정의해둔 것이다.

image

RepeatTemplateRepeatOperation 의 구현체인 것을 기억하고 있을 것이라고 생각한다.

image

TaskletStep 에서 호출한 RepeatTemplateiterate 메서드에 들어왔다.
result = executeInternal(callback); 에 들어가게되고,

image

executeInternal 메서드에서 반복문을 돌게된다. 이후 getNextResult 를 호출한다.

image

이곳에서 callback 의 doInIteration 을 호출하게되고 이는 곧 TaskletStep 에서 정의해준 StepContextRepeatCallbackdoInIteration 메서드를 호출하게 된다(2번째 코드 사진에 나와있는 콜백 클래스). 그리고 doInIteration 에서 오버라이딩 해준 메서드인 doInChunkContext 를 호출하고 이곳에서 TransactionTemplate 호출 그리고 ChunkOrientedTaskletexecute 를 호출하게 된다. (doInIteration, doInChunkContext, TransactionTemplate 의 흐름사진은 생략)

image

이제 ChunkProviderprovide 를 호출한다.

image

이곳에서 부터 RepeatTemplate 의 반복이 또 다시 이루어지게 된다.

image

동일한 흐름을 쭉 타면서 ChunkProviderprovide 메서드에 오버라이딩한 메서드 doInIteration 을 호출하게되고,

image

이곳에서 ItemReader 를 통해 읽어들이고 그 값이 null 이면 RepeatStatus.FINISHED 를 그렇지 않다면 RepeatStatus.CONTINUABLE 를 리턴하게되고,

image

위에서 한 번 봤던 RepeatTemplateexecutionInternal 의 해당 라인에서 result로 받고 그 밑에 있는 isComplete 메서드를 통해 CompletionPolicy 의 완료 조건에 해당되거나 RepeatStatus.CONTINUABLE 이 아닌 경우 while 문 지속의 판단 기준인 running 을 false로 바꿔주게된다. 그러면

image

RepeatStatus.FINISHED 를 담은 result 를 return 하게 되고,

image

ChunkProvider 의(크게는 ChunkOrientedTasklet 의) RepeatTemplate 반복이 끝이난게 된다.

image

ChunkOrientedTasklet 의 남은 흐름을 쭉 타고 최종적으로 RepeatStatus.FINISHED 를 리턴,

image

역순으로 result를 return 하게 된다. 위의 코드는 TakletStep 이다(도입부와 마찬가지로 doInIteration, doInChunkContext, TransactionTemplate 의 흐름을 타게된다. 생략함)

image

return 받은 RepeatStatus.FINISHED 값을 가지고 다시 RepeatTemplate 의 남은 흐름을 쭉 타고나서 이것을 호출했던 TaskletStep 으로 돌아가게 되고 마무리된다.

위에서 살펴본 흐름을 간략히 정리하자면,

TaskletStep -> {RepeatTemplate -> ChunkOrientedTasklet ->
ChunkProvider -> (RepeatTemplate -> ItemReader) 반복 -> ChunkOrientedTasklet -> ChunkProcessor -> ItemProcessor -> ChunkProcessor -> ItemWriter} 반복

이라고 볼 수 있겠다. 그러나, 아래와 같이 processor 에 커스텀한 RepeatTemplate 을 작성했을 경우에 ItemProcessor 이후에 다시 RepeatTemplate 이 반복되는 흐름을 가지게 될 것이다. 이는 reader, writer 도 마찬가지다. 그리고 밑의 코드는 RepeatStatus.FINISHED 를 리턴하는 경우가 없으므로 CompletionPolicy 의 완료조건에 의해서만 RepeatTemplate 반복이 끝나게 될 것이다.

.processor(new ItemProcessor<String, String>() {
     RepeatTemplate repeatTemplate = new RepeatTemplate();

     @Override
     public String process(String item) throws Exception {
         repeatTemplate.setCompletionPolicy(new SimpleCompletionPolicy(3));
         repeatTemplate.iterate(new RepeatCallback() {
             @Override
             public RepeatStatus doInIteration(RepeatContext context) throws Exception {
                 System.out.println("Testing repeatTemplate");
                 return RepeatStatus.CONTINUABLE;
             }
         });
         return item;
     }
 })

그리고 repeatTemplate.setExceptionHandler(); 구문을 통해 ExceptionHandler 를 등록해줄 수도 있다.


FaultTolerant

  • 스프링 배치는 Job 실행 중에 요류가 발생할 경우 장애를 처리하기 위한 기능을 제공하며 이를 통해 복원력을 향상시킬 수 있다.
  • 오류가 발생해도 Step 이 즉시 종료되지 않고 Retry 혹은 Skip 기능을 활성화 함으로써 내결함성 서비스가 가능하도록 한다.
  • 프로그램의 내결함성을 위해 SkipRetry 기능을 제공한다.
    • Skip
      • ItemReader / ItemProcessor / ItemWriter 에 적용할 수 있다.
    • Retry
      • ItemProcessor / ItemWriter 에 적용할 수 있다.
  • FaultTolerant 구조는 청크 기반의 프로세스 기반위에 SkipRetry 기능이 추가되어 재정의 되어 있다.
public Step batchStep() {
  return stepBuilderFactory.get("batchStep")
     .<I,O>chunk(10)
     .reader(ItemReader)
     .writer(ItemWriter)
     .faultTolerant()                          //내결함성 기능 활성화
     .skip(Class<? extends Throwable> type)    //에외 발생 시 Skip 할 예외 타입 설정
     .skipLimit(int skipLimit)                 //Skip 제한 횟수 설정
     .skipPolicy(SkipPolicy skipPolicy)]       //Skip을 어떤 조건과 기준으로 적용 할 것인지 정책 설정
     .noSkip(Class<? extends Throwable> type)  //예외 발생 시 Skip 하지 않을 예외 타입 설정
     .retry(Class<? extends Throwable> type)   //예외 발생 시 Retry 할 예외 타입 설정
     .retryLimit(int retryLimit)               //Retry 제한 횟수 설정
     .retryPolicy(RetryPolicy retryPolicy)     //Retry를 어떤 조건과 기준으로 적용 할 것인지 정책 설정
     .backOffPolicy(BackOffPolicy backOffPolicy) //다시 Retry 하기 까지의 지연시간(ms)을 설정
     .noRetry(Class<? extends Throwable> type) //예외 발생 시 Retry 하지 않을 예외 타입 설정
     .noRollback(Class<? extends Throwable> type) //예외 발생 시 Rollback 하지 않을 예외 타입 설정
     .build();
}

FaultTolerant 를 사용하게 될 경우 구조 살짝 바뀌게 된다.

  • SimpleStepBuilder 를 상속받는 FaultTolerantStepBuilder
  • SimpleChunkProvider 를 상속받는 FaultTolerantChunkProvider
  • SimpleChunkProvider 를 상속받는 FaultTolerantChunkProcessor

이렇게 FaultTolerant 사용을 위해 기존의 클래스를 상속받는 클래스를 사용하게 된다.

그리고 read, process, write을 하는 과정에서 예외가 발생하게 되면 skip count 만큼 예외를 건너뛰게 된다.
process와 write는 retry도 생각해야 한다. FaultTolerantChunkProcessorRetryTemplateexecute() 메서드를 호출하는 방식을 사용하게 된다.(ChunkProviderRepeatTemplate 을 호출해서 진행하는 것 처럼)
이때 에외가 발생하게 되면 retry count 만큼 재시도를 하게되고 다시 예외가 발생하게 되면 skip 흐름으로 들어가 skip count 만큼 예외가 발생하게 된다.

이제 간단히 흐름을 살펴보겠다.

@Bean
public Step step1() {
    return stepBuilderFactory.get("step1")
            .<String, String>chunk(5)
            .reader(new ItemReader<String>() {
                int i = 0;

                @Override
                public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
                    i++;
                    if (i == 1) {
                        throw new IllegalArgumentException("this exception should be skipped.");
                    }
                    return i > 3 ? null : "item" + i;
                }
            })
            .processor(new ItemProcessor<String, String>() {
                @Override
                public String process(String item) throws Exception {
                    throw new IllegalStateException("this exception should be retried.");
                }
            })
            .writer(System.out::println)
            .faultTolerant()                            //faultTolerant 사용
            .skip(IllegalArgumentException.class)
            .skipLimit(2)
            .retry(IllegalStateException.class)
            .retryLimit(2)
            .build();
}

image

faultTolerant() 호출을 하면 위와 같이 FaultTolerantStepBuilder 를 사용하게 된다.

image

build() 를 호출하게되면 FaultTolerantStepBuilder -> SimpleStepBuilder -> AbstractTaskletStepBuilder 의 흐름을 타며 build() 를 호출하게 된다.

image

createTasklet() 호출하게 되면 FaultTolerantStepBuilder 에서 createChunkProvider()createChunkProcessor() 를 호출하게되고,

image

해당 메서드에서 FaultTolerantChunkProviderFaultTolerantChunkProcessor를 생성해주는 것을 확인할 수가 있다.

reader에 i == 1 일 경우 exception 을 던지게끔 해놨었는데,

image

image

FaultTolerantChunkProviderread() 메서드를 보면 catch 구문에 shouldSkip 메서드를 이용해 skip 여부를 확인하는 구문이 있는 것을 확인할 수가 있다.

image

image

그리고 FaultTolerantChunkProcessortransform 메서드 내부에서 RetryTemplateexecute 를 호출하는 것을 확인할 수가 있다. 해당 구문 위쪽에 RetryCallback 가 정의되어 있다.

일단 크게 대략 이 정도의 흐름이고 흐름은 뒷 쪽에서 차차 살펴보도록 하겠다.


Skip

  • Skip은 데이터를 처리하는 동안 설정된 Exception이 발생했을 경우, 해당 데이터 처리를 건너뛰는 기능
  • ItemReader 과정에서 예외가 발생하면 해당 아이템만 스킵하고 진행
  • ItemProcessor 과정에서 예외가 발생하면 다시 Chunk의 처음으로 돌아가서 read 하게되고(이 때 read 할 item 들은 캐싱이 되어있음) 이전 process 과정에서 예외가 발생한 아이템은 체크가 되어있기 때문에 해당 아이템을 제외한 나머지 아이템들을 가지고 처리하게 됨
  • ItemWriter 과정에서 예외 발생 시 ItemProcessor와 마찬가지로 처음부터 돌아가서 동작을 하고 예외가 발생한 아이템은 건너뛰게 된다. 이 과정에서 원래라면 List 이렇게 리스트 형식으로 writer에 들어올 것을 processor에서 요소를 하나씩 처리하고 writer에 보내게 된다. processor에서 1, 3, 5 값이 있다면 한 번에 3개를 writer에 보내는 것이 아니라 processor에서 1처리 -> writer 1처리, processor에서 3처리 -> writer에서 3처리 ... 이런식으로 동작하게된다.
  • Skip은 내부적으로 SkipPolicy 를 통해서 구현되어 있다.
  • 스킵 대상에 포함된 예외인지 여부, 스킵 카운터를 초과 했는지 여부에 따라 Skip 가능 여부를 판별한다.

Step -> RepeatTemplate -> Chunk -> Exception -> SkipPolicy -> Classifier -> skip?
위의 흐름을 따라간다. StepBuilderFactory 를 이용해 .skip(class), noSkip(class), skipLimit(count) 를 설정할 수 있고 이 경우에는 LimitCheckingItemSkipPolicy 가 생성되어 작동하게 된다.

내부적으로 SubclassClassifier<Throwable, Boolean> 을 가지고 있으며 특정 클래스에 대한 스킵 여부 boolean 값을 가지고 있으며 skip? 물음에 대하여 해당 boolean 값을 보거나 skipLimit을 체크한다.(true면 skip 한다)

@Bean
public Step step1() {
    return stepBuilderFactory.get("step1")
                             .<String, String>chunk(5)
                             .reader(new ItemReader<String>() {
                                 int i = 0;

                                 @Override
                                 public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
                                     i++;
                                     if (i == 3) {
                                         System.out.println("error occurred item: " + i);
                                         throw new SkippableException("skip"); //커스텀 예외
                                     }
                                     System.out.println("ItemReader: " + i);
                                     return i > 20 ? null : String.valueOf(i);
                                 }
                             })
                             .processor(itemProcess())
                             .writer(itemWriter())
                             .faultTolerant()
                             .skip(SkippableException.class)
                             .skipLimit(2)
                             .build();
}

private ItemWriter<? super String> itemWriter() {
    return new SkipItemWriter();
}

private ItemProcessor<? super String, String> itemProcess() {
    return new SkipItemProcessor();
}

---
public class SkipItemProcessor implements ItemProcessor<String, String> {

    @Override
    public String process(String item) throws Exception {
        if (item.equals("6") || item.equals("7")) {
            System.out.println("error occurred item: " + item);
            throw new SkippableException("Process failed cnt :");
        }

        System.out.println("ItemProcess : " + item);
        return String.valueOf(Integer.parseInt(item) * -1);
    }
}

---

public class SkipItemWriter implements ItemWriter<String> {

    @Override
    public void write(List<? extends String> items) throws Exception {

        for (String item : items) {
            if (item.equals("-12")) {
                System.out.println("error occurred item: " + item);
                throw new SkippableException("Write failed cnt : ");
            }
            System.out.println("ItemWriter: " + item);
        }
    }
}

위의 코드를 돌렸을 때 결과는 다음과 같이 나온다.

ItemReader: 1
ItemReader: 2
error occurred item: 3                    // 3에서 첫 에러 발생
ItemReader: 4
ItemReader: 5
ItemReader: 6
ItemProcess : 1
ItemProcess : 2
ItemProcess : 4
ItemProcess : 5
error occurred item: 6                        // 6에서 두번째 에러 발생
ItemProcess : 1                                        // 해당 chunk 다시 시작, 에러난 item인 6은 pass
ItemProcess : 2
ItemProcess : 4
ItemProcess : 5
ItemWriter: -1
ItemWriter: -2
ItemWriter: -4
ItemWriter: -5
ItemReader: 7
ItemReader: 8
ItemReader: 9
ItemReader: 10
ItemReader: 11
error occurred item: 7                    // Processor에서 7 만나자마자 에러 발생, 3번째 에러이고 limit 넘었으므로 종료

이제 limit을 3으로 설정하고 돌리면 다음과 같이 나오게 된다.

ItemReader: 1
ItemReader: 2
error occurred item: 3
ItemReader: 4
ItemReader: 5
ItemReader: 6
ItemProcess : 1
ItemProcess : 2
ItemProcess : 4
ItemProcess : 5
error occurred item: 6
ItemProcess : 1
ItemProcess : 2
ItemProcess : 4
ItemProcess : 5
ItemWriter: -1
ItemWriter: -2
ItemWriter: -4
ItemWriter: -5
ItemReader: 7
ItemReader: 8
ItemReader: 9
ItemReader: 10
ItemReader: 11
error occurred item: 7
ItemProcess : 8
ItemProcess : 9
ItemProcess : 10
ItemProcess : 11
ItemWriter: -8
ItemWriter: -9
ItemWriter: -10
ItemWriter: -11
ItemReader: 12
ItemReader: 13
ItemReader: 14
ItemReader: 15
ItemReader: 16
ItemProcess : 12
ItemProcess : 13
ItemProcess : 14
ItemProcess : 15
ItemProcess : 16
error occurred item: -12
ItemProcess : 12
error occurred item: -12

내부 코드 흐름을 간단히 살펴보자면,

image

.faultTolerant() 를 입력하면 위의 FaultTolerant 파트에서 언급했듯이 SimpleStepBuilder 대신 FaultTolerantStepBuilder 를 사용하게 된다.

image

.skip() 을 통해 스킵될 예외들을 추가해주게 되고,

image image

FaultTolerantStepBuilder에서 FaultTolerantChunkProvider 를 만드는 과정에서 SkipPolicy를 생성하게되고 내가 설정한 skipLimit과 skip할 exception에 대한 정보가 들어가있는 map이 생성자 파라미터로 들어가게 된다. 위의 코드에서는 따로 SkipPolicy를 설정해주지 않았으므로 default 값인 LimitCheckingItemSkipPolicy 를 이용한 것이다.

image

그리고 이 SkipPolicy 에는 shouldSkip 메서드가 존재하는데(위의 코드는 LimitCheckingItemSkipPolicyshouldSkip 메서드이다)

image

FaultTolerantChunkProviderread 메서드에서 shouldSkip 을 호출하는 것을 확인할 수가 있다.
위의 흐름은 ItemWriter의 경우의 흐름이고 ItemProcessorItemWriter는 조금 다르다. 이는 Retry 파트에서 살펴보겠다.

.skipPolicy(limitCheckingItemSkipPolicy())

---

@Bean
public SkipPolicy limitCheckingItemSkipPolicy() {
    Map<Class<? extends Throwable>, Boolean> exceptionClass = new HashMap<>();
    exceptionClass.put(SkippableException.class, true);

    return new LimitCheckingItemSkipPolicy(3, exceptionClass);
}

위와 같이 SkipPolicy를 따로 정해서 .skip(), .skipLimit() 대신 넣어줄 수도 있다. 물론 SkipPolicy 인터페이스를 커스텀하게 구현해서 넣어도 된다.

image

스프링 배치가 기본적으로 제공하는 SkipPolicy는 위와 같다.


Retry

  • Retry는 ItemProcess, ItemWriter에서 설정된 Exception이 발생했을 경우, 지정한 정책에 따라 데이터 처리를 재시도하는 기능
  • Skip과 마찬가지로 Retry를 함으로써, 배치수행의 빈번한 실패를 줄일 수 있게 함

RepeatOperationsRepeatTemplate 이 있는 것 처럼 Retry 또한 RetryOperationsRetryTemplate이 있고 내부적으로 RetryCallbackRecoveryCallback 을 수행하게된다.

대략적인 흐름은 Step -> RepeatTemplate -> RetryTemplate -> RetryCallback/RecoveryCallback -> Chunk
-> Exception -> RetryPolicy -> BackOffPolicy/Step 와 같다.

RetryTemplate 에서 retry 할 예외인지, retryCount는 limit을 넘었는지 여부에 따라 RetryCallbackRecoveryCallback 으로 분기를 타게되고, RetryPolicy 에서도 마찬가지로 판단 후에 Step 반복문 처음부터 다시 시작할 것인지 아니면 Step을 종료할 것인지 판단하게 된다.

사용방법은 Skip과 비슷하다. FaultTolerant 설명할 때 있었던 예시코드인데 다시보고 파악해보자.

public Step batchStep() {
  return stepBuilderFactory.get("batchStep")
     .<I,O>chunk(10)
     .reader(ItemReader)
     .writer(ItemWriter)
     .faultTolerant()                          //내결함성 기능 활성화
     .skip(Class<? extends Throwable> type)    //에외 발생 시 Skip 할 예외 타입 설정
     .skipLimit(int skipLimit)                 //Skip 제한 횟수 설정
     .skipPolicy(SkipPolicy skipPolicy)]       //Skip을 어떤 조건과 기준으로 적용 할 것인지 정책 설정
     .noSkip(Class<? extends Throwable> type)  //예외 발생 시 Skip 하지 않을 예외 타입 설정
     .retry(Class<? extends Throwable> type)   //예외 발생 시 Retry 할 예외 타입 설정
     .retryLimit(int retryLimit)               //Retry 제한 횟수 설정
     .retryPolicy(RetryPolicy retryPolicy)     //Retry를 어떤 조건과 기준으로 적용 할 것인지 정책 설정
     .backOffPolicy(BackOffPolicy backOffPolicy) //다시 Retry 하기 까지의 지연시간(ms)을 설정
     .noRetry(Class<? extends Throwable> type) //예외 발생 시 Retry 하지 않을 예외 타입 설정
     .noRollback(Class<? extends Throwable> type) //예외 발생 시 Rollback 하지 않을 예외 타입 설정
     .build();
}

스프링 배치에서 기본적으로 제공하는 RetryPolicy는 다음과 같다.

image

default는 SimpleRetryPolicy로 내가 설정한 retry 할 예외들, retryLimit 등이 파라미터로 들어가게 된다.

이제 코드를 살펴보면서 흐름을 파악해보겠다. 일단 기본 코드는 다음과 같이 생성해놓았다.

@Bean
public Step step1() {
    return stepBuilderFactory.get("step1")
                             .<String, String>chunk(5)
                             .reader(reader())
                             .processor(processor())
                             .writer(items -> items.forEach(System.out::println))
                             .faultTolerant()
                             .retry(RetryableException.class)
                             .retryLimit(2)
                             .build();
}

@Bean
public ListItemReader<String> reader() {
    List<String> items = new ArrayList<>();
    for (int i = 0; i < 30; i++) {
        items.add(String.valueOf(i));
    }
    return new ListItemReader<>(items);
}

@Bean
public ItemProcessor<? super String, String> processor() {
    return new RetryItemProcessor();
}

---

public class RetryItemProcessor implements ItemProcessor<String, String> {

    private int cnt = 0;

    @Override
    public String process(String item) throws Exception {
        cnt++;
        throw new RetryableException();
    }
}

---

public class RetryableException extends Exception {

    public RetryableException() {
        super();
    }

    public RetryableException(String message) {
        super(message);
    }
}

이제 실행시켜보면서 살펴보겠다.

image image

ChunkOrientedTasklet에서 호출한 FaultTolerantChunkProcessor 에서 아이템 수만큼 for문을 돌면서 RetryCallbackRecoveryCallback이 생성되고 RetryTemplate 에서 이를 가지고 수행하게 된다. 즉 아이템마다 RetryCallbackRecoveryCallback 을 가지게 된다는 것이다. 그리고 이 아이템마다 retry를 따로 측정하기 때문에 이 내용은 꼭 기억하고 있자. 하이라이트 된 코드의 DefaultRetryState은 아이템의 retry정보가 담긴 RetryContext를 찾을 때 필요한 정보를 담고 있는 객체고 현재 아이템 값으로 key를 생성해 넣어주고 이 객체는 밑의 doExecute() 메서드에서 사용하게 된다.

image image

doExecute() 메서드 내에서 retry 할 수 있는지 체크를 하고 while문을 돌게되는데 이때 위에서 doExecute() 메서드 3번째 코드라인에서 생성된 context를 이용하게 된다. 이 context는 몇 번의 시도를 했는지, 최대 limit은 몇인지 등에 대한 상태정보를 가지고 있다. 해당 정보를 이용해 retry 할 수 있는지 체크하게되는 것이다.

그리고 retryCallback.doWithRetry(context) 를 호출하고 이는 아까 정의되어있던 RetryCallback 메서드를 호출하게되고 내부적으로 doProcess를 호출하고 결국 ItemProcessor 로직을 타게된다.

image

예외가 던져지면,

image

RetryCallback에서 exception catch를 하게되고, 예외가 발생하게되면 이번에는 RecoveryCallback 의 흐름을 타게된다.

image

RecoveryCallback 에서는 Skip에 대한 여부가 나오게 된다. Skip 까지 설정해두었다면 해당 아이템을 item iterator에서 remove하고 다시 돌게될 것이다.
이 내용은 밑에서 한 번 더 다루겠다.

image

RetryTemplate에서도 마찬가지로 로 catch 흐름을 타게된다. retry 할 수 있는 여부로 판단하여 BackOffPolicy 흐름을 타야하는지 판단한다.

image

이후 다시 Step의 처음부터 시작하게된다. (위의 코드는 ChunkOrientedTaskletexecute())

Skip의 경우에는 해당 아이템을 체크해두고 다시 chunk로 돌아가서 해당 아이템을 제외하고 재시도한다면,
RetrySkip과 같은 데이터의 조작없이 다시 reader부터 처음부터 처리하게된다.

@Override
public String process(String item) throws Exception {
    System.out.println("item process: " + item);
    if (item.equals("2") || item.equals("3")) {
        cnt++;
        System.out.println("processor occurred! failed cnt: " + cnt);
        throw new RetryableException();
    }
    return item;
}

RetryItemProcessor 의 내용을 다음과 같이 수정하고 돌리면 다음과 같은 결과가 출력이 된다.

item process: 0
item process: 1
item process: 2
processor occurred! failed cnt: 1
item process: 0
item process: 1
item process: 2
processor occurred! failed cnt: 2
item process: 0
item process: 1
종료...

Skip은 예외가 발생한 아이템을 체크하고 건너뛰었지만 Retry는 청크의 처음부터 다시 돌아가기 때문에 위와같은 상황이 발생하게된다. Skip 까지 추가하고 살펴보겠다.

 .retry(RetryableException.class)
 .retryLimit(2)

Skip을 붙이고 돌리면 정상적으로 마무리가 된다. 출력결과는 다음과 같다.

item process: 0
item process: 1
item process: 2
processor occurred! failed cnt: 1
item process: 0
item process: 1
item process: 2
processor occurred! failed cnt: 2
item process: 0
item process: 1
item process: 3
processor occurred! failed cnt: 3
item process: 0
item process: 1
item process: 3
processor occurred! failed cnt: 4
item process: 0
item process: 1
item process: 4
0
1
4
item process: 5
item process: 6
item process: 7
item process: 8
item process: 9
5
6
7
8
9

예외 2번이 터지고 이후 RecoveryCallback 콜백 메서드를 돌면서 예외가 터진 아이템을 skip 하게 된다. 이때 ItemProcessor 에서는 Skip 시에 해당 아이템을 Item Iterator 에서 remove 하고 진행하게 된다.

이후 다시 돌면서 아이템 '3' 을 마주하게되고 RetryCallbackRecoveryCallback은 앞서 말한 것 처럼 아이템마다 할당되기 때문에 다시 retry 2번을 거치고 skip 흐름에서 item remove를 하고 이후 진행하게된다.

image

마지막으로 ItemReader, ItemWriter, ItemProcessor의 전체 흐름 이미지로 마무리하도록 하겠다.

ItemReader

image

ItemProcessor

image

ItemWriter

image


REFERENCE

정수원님 스프링 배치 강의