1. 계층형 아키텍처의 문제는 무엇일까?

계층형 아키텍처란?

계층(layer)으로 구성된 (웹) 애플리케이션은 전통적인 웹 애플리케이션 구조를 말한다.

크게보면 웹 → 도메인 → 영속성 으로 구성된 3계층으로 많이 표현되는데

  • 웹 : 요청을 받아서 서비스로 요청을 보냄
  • 도메인(비즈니스) : 필요한 비즈니스 로직을 수행하고 엔티티의 현재 상태를 조회하거나 변경하기 위해 영속성 계층의 컴포넌트 호출
  • 영속성 : 엔티티, 리포지터리 등 데이터베이스 관련 컴포넌트

오랫동안 사용한만큼 견고한 아키텍처 패턴이 맞고 잘 이해하고 사용하면 각 계층에 독립적으로 로직을 작성하고 기존 기능에 영향없는 기능 추가도 가능하다.

다만, 계층형은 코드에 나쁜 습관들이 스며들기 쉽고 시간이 지날수록 유지보수가 힘들어지는 단점들이 있다.

계층형 아키텍처는 데이터베이스 주도 설계를 유도한다.

계층형의 토대는 데이터베이스라서 웹은 도메인을, 도메인은 영속성을 의존하다보니 모두 데이터베이스에 의존하게 된다.

보통 애플리케이션을 만들때 비즈니스를 관장하는 규칙이나 정책을 반영한 모델을 만드는데 이때 우리는 상태(state)가 아니라 행동(behavior)을 중심으로 모델링한다.

하지만 계층형의 설계는 보통 데이터베이스를 토대로 도메인 로직을 구현하는 방식이라서 아키텍처의 구현으로는 맞더라도 비즈니스 관점에선 다르다.

가장 중요한 도메인 로직을 먼저 만들어야 로직을 제대로 이해하는지 확인하고 이를 토대로 웹과 영속성 계층을 설계할 수 있기 때문이다.

ORM(object-relational-mapping, 객체 관계매핑) 프레임워크(JPA, 하이버네이트 등)를 사용하면 비즈니스 규칙을 영속성 관점에 섞고 싶은 생각이 들게 된다.

ORM에 의해 관리되는 엔티티들은 일반적으로 영속성 계층에 두고 도메인계층에선 엔티티에 접근가능한데 이러한 구조는 영속성 계층과 도메인 계층 사이에 강한 결합을 만들게 된다.

서비스에서 영속성 모델을 마치 비즈니스 모델처럼 사용하다보면 도메인 로직뿐만 아니라 영속성 계층과 관련된 작업들도 해줘야 한다.

영속성 코드가 사실상 도메인 코드에 녹아들면서 둘 중 하나만 바꾸는게 어려워져서 계층형의 목표와 대치되는 코드가 된다.

지름길을 택하기 쉬워진다.

계층형 아키텍처는 특정한 계층에서는 같은 계층에 있는 컴포넌트나 아래에 있는 계층에만 접근 가능하다는 규칙이 있다.

만약 상위 계층에 위치한 컴포넌트에 접근해야 한다면? 컴포넌트를 계층 아래로 내려버리면 된다. 한번은 괜찮을 수 있다. 근데 2번, 3번이 넘고 나 뿐만 아니라 다른 동료들도 그렇게 하게 되면?

유틸리티나 헬퍼 컴포넌트 등이 아래 계층으로 내려오게 되면 영속성 계층은 모든 것에 접근 가능하기 때문에 시간이 지날 수록 점점 비대해 질 것이다.

테스트하기 어려워진다.

