[WebFlux] publishOn과 subscribeOn

회사에서 WebFlux로 개발하면서 publishOn과 subscribeOn의 사용법이 매번 헷갈려서 최대한 간단하게 정리를 해봤다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Flux.range(1,3)
.subscribeOn(Schedulers.newBoundedElastic(1,1,"sub2"))
.doOnNext(i -> log.info("main 1: {}, number: {}", Thread.currentThread().getName(), i))
.publishOn(Schedulers.newBoundedElastic(1,1,"pub1"))
.doOnNext(i -> log.info("main 2: {}, number: {}", Thread.currentThread().getName(), i))
.subscribeOn(Schedulers.newBoundedElastic(1,1,"sub1"))
.subscribe();

// log
main 1: sub2-3, number: 1
main 1: sub2-3, number: 2
main 1: sub2-3, number: 3
main 2: pub1-2, number: 1
main 2: pub1-2, number: 2
main 2: pub1-2, number: 3

subscribeOn

subscribeOn은 호출된 곳의 앞쪽 publisher 체인과 뒷쪽의 subscriber 체인을 한꺼번에 별도의 스레드로 분리한다.
느린 publisher 와 빠른 subscriber 의 조합에서 쓰기 적합하다고 하는데… 무슨 말인지 잘 모르겠다. 외부서비스에서 데이터를 읽어올때 쓰기 좋다고 한다.

위 예시처럼 subscribeOn을 동일한 체인에서 중첩해서 사용할 경우
첫번째 subscribeOn 앞뒤 체인을 묶어서 sub2 스케줄러에서 담당하고
두번째 subscribeOn 앞뒤 체인을 묶어서 sub1 스케줄러가 담당해야 하지만 이미 sub2에서 담당해서 그대로 sub2로 돌게 된다.

publishOn

publishOn은 호출된 곳의 앞쪽 publisher 체인은 그대로 두고 뒷쪽 subscriber 체인만 별도의 스레드로 분리한다.
빠른 publisher와 느린 subscriber 일때 쓰기 좋다고 하고 외부서비스로 데이터를 쓰는 작업할 때 좋닫고 한다.
위 예시에서 subscribeOn으로 sub2 스레드에서 돌다가 publishOn을 만나면서 pub1 스케줄러에 작업을 할당하고 나머지 연산을 진행한다.

🔗 출처
Reactor Flux의 publishOn subscribeOn을 이용한 스케쥴링

4-2. 리액터 프로젝트 심화학습

리액티브 스트림의 수명 주기

조립(assembling) 단계

  • 처리 흐름에서 사용하는 연산자를 조합한 빌더 API처럼 보이지만 일반적인 빌더 패턴과 달리 리액터 API는 불변성(Immutability)을 제공한다.(적용된 각각의 연산자가 새로운 객체를 생성한다.)
  • 스트림 구성을 조작하고 더나은 스트림 전달을 위한 다양한 기술을 적용할 수 있는 단계

구독 단계

  • 특정 Publisher를 구독할 때 발생
  • 조립 단계에서 일련의 Publisher 체인이 연결되었고 최상위 래퍼를 구독하면 해당 체인에 대한 구독 프로세스가 시작된다.
  • 조립단계와 동일한 최적화를 수행할 수 있다.
  • 리액터에서 멀티 스레딩을 지원하는 일부 연산자는 구독이 발생하는 작업자를 변경할 수 있다.

런타임 단계

  • 게시자와 구독자 간에 실제 신호가 교환되는 단계
  • 교환하는 처음 두 신호는 onSubscribe, request
    • onSubscribe 메서드는 최상위 소스에서 호출
    • 구독이 모든 구독자 체인을 통과하여 마지막 구독자가 구독 체인에 대한 정보를 수신하고 메시지 수신을 시작하려면 Subscription#request 메서드를 호출해 전송을 시작해야 한다.
  • 런타임 중에도 request를 줄이기 위한 최적화를 적용할 수 있다.

