[WebFlux] map과 flatMap에 대해서

spring WebFlux를 사용하다보면 체인 연산자로 FlatMap을 사용해야할지, Map을 사용해야할지 헷갈리는 경우가 있어서 정리해봤다.

map 함수

public final Flux map(Function<? superT,? extends V> mapper)

  • Transform the items emitted by this Flux by applying a synchronous function to each item.

flux.map

map 함수는 데이터 스트림의 각 요소를 변환하는데 사용되는 함수로 각 요소를 동기적으로 처리하며, 각 요소를 변환하여 새로운 데이터 스트림을 반환한다.

간단한 샘플코드를 보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Test
void sampleMapTest() {
// Arrange
Function<String, String> mapper = (str) -> str.toUpperCase();
Flux<String> testFlux = Flux.just("apple", "banana", "carrot");

// Act
Flux<String> resultFlux = testFlux.map(mapper);

// Assert
StepVerifier.create(resultFlux)
.expectNext("APPLE", "BANANA", "CARROT")
.expectComplete()
.verify();
}

Function mapper는 스트림을 구독할 때 실행되며 각 요소의 순서를 유지하며 입력값을 변환하여 새로운 값으로 출력하게 된다.

flatMap 함수

public final Flux flatMap(Function<? superT,? extendsPublisher<? extends V>> mapper, int concurrency)

  • Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave.

flux.flatMap

flatMap 함수는 데이터 스트림의 각 요소별로 publisher를 생성하여 여러개의 하위 스트림이 생성되고 subscribe를 통해 단일 스트림으로 묶은 Publisher로 반환한다.

이러한 Publisher를 비동기적으로 처리하여 순서를 보장하지 않기 때문에 순서를 보장하고 싶으면 flatMapSequential 이나 concatMap 등을 사용할 수 있다.

샘플코드를 보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Test
void sampleFlatMapTest() {
// Arrange
Function<String, Publisher<String>> mapper = (str) -> Flux.just(str.toUpperCase().split("")).subscribeOn(Schedulers.boundedElastic());
Flux<String> testFlux = Flux.just("apple", "banana", "carrot");

// Act
Flux<String> resultFlux = testFlux.flatMap(mapper);

List<String> output = new ArrayList<>();
resultFlux.subscribe(output::add);

// Assert
assertThat(output).containsExactlyInAnyOrder("A", "P", "P", "L", "E", "B", "A", "N", "A", "N", "A", "C", "A", "R", "R", "O", "T");
}

mapper에서 Publisher로 반환되는데 별도의 스레드를 할당하고 flatMap에서 비동기로 동작하게 되면서 순서를 보장하지 않는 flatMap의 응답값은 apple, banana, carrot의 글자가 섞이게 된다.

사용 시 고려사항

map은 각 요소가 1:1로 매핑되서 동기적으로 처리된다.

flatMap은 각 요소에 대해서 1:다 매핑이 되서 단일 스트림으로 병합되고 publisher의 동작에 따라서 동기적, 혹은 비동기적으로 동작할 수 있다.

단순히 DTO 객체변환이라든지, 단일요소에 대한 간단한 계산같은건 map을 사용하면 좋고 각 요소의 처리 시간이 다를 수 있는 작업이나 외부 서비스, DB 호출 등의 작업을 할때는 비동기 동작이 가능한 flatMap을 사용하는게 좋다.

map은 단일 데이터 스트림의 각 요소를 독립적으로 처리하기 때문에 요소의 크기에 따라 성능차이가 크진 않지만 flatMap은 데이터 스트림을 개별적으로 처리 후 병합하기 위해 데이터 스트림의 크기나 작업량에 따라서 성능적으로 영향이 map에 비해서 상대적으로 클 수 있다.

[WebFlux] Mono.defer(), Mono.fromCallable()

Mono.defer()Mono.fromCallable() 모두 지연 평가(lazy evaluation)를 제공하는데 사용된다. 지연 평가란 호출 시점을 지연시켜서 필요한 시점에서 코드를 실행하는 방식을 의미한다.

