Operator 포스팅을 작성하면서 다음과 같은 코드에서 observer.next()
의 호출은 순차적으로 처리된다. 이전 순회에서 next()
가 호출된 이후 500ms를 sleep
하는 것이 보장되는 상황이다.
function createInnerObservable(x: number) {
return new Observable((observer) => {
(async () => {
for (let i = 0; i < 3; i++) { if (i > 0) await sleep(500); observer.next(10 * x); } observer.complete();
})();
});
}
하지만 기본적으로 for...loop
는 continue/break
의 사용, 너무 많은 일을 수행하는 중첩 루프 등 다분히 명령적(imperative)이라는 점에서 선언적(declarative) 프로그래밍을 추구하는 관점에서는 지양하는 것이 권장된다. 그렇다면 가장 먼저 떠올릴 수 있는 방식은 Array.prototype.forEach
를 사용하는 방식으로 변경해보는 것이다.
function createInnerObservable(x: number) {
return new Observable((observer) => {
[0, 1, 2].forEach(async (_, i) => { if (i > 0) await sleep(500); observer.next(10 * x); }); observer.complete();
});
}
하지만 의도와는 달리 이전 순회의 next()
를 대기하지 않고 병렬적으로 sleep
을 수행한 이후 각각의 next()
가 호출되는 것을 알 수 있는데, async/await 포스팅을 작성하면서 살펴본 async
함수의 동작 방식을 생각해보면 async
함수는 호출자를 block하지 않는다는 특징을 살펴보았다. forEach
의 경우 컬렉션(이 경우에는 배열)을 순회하면서 전달받은 함수가 비동기 함수이든 동기 함수이든 호출만 하게 된다. 반면 for...loop
에서 각각의 순회를 문으로 나열해보면 다음과 같이 표현될 것이다.
function createInnerObservable(x: number) {
return new Observable((observer) => {
(async () => {
observer.next(10 * x);
await sleep(500);
observer.next(10 * x);
await sleep(500);
observer.next(10 * x);
observer.complete();
})();
});
}
즉, for...loop
는 forEach
와 달리 현재 실행 컨텍스트에서 각각의 Promise가 settled될 때까지 대기하는 형태로 함수 몸체의 문들이 실행되므로 의도한대로 순차적으로 처리될 수 있는 것이다. 그렇다면 “순차 처리를 하기 위해서는 for...loop
가 갖는 한계를 감안하고서라도 무조건 사용해야 하는가”라면 분명 아쉬운 부분이 존재할 것이다. 그 대신 컬렉션을 다루는 고차 함수들이 선택적으로 병렬·순차 처리를 제공할 수 있게끔 하는 것이 일관성 측면에서도 더욱 좋은 해결 방안이 될 것이다.
Async functions with reduce
순차 처리에 대한 힌트는 reduce
에서 얻을 수 있다. reduce
에 전달한 reducer는 이전 연산의 결과와 현재 순회하고 있는 요소를 전달받아 값을 생성하고 이를 다음 reducer에게 전달하는 역할을 수행한다. 즉 다음 연산을 위해 이전 연산의 결과를 필요로 한다는 형태이다.
const arr = [1, 2, 3];
const syncRes = arr.reduce((acc, value) => {
return acc + value;
}, 0);
syncRes; // 6
이를 비동기 상황에 대해서도 적용해본다면 이전 reducer가 반환한 값이 Promise가 되고 해당 Promise가 settled되면 이후의 연산을 진행할 수 있게끔 설정한다면 동기 상황에서처럼 순차 처리가 가능할 것이다. 그렇다면 이전 연산의 결과가 Promise이므로 다음 연산에서 Promise가 settled될 때까지 대기할 수 있도록 async reducer
를 전달하는 형태로 수정해보자.
(async () => {
const arr = [1, 2, 3];
const asyncRes = await arr.reduce(async (acc, value) => {
await sleep(10);
return (await acc) + value;
}, 0);
asyncRes; // 6
})();
[0] ●───────→ [ 0 ]
↓
[(await 0) + 1]: Promise(1)
↓
[(await Promise(1)) + 2]: Promise(3)
↓
[(await Promise(3)) + 3] ───────→ ● [Promise(6)]
async
함수는 무조건 Promise를 반환하기 때문에 reduce가 최종적으로 반환한 값 역시 Promise가 된다.
Timing
하지만 현재 상태로는 이전 연산의 결과가 settled될 때까지 대기한다는 점은 보장하지만 여전히 sleep
은 각각의 async reducer
에서 병렬적으로 수행된다. 그렇다면 우선 간략하게나마 reduce
의 구현을 먼저 살펴보도록 하자.
function reduce(array, reducer, init) {
let acc = init;
for (const value of array) {
acc = reducer(acc, value); }
return acc;
}
reduce
에 async reducer
를 전달한 경우 reducer를 호출하면 각각의 async reducer
는 호출자를 block하는 대신 Promise를 반환하게 된다. 다만 이전 연산의 결과에 종속이 존재하는 부분이 각각의 reducer의 어느 위치에 존재하느냐에 따라 다른 동작을 보이게 되는데 하나씩 살펴보도록 하자.
await acc를 나중에 수행하기
다음의 코드에서 sleep
은 이전 연산의 결과에 대해 독립적인 작업이고, await acc
는 이전 연산의 결과가 resolve
되어야 참조할 수 있게 된다. 따라서 async reducer
를 호출하면 함수 몸체에서 처음으로 만난 await
우측의 표현식이 평가되므로 각각의 reducer에서 모두 sleep(10)
은 그 즉시 수행하게 될 것이다. 그 이후 이전 연산의 결과(acc
)를 평가하게 되는데 각각의 acc
는 이전 연산에 종속적이므로 연쇄적으로 Promise가 resolve
될 때까지 대기하게 될 것이다.
(async function () {
const arr = [1, 2, 3];
const startTime = new Date().getTime();
const asyncRes = await arr.reduce(async (acc, value) => {
await sleep(10);
return (await acc) + value;
}, 0);
console.log(`${new Date().getTime() - startTime} ms`);
// 약 12ms 정도가 소요된다.
})();
[reduce]
● ● ● ●
┆───────→┌┴┐ ┆ ┆
┆────────│ │──────→┌┴┐ ┆
┆────────│ │───────│ │──────→┌┴┐
┆ [ sleep(10) ]
┆ │ │ │ │←──────│ │
┆ │ │←──────│ │ await │ │
┆ │ │ await │ │ │ │
┆ └┬┘──────→│ │ │ │
┆ ┆ 1 └┬┘──────→│ │
┆ ┆ ┆ 3 └┬┘
┆←────────────────────────────┘ 6
await acc를 먼저 수행하기
하지만 문의 순서를 조금 바꾸어서 이전 연산의 결과를 먼저 await
하는 형태로 변경하면 각각의 async reducer
는 sleep
을 병렬적으로 수행하지 않고, 순차적으로 수행하게 된다.
(async function () {
const arr = [1, 2, 3];
const startTime = new Date().getTime();
const asyncRes = await arr.reduce(async (acc, value) => {
const resolved = await acc;
await sleep(10);
return resolved + value;
}, 0);
console.log(`${new Date().getTime() - startTime} ms`);
// 약 36ms 정도가 소요된다.
})();
[reduce]
● ● ● ●
┆───────→┌┴┐ ┆ ┆
┆────────│ │──────→┌┴┐ ┆
┆────────│ │───────│ │──────→┌┴┐
┆ │ │ │ │←──────│ │
┆ │ │←──────│ │ await │ │
┆ │ │ await │ │ │ │
┆ [sleep(10)] │ │ │ │
┆ └┬┘──────→│ │ │ │
┆ ┆ 1 [sleep(10)] │ │
┆ ┆ └┬┘──────→│ │
┆ ┆ ┆ 3 [sleep(10)]
┆ ┆ ┆ └┬┘
┆←────────────────────────────┘ 6
따라서 이제 async reducer
의 설계에 따라 순차 처리와 병렬 처리를 선택할 수 있게끔 하는 인터페이스를 제공할 수 있는 실마리를 찾았다.
자원을 많이 소비하기는 하지만 그만큼 속도가 중요한 상황이라면 병렬적으로, 실행 순서 혹은 자원 절약이 더욱 중요한 상황이라면 순차 처리를 수행하게끔 reducer를 설계하면 된다.
Async functions with map
map
역시 일반적으로 컬렉션을 다룰 때 사용하는 대표적인 함수이다. 비동기 상황에서 map
을 이용하려면 고려해야 할 사항이 크게 두 가지가 존재한다. 첫 번째는 모든 요소들을 새로운 값을 갖는 Promise로 래핑해야 한다. 두 번째는 컬렉션 내의 모든 Promise가 settled될 때까지 기다릴 수 있어야 한다. 이러한 두 가지 요구 사항을 충족시키는 API는 Promise.all
이 있다. 따라서 일반적으로 다음과 같은 패턴으로 비동기 상황에서 map
을 수행하게 된다.
Promise.all(arr.map(async (...) => { ... })
(async function () {
const arr = [1, 2, 3];
const asyncRes = await Promise.all(
arr.map(async (value) => {
console.log(`before: ${value}`);
await sleep(10);
console.log(`after: ${value}`);
return value + 1;
}),
);
// before 1
// before 2
// before 3
// after 1
// after 2
// after 3
asyncRes; // [2, 3, 4]
})();
[map]
● ● ● ●
┆1──────→┌┴┐ ┆ ┆
┆2───────│ │──────→┌┴┐ ┆
┆3───────│ │───────│ │──────→┌┴┐
┆ [ sleep(10) ]
┆ │ │ │ │ │ │
┆←──── 2 └┬┘───────└┬┘ └┬┘
┆←────────┆────── 3 ┘ │
┆←────────┆──────────────── 4 ┘
여기서도 sleep
은 병렬적으로 수행된다. 하지만 처음의 의도는 순차 처리와 병렬 처리를 선택적으로 제공할 수 있게끔 하는 것이 목적이었으므로 앞서 살펴본 reduce를 이용하면 이전 연산의 결과가 resolve
될 때까지 다음 async reducer
에서 대기하게끔 설계할 수 있을 것이다.
(async function () {
const arr = [1, 2, 3];
const asyncRes = await arr.reduce(async (acc, value) => {
const resolved = await acc;
console.log(`before: ${value}`);
await sleep(10);
console.log(`after: ${value}`);
return [...acc, value + 1];
}, []);
// before: 1
// after: 1
// before: 2
// after: 2
// before: 3
// after: 3
asyncRes; // [2, 3, 4]
})();
[reduce]
● ● ● ●
┆[]─────→┌┴┐ ┆ ┆
┆←──────2└┬┘ ┆ ┆
┆[2]──────┆───────→┌┴┐ ┆
┆←────────┆───────3└┬┘ ┆
┆[2,3]────┆─────────┆───────→┌┴┐
┆←────────┆─────────┆───────4└┬┘
Async functions with forEach
forEach
역시 map
과 유사하지만 값에 함수를 적용하고 함수가 반환한 값을 요소로 갖는 새로운 컬렉션을 생성하는 대신 forEach
는 각각의 요소에 대해 함수를 호출하기만 하고 결과는 버린다는 차이가 있다.
const arr = [1, 2, 3];
arr.forEach((value) => {
console.log(value);
});
// 1
// 2
// 3
console.log('Finish in Sync');
// Finish in Sync
그렇다면 비동기 상황에 대해서도 위와 동일한 결과를 얻기 위해 대부분이 시도하는 방법은 글의 서두에서처럼 forEach
의 callbackFn
에 async
함수를 전달하는 방법일 것이다.
const arr = [1, 2, 3];
arr.forEach(async (value) => {
await sleep(10 - value);
console.log(value);
});
console.log('Finish in async');
// Finish in async
// 3
// 2
// 1
하지만 콘솔에 출력된 결과를 살펴보면 우선 forEach
에 전달한 callbackFn
이 호출되더라도 block이 발생하지 않는다. 거기에 sleep
역시 병렬적으로 수행되어 가장 짧은 대기 시간을 갖는 3번째 요소가 가장 먼저 출력되었다.
[forEach]
● ● ● ●
┌┴┐──────→┌┴┐ ┆ ┆
│ │───────│ │──────→┌┴┐ ┆
│ │───────│ │───────│ │──────→┌┴┐
done └┬┘ │ │ │ │ └┬┘3
┆ │ │ └┬┘2 ┆
┆ └┬┘1 ┆ ┆
그렇다면 이번에도 역시 reduce
를 이용한다면 비동기 상황에서도 순차적인 처리를 보장할 수 있게 될 것이다.
(async function () {
const arr = [1, 2, 3];
await arr.reduce(async (acc, value) => {
await acc;
await sleep(10 - value);
console.log(value);
}, undefined);
console.log('Finish in async');
// 1
// 2
// 3
// Finish in async
})();
[reduce]
● ● ● ●
┌┴┐──────→┌┴┐ ┆ ┆
│ │←──────└┬┘1 ┆ ┆
│ │────────┆───────→┌┴┐ ┆
│ │ ┆ └┬┘2 ┆
│ │────────┆─────────┆───────→┌┴┐
│ │←───────┆─────────┆────────└┬┘3
done └┬┘ ┆ ┆ ┆
Async functions with filter
filter
의 경우는 조금 복잡한데, 기본적으로 Promise는 **“truthy”**하기 때문에 map
을 이용하여 true
또는 false
를 갖는 배열을 만든 후, 원본 컬렉션에 filter
를 적용하여 값이 true
인 요소와 동일한 인덱스에 위치한 요소만 취해야 한다.
const asyncFilter = async (arr, predicate) => {
const results = await Promise.all(arr.map(predicate));
return arr.filter((_, idx) => results[idx]);
};
하지만 filter
역시 reduce
를 통해 수행하는 것이 가능하므로 우선 reduce
를 이용한 병렬 처리부터 살펴보도록 하자.
const asyncFilter = async (arr, predicate) =>
arr.reduce(
async (acc, value) =>
(await predicate(value))
? [...(await acc), value]
: acc,
[],
);
asyncFilter([1, 2, 3], (value) => value % 2 === 0);
[filter] [reduce]
● ● ● ● ●
┆[1,2,3]─→┆───────→┌┴┐ ┆ ┆
┆ ┆────────│ │──────→┌┴┐ ┆
┆ ┆────────│ │───────│ │──────→┌┴┐
┆ ┆ [ await predicate(value) ]
┆ ┆ │ │ │ │←await─│ │
┆ ┆ │ │←await─│ │ │ │
┆ ┆ └┬┘[]────→│ │ │ │
┆ ┆ ┆ └┬┘[2]───→│ │
┆←[2]─────┆←[2]─────┆─────────┆────────└┬┘
await acc
를 수행하기 전에 await predicate(value)
가 위치한 상태이므로 각각의 predicate(value)
는 병렬적으로 수행된다.
그렇다면 이번에는 순차 처리의 경우를 생각해보자. 간단하게 문의 순서를 바꿔주면 된다. 즉 await acc
를 수행한 이후에 predicate
를 수행하게끔 바꾸는 것이다.
const asyncFilter = async (arr, predicate) =>
arr.reduce(
async (acc, value) => [
...(await acc),
...((await predicate(value)) ? [value] : []),
],
[],
);
[filter] [reduce]
● ● ● ● ●
┆[1,2,3]─→┆───────→┌┴┐ ┆ ┆
┆ ┆────────│ │──────→┌┴┐ ┆
┆ ┆────────│ │───────│ │──────→┌┴┐
┆ ┆ │ │ │ │← acc ─│ │
┆ ┆ │ │← acc ─│ │ │ │
┆ ┆ [predicate(1)] │ │ │ │
┆ ┆ └┬┘[]────→│ │ │ │
┆ ┆ ┆ [predicate(2)] │ │
┆ ┆ ┆ └┬┘[2]───→│ │
┆ ┆ ┆ ┆ [predicate(3)]
┆←[2]─────┆←[2]─────┆─────────┆────────└┬┘