리액터에서 스레드 스케줄링 모델

멀티스레딩 실행을 위해 제공하는 연산자 사이의 차이점에 대해서 확인

다른 워커로 실행을 전환할 수 있는 네 가지 연산자

publishOn 연산자

  • 런타임 실행의 일부를 지정된 워커로 이동
  • Scheduler 인터페이스를 사용하여 현재 스트림에 대한 특정 워커를 선택할 수 있다.
  • 내부적으로 전용 워커가 메시지를 하나씩 처리할 수 있도록 새로운 원소를 제공하는 큐를 가지고 있다.
  • 리액티브 스트림의 모든 원소는 하나씩(동시에는 아니지만) 처리되므로 항상 모든 이벤트에 순서를 엄격하게 정의할 수 있다.(이 속성을 **직렬성(serializability)**라고 한다.)
  • 병렬 처리를 할 수 없다는 말처럼 들리지만 병렬 처리도 가능한데 예를 들어 처리 단계 사이에 비동기 영역을 추가해서 독립적으로 작업해 비동기 처리를 할 수 있다.

subscribeOn 연산자

  • 구독체인에서 워커의 작업 위치를 변경
  • 보통 호출 시점에서 상위 스트림에 해당하는 부분의 스레드를 설정

parallel 연산자

  • 하위 스트림에 대한 플로 분할과 분할된 플로 간 균형 조정 역할
1
2
3
4
5
6
Flux.range(0, 10000)
.parallel()
.runOn(Schedulers.parallel())
.map()
.filter()
.subscribe()
  • parallel연산자를 사용하면 ParallelFlux를 동작시킨다.
    • 다수의 Flux를 추상화하여 Flux간에 데이터의 크기 균형을 이룬다.

Scheduler

  • Scheduler.schedule : Runnable 작업을 예약가능
  • Scheduler.createWorker : 동일한 방법으로 Runnable 작업 예약이 가능한 Worker 인터페이스의 인스턴스를 제공
  • Scheduler인터페이스 / Workder인터페이스의 차이점 : 워커 풀 / Thread 또는 리소스를 추상화한 것
  • 리액터에서 제공하는 스케줄러 인터페이스의 3가지 주요 구현체
    • SingleScheduler : 모든 작업을 한 개의 전용 워커에 예약가능, 시간에 의존적
    • ParallelScheduler : 고정된 크기의 작업자 풀에서 작동(CPU 코어 수로 기본크기 제한)
    • ElasticScheduler : 동적으로 작업자를 만들고 스레드 풀을 캐시, 생성된 스레드 풀의 최대 개수는 제한되지 않음

리액터 컨텍스트

  • Context는 스트림을 따라 전달되는 인터페이스
  • 런타임 단계에서 필요한 컨텍스트 정보에 엑세스할 수 있도록 하는 것
  • 멀티스레드 환경의 비동기 처리방식에서 ThreadLocal가 가지는 한계를 해결할 수 있다.
    • 변수에 데이터를 넣은 후 publishOn 등을 통해 다른 워커에서 작업 플로를 수행하면 데이터를 쌓은 스레드와 작업 스레드가 달라서 데이터에 접근할 수 없다.

4. 리액터 프로젝트 - 리액티브 앱의 기초

리액티브 스트림 스펙은 리액티브 라이브러리가 서로 호환할 수 있게 해주며 여러 중요한 개선 사항이 많았지만 API 및 규칙만 정의하고 일상적인 사용을 위한 라이브러리는 제공하지 않았다.

리액티브 프레임워크중에서 가장 유명한 라이브러리 중 하나인 리액터 프로젝트(Project Reactor)는 1.x 버전에서 리액터 패턴, 함수형 프로그래밍 및 리액티브 프로그래밍과 같은 메시지 처리에 대한 모범 사례를 통합하여 비동기 논블로킹 처리를 지원하도록 설계하였다.

