CompletableFuture

2019.05.16 09:29JAVA

1. CompletableFuture
 

  • 하나씩 순차적으로 스테이지를 실행하고자 할때 사용하면 좋다.

  • 병렬처리작업중 순차적으로 작업을 진행해야 하는경우 Callback 의 늪에 빠질수 있다, 이때도 콜백업이 사용가능.

  • CompletableFuture를 사용하면. 1.7에서 자공하는  Feture 의 기능으로 async 작업이 가능 하지만,  isDone() 혹은 get()을 이용 하여 결과값을 다음 스테이지 에서 사용할때 Blocking으로 변경되는 단점을 보안한다.            

A Future that may be explicitly completed (setting its value and status), and may be used as a CompletionStage,supporting dependent functions and actions that trigger upon its completion.
When two or more threads attempt to complete, completeExceptionally, or cancel a CompletableFuture, only one of them succeeds.

In addition to these and related methods for directly manipulating status and results, CompletableFuture implements interface CompletionStage with the following policies:

Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any otheraller of a completion method.
All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool() (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task). To simplify monitoring, debugging, and tracking, all generated asynchronous tasks are instances of the marker interface CompletableFuture.AsynchronousCompletionTask.
  All CompletionStage methods are implemented independently of other public methods, so the behavior of one method is not impacted by overrides of others in subclasses.
CompletableFuture also implements Future with the following policies:
  Since (unlike FutureTask) this class has no direct control over the computation that causes it to be completed, cancellation is treated as just another form of exceptional completion. Method cancel has the same effect as completeExceptionally(new CancellationException()). Method isCompletedExceptionally can be used to determine if a CompletableFuture completed in any exceptional fashion.
  In case of exceptional completion with a CompletionException, methods get() and get(long, TimeUnit) throw an ExecutionException with the same cause as held in the corresponding CompletionException. To simplify usage inmost contexts, this class also defines methods join() and getNow that instead throw the CompletionException directly in thesecases.


명시 적으로 완료 (값과 상태 설정) 될 수있는 Future이며, CompletionStage로 사용되어 종속 기능 및 완료시 실행되는 작업을 지원합니다.
2 개 이상의 thread가 완료하려고하면, CompletableFuture를 완료하는지, 완료하면 그 중 1 개 (살)만이 성공합니다.
CompletableFuture는 상태 및 결과를 직접 조작하기위한 이러한 메소드 및 관련 메소드 외에도 CompletionStage 인터페이스를 다음 정책과 함께 구현합니다.
비동기 메소드의 종속 완료를 위해 제공된 조치는 현재 CompletableFuture를 완료하는 스레드 또는 완료 메소드의 다른 호출자가 수행 할 수 있습니다.
명시 적 Executor 인수가없는 모든 비동기 메소드는 ForkJoinPool.commonPool ()을 사용하여 수행됩니다 (병렬 처리 수준이 최소 두 개는 지원하지 않는 경우가 아니면 각 스레드를 실행하기 위해 새 스레드가 작성됩니다). 모니터링, 디버깅 및 추적을 단순화하기 위해 생성 된 모든 비동기 작업은 마커 인터페이스 CompletableFuture.AsynchronousCompletionTask의 인스턴스입니다.
모든 CompletionStage 메소드는 다른 public 메소드와 독립적으로 구현되므로 한 메소드의 동작은 하위 클래스의 다른 메소드보다 우선 적용되지 않습니다.
CompletableFuture는 또한 다음 정책을 사용하여 Future를 구현합니다.
FutureTask와는 달리이 클래스는 완료를 유발하는 계산을 직접 제어 할 수 없기 때문에 취소는 예외적 인 완료의 또 다른 형태로 취급됩니다. 메소드 취소는 completeExceptionly와 동일한 효과를 갖습니다 (new CancellationException ()). isCompleted 메서드는 CompletableFuture가 예외적 인 방식으로 완료되었는지 판단하는 데 사용할 수 있습니다.
CompletionException에 의해 예외적으로 완료했을 경우, get () 및 get (long, TimeUnit) 메소드는, 대응하는 CompletionException와 같은 원인으로 ExecutionException를 Throw합니다. 대부분의 컨텍스트에서 사용을 단순화하기 위해이 클래스는 join () 및 getNow 메소드를 정의하며이 경우 대신 CompletionException을 직접 throw합니다.

 

