4. 리액터 프로젝트 - 리액티브 앱의 기초

리액티브 스트림 스펙은 리액티브 라이브러리가 서로 호환할 수 있게 해주며 여러 중요한 개선 사항이 많았지만 API 및 규칙만 정의하고 일상적인 사용을 위한 라이브러리는 제공하지 않았다.

리액티브 프레임워크중에서 가장 유명한 라이브러리 중 하나인 리액터 프로젝트(Project Reactor)는 1.x 버전에서 리액터 패턴, 함수형 프로그래밍 및 리액티브 프로그래밍과 같은 메시지 처리에 대한 모범 사례를 통합하여 비동기 논블로킹 처리를 지원하도록 설계하였다.

이후 여러 부족한 부분들을 보완하면서 2.x를 거쳐 현재는 3.x 버전으로 릴리즈되어있다.

리액터 프로젝트 필수 요소

  • 비동기 파이프라인을 구축할 때 콜백 지옥깊게 중첩된 코드를 생략
  • 코드 가독성을 높이고 리액터 라이브러리에 의해 정의된 워크플로에 **조합성(composability)**을 추가
  • 리액터 API는 연산자를 연결해서 사용하는 것을 권장하며 이를 통해 복잡하고 재사용 가능한 실행 그래프(execution graph)를 작성할 수 있다.
    • 그래프는 실행 흐름만 정의하며 구독자가 실제 구독을 했을 때만 데이터 플로가 기동된다.
  • 오류 발생 가능성이 있는 비동기 요청의 결과를 효율적으로 처리하여 유연하지만 복원력 있는 코드를 작성할 수 있다.

배압은 리액티브 스트림 스펙의 핵심 속성으로 리액터 역시 동일하다.

1
2
3
(데이터 플로)--▶️          --▶️         --▶️
게시자 연산자 연산자 구독자
◀️-- ◀️-- ◀️--(요청)

배압 전파의 일반적인 모드를 모두 지원

  • 푸시 전용 : subscription.request(Long.MAX_VALUE)
  • 풀 전용 : subscription.request(1)
  • 풀-푸시(혼합형) : 구독자가 수요를 실시간으로 제어할 수 있고 게시자가 데이터 소비 속도에 적응할 수 있는 경우
  • 풀-푸시 모델을 지원하지 않는 이전 API를 적용할 때는 예전 스타일의 배압 메커니즘을 제공한다.

Flux와 Mono

데이터를 기반으로 리액티브 스트림을 생성하는 팩토리 메서드를 제공

Mono는 Flux와 비슷하지만 하나의 요소를 대상으로 사용되는데 HTTP 요청이나 DB 쿼리와 같은 비동기 작업을 래핑하는데 매우 유용

Flux와 Mono는 구독 루틴을 단순화하는 subscribe() 메서드를 람다 기반으로 재정의한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
Flux.just("A","B","C")
.subscribe(
data -> log.info("onNext: {}", data),
err -> { /* ignored */ },
() -> log.info("onComplete")
);

/*
onNext: A
onNext: B
onNext: C
onComplete
*/

또한 subscription으로 구독을 직접 제어하거나 직접 Subscriber 인터페이스를 구현하여 스트림을 구독할 수 있다.

연산자를 이용해 리액티브 시퀀스 변환하기

연산자의 종류가 너무 많아서 적절한 연산자를 선택하는 가이드를 포함한 아래 링크를 참조

Which operator do I need?

  • 원소 매핑 : map(1:1) …
  • 필터링 : filter …
  • 시퀀스 수집? 합치기? : collectList() …
  • 원소 줄이기 : reduce, scan …
  • 스트림 조합 : concat, merge, zip …
  • 스트림 내의 원소 일괄 처리
    • buffer : List와 같은 컨테이너를 이용한 Buffering, Flux<List<T>>
    • window : Flux<Flux<T>>와 같은 형태로 스트림을 스트림으로 Windowing
    • groupBy : Flux<GroupedFlux<K, T>> 유형의 스트림으로 Grouping
  • flatmap : 논리적으로 map과 flatten의 2가지 작업으로 구성
    • map파트는 들어오는 각 원소를 리액티브 스트림(T -> Flux<R>)으로 변환
    • flatten파트는 생성된 모든 리액티브 시퀀스를 R 타입의 원소를 통과시키는 새로운 리액티브 시퀀스로 병합
  • 샘플링 : sample 연산자를 사용하여 특정 기간 내 최근에 관찰된 값을 주기적으로 출력할 수 있다.
  • 블로킹 구조로 전환
    • 리액티브 애플리케이션에서 블로킹 처리를 해선 안되지만, 상위 API에서 필요로 하는 경우도 있음
    • blockFirst, blockLast, toIterable, toStream …
    • Mono#toFuture 를 제외한 모든 메서드는 "non-blocking only"로 표시된 스케줄러에서 호출되면 UnsupportedOperatorException을 발생시킨다.
  • 시퀀스 엿보기
    • doOnNext(Consumer <T>), doOnComplete(), doOnError(Throwable)
    • 최종 시퀀스를 수정하지 않고 프로세스 파이프라인의 중간에 있는 각 원소나 특정 시그널을 처리해야 하는 경우

Hot 스트림과 cold 스트림

1. 콜드 퍼블리셔(cold publisher)

  • 구독자가 나타날 때마다 시퀀스 데이터가 생성되는 방식
  • 구독자 없이는 데이터 생성 X
  • 대표적으로 HTTP 요청이 이런식으로 동작한다.

2. 핫 퍼블리셔(hot publisher)

  • 데이터 생성 시 구독자의 존재 여부에 의존하지 않는 방식
  • 첫 구독자가 없더라도 원소를 만들어 낼 수 있다.
  • 이때 구독자가 나타나면 이전 생성값 말고 새로운 값만 보낼 수도 있다.
  • 리액터 라이브러리에 포함된 대부분은 Processor 인터페이스를 상속한다.

콜드 퍼블리셔를 리액티브 변환을 통해 핫 퍼블리셔로 전환할 수 있다.

4. 리액터 프로젝트 - 리액티브 앱의 기초

https://yoo0926.github.io/2021/12/03/book/spring5-reactive/4/

Author

Jaeyong Yoo

Posted on

2021-12-03

Updated on

2023-06-10

Licensed under

댓글