이후 여러 부족한 부분들을 보완하면서 2.x를 거쳐 현재는 3.x 버전으로 릴리즈되어있다.

리액터 프로젝트 필수 요소

  • 비동기 파이프라인을 구축할 때 콜백 지옥깊게 중첩된 코드를 생략
  • 코드 가독성을 높이고 리액터 라이브러리에 의해 정의된 워크플로에 **조합성(composability)**을 추가
  • 리액터 API는 연산자를 연결해서 사용하는 것을 권장하며 이를 통해 복잡하고 재사용 가능한 실행 그래프(execution graph)를 작성할 수 있다.
    • 그래프는 실행 흐름만 정의하며 구독자가 실제 구독을 했을 때만 데이터 플로가 기동된다.
  • 오류 발생 가능성이 있는 비동기 요청의 결과를 효율적으로 처리하여 유연하지만 복원력 있는 코드를 작성할 수 있다.

배압은 리액티브 스트림 스펙의 핵심 속성으로 리액터 역시 동일하다.

1
2
3
(데이터 플로)--▶️          --▶️         --▶️
게시자 연산자 연산자 구독자
◀️-- ◀️-- ◀️--(요청)

배압 전파의 일반적인 모드를 모두 지원

  • 푸시 전용 : subscription.request(Long.MAX_VALUE)
  • 풀 전용 : subscription.request(1)
  • 풀-푸시(혼합형) : 구독자가 수요를 실시간으로 제어할 수 있고 게시자가 데이터 소비 속도에 적응할 수 있는 경우
  • 풀-푸시 모델을 지원하지 않는 이전 API를 적용할 때는 예전 스타일의 배압 메커니즘을 제공한다.

Flux와 Mono

데이터를 기반으로 리액티브 스트림을 생성하는 팩토리 메서드를 제공

Mono는 Flux와 비슷하지만 하나의 요소를 대상으로 사용되는데 HTTP 요청이나 DB 쿼리와 같은 비동기 작업을 래핑하는데 매우 유용

Flux와 Mono는 구독 루틴을 단순화하는 subscribe() 메서드를 람다 기반으로 재정의한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
Flux.just("A","B","C")
.subscribe(
data -> log.info("onNext: {}", data),
err -> { /* ignored */ },
() -> log.info("onComplete")
);

/*
onNext: A
onNext: B
onNext: C
onComplete
*/

또한 subscription으로 구독을 직접 제어하거나 직접 Subscriber 인터페이스를 구현하여 스트림을 구독할 수 있다.

연산자를 이용해 리액티브 시퀀스 변환하기

연산자의 종류가 너무 많아서 적절한 연산자를 선택하는 가이드를 포함한 아래 링크를 참조

Which operator do I need?

  • 원소 매핑 : map(1:1) …
  • 필터링 : filter …
  • 시퀀스 수집? 합치기? : collectList() …
  • 원소 줄이기 : reduce, scan …
  • 스트림 조합 : concat, merge, zip …
  • 스트림 내의 원소 일괄 처리
    • buffer : List와 같은 컨테이너를 이용한 Buffering, Flux<List<T>>
    • window : Flux<Flux<T>>와 같은 형태로 스트림을 스트림으로 Windowing
    • groupBy : Flux<GroupedFlux<K, T>> 유형의 스트림으로 Grouping
  • flatmap : 논리적으로 map과 flatten의 2가지 작업으로 구성
    • map파트는 들어오는 각 원소를 리액티브 스트림(T -> Flux<R>)으로 변환
    • flatten파트는 생성된 모든 리액티브 시퀀스를 R 타입의 원소를 통과시키는 새로운 리액티브 시퀀스로 병합
  • 샘플링 : sample 연산자를 사용하여 특정 기간 내 최근에 관찰된 값을 주기적으로 출력할 수 있다.
  • 블로킹 구조로 전환
    • 리액티브 애플리케이션에서 블로킹 처리를 해선 안되지만, 상위 API에서 필요로 하는 경우도 있음
    • blockFirst, blockLast, toIterable, toStream …
    • Mono#toFuture 를 제외한 모든 메서드는 "non-blocking only"로 표시된 스케줄러에서 호출되면 UnsupportedOperatorException을 발생시킨다.
  • 시퀀스 엿보기
    • doOnNext(Consumer <T>), doOnComplete(), doOnError(Throwable)
    • 최종 시퀀스를 수정하지 않고 프로세스 파이프라인의 중간에 있는 각 원소나 특정 시그널을 처리해야 하는 경우