계층형 아키텍처에서 계층을 건너뛰도록 하는 경우도 있다. 엔티티의 필드를 딱 하나만 조작하면 될 경우에 웹 계층에서 바로 영속성 계층에 접근하면 도메인 계층을 건너 뛰게 된다. 이런 경우 크게 두가지 문제가 발생하는데

  1. 도메인 로직을 웹 계층에 구현하게 된다.
    만약 유스케이스가 확장된다면 더 많은 도메인 로직이 웹 계층에 추가되면서 애플리케이션 전반으로 책임이 섞이고 핵심 도메인 로직들이 퍼져나갈 수 있다.
  2. 웹 계층 테스트에서 도메인 계층뿐만 아니라 영속성 계층도 모킹(mocking)해야 한다.
    이 경우 단위 테스트의 복잡도가 올라가고 이렇게 복잡한 설정을 할 시간이 없어서 테스트를 안하게 되는 시작이 된다.

유스케이스를 숨긴다.

기능을 추가하거나 변경할 적절한 위치를 찾는 일이 빈번하기 때문에 ㅐ아키텍처는 코드를 빠르게 탐색하는데 도움이 돼야 한다.

계층형 아키텍처에서는 도메인 로직이 여러 계층에 걸쳐 흩어지기 쉬운 환경이라 유스케이스가 “간단”해서 도메인 계층을 생략하면 웹 계층에 존재할 수도 있고, 도메인과 영속성 모두에 접근할 수 있도록 컴포넌트의 계층을 내리면 영속성 계층에 존재할 수도 있다.

이런 경우 새로운 기능을 추가할 적당한 위치를 찾기 어려워지고 여러 개의 유스케이스를 담당하는 아주 넓은 서비스가 만들어질 수도 있다.

넓은 서비스는 영속성 계층에 많은 의존성을 갖게되고, 웹 레이어의 많은 컴포넌트가 이 서비스에 의존하게 된다. 서비스는 점점 더 복잡해지고 테스트하기도 어려워진다.

동시 작업이 어려워진다.

새로운 기능을 추가하기 위해 3명의 개발자가 있을때 각 계층에 각각의 기능을 동시에 개발할 수 있을까?

계층형에선 영속성 계층 위에 모든 것이 만들어지기 때문에 영속성 계층을 먼저 개발ㄹ해야 하고, 그 다음에 도메인 계층, 웹 계층을 만들어야 한다.

동시에 한꺼번에가 아니라 한번에 한명의 개발자만 일할 수 있는 것이다.

또한 넓은 서비스가 있다면 서로 다른 기능을 동시에 작업하기 어려운데 병합 충돌(merge conflict)이나 롤백이 필요한 문제가 발생할 수 있다.

유지보수 가능한 소프트웨어를 만드는 데 어떻게 도움이 될까?

올바르게 구축하고 몇 가지 추가적인 규칙들을 잘 적용한다면 계층형 아키텍처는 유지보수하기 매우 쉬워지며 코드를 쉽게 변경하거나 추가할 수 있다.

하지만 잘못된 방향으로 흘러가기 쉽다보니 계층형 아키텍처로 만들든 다른 아키텍처 스타일로 만들든, 지름길을 택하지 않고 유지보수하기에 더 쉬운 솔루션을 만드는 데 도움이될 것이다.

4-2. 리액터 프로젝트 심화학습

리액티브 스트림의 수명 주기

조립(assembling) 단계

  • 처리 흐름에서 사용하는 연산자를 조합한 빌더 API처럼 보이지만 일반적인 빌더 패턴과 달리 리액터 API는 불변성(Immutability)을 제공한다.(적용된 각각의 연산자가 새로운 객체를 생성한다.)
  • 스트림 구성을 조작하고 더나은 스트림 전달을 위한 다양한 기술을 적용할 수 있는 단계

구독 단계

  • 특정 Publisher를 구독할 때 발생
  • 조립 단계에서 일련의 Publisher 체인이 연결되었고 최상위 래퍼를 구독하면 해당 체인에 대한 구독 프로세스가 시작된다.
  • 조립단계와 동일한 최적화를 수행할 수 있다.
  • 리액터에서 멀티 스레딩을 지원하는 일부 연산자는 구독이 발생하는 작업자를 변경할 수 있다.

