티스토리 뷰
## 4. 리액터 핵심 특징(Reactor core features) 상당 의역.
리액터 프로젝트의 주된 구조는 리액터-코어(reactor-core)로, Java8을 타겟으로 하며 리액티브 스트림 명세에 집중한 리액티브 라이브러리다.
리액터는 퍼플리셔 역할을 하는 플럭스와 모노 두 타입을 제공한다. 플럭스 객체란 0..N개의 반응형 항목 순서에 대응하는 개념이며, 모노 객체는 단일 값이거나 값 없음(0..)의 결과에 대응하는 개념이다.
이 구분을 통해 비동기 처리의 단위가 되는 개념들을 플럭스와 모노로 표현할 수 있다. 예를 들자. HTTP 요청은 유일한 응답만을 생성한다. 따라서 숫자를 셈하는 연산에는 적합하지 아니하다. 그렇기에 이러한 HTTP 요청은 결과를 모노로 표현할 수 있다. 이는 플럭스로 표현하는 것보다는 적합하다. 왜냐면 모노가 없거나 유일한 결과를 다루는 모델이기 때문이다.
처리 카디널리티가 변하는 연산 또한 두 타입으로 표현할 수 있다. 즉, 가령 카운트 연산은 플럭스를(여러 값들이 필요) 통해 수행되지만 그 결과는 모노(합계는 한개)를 통해 표현될 수 있는 것처럼 말이다.
역자: 여기서 이해가 잘 안되는 부분은 카디널리티라는 부분이다. 여기서 카디널리티는 단위 원소수라는 것으로 집합수(원소수)를 의미한다. 그저 특정 연산(가령 메서드)이 수행되는데 관여되는 데이터가 N개이면 그 연산의 카디널리티는 최대 N개가 되는 식이다. 즉 연산에 관계된 데이터가 0개 이상이면 플럭스로 나타내고 0 또는 1개이면 모노로 표현하면 된다는 내용 같다(틀리면 수정 부탁드립니다).
### 플럭스, 0~N개의 항목에 대한 비동기 순열

플럭스 Flux는 Publisher로, 0 ~ N 개의 순열을 방출하거나 완료 신호를 받아 종료되기도 하고 에러로 종료되기도 한다. 이는 리액티브 스트림 명세에 따르면 각각 뒤에 연결된 구독자(Subscriber)의 onNext, onComplete, onError에 대응한다.
플럭스는 다양한 범주의 신호를 사용하는 다목적용 리액티브 타입이다. 모든 이벤트 혹은 심지어 종료 이벤트들을 사용하는 건 선택적이다 - onNext 이벤트 없이onComplete 이벤트로 빈(empty) 유한한 순열을 표시할 수도 있으며 여기에 onComplete 이벤트도 제거하면 무한한 비어 있는 순열을 얻게 된다(딱히 유용하지 않으며, 취소 관련한 테스트에나 쓸 법하다). 유사하게 무한한 순열은 꼭 비어 있을 필요는 없다. 가령, Flux.interval(Duration)는 Flux을 생성하며 이는 무한하고 클렁당 정기적인 틱을 발생시킨다.
### 모노, 비동기적인 0-1 결과