Hot 스트림과 cold 스트림

1. 콜드 퍼블리셔(cold publisher)

  • 구독자가 나타날 때마다 시퀀스 데이터가 생성되는 방식
  • 구독자 없이는 데이터 생성 X
  • 대표적으로 HTTP 요청이 이런식으로 동작한다.

2. 핫 퍼블리셔(hot publisher)

  • 데이터 생성 시 구독자의 존재 여부에 의존하지 않는 방식
  • 첫 구독자가 없더라도 원소를 만들어 낼 수 있다.
  • 이때 구독자가 나타나면 이전 생성값 말고 새로운 값만 보낼 수도 있다.
  • 리액터 라이브러리에 포함된 대부분은 Processor 인터페이스를 상속한다.

콜드 퍼블리셔를 리액티브 변환을 통해 핫 퍼블리셔로 전환할 수 있다.

3. 스트림의 새로운 표준 - 리액티브 스트림

API 불일치 문제

CompletableStage를 이용하는 자바 코어 라이브러리와 RxJava 같은 다양한 라이브러리가 있어서, 코드를 작성할 때 다양한 선택을 할 수 있지만 과도하게 많은 선택지는 시스템을 지나치게 복잡하게 만들 수 있다.

핵심적인 문제는 라이브러리 공급자가 일관된 API를 만들어낼 수 있는 표준화된 방법이 없다는 사실이다.

풀 방식과 푸시 방식

리액티브 초기 단계에서 모든 라이브러리의 데이터 흐름은 소스에서 구독자에게 푸시되는 방식이었다.

  • 풀 방식으로 요소를 하나씩 요청할 경우 비동기 논블로킹 방식을 사용하더라도 각 요소에 대한 요청을 처리 하면서 대기시간이 발생하여 전체 처리시간 중 많은 시간을 유휴 상태로 있게 된다.

  • 푸시 방식을 도입하면서 요청하는 횟수를 최소화하여 전체 처리 시간을 최적화할 수 있었다.

하지만 푸시 모델만 사용하는 것은 기술적 한계가 있는데

  • 메시지 기반 통신의 본질은 요청에 응답하는 것인데
  • 프로듀서가 컨슈머의 처리 능력을 무시하면 전반적인 시스템 안정성에 영향을 미칠 수 있기 때문이다.

흐름제어

  • 느린 프로듀서와 빠른 컨슈머

    순수한 푸시 모델은 동적으로 시스템의 처리량을 증가시키는 것이 불가능하다.

  • 빠른 프로듀서와 느린 컨슈머

    프로듀서는 컨슈머가 처리할 수 있는 것보다 더 많은 데이터를 전송할 수 있으며 이로 인해 부하를 받는 컴포넌트에 치명적인 오류가 발생할 수 있다.

이를 해결하기 위한 직관적인 방법은 큐에 수집하는 것인데 3가지 유형으로 구분할 수 있다.

  1. 무제한 큐: 메모리 한도에 도달하면 전체 시스템이 손상될 가능성이 있다.(복원력이 떨어짐)
  2. 크기가 제한된 드롭 큐: 메시지의 중요성이 낮을 때 사용되는 방법으로 큐가 가득 차면 메시지를 무시하는데 중요한건 데이터 세트가 변경된다는 점이다.
  3. 크기가 제한된 블로킹 큐: 가장 느린 컨슈머의 처리량에 의해 시스템의 전체 처리량이 제한된다. 시스템의 비동기 동작을 모두 무효화하여 절대 받아들일 수 없는 시나리오다.