런타임 단계

  • 게시자와 구독자 간에 실제 신호가 교환되는 단계
  • 교환하는 처음 두 신호는 onSubscribe, request
    • onSubscribe 메서드는 최상위 소스에서 호출
    • 구독이 모든 구독자 체인을 통과하여 마지막 구독자가 구독 체인에 대한 정보를 수신하고 메시지 수신을 시작하려면 Subscription#request 메서드를 호출해 전송을 시작해야 한다.
  • 런타임 중에도 request를 줄이기 위한 최적화를 적용할 수 있다.

리액터에서 스레드 스케줄링 모델

멀티스레딩 실행을 위해 제공하는 연산자 사이의 차이점에 대해서 확인

다른 워커로 실행을 전환할 수 있는 네 가지 연산자

publishOn 연산자

  • 런타임 실행의 일부를 지정된 워커로 이동
  • Scheduler 인터페이스를 사용하여 현재 스트림에 대한 특정 워커를 선택할 수 있다.
  • 내부적으로 전용 워커가 메시지를 하나씩 처리할 수 있도록 새로운 원소를 제공하는 큐를 가지고 있다.
  • 리액티브 스트림의 모든 원소는 하나씩(동시에는 아니지만) 처리되므로 항상 모든 이벤트에 순서를 엄격하게 정의할 수 있다.(이 속성을 **직렬성(serializability)**라고 한다.)
  • 병렬 처리를 할 수 없다는 말처럼 들리지만 병렬 처리도 가능한데 예를 들어 처리 단계 사이에 비동기 영역을 추가해서 독립적으로 작업해 비동기 처리를 할 수 있다.

subscribeOn 연산자

  • 구독체인에서 워커의 작업 위치를 변경
  • 보통 호출 시점에서 상위 스트림에 해당하는 부분의 스레드를 설정

parallel 연산자

  • 하위 스트림에 대한 플로 분할과 분할된 플로 간 균형 조정 역할
1
2
3
4
5
6
Flux.range(0, 10000)
.parallel()
.runOn(Schedulers.parallel())
.map()
.filter()
.subscribe()
  • parallel연산자를 사용하면 ParallelFlux를 동작시킨다.
    • 다수의 Flux를 추상화하여 Flux간에 데이터의 크기 균형을 이룬다.

Scheduler

  • Scheduler.schedule : Runnable 작업을 예약가능
  • Scheduler.createWorker : 동일한 방법으로 Runnable 작업 예약이 가능한 Worker 인터페이스의 인스턴스를 제공
  • Scheduler인터페이스 / Workder인터페이스의 차이점 : 워커 풀 / Thread 또는 리소스를 추상화한 것
  • 리액터에서 제공하는 스케줄러 인터페이스의 3가지 주요 구현체
    • SingleScheduler : 모든 작업을 한 개의 전용 워커에 예약가능, 시간에 의존적
    • ParallelScheduler : 고정된 크기의 작업자 풀에서 작동(CPU 코어 수로 기본크기 제한)
    • ElasticScheduler : 동적으로 작업자를 만들고 스레드 풀을 캐시, 생성된 스레드 풀의 최대 개수는 제한되지 않음

리액터 컨텍스트

  • Context는 스트림을 따라 전달되는 인터페이스
  • 런타임 단계에서 필요한 컨텍스트 정보에 엑세스할 수 있도록 하는 것
  • 멀티스레드 환경의 비동기 처리방식에서 ThreadLocal가 가지는 한계를 해결할 수 있다.
    • 변수에 데이터를 넣은 후 publishOn 등을 통해 다른 워커에서 작업 플로를 수행하면 데이터를 쌓은 스레드와 작업 스레드가 달라서 데이터에 접근할 수 없다.

4. 리액터 프로젝트 - 리액티브 앱의 기초

리액티브 스트림 스펙은 리액티브 라이브러리가 서로 호환할 수 있게 해주며 여러 중요한 개선 사항이 많았지만 API 및 규칙만 정의하고 일상적인 사용을 위한 라이브러리는 제공하지 않았다.

