RxJS에서 연산자(operator)는 현재 Observable을 기반으로 새로운 Observable을 생성하는 순수 함수이다. 이때 Observable은 불변 객체이므로 현재의 Observable 인스턴스는 변경되지 않고 항상 새로운 Observable을 만든다.
type OperatorFunction<T, R> = (
source: Observable<T>,
) => Observable<R>;
map, filter, reduce
그렇다면 우선 대표적인 연산자인 map
, filter
, reduce
에 대한 구현을 살펴보자. Array.prototype
에 정의된 메서드와는 달리 이번에는 Observable에 대해서 적용할 수 있도록 해야한다.
function map<T, R>(
transform: (value: T, index: number) => R,
): Operator<T, R> {
let currentIndex = 0;
return (source) =>
new Observable((observer) => {
const subscription = source.subscribe(
(value) => {
const mapped = transform(value, currentIndex++);
observer.next(mapped);
},
(err) => {
observer.error(err);
},
() => {
observer.complete();
},
);
return () => subscription.unsubscribe();
});
}
function filter<T>(
predicate: (value: T, index: number) => T,
): Operator<T, T> {
let currentIndex = 0;
return (source) =>
new Observable((observer) => {
const subscription = source.subscribe(
(value) => {
const filtered = predicate(value, currentIndex++);
if (filtered) observer.next(value);
},
(err) => {
observer.error(err);
},
() => {
observer.complete();
},
);
return () => subscription.unsubscribe();
});
}
function reduce<T, R>(
accumulator: (
acc: T | R,
value: T,
index: number,
) => T | R,
init: T | R,
): Operator<T, T | R> {
let currentIndex = 0;
let accumulated = init;
return (source) =>
new Observable((observer) => {
const subscription = source.subscribe(
(value) => {
accumulated = accumulator(
accumulated,
value,
currentIndex++,
);
},
(err) => {
observer.error(err);
},
() => {
observer.next(accumulated);
observer.complete();
},
);
return () => subscription.unsubscribe();
});
}
USE-CASE
const mapSource = map((number) => number + 1);
const filterSource = filter((number) => number % 2);
const reducedSource = reduce(
(acc, number) => acc + number,
0,
);
reducedSource(
mapSource(filterSource(of(1, 2, 3, 4, 5))),
).subscribe((value) => {
console.log(value);
});
// 12
map
,filter
,reduce
연산자는 RxJS에서는 연산자로 구분되지만 반환하는 타입을 살펴보면OperatorFunction
(Observable ⇒ Observable)인 것을 알 수 있다. 따라서 엄밀히 말하면 연산자를 만들어내는 고차 연산자(Higher-Order Operator)라 할 수 있다.
하지만 아직 아쉬운 점이 존재하는 데 함수 합성(compose, pipe)과 마찬가지로 연산자들을 합성할 수 있는 별도의 기법(혹은 함수)가 존재한다면 가독성과 사용성을 높일 수 있을 것이다.
Composing Operator
Dot Chaining
RxJS 5.5까지는 다음처럼 dot chaining을 이용하여 합성을 수행하였으나 6.0부터는 pipe 연산자만 제공한다.
of(1, 2, 3, 4, 5)
.map((number) => { ... })
.filter((number) => { ... })
.reduce((acc, number) => { ... });
이러한 체이닝 방식의 한계는 Observable.prototype
에 필요한 모든 연산자가 정의되어 있어야 한다는 점이다. 이 방식에서는 실제로 사용하지 않는 연산자에 대해서도 체이닝을 위해 prototype
에 정의하므로 파일 사이즈를 증가시킨다는 문제가 존재한다.
pipe operator
of(1, 2, 3, 4, 5)
.pipe(
map((number) => { ... }),
filter((number) => { ... }),
reduce((acc, number) => { ... })
);
Observable.prototype
에는 pipe
연산자만 정의해두고 필요한 연산자들을 합성하는 이 방식을 이용하면 웹팩같은 모듈 번들러를 통해 트리 셰이킹(tree shaking: 사용하지 않는 모듈을 번들링 시 제거)을 수행할 수 있게 된다. 거기에 pipe
연산자에 결합되는 요소는 메서드가 아닌 함수 단위이므로 Observable과의 결합도를 떨어뜨릴 수 있다.
또한 각 연산자가 반환하는 형태를 살펴보면 Transducing에서 살펴본 것처럼 새로운 Observable을 직접 반환하는 것이 아닌 Source Observable을 전달받아 필요한 연산을 수행한 이후 새로운 Observable을 반환하는 함수(OperatorFunction)를 반환한다. reducer(transformer)를 전달받아 새로운 reducer(transformer)를 반환하는 transducer 간의 합성과 유사한 형태를 띈다. 따라서 Lazy Evaluation의 이점과 Early Termination의 가능성을 그대로 이어갈 수 있게 된다.
class Observable<T> {
// ...
pipe(): Observable<T>;
pipe<A>(op1: Operator<T, A>): Observable<A>;
pipe<A, B>(
op1: Operator<T, A>,
op2: Operator<A, B>,
): Observable<B>;
pipe<A, B, C>(
op1: Operator<T, A>,
op2: Operator<A, B>,
op3: Operator<B, C>,
): Observable<C>;
pipe<A, B, C, D>(
op1: Operator<T, A>,
op2: Operator<A, B>,
op3: Operator<B, C>,
op4: Operator<C, D>,
): Observable<D>;
pipe(
...operators: Operator<any, any>[]
): Observable<any> {
let resultObservable: Observable<any> = this;
for (const operator of operators) {
resultObservable = operator(resultObservable);
}
return resultObservable;
}
}
of(1, 2, 3, 4, 5)
.pipe(
filter((number) => number % 2),
map((number) => number + 1),
reduce((acc, number) => acc + number, 0),
)
.subscribe((value) => {
console.log(value);
});
// 12
mergeMap
map
연산자의 경우 Observable에서 전달된 값을 변경(transform)하여 전달하는 역할을 수행하였다. RxJS에서 모든 데이터 소스는 Observable로 만드는 것이 가능하므로 각각의 값을 다시 Observable로 만들어 전달하는 것도 가능할 것이다. 하지만 그 상황에서 실질적으로 필요한 것은 Observable로 감싸진 값이지 Observable 그 자체가 아니다. 비슷한 사례는 Promise에서 flattening을 수행했던 경우를 생각해보면 이해할 수 있다.
const P = new Promise((resolve, reject) => {
setTimeout(() => resolve(0), 0);
});
P.then((res) => {
return Promise.resolve(res + 1);
}).then((res) => {
// 만약 res가 Promise라면 다시 res.then을 이용하여 Promise 내부의 값을
// 참조하는 방식으로 수행해야 한다.
res.then(console.log);
// 하지만 Promise는 기본적으로 Promise flattening을 수행하여 전달하기 때문에
// 여기에서의 res는 Promise가 래핑하고 있는 값 그 자체이다.
console.log(res); // 1
});
마찬가지로 Observable에서도 flattening을 지원하지 않으면 다음과 같은 상황에서 두 번의 subscribe
를 수행해야 할 것이다.
[Observable].subscribe((value) =>
value.subscribe((innerValue) => console.log(innerValue)),
);
function mergeMap<T, R>(
project: (value: T, index: number) => Observable<R>,
): OperatorFunction<T, R> {
let currentIndex = 0;
return (source) =>
new Observable((observer) => {
const subscriptions = new Set<Subscription>();
const sub = source.subscribe(
(value) => {
const projected = project(value, currentIndex++);
const innerSub = projected.subscribe(
(pValue) => {
observer.next(pValue);
},
(err) => {
observer.error(err);
},
() => {
subscriptions.delete(innerSub);
},
);
subscriptions.add(innerSub);
},
(err) => {
observer.error(err);
},
() => {
Array.from(subscriptions).forEach((sub) =>
sub.unsubscribe(),
);
observer.complete();
},
);
subscriptions.add(sub);
return () => {
Array.from(subscriptions).forEach((sub) =>
sub.unsubscribe(),
);
};
});
}
mergeMap
이 기존의 map
과 다른 점은 바로 Subscriptions에 대한 Set
을 유지한다는 점이다. Source Observable에 대한 구독과 project
함수가 반환한 Observable에 대한 Subscription을 모두 추적해야하기 때문이다. 그래야 Source Observable은 물론 중첩된 Observable에 대해서도 모두 unsubscribe
를 수행할 수 있게 된다.
가장 핵심적인 사항은 Source Observable에 대한 subscribe
를 수행한 후, project
함수에서 반환하는 Observable을 다시 subscribe
하여 값을 전달할 수 있다는 것이다.
concatMap
concatMap
은 mergeMap
과 유사하지만 project
함수가 반환하는 Observable을 subscribe
했을 때 현재 subscribe
중인 Observable에서 complete
가 호출되어야 다음 Observable을 subscribe
하게 된다.
Source Observable에서 next
로 전달하는 값은 project
함수가 반환한 Observable이다. 이때 mergeMap
의 경우 project
함수가 반환하는 순서대로 해당 Observable에 대한 subscribe
를 수행하고 이전에 subscribe
한 Observable에서 complete
가 호출되었는지 여부에 관계없이 먼저 next
를 호출하는 Observable을 처리하게 된다.
반면 concatMap
의 경우 자신보다 앞서 subscribe
를 수행한 Observable이 있다면 해당 Observable에서 complete
가 호출되기 전까지는 다음 Observable을 버퍼에 저장해두었다가 complete
가 호출되면 그때서야 버퍼에 있는 Observable을 subscribe
하게 되는 차이가 있다.
function concatMap<T, R>(
project: (value: T, index: number) => Observable<R>,
): OperatorFunction<T, R> {
let currentIndex = 0;
const buffer: Observable<R>[] = [];
const subscribeTo = (
projected: Observable<R>,
observer: Observer<R>,
subscriptions: Set<Subscription>,
): void => {
const subscription = projected.subscribe(
(pValue) => {
observer.next(pValue);
},
(err) => {
observer.error(err);
},
() => {
subscriptions.delete(subscription);
if (subscriptions.size === 0 && buffer.length > 0) {
subscribeTo(
buffer.shift(),
observer,
subscriptions,
);
}
},
);
subscriptions.add(subscription);
};
return (source) =>
new Observable((observer) => {
const subscriptions = new Set<Subscription>();
const outerSub = source.subscribe(
(value) => {
const projected = project(value, currentIndex++);
if (subscriptions.size > 0) {
buffer.push(projected);
return;
}
subscribeTo(projected, observer, subscriptions);
},
(err) => {
observer.error(err);
},
() => {
Array.from(
subscriptions,
).forEach((subscription) =>
subscription.unsubscribe(),
);
observer.complete();
},
);
return () => {
outerSub.unsubscribe();
Array.from(subscriptions).forEach((subscription) =>
subscription.unsubscribe(),
);
};
});
}
mergeMap vs concatMap
function createInnerObservable(x: number) {
return new Observable((observer) => {
(async () => {
for (let i = 0; i < 3; i++) {
if (i > 0) await sleep(500);
observer.next(10 * x);
}
observer.complete();
})();
});
}
const outerObservable = new Observable((observer) => {
(async () => {
observer.next(1);
await sleep(800);
observer.next(3);
await sleep(3000);
observer.complete();
})();
});
outerObservable
.pipe(mergeMap(createInnerObservable))
.subscribe((x) => console.log(`[mergeMap] outer: ${x}`));
// [mergeMap] outer: 10
// [mergeMap] outer: 10
// [mergeMap] outer: 30
// [mergeMap] outer: 10
// [mergeMap] outer: 30
// [mergeMap] outer: 30
[시간]
─────────[500ms]─────[800ms]─────[1000ms]─────────────
[outer]
(1)──────────────────(3)────────────────────────────
│────────┐───────────────────────┐
│ │ │───────────│─────────┐──────┐
│ │ │ │ │ │
(10) (10) (30) (10) (30) (30)
outerObservable
.pipe(concatMap(createInnerObservable))
.subscribe(
(x) => console.log(`[concatMap] outer: ${x}`),
undefined,
() => console.log('[concatMap] done!'),
);
// [concatMap] outer: 10
// [concatMap] outer: 10
// [concatMap] outer: 10
// [concatMap] outer: 30
// [concatMap] outer: 30
// [concatMap] outer: 30
// [concatMap] done!
[시간]
─────────[500ms]─────[800ms]─────[1000ms]─────────────
[outer]
(1)──────────────────(3)────────────────────────────
│ └────────────────┐──────┐────┐
│────────┐───────────────────────┐ │ │ │
│ │ │ │ │ │
(10) (10) (10) (30) (30) (30)