ThreadLocal과 Netty

Spring에서 Bean들은 Spring Container에 의해서 싱글톤으로 관리되는데 이는 애플리케이션에 한개의 인스턴스만 존재하는데 여러 Thread가 동시에 접근할 경우 동시성 이슈가 발생할 수 있는데 이를 해결하기 위해 Java에서는 ThreadLocal 객체를 활용할 수 있다.

ThreadLocal은 Thread만 접근가능한 저장소로 여러 Thread가 접근할 경우 각각의 Thread를 식별해서 저장소를 구분한다. 따라서 같은 인스턴스의 ThreadLocal 필드에 여러 Thread가 접근하더라도 상관이 없다.

get(), set(), remove() 같은 메서드를 통해 조회, 저장, 저장소를 초기화한다.

톰캣과 같은 WAS에선 Thread Pool을 만들고 Request가 들어오면 각 Thread가 해당 요청을 담당하여 프로세스를 처리한다.

Spring Boot가 실행되면 내부적으로 ThreadPoolExecutor 구현체를 생성해서 내장 톰캣이 사용할 Thread Pool을 생성하는 구조
Thread Pool 을 사용하는 환경에선 Thread Pool을 통해 Thread가 재사용될 수 있으므로 사용이 끝나면 명시적으로 초기화를 해줘야 한다.
ThreadPoolExecutor를 확장해서 beforeExecute()와 afterExecute() 메서드에서 이러한 문제들을 해결할 수 있다.
기존 서블릿 기반의 Spring Boot는 Tomcat을 기본 Embeded WAS로 사용하는데 WebFlux의 경우 Netty를 기본으로 사용한다.

Tomcat은 요청 당 하나의 Thread가 동작하지만 Netty는 1개의 이벤트를 받는 Thread와 다수의 Worker Thread로 동작하게 된다.
Netty는 channel에서 발생하는 이벤트를 EventLoop가 처리하는 구조로 동작하는데 EventLoop는 이벤트를 실행하기 위한 무한루프 Thread라고 볼수 있다.

Netty에 대한 자세한 내용은 다음에 더 찾아보자.

아래는 webFlux에서 ThreadLocal에 저장하던 데이터를 어떻게 보관하는지 찾아본 글 내용이 흥미로워서 나중에 보려고 추가함
https://sightstudio.tistory.com/15

[WebFlux] map과 flatMap에 대해서

spring WebFlux를 사용하다보면 체인 연산자로 FlatMap을 사용해야할지, Map을 사용해야할지 헷갈리는 경우가 있어서 정리해봤다.

map 함수

public final Flux map(Function<? superT,? extends V> mapper)

  • Transform the items emitted by this Flux by applying a synchronous function to each item.

flux.map

map 함수는 데이터 스트림의 각 요소를 변환하는데 사용되는 함수로 각 요소를 동기적으로 처리하며, 각 요소를 변환하여 새로운 데이터 스트림을 반환한다.

간단한 샘플코드를 보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Test
void sampleMapTest() {
// Arrange
Function<String, String> mapper = (str) -> str.toUpperCase();
Flux<String> testFlux = Flux.just("apple", "banana", "carrot");

// Act
Flux<String> resultFlux = testFlux.map(mapper);

// Assert
StepVerifier.create(resultFlux)
.expectNext("APPLE", "BANANA", "CARROT")
.expectComplete()
.verify();
}

Function mapper는 스트림을 구독할 때 실행되며 각 요소의 순서를 유지하며 입력값을 변환하여 새로운 값으로 출력하게 된다.

flatMap 함수

public final Flux flatMap(Function<? superT,? extendsPublisher<? extends V>> mapper, int concurrency)

  • Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave.

flux.flatMap

flatMap 함수는 데이터 스트림의 각 요소별로 publisher를 생성하여 여러개의 하위 스트림이 생성되고 subscribe를 통해 단일 스트림으로 묶은 Publisher로 반환한다.

이러한 Publisher를 비동기적으로 처리하여 순서를 보장하지 않기 때문에 순서를 보장하고 싶으면 flatMapSequential 이나 concatMap 등을 사용할 수 있다.