리액티브 프레임워크중에서 가장 유명한 라이브러리 중 하나인 리액터 프로젝트(Project Reactor)는 1.x 버전에서 리액터 패턴, 함수형 프로그래밍 및 리액티브 프로그래밍과 같은 메시지 처리에 대한 모범 사례를 통합하여 비동기 논블로킹 처리를 지원하도록 설계하였다.

이후 여러 부족한 부분들을 보완하면서 2.x를 거쳐 현재는 3.x 버전으로 릴리즈되어있다.

리액터 프로젝트 필수 요소

  • 비동기 파이프라인을 구축할 때 콜백 지옥깊게 중첩된 코드를 생략
  • 코드 가독성을 높이고 리액터 라이브러리에 의해 정의된 워크플로에 **조합성(composability)**을 추가
  • 리액터 API는 연산자를 연결해서 사용하는 것을 권장하며 이를 통해 복잡하고 재사용 가능한 실행 그래프(execution graph)를 작성할 수 있다.
    • 그래프는 실행 흐름만 정의하며 구독자가 실제 구독을 했을 때만 데이터 플로가 기동된다.
  • 오류 발생 가능성이 있는 비동기 요청의 결과를 효율적으로 처리하여 유연하지만 복원력 있는 코드를 작성할 수 있다.

배압은 리액티브 스트림 스펙의 핵심 속성으로 리액터 역시 동일하다.

1
2
3
(데이터 플로)--▶️          --▶️         --▶️
게시자 연산자 연산자 구독자
◀️-- ◀️-- ◀️--(요청)

배압 전파의 일반적인 모드를 모두 지원

  • 푸시 전용 : subscription.request(Long.MAX_VALUE)
  • 풀 전용 : subscription.request(1)
  • 풀-푸시(혼합형) : 구독자가 수요를 실시간으로 제어할 수 있고 게시자가 데이터 소비 속도에 적응할 수 있는 경우
  • 풀-푸시 모델을 지원하지 않는 이전 API를 적용할 때는 예전 스타일의 배압 메커니즘을 제공한다.

Flux와 Mono

데이터를 기반으로 리액티브 스트림을 생성하는 팩토리 메서드를 제공

Mono는 Flux와 비슷하지만 하나의 요소를 대상으로 사용되는데 HTTP 요청이나 DB 쿼리와 같은 비동기 작업을 래핑하는데 매우 유용

Flux와 Mono는 구독 루틴을 단순화하는 subscribe() 메서드를 람다 기반으로 재정의한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
Flux.just("A","B","C")
.subscribe(
data -> log.info("onNext: {}", data),
err -> { /* ignored */ },
() -> log.info("onComplete")
);

/*
onNext: A
onNext: B
onNext: C
onComplete
*/

또한 subscription으로 구독을 직접 제어하거나 직접 Subscriber 인터페이스를 구현하여 스트림을 구독할 수 있다.

연산자를 이용해 리액티브 시퀀스 변환하기

연산자의 종류가 너무 많아서 적절한 연산자를 선택하는 가이드를 포함한 아래 링크를 참조

Which operator do I need?

  • 원소 매핑 : map(1:1) …
  • 필터링 : filter …
  • 시퀀스 수집? 합치기? : collectList() …
  • 원소 줄이기 : reduce, scan …
  • 스트림 조합 : concat, merge, zip …
  • 스트림 내의 원소 일괄 처리
    • buffer : List와 같은 컨테이너를 이용한 Buffering, Flux<List<T>>
    • window : Flux<Flux<T>>와 같은 형태로 스트림을 스트림으로 Windowing
    • groupBy : Flux<GroupedFlux<K, T>> 유형의 스트림으로 Grouping
  • flatmap : 논리적으로 map과 flatten의 2가지 작업으로 구성
    • map파트는 들어오는 각 원소를 리액티브 스트림(T -> Flux<R>)으로 변환
    • flatten파트는 생성된 모든 리액티브 시퀀스를 R 타입의 원소를 통과시키는 새로운 리액티브 시퀀스로 병합
  • 샘플링 : sample 연산자를 사용하여 특정 기간 내 최근에 관찰된 값을 주기적으로 출력할 수 있다.
  • 블로킹 구조로 전환
    • 리액티브 애플리케이션에서 블로킹 처리를 해선 안되지만, 상위 API에서 필요로 하는 경우도 있음
    • blockFirst, blockLast, toIterable, toStream …
    • Mono#toFuture 를 제외한 모든 메서드는 "non-blocking only"로 표시된 스케줄러에서 호출되면 UnsupportedOperatorException을 발생시킨다.
  • 시퀀스 엿보기
    • doOnNext(Consumer <T>), doOnComplete(), doOnError(Throwable)
    • 최종 시퀀스를 수정하지 않고 프로세스 파이프라인의 중간에 있는 각 원소나 특정 시그널을 처리해야 하는 경우