2. 샘플 코드

List<CompletableFuture<?>> completableFutures =  new ArrayList<>(); 

////////////////////////////////////////////////////////////////////// 
// ExecutorService 를 Single로 지정하여 하나씩 순차적으로 실행하도록 한다. 
//   - executorService 를 실행하지 않을경우 CPU*2 (ForkJoin) 실행 
////////////////////////////////////////////////////////////////////// 
ExecutorService executorService =  Executors.newSingleThreadExecutor(); 


for (String targetKey: ids) { 
    CompletableFuture<?> completableFuture =  CompletableFuture.runAsync(() -> { 

    log.info("////////////////////////////////////////////////////"); 
    log.info("// 실행  시작 "); 
    log.info("////////////////////////////////////////////////////"); 

    // 1. DB 상태 변경 
    this.reportStatusChange(targetKey, mediaCode, ReportSetStatus.RUNNING); 
    log.debug("1. DB 상태 변경 "); 

    },executorService).thenAccept(aVoid -> { 

    // 2. 데이터 수집및 저장 
    List crawlerTargets = this.crawlerTargetCreation(targetKey, mediaCode); 
    this.dataCrawling(crawlerTargets); 

    log.debug("2. 데이터 타겟 생성 및 수집 "); 

    }) 
        .thenRun(() -> { 
        //3. DB 상태 변경 
        this.reportStatusChange(targetKey, mediaCode, ReportSetStatus.COMPLETE); 
        log.debug("3. 완료후 DB 변경 "); 

        log.info("////////////////////////////////////////////////////"); 
        log.info("// 수동 실행  종료"); 
        log.info("////////////////////////////////////////////////////"); 

        }) 
        .exceptionally(throwable -> { 

        log.error("4. 실패에 대한 로그", throwable); 
        this.reportStatusChange(targetKey, mediaCode, ReportSetStatus.ERROR); 
        throw new APIRuntimeException("batch manual crawler error ",throwable); 

        }); 

    completableFutures.add(completableFuture); 

} 


/////////////////////////////////////////////////////////// 
// 모든 작업이 완료된 뒤에 실행할 행동 지정, CompletableFuture 
/////////////////////////////////////////////////////////// 

CompletableFuture 
    .allOf(completableFutures.toArray(new CompletableFuture[ids.length])) 
    .thenAccept(aVoid ->  log.info("모든 작업이 완료 되었습니다.")); 

executorService.shutdown(); 


3. 중요 Method 설명 

.thenRun : Runnable 을 파라미터로 하여, 정상실행후 CompletableFuture 리턴, 다음스테이지 실행에 반환값 없음
.supplyAsync : Supplier 을 파라미터로 하며, 정상실행후  CompletableFuture 리턴, 다음스테이로 반환값 전달
.thenAccept(Async) : Consumer 형태의 파이프라인 실행 , 인자가 Consumer 이기때문에 다음 스테이지로 결과값을 넘겨줄수 없다.
.thenApply(Async)  : Function 형태의 파이프라인 실행, 인자가 Function 이기때문에 다음 스테이지로 결과값을 넘겨줄수 있다.
.thenCompose : CompletableFuture 를 반환하는 Method Chain으로 실행하고자 할때. 
.exceptionally : 예외 사항 처리 

.allOf : 동시에 N개의 요청을 호출하고 나서 모든 스테이지가 완료되면 다음 스테이지를 실행한다.
.anyOf : 동시에 N개의 요청을 호출하고 나서 하나라도 호출이 완료되면 다음 스테이지를 실행한다.



'JAVA' 카테고리의 다른 글

java null safe stream 생성 방법  (0) 2019.09.05
병렬처리와 동시성  (0) 2019.05.27
CompletableFuture  (0) 2019.05.16
API call back off time(재요청 타시간)  (0) 2019.05.07
java throws bubble up 예제  (0) 2019.05.07
java 8 stream throw exception bubble up (예외사항 전파)  (0) 2019.05.07