Jay's Blog

[Webflux] Reactor Mono와 Flux
웹플럭스 처음 배워 보기 - Mono, Flux

웹플럭스를 잘 쓰려면 두개는 우선 알아야 한다. Mono, Flux

웹플럭스는 비동기 기반의 프레임워크이기 때문에 가지는 특징이 있다.

메소드 시그니처로 구분한다니?

Kotlin

fun getMyType() : Mono<MyType>

Java

public Mono<MyType> getMyType() {}

리턴타입에 Mono가 정의됨으로써, 이 메소드는 Reactor를 사용하는 비동기 응답이라는 것을 나타낸다.

기존 코드에 변경을 주지 않으면서도 가장 간단하게 표현할 수 있는 방법이 아닌가 싶다.

이렇게 표현해주는 것만으로도 이제 Reactor의 세계에 입문하게 된 것이다.

Mono

쉽게 말하면 1개 혹은 0개의 값을 담을 수 있는 컨테이너이다.

Mono는 최대 하나의 element를 onNext시그널에 전달할 수 있고, onComplete 시그널에서는 값을 1개 혹은 값이 없이 끝나게 된다. 중간에 예외가 발생하게 되면 1개의 onError 시그널이 발생한다.

리액터에서 말하는 시그널이라는 것은 리액터의 구조와 연관이 있다. 비동기 이벤트 기반으로 구현하기 위해서는 모든 작업(operation)은 단일 쓰레드에서 동작해서는 안된다.

왜냐고..? 일단 쓰레드가 코어 갯수 밖에 없고, 단일 쓰레드를 계속해서 사용하면 다른 작업이 불가능하기 때문이다.

그래서 리액터는 네티 기반의 이벤트 핸들러를 이용해서 각각의 작업은 모두 분할 되어 있도록 한다. 분할 되어 있는 작업들은 실행이 되면서 각각의 작업이 서로 다른 쓰레드에서 실행이 되도록 한다.

예를 들어보자.

Kotlin

Mono.just(1)
    .map {
        it + 1  // thread 1: result 2
    }
    .map {
        it + 1  // thread 2: result 3
    }
    .onNext {
        log.info(it) // onNext에서는 성공한 데이터만 나오고, 데이터를 사용할 수만 있음
    }
    // 아무일도 일어나지는 않음
    .subscribe() // 호출되는 시점 부터 스케줄링해서 동작

보통 위와 같은 상황에서는 각각의 map 시그널이 분리되어 전달되고 각각 다른 쓰레드에서 실행이 된다. 다른 쓰레드에서 실행되기 때문에 기존에 MDC 같은 것을 사용한다면 동작하지 않는다. 비슷하게 사용하고 싶으면 방법을 강구해야 한다. (몇가지 방법은 있다)

위와 같이 Mono로 생성된 객체는 생성 당시에는 아무것도 할 수 없고, subscribe가 호출되는 시점부터 스케쥴링이 된다. Hot, Cold 퍼블리셔는 있지만, 이것은 나중에 이야기 하는게 나을 듯 하다.

onNext라던지 onComplete 같은 여러가지 시그널을 처리할 수 있는 방법이 있으니, 해당 내용은 리액터 공식문서를 확인하면서 데이터를 변경해보면 된다.

Flux

플럭스는 쉽게 이야기해서 0개-무한개 까지를 표현할 수 있는 컨테이너이다.

리액터에서는 모든 element가 subcribe한 시점부터 있을 필요는 없다. 이는 Mono든 Flux든 마찬가지이고, 따라서 Flux는 subscribe한 이후에 일련의 데이터가 계속해서 전달되는 것으로 이해하면 오히려 더 쉽게 접근가능할 수 있다.

리액터 공식문서를 볼때, 이미지를 잘 활용해서 보면 좋다. 어떤 시그널일 때, 어떤 식으로 동작하는지 자세히 알 수 있기 때문이다.

Kotlin

Flux.interval(Duration.ofSeconds(1))
    // 1초당 한번씩 데이터가 생성됨.
    // interval은 현재 몇번째인지를 데이터로 내려줌
    .map {
        log.info(it)
        it + 1
    } // 결과 값이 Flux로 나온다.

위 코드를 실행하게 되면, 계속해서 1초당 한번씩 로그를 찍으면서 데이터를 생성해내는 것을 알 수 있다.

Flux는 여러 데이터를 표현할 수 있기 때문에 자바에서 흔히 사용되는 컬렉션 객체를 Flux로 변환해서 사용할 수도 있고, 계속해서 생성되는 데이터를 Sink를 이용해서 전달해서 사용하는 방법도 있다. 이는 다음에 더 자세히 다루도록 하겠다.

웹플럭스에서 사용?

웹플럭스에서 Mono, Flux를 사용하는 것은 매우 간단하다.

Kotlin


@Controller
fun getData(req: Req) : Mono<MyData> {}

@Controller
fun getData(req: Req) : Flux<MyData> {}

위와 같이 사용하면 된다.

다음에는 publishOn vs subscribeOn / hot vs cold publisher 에 대해 이야기 해보고자 한다.




참고문헌

리액터 공식문서

*****
Written by Jay on 11 December 2021