샘플코드를 보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Test
void sampleFlatMapTest() {
// Arrange
Function<String, Publisher<String>> mapper = (str) -> Flux.just(str.toUpperCase().split("")).subscribeOn(Schedulers.boundedElastic());
Flux<String> testFlux = Flux.just("apple", "banana", "carrot");

// Act
Flux<String> resultFlux = testFlux.flatMap(mapper);

List<String> output = new ArrayList<>();
resultFlux.subscribe(output::add);

// Assert
assertThat(output).containsExactlyInAnyOrder("A", "P", "P", "L", "E", "B", "A", "N", "A", "N", "A", "C", "A", "R", "R", "O", "T");
}

mapper에서 Publisher로 반환되는데 별도의 스레드를 할당하고 flatMap에서 비동기로 동작하게 되면서 순서를 보장하지 않는 flatMap의 응답값은 apple, banana, carrot의 글자가 섞이게 된다.

사용 시 고려사항

map은 각 요소가 1:1로 매핑되서 동기적으로 처리된다.

flatMap은 각 요소에 대해서 1:다 매핑이 되서 단일 스트림으로 병합되고 publisher의 동작에 따라서 동기적, 혹은 비동기적으로 동작할 수 있다.

단순히 DTO 객체변환이라든지, 단일요소에 대한 간단한 계산같은건 map을 사용하면 좋고 각 요소의 처리 시간이 다를 수 있는 작업이나 외부 서비스, DB 호출 등의 작업을 할때는 비동기 동작이 가능한 flatMap을 사용하는게 좋다.

map은 단일 데이터 스트림의 각 요소를 독립적으로 처리하기 때문에 요소의 크기에 따라 성능차이가 크진 않지만 flatMap은 데이터 스트림을 개별적으로 처리 후 병합하기 위해 데이터 스트림의 크기나 작업량에 따라서 성능적으로 영향이 map에 비해서 상대적으로 클 수 있다.

log4j2 설정

사내 프로젝트의 여러가지 설정, 프로퍼티들은 dev, test, master로 분리되어있는데 살펴보다보니 log4j2-test.xml이 없고 test 프로파일로 빌드 시 log4j2-dev.xml을 바라보고 있었다.

시간도 좀 여유가 있던 시점이라 그냥 내가 만들어서 분리시켰는데 하는김에 SQL 관련 로그 설정을 변경했고 공부도 할겸 내용을 정리했다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<!-- SQL Logger -->
<logger name="jdbc.connection" additivity="false">
<level value="OFF" />
<appender-ref ref="console"/>
</logger>

<logger name="jdbc.audit" additivity="false">
<level value="OFF"/>
<appender-ref ref="console"/>
</logger>

<logger name="jdbc.resultset" additivity="false">
<level value="OFF" />
<appender-ref ref="console"/>
</logger>

<logger name="jdbc.sqlonly" additivity="false">
<level value="OFF"/>
<appender-ref ref="console"/>
</logger>

<logger name="jdbc.resultsettable" additivity="false">
<level value="info"/>
<appender-ref ref="console"/>
</logger>

<logger name="jdbc.sqltiming" additivity="false">
<level value="info" />
<appender-ref ref="console"/>
</logger>
내용
jdbc.connection 열려있는 모든 번호와 연결 수립 및 해제 이벤트를 기록. 이는 연결 문제를 찾아내는데 매우 유용 (Connection Pool 설정)
jdbc.audit ResultSet을 제외한 모든 JDBC 호출 정보를 로그로 남긴다. 많은 양의 로그가 생성되므로 특별히 JDBC 문제를 추적해야 할 필요가 있는 경우를 제외하고는 사용을 권장하지 않는다.
jdbc.resultset ResultSet을 포함한 모든 JDBC 호출 정보를 로그로 남기므로 매우 방대한 양의 로그가 생성된다.
jdbc.sqlonly SQL문만을 로그로 남기며, PreparedStatement일 경우 관련된 argument 값으로 대체된 SQL문이 보여진다. (원래는 ? 로 표시되는데, log4j 덕분에 완전한 쿼리로 보여지는 것임)
jdbc.resultsettable SQL 결과 조회된 데이터의 table을 로그로 남긴다.
jdbc.sqltiming SQL문과 해당 SQL을 실행시키는데 수행된 시간 정보(milliseconds)를 포함한다.