Hot 스트림과 cold 스트림

1. 콜드 퍼블리셔(cold publisher)

  • 구독자가 나타날 때마다 시퀀스 데이터가 생성되는 방식
  • 구독자 없이는 데이터 생성 X
  • 대표적으로 HTTP 요청이 이런식으로 동작한다.

2. 핫 퍼블리셔(hot publisher)

  • 데이터 생성 시 구독자의 존재 여부에 의존하지 않는 방식
  • 첫 구독자가 없더라도 원소를 만들어 낼 수 있다.
  • 이때 구독자가 나타나면 이전 생성값 말고 새로운 값만 보낼 수도 있다.
  • 리액터 라이브러리에 포함된 대부분은 Processor 인터페이스를 상속한다.

콜드 퍼블리셔를 리액티브 변환을 통해 핫 퍼블리셔로 전환할 수 있다.

3. 스트림의 새로운 표준 - 리액티브 스트림

API 불일치 문제

CompletableStage를 이용하는 자바 코어 라이브러리와 RxJava 같은 다양한 라이브러리가 있어서, 코드를 작성할 때 다양한 선택을 할 수 있지만 과도하게 많은 선택지는 시스템을 지나치게 복잡하게 만들 수 있다.

핵심적인 문제는 라이브러리 공급자가 일관된 API를 만들어낼 수 있는 표준화된 방법이 없다는 사실이다.

풀 방식과 푸시 방식

리액티브 초기 단계에서 모든 라이브러리의 데이터 흐름은 소스에서 구독자에게 푸시되는 방식이었다.

  • 풀 방식으로 요소를 하나씩 요청할 경우 비동기 논블로킹 방식을 사용하더라도 각 요소에 대한 요청을 처리 하면서 대기시간이 발생하여 전체 처리시간 중 많은 시간을 유휴 상태로 있게 된다.

  • 푸시 방식을 도입하면서 요청하는 횟수를 최소화하여 전체 처리 시간을 최적화할 수 있었다.

하지만 푸시 모델만 사용하는 것은 기술적 한계가 있는데

  • 메시지 기반 통신의 본질은 요청에 응답하는 것인데
  • 프로듀서가 컨슈머의 처리 능력을 무시하면 전반적인 시스템 안정성에 영향을 미칠 수 있기 때문이다.

흐름제어

  • 느린 프로듀서와 빠른 컨슈머

    순수한 푸시 모델은 동적으로 시스템의 처리량을 증가시키는 것이 불가능하다.

  • 빠른 프로듀서와 느린 컨슈머

    프로듀서는 컨슈머가 처리할 수 있는 것보다 더 많은 데이터를 전송할 수 있으며 이로 인해 부하를 받는 컴포넌트에 치명적인 오류가 발생할 수 있다.