모노 Mono는 특화된 Publisher이며 최대 1개 아이템만을 방출하고 onComplete 신호나 onError 신호와 함께 선택적으로 종료된다.
모노는 플럭스가 제공하는 연산집합의 부분만을 제공하며, 일부 연산은 플럭스를 리턴하는 연산이다(주로 모노를 다른 Publisher와 결합하는). 예를 들어, Mono#concatWith(Publisher)는 플럭스를 리턴하며 Mono#then(Mono)는 또다른 모노를 리턴한다.
모노는 값이 없는(?) 비동기 프로세스를 표현하는데 쓰일 수 있으니 명심하시고, 그런 프로세스는 완료의 의미만을 갖는다는 사실을 적시하시오(Runnable과 유사). 이거 만드려면 Mono\<Void\>를 만드세요.
### 4.3 플럭스와 모노를 만드는 기본 방법 그리고 그것을 구독하는 법
플럭스와 모노를 사용해보는 가장 쉬운 방법은 그들 각자 클래스들의 수많은 팩토리 메서드 중 하나를 사용해보는 것이다.
예를 들자꾸나. 문자열(String)의 순열을 생성하기 위해서는 그저 열거하거나, 컬렉션에 다 넣고 그 컬랙션을 기반으로 플럭스를 만들어 볼 수 있다. 아래와 같이.
```
Flux seq1 = Flux.just("foo", "bar", "foobar");
List iterable = Arrays.asList("foo", "bar", "foobar");
Flux seq2 = Flux.fromIterable(iterable);
```
또는 아래와 같이 할 수 있다.
```
Mono noData = Mono.empty(); [1]
Mono data = Mono.just("foo");
Flux numbersFromFiveToSeven = Flux.range(5, 3); [2]
```
[1] 팩토리 메서드가 비어 있더라도 제네릭 타입을 기술해야 한다.
[2] 첫 파라미터는 시작 인덱스이고 두번째는 몇 개를 생산할지에 대한 인자이다.
이제 구독에 대해 살피자. 플럭스와 모노는 자바 람다식을 활용할 수 있다. subscribe() 메서드의 수많은 변형을 사용할 수 있다. 변형이란 수많이 오버로드된 메서드들이다.
```
subscribe(); [1]
subscribe(Consumer<? super T> consumer); [2]
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer); [3]
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer); [4]
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer); [5]
```
[1] 구독과 동시에 순열을 트리거하오.
[2] 각각 값이 생산될 때마다 작업을 개시한다오.
[3] 값을 다룰 때 에러에도 반응한다오.
[4] 값과 에러를 둘 다 다루는데 순열이 성공적으로 종료된 경우에 부가적인 코드를 수행한다오.
[5] 값과 에러와 성공시 부가적인 코드를 다루는데 subscribe 호출에 관계되었던 Subscription 객체를 기준으로 특정 작업을 수행한다오.
> 이 메서드들의 변형들은 Subscription에 대한 레퍼런스를 리턴하는데 이 레퍼런스로 더 데이터가 필요 없을 때 구독을 취소할 수 있다오. 취소가 이뤄지면, 데이터를 제공하는 측은 값의 생산을 멈추고 초기화했던 자원들을 해제하는 작업을 수행한다오. 이 취소와 클리닝 작업을 위한 다목적 인터페이스인 Disposable 가 존재한다네.
## 4.3.1 구독 메서드 예시
이 구역에서 최소 예시를 살펴본다. 각각은 subscribe 메서드 5개의 시그니처를 가진다. 아래 나오는 코드는 인자 없는 기본 메서드의 사용례이다.
```
Flux ints = Flux.range(1, 3); [1]
ints.subscribe(); [2]
```
[1] 구독자가 붙으면 바로 3개의 값을 생산하는 플럭스를 만든다.
[2] 초 간단한 방식으로 구독한다.
위 코드는 명시적인 출력이 없지만 동작한다. 플럭스는 3개의 값을 생산할 것이다. 람다식을 쓴다면 그 값을 출력할 수 있다. 다음이 그 예다.
```
Flux ints = Flux.range(1, 3); [1]
ints.subscribe(i -> System.out.println(i)); [2]
```
[1] 구독자가 붙으면 바로 3개의 값을 생산하는 플럭스를 만든다.
[2] 값을 출력하는 구독자로 구독한다.
그 결과는 이와 같다.
```
1
2
3
```
다음 시그니처를 시연하기 위해, 의도적으로 예외를 발생시키는 아래의 예제를 준비하였다.
```
lux ints = Flux.range(1, 4) [1]
.map(i -> { [2]
if (i <= 3) return i; [3]
throw new RuntimeException("Got to 4"); [4]
});
ints.subscribe(i -> System.out.println(i), [5]
error -> System.err.println("Error: " + error));
```
[1] 구독과 동시에 값을 생산하는 플럭스 생성
[2] 값을 다르게 다루기 위한 map 연산
[3] 대부분의 값 리턴
[4] 한 개 값에선 에러 발생
[5] 예외 핸들러를 포함하는 구독자를 통한 구독
2개의 람다식을 썼다. 하나는 우리가 기대하는 정상 흐름과 나머지는 예외를 위한 것. 이 코드는 아래의 출력을 만든다.
```
1
2
3
Error: java.lang.RuntimeException: Got to 4
```
다음 시그니처는 에러 핸들러 및 완료 핸들러가 추가되었다.
```
Flux ints = Flux.range(1, 4); [1]
ints.subscribe(i -> System.out.println(i),
error -> System.err.println("Error " + error),
() -> System.out.println("Done")); [2]
```
[1] 구독과 동시에 값을 생산하는 플럭스 생성
[2] 완료 핸들러가 포함된 구독자로 구독
에러 신호 또는 완료 신호 두 개는 모두 종료 이벤트이며 각각은 상호배제적이다(두 개 중 하나만 받을 수 있다). 완료 이벤트가 발생하도록 예외가 발생하지 않도록 하였다.
완료 콜백은 입력을 가지지 않으며 빈 괄호로 표기되었다. Runnable 인터페이스의 run과 같다. 출력은 이하와 같다.
```
1
2
3
4
Done
```
마지막 시그니처는 Consumer\<Subscription\>을 포함한다. 여기서 Subscription객체를 통해 특정 작업(request(long) 또는 cancel())을 할 수 있다. 그렇지 않으면 플럭스는 잠긴다(hang).
```
Flux ints = Flux.range(1, 4);
ints.subscribe(i -> System.out.println(i),
error -> System.err.println("Error " + error),
() -> System.out.println("Done"),
sub -> sub.request(10)); [1]
```
[1] 구독하면 우리는 Subscription을 얻게 된다. 데이터 제공자로부터 최대 10 개까지의 요소를 원하는 신호를 보낸다(실제로는 4개 요소만 방출되고 완료된다).
## 4.3.2 구독 취소와 Disposable
모든 람다 기반의 subscribe() 메서드들은 Disposable을 리턴한다. Disposable은 구독이 취소될 수 있다는 것을 뜻하며 이는 해당 인터페이스의 dispose() 메서드의 호출함으로써 가능하다.
플럭스와 모노에 있어서, 취소는 데이터 제공자에게 요소의 생산을 멈춰야 한다는 신호를 주는 것과 같다. 그러나 즉발성은 보장되지 않는다. 몇몇 데이터 제공자들은 데이터를 너무 빨리 생산해서 취소 지시가 있기 전에 이미 생산을 마쳤을 수 있다.
Disposables 클래스에는 Disposable 에서 유용한 몇 유틸리티가 있다. 이들 중 Disposables.swap()은 Disposable을 감싸는 Wrapper를 생성하는데, 이를 통해 원자적으로 구체적인 Disposable을 취소하고 교환할 수 있다. 이 기능이 유용할 때가 있다. 그것은, 사용자가 버튼을 클릭할 때마다 요청을 취소하고 새 요청을 만들어야 하는 UI 시나리오와 같은 때이다. Wrapper를 끝내면(Dispose) wrapper는 닫히고, 현재 구체적인 값과 모든 미래에 시도될 치환작업이 종료된다.
또다른 재미있는 유틸리티는 Disposables.composite(...)이다. 서너개의 서비스 콜과 관련된 진행중인 여러 요청과 같은 Disposable을 서너개 모을 수 있으며 나중에 한번에 종료(dispose)할 수 있다. 한번 composite의 dispose()가 호출되면, 이후에 Disposable을 추가하는 시도는 즉시 추가된 Disposable를 종료시킨다.
## 4.3.3 람다식의 대체 : BaseSubscriber
또 하나의 subscribe 메서드가 있다. 더 일반적이고 구독자를 람다식으로 구성하는 것보다도 더 완벽한 기능을 갖춘 Subscriber다. 이러한 Subscriber를 구현하는 작업을 덜기 위해 확장가능한 클래스 BaseSubscriber가 제공된다.
이들 중 하나를 구현해보자. SampleSubscriber라 명명할 것이며 플럭스에 어떻게 적용되는지 아래에서 살펴본다.
```
SampleSubscriber ss = new SampleSubscriber();
Flux ints = Flux.range(1, 4);
ints.subscribe(i -> System.out.println(i),
error -> System.err.println("Error " + error),
() -> {System.out.println("Done");},
s -> s.request(10));
ints.subscribe(ss);
```
이제 SampleSubscriber가 어떻게 생겼는지 보자. BaseSubscriber의 최소한의 구현이다.
```
package io.projectreactor.samples;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
public class SampleSubscriber extends BaseSubscriber {
public void hookOnSubscribe(Subscription subscription) {
System.out.println("Subscribed");
request(1);
}
public void hookOnNext(T value) {
System.out.println(value);
request(1);
}
}
```
이 SampleSubscriber 클래스는 BaseSubscriber를 상속한다. 이는 리액터에서 사용자가 Subscriber를 구독할 때 권장되는 추상 클래스다. 이 클래스는 구독자의 행동을 튜팅할 수 있도록 오버라이딩 가능한 훅을 제공한다. 기본적으로, 이 구독자는 무한한 요청을 보내고 정확하게 subscribe()가 호출된 것과 같이 동작한다. 그러나 BseeSubscriber를 상속하면 요청 횟수를 사용자 정의하고 싶을 때 꽤나 유용하다.