SQL관련

sqlonly와 sqltiming은 sql로그를 찍는 공통적인 기능을 하며 sqltiming이 sqlonly를 포함하고 있습니다.

#{id} 등 파라미터로 넣어준 값도 모두 치환하여 출력해줍니다.

둘다 찍도록 하면 sql문이 두번 찍히니 하나만 찍어줍니다.

  • jdbc.sqlonly : SQL문을 로그로 남깁니다.
  • jdbc.sqltiming : SQL문과 수행된 시간을 로그로 남깁니다.

TABLE관련

결과를 table형식으로 만들어 출력해줍니다.

  • jdbc.resultsettable : SQL 결과 조회된 데이터를 table 형식으로 만들어줍니다.

JDBC호출 관련

아래 두개의 설정은 일반적으로는 킬 필요가 없는 설정입니다.

두개가 묶음으로 JDBC 호출정보를 출력해주는 기능을 하며 ResultSet을 포함하느냐 포함하지 않느냐의 차이입니다.

  • jdbc.audit : ResultSet을 제외한 JDBC 호출 정보를 로그로 남긴다.
  • jdbc.resultset : ResultSet을 포함한JDBC 호출 정보를 로그로 남깁니다.

additivity : 중복된 로그를 남길 지 여부를 설정한다.

기본적으로 logger는 정의한 패키지의 조상으로부터 모든 appender를 상속받아서 만약 어떤 Logger가 같은 appender를 사용한다면 메시지가 2번 찍히게 된다.

additivity=”false” 설정하면 이를 방지할 수 있다.

Log Level

로그 레벨은 다음과 같다. (OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL)

레벨 범위
OFF 이벤트를 기록하지 않음
FATAL 아주 심각한 에러가 발생한 상태를 나타냄. 시스템적으로 심각한 문제가 발생해서 어플리케이션 작동이 불가능할 경우가 해당하는데, 일반적으로는 어플리케이션에서는 사용할 일이 없음.
ERROR 요청을 처리하는중 문제가 발생한 상태를 나타냄.
WARN 처리 가능한 문제이지만, 향후 시스템 에러의 원인이 될 수 있는 경고성 메시지를 나타냄.
INFO 로그인, 상태변경과 같은 정보성 메시지를 나타냄.
DEBUG 개발시 디버그 용도로 사용한 메시지를 나타냄.
TRACE 디버그 레벨이 너무 광범위한 것을 해결하기 위해서 좀더 상세한 상태를 나타냄.
ALL 모든 이벤트를 기록함

[WebFlux] Mono.defer(), Mono.fromCallable()

Mono.defer()Mono.fromCallable() 모두 지연 평가(lazy evaluation)를 제공하는데 사용된다. 지연 평가란 호출 시점을 지연시켜서 필요한 시점에서 코드를 실행하는 방식을 의미한다.

Mono.defer() 메서드는 Mono의 factory 메서드 중 하나로, 인자로 전달받은 Supplier 함수를 사용하여 Mono 객체를 생성한다. 이때 Supplier 함수가 호출되는 시점은 Mono 객체를 subscribe 할 때이다.

Mono.fromCallable() 메서드는 Mono의 factory 메서드 중 하나로, 인자로 전달받은 Callable 함수를 사용하여 Mono 객체를 생성한다. 이때 Callable함수가 호출되는 시점은 Mono객체 생성 시점이다.

따라서 Mono.defer()는 Mono 객체를 생성할 때마다 Supplier함수를 실행하여 객체를 생성하는 방식을 사용하며, Mono.fromCallable()은 Mono객체를 생성하는 시점에서 Callable함수를 실행하여 객체를 생성하는 방식을 사용한다. Mono.fromCallable()은 즉시 실행되므로, Mono.defer()보다 더 많은 자원을 소비할 수 있다.