이런 시스템 부하에 적절하게 대응하는 방법으로 배압 제어 메커니즘이 있다.

리액티브 스트림의 기본 스펙

리액티브 스트림 스펙에는 Publisher, Subscriber, Subscription, Processor의 네 가지 기본 인터페이스가 정의돼 있다.

  • Publisher : Observable과 비교하면 Publisher와 Subscriber를 연결하기 위한 표준화된 진입점을 의미

  • Subscriber : Observer와 비슷한데 onSubscribe라는 추가 메서드를 제공하는데 Subscriber에게 구독이 성공했음을 알리는 API 메서드

  • Subscription : 원소 생성을 제어하기 위해 기본적인 사항을 제공

    • cancel() : 스트림에서 구독을 취소하거나 발행을 완전히 취소 가능

    • request(long n) : 요청하는 Publisher가 보내줘야 하는 데이터 크기를 알려줄 수 있음 ▶️ Publisher에서 유입되는 원소의 개수가 처리할 수 있는 제한을 초과하지 않을 것을 확신할 수 있다.

      리액티브 스트림은 순수 푸시 모델과는 달리 배압을 적절하게 제어할 수 있는 하이브리드 푸시-풀 모델을 제공한다.

      • 순수 푸시 모델을 사용하고 싶으면 최대 개수 요청 request(Long.MAX_VALUE)
      • 순수 풀 모델을 사용하고 싶으면 onNext()가 호출될 때마다 요청
  • Processor : Publisher와 Subscriber의 혼합 형태로 Publisher와 Subscriber 사이에 몇가지 처리 단계를 추가하도록 설계됐다.

1
2
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

리액티브 스트림 기술 호환성 키트(TCK)

모든 동작을 검증하고 반응 라이브러리를 표준화하여 서로 호환하는지 확인하는 공통 도구로 모든 리액티브 스트림 코드를 방어하고 지정된 규칙에 따라 구현을 테스트 한다.

TCK github : https://github.com/reactive-streams/reactive-streams-jvm/tree/master/tck


리액티브 스트림을 활용한 비동기 및 병렬

  • 리액티브 스트림 API는 Publisher가 생성하고 Subscriber가 소비한 모든 신호는 처리 중에 논블로킹이어야 하며 방해받지 않아야 한다고 규칙에 명시되어 있다.

  • 모든 프로세서나 코어를 효율적으로 사용하려면 병렬처리가 필요하고 이는 일반적으로 onNext 메서드를 병렬로 호출하는 것을 뜻하지만 on*** 메서드의 호출은 스레드 안전성을 보장하는 방식으로 신호를 보내야 하며 다중 스레드에서 수행되는 경우 외부적인 동기화를 사용해야 한다. 즉, 스트림의 요소를 병렬 처리할 수 없다.

  • 자원을 효율적으로 활용하기 위해 스트림 처리 파이프의 각 단계에 메시지를 비동기적으로 전달하는 것이다.

    • 상황에 따라서 처리단계를 각각 별도의 스레드로 처리하고 각 스레드 사이에 큐와 같은 데이터 구조를 적용하여 메시지를 독립적으로 제공하고 사용하도록 할수 있다.

2. 스프링을 이용한 리액티브 프로그래밍 - 기본 개념

관찰자(Observer) 패턴

  • 이벤트를 발생시키는 역할(주체Subject), 이벤트를 수신하는 역할(객체, 즉 관찰자Observer)의 두가지 핵심 요소가 존재
  • ObserverSubject에 등록되고 Subject로부터 알림을 수신