Mono.defer() 메서드는 Mono의 factory 메서드 중 하나로, 인자로 전달받은 Supplier 함수를 사용하여 Mono 객체를 생성한다. 이때 Supplier 함수가 호출되는 시점은 Mono 객체를 subscribe 할 때이다.

Mono.fromCallable() 메서드는 Mono의 factory 메서드 중 하나로, 인자로 전달받은 Callable 함수를 사용하여 Mono 객체를 생성한다. 이때 Callable함수가 호출되는 시점은 Mono객체 생성 시점이다.

따라서 Mono.defer()는 Mono 객체를 생성할 때마다 Supplier함수를 실행하여 객체를 생성하는 방식을 사용하며, Mono.fromCallable()은 Mono객체를 생성하는 시점에서 Callable함수를 실행하여 객체를 생성하는 방식을 사용한다. Mono.fromCallable()은 즉시 실행되므로, Mono.defer()보다 더 많은 자원을 소비할 수 있다.

Mono.defer()는 Mono의 객체 생성 시점을 늦추고, 객체 생성과 동시에 상태를 초기화할 수 있는 장점이 있고 반면에 Mono.fromCallable()은 Callable함수의 실행 결과를 즉시 Mono객체에 포함시키므로, 결과를 즉시 필요로 하는 경우에 유용하다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void main(String[] args)
{
callExternalService()
.repeat(3)
.doOnNext(System.out::println)
.subscribe();
}

private static Mono<?> callExternalService()
{
System.out.println("External service is called.");
return Mono.just("result");
}

/*
External service is called.
result
result
result
result
*/

Mono.just(“result”)의 결과를 repeat(3)으로 세 번 반복하게 되면, 해당 Mono를 세 번 실행하게 된다.

하지만, Mono.just(“result”)는 호출될 때마다 값을 생성하는 게 아니라, 최초 1회 호출시 생성한 값을 계속해서 방출하게 된다. 그래서 "External service is called."는 처음 1회만 출력되고, 세 번 반복되는 Mono.just(“result”)는 이미 생성된 값을 방출하게 되므로, 추가로 호출할 필요가 없어지게 된다.

따라서 "External service is called."는 최초 1회만 출력되고, 세 번 반복되는 Mono.just(“result”)는 이미 생성된 값을 방출하게 된다.

[WebFlux] Flux와 Stream의 reduce, 그리고 chatGPT

평소처럼 개발하던 중 reduce를 사용한 코드에서 오류가 발생했다.

1
2
3
4
5
java: cannot find symbol
symbol: variable accumulatedMap

java: cannot find symbol
symbol: variable currentMap

오류가 발생한 상황을 샘플코드로 대략적으로 재현해봤다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// sample 1
List<Integer> list = IntStream.range(0,10).boxed().collect(Collectors.toList());
var collectMap = list.stream()
.map(i -> {
Map<String, Integer> map = new HashMap<>();
map.put("key" + i, i * 10);
return map;
})
.reduce(**Collections.emptyMap()**, (accumulatedMap, currentMap) -> new HashMap<>() {
{
putAll(accumulatedMap);
putAll(currentMap);
}
});
System.out.println("Stream reduce Map size: " + collectMap.size());
// Stream reduce Map size: 10

// sample 2
Flux.range(1, 10)
.map(i -> {
Map<String, Integer> map = new HashMap<>();
map.put("key" + i, i * 10);
return map;
})
//.reduce(**Collections.emptyMap()** ,(accumulatedMap, currentMap) -> new HashMap<>() { // compile error!! > `java: cannot find symbol`
.reduce((accumulatedMap, currentMap) -> new HashMap<>() {
{
putAll(accumulatedMap);
putAll(currentMap);
}
})
.doOnNext(map -> {
System.out.println("Flux reduce Map size: " + map.size());
})
.subscribe();
// Flux reduce Map size: 10