이를 해결하기 위한 직관적인 방법은 큐에 수집하는 것인데 3가지 유형으로 구분할 수 있다.

  1. 무제한 큐: 메모리 한도에 도달하면 전체 시스템이 손상될 가능성이 있다.(복원력이 떨어짐)
  2. 크기가 제한된 드롭 큐: 메시지의 중요성이 낮을 때 사용되는 방법으로 큐가 가득 차면 메시지를 무시하는데 중요한건 데이터 세트가 변경된다는 점이다.
  3. 크기가 제한된 블로킹 큐: 가장 느린 컨슈머의 처리량에 의해 시스템의 전체 처리량이 제한된다. 시스템의 비동기 동작을 모두 무효화하여 절대 받아들일 수 없는 시나리오다.

이런 시스템 부하에 적절하게 대응하는 방법으로 배압 제어 메커니즘이 있다.

리액티브 스트림의 기본 스펙

리액티브 스트림 스펙에는 Publisher, Subscriber, Subscription, Processor의 네 가지 기본 인터페이스가 정의돼 있다.

  • Publisher : Observable과 비교하면 Publisher와 Subscriber를 연결하기 위한 표준화된 진입점을 의미

  • Subscriber : Observer와 비슷한데 onSubscribe라는 추가 메서드를 제공하는데 Subscriber에게 구독이 성공했음을 알리는 API 메서드

  • Subscription : 원소 생성을 제어하기 위해 기본적인 사항을 제공

    • cancel() : 스트림에서 구독을 취소하거나 발행을 완전히 취소 가능

    • request(long n) : 요청하는 Publisher가 보내줘야 하는 데이터 크기를 알려줄 수 있음 ▶️ Publisher에서 유입되는 원소의 개수가 처리할 수 있는 제한을 초과하지 않을 것을 확신할 수 있다.

      리액티브 스트림은 순수 푸시 모델과는 달리 배압을 적절하게 제어할 수 있는 하이브리드 푸시-풀 모델을 제공한다.

      • 순수 푸시 모델을 사용하고 싶으면 최대 개수 요청 request(Long.MAX_VALUE)
      • 순수 풀 모델을 사용하고 싶으면 onNext()가 호출될 때마다 요청
  • Processor : Publisher와 Subscriber의 혼합 형태로 Publisher와 Subscriber 사이에 몇가지 처리 단계를 추가하도록 설계됐다.

1
2
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

리액티브 스트림 기술 호환성 키트(TCK)

모든 동작을 검증하고 반응 라이브러리를 표준화하여 서로 호환하는지 확인하는 공통 도구로 모든 리액티브 스트림 코드를 방어하고 지정된 규칙에 따라 구현을 테스트 한다.

TCK github : https://github.com/reactive-streams/reactive-streams-jvm/tree/master/tck


리액티브 스트림을 활용한 비동기 및 병렬

  • 리액티브 스트림 API는 Publisher가 생성하고 Subscriber가 소비한 모든 신호는 처리 중에 논블로킹이어야 하며 방해받지 않아야 한다고 규칙에 명시되어 있다.

  • 모든 프로세서나 코어를 효율적으로 사용하려면 병렬처리가 필요하고 이는 일반적으로 onNext 메서드를 병렬로 호출하는 것을 뜻하지만 on*** 메서드의 호출은 스레드 안전성을 보장하는 방식으로 신호를 보내야 하며 다중 스레드에서 수행되는 경우 외부적인 동기화를 사용해야 한다. 즉, 스트림의 요소를 병렬 처리할 수 없다.

  • 자원을 효율적으로 활용하기 위해 스트림 처리 파이프의 각 단계에 메시지를 비동기적으로 전달하는 것이다.

    • 상황에 따라서 처리단계를 각각 별도의 스레드로 처리하고 각 스레드 사이에 큐와 같은 데이터 구조를 적용하여 메시지를 독립적으로 제공하고 사용하도록 할수 있다.

2. 스프링을 이용한 리액티브 프로그래밍 - 기본 개념

관찰자(Observer) 패턴

  • 이벤트를 발생시키는 역할(주체Subject), 이벤트를 수신하는 역할(객체, 즉 관찰자Observer)의 두가지 핵심 요소가 존재
  • ObserverSubject에 등록되고 Subject로부터 알림을 수신

