[WebFlux] Flux와 Stream의 reduce, 그리고 chatGPT

평소처럼 개발하던 중 reduce를 사용한 코드에서 오류가 발생했다.

1
2
3
4
5
java: cannot find symbol
symbol: variable accumulatedMap

java: cannot find symbol
symbol: variable currentMap

오류가 발생한 상황을 샘플코드로 대략적으로 재현해봤다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// sample 1
List<Integer> list = IntStream.range(0,10).boxed().collect(Collectors.toList());
var collectMap = list.stream()
.map(i -> {
Map<String, Integer> map = new HashMap<>();
map.put("key" + i, i * 10);
return map;
})
.reduce(**Collections.emptyMap()**, (accumulatedMap, currentMap) -> new HashMap<>() {
{
putAll(accumulatedMap);
putAll(currentMap);
}
});
System.out.println("Stream reduce Map size: " + collectMap.size());
// Stream reduce Map size: 10

// sample 2
Flux.range(1, 10)
.map(i -> {
Map<String, Integer> map = new HashMap<>();
map.put("key" + i, i * 10);
return map;
})
//.reduce(**Collections.emptyMap()** ,(accumulatedMap, currentMap) -> new HashMap<>() { // compile error!! > `java: cannot find symbol`
.reduce((accumulatedMap, currentMap) -> new HashMap<>() {
{
putAll(accumulatedMap);
putAll(currentMap);
}
})
.doOnNext(map -> {
System.out.println("Flux reduce Map size: " + map.size());
})
.subscribe();
// Flux reduce Map size: 10

Stream과 Flux의 reduce를 호출 시 initial 파라미터로 각각 Collections.emptyMap()를 사용하였으나 Stream은 정상동작하지만 Flux에선 컴파일 오류가 발생하였다.

여러 케이스로 바꿔가면서 테스트를 해보니 Collections.emptyMap()의 반환값이 불변객체라서 안되는거 같다는 추론까진 도달했지만 확신은 없었다.

구글링으로 찾아보려고 했지만 공식문서를 봐도 Flux에선 안되고 Stream에선 되는 이유에 대해서 명확한 답변을 찾기 어려워서 고민하던차에 요즘 핫한 chatGPT에 물어볼까 하는 생각이 들어서 바로 실행해봤다.

아래는 chatGPT 와 대화한 내용


i want public final <A> Mono<A> reduce(A initial, BiFunction<A,? super T,A> accumulator) example code.

  • Here’s an example of how you could use the reduce method in Spring WebFlux:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;

    import java.util.function.BiFunction;

    public class ReduceExample {
    public static void main(String[] args) {
    Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);

    BiFunction<Integer, Integer, Integer> accumulator = (a, b) -> a + b;
    Mono<Integer> sum = numbers.reduce(0, accumulator);

    sum.subscribe(result -> System.out.println("The sum of the numbers is: " + result));
    }
    }

    This example creates a Flux of integers and then uses the reduce method to sum them up. The reduce method takes an initial value, in this case 0, and a BiFunction that specifies how the current accumulation (a) is combined with each element from the flux (b) to produce the next accumulation.
    The subscribe method is then used to subscribe to the Mono returned by reduce and print out the result.

initial Type is HashMap

  • Here’s an example of how you could use the reduce method with an initial value of a HashMap:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;

    import java.util.HashMap;
    import java.util.function.BiFunction;

    public class ReduceExample {
    public static void main(String[] args) {
    Flux<String> words = Flux.just("apple", "banana", "cherry", "apple", "banana");

    BiFunction<HashMap<String, Integer>, String, HashMap<String, Integer>> accumulator = (map, word) -> {
    if (!map.containsKey(word)) {
    map.put(word, 1);
    } else {
    map.put(word, map.get(word) + 1);
    }
    return map;
    };
    Mono<HashMap<String, Integer>> wordCounts = words.reduce(new HashMap<>(), accumulator);

    wordCounts.subscribe(result -> System.out.println("The word counts are: " + result));
    }
    }

    This example creates a Flux of strings and uses the reduce method to count the occurrences of each word. The reduce method takes an initial value, in this case a new empty HashMap, and a BiFunction that specifies how the current HashMap (map) is combined with each word from the flux (word) to produce the next HashMap.
    The subscribe method is then used to subscribe to the Mono returned by reduce and print out the result.

Why does Mono<HashMap<String, Integer>> wordCounts = words.reduce(Collections.emptyMap(), accumulator); error occur?

  • The error occurs because Collections.emptyMap() returns an unmodifiable map, which means you can’t modify it. The reduce method expects the accumulator function to modify the accumulator object, which is not possible with an unmodifiable map.
    You can resolve the issue by creating a new empty HashMap as the initial value instead of using Collections.emptyMap(), like this:

    1
    Mono<HashMap<String, Integer>> wordCounts = words.reduce(new HashMap<>(), accumulator);

    This will give you a modifiable map that can be updated by the accumulator function as required by the reduce method.

But why does Java Stream API succeed?

  • The Java Stream API allows you to use the reduce method with an initial value of Collections.emptyMap() because the reduce operation in the Stream API operates on an internal data structure that is separate from the original data source. This allows the Stream API to handle unmodifiable maps as the initial value without any issues.
    However, in reactive programming with Spring WebFlux, the reduce method operates on the data stream directly, so you need to provide an accumulator object that is mutable. That’s why you need to use a new empty HashMap instead of Collections.emptyMap() as the initial value.

별로 기대하지 않았는데 원하는 내용에 대해서 생각보다 정확한 답변을 들을 수 있었다.

  1. 먼저 Flux의 reduce 호출 시initial가 HashMap인 케이스로 샘플코드를 요청하고 Collections.emptyMap() 을 초기값으로 사용 시 왜 오류가 나는지 물어봤고
    Collections.emptyMap()으로 생성되는 객체는 returns an unmodifiable map 즉, immutable(불변)객체인데 reduce 메서드는 초기값으로 들어온 accumulator 객체를 수정할 것으로 예상했지만 수정이 불가능한 불변객체가 들어와서 오류가 발생하는거라고 한다.
  2. Stream의 reduce에선 왜 성공하는지? 에 대해선
    Stream의 reduce는 원래 데이터와는 다른 별도의 내부 데이터 구조에서 동작하기 때문에 Collections.emptyMap()의 초기값으로 reduce를 사용할 수 있지만
    리액티브 프로그래밍 기반의 WebFlux에선 데이터 스트림에서 reduce 메서드가 직접 동작하기 때문에 변경가능한 accumulator 객체를 제공해야 하므로 변경이 가능한 빈 HashMap 객체를 생성해야 한다고 답변하였다.

앞으로 나의 코딩선생님이 하나 더 늘었다는 생각에 흥분되면서도 한편으론 chatGPT를 사용해서 구글의 코딩인터뷰 Level3도 통과했다는 뉴스처럼 문맥을 파악해서 의도한 바를 정확하게 알려주는 답변을 보고 소름이 돋았다.

[WebFlux] Flux와 Stream의 reduce, 그리고 chatGPT

https://yoo0926.github.io/2023/02/14/spring/WebFlux/spring-webflux-2/

Author

Jaeyong Yoo

Posted on

2023-02-14

Updated on

2023-06-13

Licensed under

댓글