Stream과 Flux의 reduce를 호출 시 initial 파라미터로 각각 Collections.emptyMap()를 사용하였으나 Stream은 정상동작하지만 Flux에선 컴파일 오류가 발생하였다.

여러 케이스로 바꿔가면서 테스트를 해보니 Collections.emptyMap()의 반환값이 불변객체라서 안되는거 같다는 추론까진 도달했지만 확신은 없었다.

구글링으로 찾아보려고 했지만 공식문서를 봐도 Flux에선 안되고 Stream에선 되는 이유에 대해서 명확한 답변을 찾기 어려워서 고민하던차에 요즘 핫한 chatGPT에 물어볼까 하는 생각이 들어서 바로 실행해봤다.

아래는 chatGPT 와 대화한 내용


i want public final <A> Mono<A> reduce(A initial, BiFunction<A,? super T,A> accumulator) example code.

  • Here’s an example of how you could use the reduce method in Spring WebFlux:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;

    import java.util.function.BiFunction;

    public class ReduceExample {
    public static void main(String[] args) {
    Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);

    BiFunction<Integer, Integer, Integer> accumulator = (a, b) -> a + b;
    Mono<Integer> sum = numbers.reduce(0, accumulator);

    sum.subscribe(result -> System.out.println("The sum of the numbers is: " + result));
    }
    }

    This example creates a Flux of integers and then uses the reduce method to sum them up. The reduce method takes an initial value, in this case 0, and a BiFunction that specifies how the current accumulation (a) is combined with each element from the flux (b) to produce the next accumulation.
    The subscribe method is then used to subscribe to the Mono returned by reduce and print out the result.

initial Type is HashMap

  • Here’s an example of how you could use the reduce method with an initial value of a HashMap:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;

    import java.util.HashMap;
    import java.util.function.BiFunction;

    public class ReduceExample {
    public static void main(String[] args) {
    Flux<String> words = Flux.just("apple", "banana", "cherry", "apple", "banana");

    BiFunction<HashMap<String, Integer>, String, HashMap<String, Integer>> accumulator = (map, word) -> {
    if (!map.containsKey(word)) {
    map.put(word, 1);
    } else {
    map.put(word, map.get(word) + 1);
    }
    return map;
    };
    Mono<HashMap<String, Integer>> wordCounts = words.reduce(new HashMap<>(), accumulator);

    wordCounts.subscribe(result -> System.out.println("The word counts are: " + result));
    }
    }

    This example creates a Flux of strings and uses the reduce method to count the occurrences of each word. The reduce method takes an initial value, in this case a new empty HashMap, and a BiFunction that specifies how the current HashMap (map) is combined with each word from the flux (word) to produce the next HashMap.
    The subscribe method is then used to subscribe to the Mono returned by reduce and print out the result.

Why does Mono<HashMap<String, Integer>> wordCounts = words.reduce(Collections.emptyMap(), accumulator); error occur?

  • The error occurs because Collections.emptyMap() returns an unmodifiable map, which means you can’t modify it. The reduce method expects the accumulator function to modify the accumulator object, which is not possible with an unmodifiable map.
    You can resolve the issue by creating a new empty HashMap as the initial value instead of using Collections.emptyMap(), like this:

    1
    Mono<HashMap<String, Integer>> wordCounts = words.reduce(new HashMap<>(), accumulator);

    This will give you a modifiable map that can be updated by the accumulator function as required by the reduce method.

But why does Java Stream API succeed?

  • The Java Stream API allows you to use the reduce method with an initial value of Collections.emptyMap() because the reduce operation in the Stream API operates on an internal data structure that is separate from the original data source. This allows the Stream API to handle unmodifiable maps as the initial value without any issues.
    However, in reactive programming with Spring WebFlux, the reduce method operates on the data stream directly, so you need to provide an accumulator object that is mutable. That’s why you need to use a new empty HashMap instead of Collections.emptyMap() as the initial value.