Java의 내장 클래스인 Observable, Observer는 java9 부터 deprecated 되었다.

  • interface가 아니라 class로 구현되어 있어서 이미 다른 클래스를 상속받은 클래스가 Observable을 상속할 수 없어서 재사용성에 제약이 생긴다.
  • Observable의 핵심 메소드 중 하나인 setChanged() 메소드가 protected로 정의되어 있어서 사용하려면 상속받은 서브클래스만 해당 메소드를 호출할 수 있다.
  • Observable의 알림은 순서를 보장할 수 없고 상태 변경 역시 1:1로 일치하지 않아서 멀티 스레드 환경에서 thread-safe 하지 않다.
  • Serializable을 구현하지 않기 때문에 Observable을 상속받은 서브클래스도 직렬화할 수 없다.

발행-구독 패턴

스프링 프레임워크는 이벤트처리를 위해 @EventListener어노테이션과 이벤트 발행을 위한 ApplicationEventPublisher클래스를 제공한다.

관찰자 패턴과의 차이점은 게시자와 구독자 사이에 간접적인 이벤트 채널(=메시지 브로커 or 이벤트 버스)을 제공하여 구독자는 이벤트 채널은 알고 있지만 게시자가 누구인지는 신경쓰지 않는다.

SseEmitter를 사용하면 스프링 프레임워크를 브로커를 사용하여 발행-구독 패턴을 구현할 수 있다.
다만 로직을 구현함에 있어 스프링의 내부 메커니즘을 사용했고 이는 프레임워크의 변경으로 인해 프로그램의 안정성을 보장할 수 없는 단점이 있다.

리액티브 프레임워크 RxJava

자바 플랫폼에서 리액티브 프로그래밍을 위한 표준 라이브러리는 RxJava 1.x 였고 현재(2021 기준)는 2.x를 지나 3.x까지 출시되었다.

RxJava 라이브러리는 **Reactive Extensions(혹은 ReactiveX)**의 자바 구현체로 종종 관찰자 패턴, 반복자 패턴 및 함수형 프로그래밍의 조합으로 정의된다.

RxJava의 기본적인 Observer 인터페이스는 아래와 같이 설계할 수 있다.

1
2
3
4
5
public interface RxObserver<T> {
void onNext(T next);
void onComplete();
void onError(Exception e);
}

RxJava에서 아래와 같이 주기적으로 비동기 이벤트 시퀀스를 생성할 경우 이벤트가 생성되는 것과 별개의 스레드에서 사용되기 때문에 메인 스레드가 종료되지 않도록 sleep()을 쓰거나 다른
방법으로 종료를 지연시킬 수 있다.

1
2
3
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(e -> System.out.println("Received: " +e));
Thread.sleep(5000);

마블 다이어그램

RxJava는 연산자를 통해 스트림의 원소를 조정하거나 구조 자체를 변경할 수 있다. 연산자가 복잡한 변환을 수행할 경우 이를 시각적으로 표현하여 그 동작을 효과적으로 설명하기 위한 목적으로 마블 다이어그램(marble diagram)이 발명됐다.

  • 위아래 실선(ㅡ>) : Observable의 시간흐름(Timeline)을 의미한다.
  • 각 도형(○,□) : Observable에서 발행하는 데이터로 발행될때마다 onNext 메서드가 호출된다.
  • 파이프(|) : 데이터 발행을 모두 완료했다는 의미로 onCompleted 메서드가 호출된다.
  • 위에서 아래로 점선(—>) : 함수의 입력,출력을 의미한다.
  • 가운데 박스 : 함수를 의미하며 입력된 값에 어떤 변환작업을 하는지 나타내고 있다.
  • 엑스(X) : 함수에서 입력된 값을 처리하는 중 에러가 발생하거나 비정상적으로 종료되었음을 의미하며 onError 메서드가 호출된다.

RxJava와 관련된 모든 연산자는 이러한 마블다이어그램으로 표현되고 있으니 익숙해질 필요가 있다.