Observer Pattern
Observable
이 무엇인지 이해하기 위해서는 우선 옵서버 패턴(Observer Pattern)에 대한 이해가 필요하다.
Loosely Coupling
옵서버 패턴에서는 시간이 경과함에 따라 상태가 변경되는 대상을 Subject라 한다. 또 이때 Subject의 상태 변화를 관찰 혹은 상태 변화에 대한 알림을 받는 대상을 Observer라 한다. Subject와 Observer는 느슨한 결합(Loosely Coupling) 상태로 이는 서로 상호작용은 하는 상태이지만 각자에 대해서는 잘 알지 못한다는 것을 의미한다.
Subject가 Observer에 대해 알고 있는 사실은 Observer가 특정 인터페이스를 구현한다는 것 뿐이다.
class Subject {
observers = [];
subscribe(observer) {
this.observers = [...this.observers, observer];
}
publish(data) {
this.observers.forEach((observer) => observer(data));
}
}
const subject = new Subject();
const observer1 = (data) => {
console.log(`received data: ${data}`);
};
subject.subscribe(observer1);
setTimeout(() => {
subject.publish('published!');
}, 1000);
자동 상태 전파
또한 Observer에게 상태 변경이 통지되는 순간은 전역 실행 컨텍스트가 콜 스택에서 pop 된 이후이므로 전역 실행 컨텍스트에서 Subject의 상태 변화를 감지할 방법(PULL)은 존재하지 않는다. 그 대신 Subject의 상태가 변경되면 Observer의 실행 컨텍스트가 콜 스택에 push 되면서 데이터를 전달(PUSH)받을 수 있게 되는 것이다.
[Producer]╭─────────[Consumer]
│ [PULL] ↑
╰────────╯
[Producer]──[PUSH]─→[Consumer]
한계
-
상태 변화의 종료 시점
-
상태 변화에 대한 에러 처리
-
Bidirectional Data flow
RxJS에서는 1번과 2번의 경우에는 인터페이스를 확장하여 처리하고, 3번의 경우에는 Observable을 read-only하게 만듦으로써 해결하게 된다.
Observer
옵서버 패턴의 Subject는 RxJS의 Observable과 유사하지만, 엄밀히 정의하면 RxJS의 Subject와 일치한다. Subject는 다수의 Observer에게 공통된 데이터를 전달하고, update와 같은 메서드를 보유하고 있기 때문에 데이터를 변경하는 것이 가능한 반면, Observable은 단지 하나의 Observer에게 독립적인 데이터를 전달한다는 차이가 존재한다.
Observable이 하나의 Observer에게 값을 전달하는 방식은 Observable이 Observer의 함수(메서드)를 호출하여 필요한 값이나 이벤트를 전달하게 된다. 그러려면 Observable이 호출할 Observer의 인터페이스가 결정되어야 할 것이다.
interface Observer<T> {
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}
옵서버 패턴에서는 하나의 메서드를 갖는 Observer를 사용한 반면, RxJS의 Observer는 next
, complete
, error
총 3개의 메서드를 갖게 된다.
Observable
RxJS 공식 문서의 Observable 설명에 따르면 다음과 같은 개념들이 등장한다.
Single | Multiple | |
---|---|---|
풀(Pull) | 함수(Function) | 이터레이터(Iterator) |
푸시(Push) | 프로미스(Promise) | Observable |
- Single: 하나의 값이나 이벤트를 다룬다.
- Multiple: 여러 개의 값이나 이벤트를 다룬다.
- 풀(Pull): 데이터를 받을지 결정하는 방식
- 푸시(Push): 데이터를 보낼 지 결정하는 방식
Observable은 데이터를 만드는 생산자(Producer)와 데이터를 소비하는 소비자(Consumer)의 관계로도 설명이 가능하다. 이때 위의 표에 기술된 함수, 이터레이터, 프로미스, 옵저버블은 생산자이다. 반대로 이를 사용(소비)하는 function.call
, iterator.next
, promise.then
, Observable과 연결된 Observer는 소비자의 역할을 수행한다.
Pull 방식은 데이터를 소비하는 측이 능동적으로 데이터를 호출하고 데이터를 생산하는 측도 데이터를 소비하는 측의 영향을 받는다. 함수의 경우 선언만으로는 아무런 작업을 수행하지 않고, 소비자가 함수를 호출해야 값을 생산하게 되고, 데이터를 전달받을 수 있게 된다. 이터레이터의 경우 역시 next
메서드를 호출할 때마다 값을 생산하게 되므로 소비자가 능동적인 입장이 된다.
Push 방식은 데이터를 생산하는 측이 주체로, 이벤트나 값 같은 데이터는 생산자 측에서 준비가 되면 소비자에게 알려주는 방식으로 수행된다. 따라서 Promise, Observable 모두 생산자가 데이터를 생산하면 알림을 전달받게 된다. Promise는 함수와는 달리 객체를 생성하는 동시에 값을 생산하는 동작을 실행시킨다는 점에서 차이가 있고, Observable은 subscribe
라는 메서드에 Observer를 미리 전달하여 호출하면 값 또는 이벤트를 소비할 수 있는 시점에 Observer에게 전달된다. 이는 소비자가 생산자를 제어할 수 없음을 의미한다.
Promise가 Single인 이유는 Observable과는 다르게 정의부가 생성 시 딱 한 번 호출되고, 상태가 결정(settled)되면 절대 변경되지 않기 때문이다. 따라서 Promise는 이미 결정된 상태의 값만 반환하기 때문에 동일한 값(단일 값)을 계속 받을 수 있는 것이다.
interface Subscription {
unsubscribe: () => void;
}
class Observable<T> {
constructor(
private observe: (
observer: Observer<T>,
) => (() => void) | void,
) {}
subscribe(
next?: (value: T) => void,
error?: (err: any) => void,
complete?: () => void,
): Subscription {
const observer: Observer<T> = {
next: (value: T) => {
if (next) next(value);
},
error: (err: any) => {
if (error) error(err);
},
complete: () => {
if (complete) complete();
},
};
const teardown = this.observe(observer);
return {
unsubscribe: () => {
if (teardown) teardown();
},
};
}
}
teardown
이란 자원을 해제하는 함수를 일컫는 용어이다.
먼저 Observable을 생성할 때 중요한 점은 Promise에서는 인스턴스를 생성하는 그 즉시 executor
를 실행시키지만, Observable은 인스턴스를 생성할 때 전달은 받되 실행은 시키지 않는다는 차이가 있다. 대신 Observable은 subscribe
메서드가 호출 될때 전달받은 함수를 실행시킨다.
이때 subscribe
의 반환 타입은 Subscription 타입으로 이는 자원을 해제할 수 있는 unsubscribe
메서드를 보유하고 있는 객체이다.
USE-CASE
const observable = new Observable((observer) => {
observer.next(1);
observer.next(2);
setTimeout(() => {
observer.next(3);
observer.complete();
}, 1000);
});
console.log('BEFORE SUBSCRIBE');
const subscription = observable.subscribe(
(data) => {
console.log(data);
},
(err) => {
console.error(err);
},
() => {
console.log('complete');
},
);
console.log('AFTER SUBSCRIBE');
subscription.unsubscribe();
// BEFORE SUBSCRIBE
// 1
// 2
// AFTER SUBSCRIBE
// 3
// "complete"
현재는 핵심적인 부분에 집중하기 위해 new 키워드를 이용한 instantiation만 가능한 상태이지만 다양한 데이터 소스에 대해 Observable을 만드는 작업을 데이터의 형태에 따라 정의해 둔 팩토리 함수를 활용하면 현재 다루고 있는 데이터 소스가 어떠한 형태인지 식별할 수 있고 Observable 생성 시에 고려해야 할 것들에 대한 처리는 팩토리에 위임할 수 있다.
of
각각의 단일 데이터를 전달하는 Observable을 만들 때 사용하는 함수로는 of
라는 함수가 있다. 이때 모든 값이 전달되면 complete를 호출하고 자동으로 구독을 해제한다.
function of<T>(...values: T[]): Observable<T> {
return new Observable((observer) => {
for (const value of values) {
observer.next(value);
}
observer.complete();
});
}
of(1, 2, 3).subscribe((value) => console.log(value));
// 1
// 2
// 3
from
배열, ArrayLike, iterable, Promise 등의 데이터를 Observable로 만들 때 사용하는 함수이다.
function from<T>(
source: Promise<T> | ArrayLike<T>,
): Observable<T> {
if (typeof (source as ArrayLike<T>).length === 'number') {
const arrayLike = source as ArrayLike<T>;
return new Observable((observer) => {
for (let i = 0; i < arrayLike.length; i++) {
observer.next(arrayLike[i]);
}
observer.complete();
});
}
const promise = source as Promise<T>;
return new Observable((observer) => {
promise
.then((value) => {
observer.next(value);
observer.complete();
})
.catch((err) => observer.error(err));
});
}
ArrayLike, Iterable의 경우 모든 요소를 전달하면
complete
를 호출하고 구독을 자동으로 해제하지만, Promise는resolve
되었을 때 데이터를 전달하고complete
호출 및 구독 해제를 진행한다.reject
된 경우에는error
를 호출하고 구독을 해제한다.
fromEvent
Node.js의 EventEmitter
와 조합하거나 브라우저에서 발생하는 이벤트를 Observable로 만들 때 사용하는 함수이다. 이때 fromEvent
로 만든 Observable은 이벤트 핸들러를 등록하여 자원을 계속 사용하기 때문에 이벤트 핸들러를 제거하지 않으면 메모리를 계속 낭비하게 된다. 즉, 사용자가 의도적으로 자원을 해제하는 작업을 수행하지 않는 이상 of
, from
처럼 구독을 해제해주는 작업을 자동으로 수행해주지 않는다. 따라서 subscribe
메서드가 반환한 Subscription 객체의 unsubscribe
를 호출하여 자원을 해제해 주어야 한다.
function fromEvent<T extends Event = Event>(
target: EventTarget,
eventName: string,
): Observable<T> {
return new Observable((observer) => {
const listener: EventListener = (event: T) => {
observer.next(event);
};
target.addEventListener(eventName, listener);
// teardown
return () =>
target.removeEventListener(eventName, listener);
});
}
fromEvent($btn, 'click').subscribe(() => {
console.log('click');
});