Mono.defer()는 Mono의 객체 생성 시점을 늦추고, 객체 생성과 동시에 상태를 초기화할 수 있는 장점이 있고 반면에 Mono.fromCallable()은 Callable함수의 실행 결과를 즉시 Mono객체에 포함시키므로, 결과를 즉시 필요로 하는 경우에 유용하다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void main(String[] args)
{
callExternalService()
.repeat(3)
.doOnNext(System.out::println)
.subscribe();
}

private static Mono<?> callExternalService()
{
System.out.println("External service is called.");
return Mono.just("result");
}

/*
External service is called.
result
result
result
result
*/

Mono.just(“result”)의 결과를 repeat(3)으로 세 번 반복하게 되면, 해당 Mono를 세 번 실행하게 된다.

하지만, Mono.just(“result”)는 호출될 때마다 값을 생성하는 게 아니라, 최초 1회 호출시 생성한 값을 계속해서 방출하게 된다. 그래서 "External service is called."는 처음 1회만 출력되고, 세 번 반복되는 Mono.just(“result”)는 이미 생성된 값을 방출하게 되므로, 추가로 호출할 필요가 없어지게 된다.

따라서 "External service is called."는 최초 1회만 출력되고, 세 번 반복되는 Mono.just(“result”)는 이미 생성된 값을 방출하게 된다.

[WebFlux] Flux와 Stream의 reduce, 그리고 chatGPT

평소처럼 개발하던 중 reduce를 사용한 코드에서 오류가 발생했다.

1
2
3
4
5
java: cannot find symbol
symbol: variable accumulatedMap

java: cannot find symbol
symbol: variable currentMap

오류가 발생한 상황을 샘플코드로 대략적으로 재현해봤다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// sample 1
List<Integer> list = IntStream.range(0,10).boxed().collect(Collectors.toList());
var collectMap = list.stream()
.map(i -> {
Map<String, Integer> map = new HashMap<>();
map.put("key" + i, i * 10);
return map;
})
.reduce(**Collections.emptyMap()**, (accumulatedMap, currentMap) -> new HashMap<>() {
{
putAll(accumulatedMap);
putAll(currentMap);
}
});
System.out.println("Stream reduce Map size: " + collectMap.size());
// Stream reduce Map size: 10

// sample 2
Flux.range(1, 10)
.map(i -> {
Map<String, Integer> map = new HashMap<>();
map.put("key" + i, i * 10);
return map;
})
//.reduce(**Collections.emptyMap()** ,(accumulatedMap, currentMap) -> new HashMap<>() { // compile error!! > `java: cannot find symbol`
.reduce((accumulatedMap, currentMap) -> new HashMap<>() {
{
putAll(accumulatedMap);
putAll(currentMap);
}
})
.doOnNext(map -> {
System.out.println("Flux reduce Map size: " + map.size());
})
.subscribe();
// Flux reduce Map size: 10

Stream과 Flux의 reduce를 호출 시 initial 파라미터로 각각 Collections.emptyMap()를 사용하였으나 Stream은 정상동작하지만 Flux에선 컴파일 오류가 발생하였다.

여러 케이스로 바꿔가면서 테스트를 해보니 Collections.emptyMap()의 반환값이 불변객체라서 안되는거 같다는 추론까진 도달했지만 확신은 없었다.

구글링으로 찾아보려고 했지만 공식문서를 봐도 Flux에선 안되고 Stream에선 되는 이유에 대해서 명확한 답변을 찾기 어려워서 고민하던차에 요즘 핫한 chatGPT에 물어볼까 하는 생각이 들어서 바로 실행해봤다.

아래는 chatGPT 와 대화한 내용


i want public final <A> Mono<A> reduce(A initial, BiFunction<A,? super T,A> accumulator) example code.

  • Here’s an example of how you could use the reduce method in Spring WebFlux:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;

    import java.util.function.BiFunction;

    public class ReduceExample {
    public static void main(String[] args) {
    Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);

    BiFunction<Integer, Integer, Integer> accumulator = (a, b) -> a + b;
    Mono<Integer> sum = numbers.reduce(0, accumulator);

    sum.subscribe(result -> System.out.println("The sum of the numbers is: " + result));
    }
    }

    This example creates a Flux of integers and then uses the reduce method to sum them up. The reduce method takes an initial value, in this case 0, and a BiFunction that specifies how the current accumulation (a) is combined with each element from the flux (b) to produce the next accumulation.
    The subscribe method is then used to subscribe to the Mono returned by reduce and print out the result.