별로 기대하지 않았는데 원하는 내용에 대해서 생각보다 정확한 답변을 들을 수 있었다.

  1. 먼저 Flux의 reduce 호출 시initial가 HashMap인 케이스로 샘플코드를 요청하고 Collections.emptyMap() 을 초기값으로 사용 시 왜 오류가 나는지 물어봤고
    Collections.emptyMap()으로 생성되는 객체는 returns an unmodifiable map 즉, immutable(불변)객체인데 reduce 메서드는 초기값으로 들어온 accumulator 객체를 수정할 것으로 예상했지만 수정이 불가능한 불변객체가 들어와서 오류가 발생하는거라고 한다.
  2. Stream의 reduce에선 왜 성공하는지? 에 대해선
    Stream의 reduce는 원래 데이터와는 다른 별도의 내부 데이터 구조에서 동작하기 때문에 Collections.emptyMap()의 초기값으로 reduce를 사용할 수 있지만
    리액티브 프로그래밍 기반의 WebFlux에선 데이터 스트림에서 reduce 메서드가 직접 동작하기 때문에 변경가능한 accumulator 객체를 제공해야 하므로 변경이 가능한 빈 HashMap 객체를 생성해야 한다고 답변하였다.

앞으로 나의 코딩선생님이 하나 더 늘었다는 생각에 흥분되면서도 한편으론 chatGPT를 사용해서 구글의 코딩인터뷰 Level3도 통과했다는 뉴스처럼 문맥을 파악해서 의도한 바를 정확하게 알려주는 답변을 보고 소름이 돋았다.

[WebFlux] publishOn과 subscribeOn

회사에서 WebFlux로 개발하면서 publishOn과 subscribeOn의 사용법이 매번 헷갈려서 최대한 간단하게 정리를 해봤다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Flux.range(1,3)
.subscribeOn(Schedulers.newBoundedElastic(1,1,"sub2"))
.doOnNext(i -> log.info("main 1: {}, number: {}", Thread.currentThread().getName(), i))
.publishOn(Schedulers.newBoundedElastic(1,1,"pub1"))
.doOnNext(i -> log.info("main 2: {}, number: {}", Thread.currentThread().getName(), i))
.subscribeOn(Schedulers.newBoundedElastic(1,1,"sub1"))
.subscribe();

// log
main 1: sub2-3, number: 1
main 1: sub2-3, number: 2
main 1: sub2-3, number: 3
main 2: pub1-2, number: 1
main 2: pub1-2, number: 2
main 2: pub1-2, number: 3

subscribeOn

subscribeOn은 호출된 곳의 앞쪽 publisher 체인과 뒷쪽의 subscriber 체인을 한꺼번에 별도의 스레드로 분리한다.
느린 publisher 와 빠른 subscriber 의 조합에서 쓰기 적합하다고 하는데… 무슨 말인지 잘 모르겠다. 외부서비스에서 데이터를 읽어올때 쓰기 좋다고 한다.

위 예시처럼 subscribeOn을 동일한 체인에서 중첩해서 사용할 경우
첫번째 subscribeOn 앞뒤 체인을 묶어서 sub2 스케줄러에서 담당하고
두번째 subscribeOn 앞뒤 체인을 묶어서 sub1 스케줄러가 담당해야 하지만 이미 sub2에서 담당해서 그대로 sub2로 돌게 된다.

publishOn

publishOn은 호출된 곳의 앞쪽 publisher 체인은 그대로 두고 뒷쪽 subscriber 체인만 별도의 스레드로 분리한다.
빠른 publisher와 느린 subscriber 일때 쓰기 좋다고 하고 외부서비스로 데이터를 쓰는 작업할 때 좋닫고 한다.
위 예시에서 subscribeOn으로 sub2 스레드에서 돌다가 publishOn을 만나면서 pub1 스케줄러에 작업을 할당하고 나머지 연산을 진행한다.

🔗 출처
Reactor Flux의 publishOn subscribeOn을 이용한 스케쥴링