Java의 내장 클래스인 Observable, Observer는 java9 부터 deprecated 되었다.

  • interface가 아니라 class로 구현되어 있어서 이미 다른 클래스를 상속받은 클래스가 Observable을 상속할 수 없어서 재사용성에 제약이 생긴다.
  • Observable의 핵심 메소드 중 하나인 setChanged() 메소드가 protected로 정의되어 있어서 사용하려면 상속받은 서브클래스만 해당 메소드를 호출할 수 있다.
  • Observable의 알림은 순서를 보장할 수 없고 상태 변경 역시 1:1로 일치하지 않아서 멀티 스레드 환경에서 thread-safe 하지 않다.
  • Serializable을 구현하지 않기 때문에 Observable을 상속받은 서브클래스도 직렬화할 수 없다.

발행-구독 패턴

스프링 프레임워크는 이벤트처리를 위해 @EventListener어노테이션과 이벤트 발행을 위한 ApplicationEventPublisher클래스를 제공한다.

관찰자 패턴과의 차이점은 게시자와 구독자 사이에 간접적인 이벤트 채널(=메시지 브로커 or 이벤트 버스)을 제공하여 구독자는 이벤트 채널은 알고 있지만 게시자가 누구인지는 신경쓰지 않는다.

SseEmitter를 사용하면 스프링 프레임워크를 브로커를 사용하여 발행-구독 패턴을 구현할 수 있다.
다만 로직을 구현함에 있어 스프링의 내부 메커니즘을 사용했고 이는 프레임워크의 변경으로 인해 프로그램의 안정성을 보장할 수 없는 단점이 있다.

리액티브 프레임워크 RxJava

자바 플랫폼에서 리액티브 프로그래밍을 위한 표준 라이브러리는 RxJava 1.x 였고 현재(2021 기준)는 2.x를 지나 3.x까지 출시되었다.

RxJava 라이브러리는 **Reactive Extensions(혹은 ReactiveX)**의 자바 구현체로 종종 관찰자 패턴, 반복자 패턴 및 함수형 프로그래밍의 조합으로 정의된다.

RxJava의 기본적인 Observer 인터페이스는 아래와 같이 설계할 수 있다.

1
2
3
4
5
public interface RxObserver<T> {
void onNext(T next);
void onComplete();
void onError(Exception e);
}

RxJava에서 아래와 같이 주기적으로 비동기 이벤트 시퀀스를 생성할 경우 이벤트가 생성되는 것과 별개의 스레드에서 사용되기 때문에 메인 스레드가 종료되지 않도록 sleep()을 쓰거나 다른
방법으로 종료를 지연시킬 수 있다.

1
2
3
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(e -> System.out.println("Received: " +e));
Thread.sleep(5000);

마블 다이어그램

RxJava는 연산자를 통해 스트림의 원소를 조정하거나 구조 자체를 변경할 수 있다. 연산자가 복잡한 변환을 수행할 경우 이를 시각적으로 표현하여 그 동작을 효과적으로 설명하기 위한 목적으로 마블 다이어그램(marble diagram)이 발명됐다.

  • 위아래 실선(ㅡ>) : Observable의 시간흐름(Timeline)을 의미한다.
  • 각 도형(○,□) : Observable에서 발행하는 데이터로 발행될때마다 onNext 메서드가 호출된다.
  • 파이프(|) : 데이터 발행을 모두 완료했다는 의미로 onCompleted 메서드가 호출된다.
  • 위에서 아래로 점선(—>) : 함수의 입력,출력을 의미한다.
  • 가운데 박스 : 함수를 의미하며 입력된 값에 어떤 변환작업을 하는지 나타내고 있다.
  • 엑스(X) : 함수에서 입력된 값을 처리하는 중 에러가 발생하거나 비정상적으로 종료되었음을 의미하며 onError 메서드가 호출된다.

RxJava와 관련된 모든 연산자는 이러한 마블다이어그램으로 표현되고 있으니 익숙해질 필요가 있다.