initial Type is HashMap

  • Here’s an example of how you could use the reduce method with an initial value of a HashMap:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;

    import java.util.HashMap;
    import java.util.function.BiFunction;

    public class ReduceExample {
    public static void main(String[] args) {
    Flux<String> words = Flux.just("apple", "banana", "cherry", "apple", "banana");

    BiFunction<HashMap<String, Integer>, String, HashMap<String, Integer>> accumulator = (map, word) -> {
    if (!map.containsKey(word)) {
    map.put(word, 1);
    } else {
    map.put(word, map.get(word) + 1);
    }
    return map;
    };
    Mono<HashMap<String, Integer>> wordCounts = words.reduce(new HashMap<>(), accumulator);

    wordCounts.subscribe(result -> System.out.println("The word counts are: " + result));
    }
    }

    This example creates a Flux of strings and uses the reduce method to count the occurrences of each word. The reduce method takes an initial value, in this case a new empty HashMap, and a BiFunction that specifies how the current HashMap (map) is combined with each word from the flux (word) to produce the next HashMap.
    The subscribe method is then used to subscribe to the Mono returned by reduce and print out the result.

Why does Mono<HashMap<String, Integer>> wordCounts = words.reduce(Collections.emptyMap(), accumulator); error occur?

  • The error occurs because Collections.emptyMap() returns an unmodifiable map, which means you can’t modify it. The reduce method expects the accumulator function to modify the accumulator object, which is not possible with an unmodifiable map.
    You can resolve the issue by creating a new empty HashMap as the initial value instead of using Collections.emptyMap(), like this:

    1
    Mono<HashMap<String, Integer>> wordCounts = words.reduce(new HashMap<>(), accumulator);

    This will give you a modifiable map that can be updated by the accumulator function as required by the reduce method.

But why does Java Stream API succeed?

  • The Java Stream API allows you to use the reduce method with an initial value of Collections.emptyMap() because the reduce operation in the Stream API operates on an internal data structure that is separate from the original data source. This allows the Stream API to handle unmodifiable maps as the initial value without any issues.
    However, in reactive programming with Spring WebFlux, the reduce method operates on the data stream directly, so you need to provide an accumulator object that is mutable. That’s why you need to use a new empty HashMap instead of Collections.emptyMap() as the initial value.

별로 기대하지 않았는데 원하는 내용에 대해서 생각보다 정확한 답변을 들을 수 있었다.

  1. 먼저 Flux의 reduce 호출 시initial가 HashMap인 케이스로 샘플코드를 요청하고 Collections.emptyMap() 을 초기값으로 사용 시 왜 오류가 나는지 물어봤고
    Collections.emptyMap()으로 생성되는 객체는 returns an unmodifiable map 즉, immutable(불변)객체인데 reduce 메서드는 초기값으로 들어온 accumulator 객체를 수정할 것으로 예상했지만 수정이 불가능한 불변객체가 들어와서 오류가 발생하는거라고 한다.
  2. Stream의 reduce에선 왜 성공하는지? 에 대해선
    Stream의 reduce는 원래 데이터와는 다른 별도의 내부 데이터 구조에서 동작하기 때문에 Collections.emptyMap()의 초기값으로 reduce를 사용할 수 있지만
    리액티브 프로그래밍 기반의 WebFlux에선 데이터 스트림에서 reduce 메서드가 직접 동작하기 때문에 변경가능한 accumulator 객체를 제공해야 하므로 변경이 가능한 빈 HashMap 객체를 생성해야 한다고 답변하였다.

앞으로 나의 코딩선생님이 하나 더 늘었다는 생각에 흥분되면서도 한편으론 chatGPT를 사용해서 구글의 코딩인터뷰 Level3도 통과했다는 뉴스처럼 문맥을 파악해서 의도한 바를 정확하게 알려주는 답변을 보고 소름이 돋았다.