Operator 포스팅을 작성하면서 다음과 같은 코드에서 observer.next()의 호출은 순차적으로 처리된다. 이전 순회에서 next()가 호출된 이후 500ms를 sleep하는 것이 보장되는 상황이다.

function createInnerObservable(x: number) {
  return new Observable((observer) => {
    (async () => {
      for (let i = 0; i < 3; i++) {        if (i > 0) await sleep(500);        observer.next(10 * x);      }      observer.complete();
    })();
  });
}

하지만 기본적으로 for...loopcontinue/break의 사용, 너무 많은 일을 수행하는 중첩 루프 등 다분히 명령적(imperative)이라는 점에서 선언적(declarative) 프로그래밍을 추구하는 관점에서는 지양하는 것이 권장된다. 그렇다면 가장 먼저 떠올릴 수 있는 방식은 Array.prototype.forEach를 사용하는 방식으로 변경해보는 것이다.

function createInnerObservable(x: number) {
  return new Observable((observer) => {
    [0, 1, 2].forEach(async (_, i) => {      if (i > 0) await sleep(500);      observer.next(10 * x);    });    observer.complete();
  });
}

하지만 의도와는 달리 이전 순회의 next()를 대기하지 않고 병렬적으로 sleep을 수행한 이후 각각의 next()가 호출되는 것을 알 수 있는데, async/await 포스팅을 작성하면서 살펴본 async 함수의 동작 방식을 생각해보면 async 함수는 호출자를 block하지 않는다는 특징을 살펴보았다. forEach의 경우 컬렉션(이 경우에는 배열)을 순회하면서 전달받은 함수가 비동기 함수이든 동기 함수이든 호출만 하게 된다. 반면 for...loop에서 각각의 순회를 문으로 나열해보면 다음과 같이 표현될 것이다.

function createInnerObservable(x: number) {
  return new Observable((observer) => {
    (async () => {
      observer.next(10 * x);
      await sleep(500);
      observer.next(10 * x);
      await sleep(500);
      observer.next(10 * x);

      observer.complete();
    })();
  });
}

즉, for...loopforEach와 달리 현재 실행 컨텍스트에서 각각의 Promise가 settled될 때까지 대기하는 형태로 함수 몸체의 문들이 실행되므로 의도한대로 순차적으로 처리될 수 있는 것이다. 그렇다면 “순차 처리를 하기 위해서는 for...loop가 갖는 한계를 감안하고서라도 무조건 사용해야 하는가”라면 분명 아쉬운 부분이 존재할 것이다. 그 대신 컬렉션을 다루는 고차 함수들이 선택적으로 병렬·순차 처리를 제공할 수 있게끔 하는 것이 일관성 측면에서도 더욱 좋은 해결 방안이 될 것이다.

Async functions with reduce

순차 처리에 대한 힌트는 reduce에서 얻을 수 있다. reduce에 전달한 reducer는 이전 연산의 결과와 현재 순회하고 있는 요소를 전달받아 값을 생성하고 이를 다음 reducer에게 전달하는 역할을 수행한다. 즉 다음 연산을 위해 이전 연산의 결과를 필요로 한다는 형태이다.

const arr = [1, 2, 3];

const syncRes = arr.reduce((acc, value) => {
  return acc + value;
}, 0);

syncRes; // 6

이를 비동기 상황에 대해서도 적용해본다면 이전 reducer가 반환한 값이 Promise가 되고 해당 Promise가 settled되면 이후의 연산을 진행할 수 있게끔 설정한다면 동기 상황에서처럼 순차 처리가 가능할 것이다. 그렇다면 이전 연산의 결과가 Promise이므로 다음 연산에서 Promise가 settled될 때까지 대기할 수 있도록 async reducer를 전달하는 형태로 수정해보자.

(async () => {
  const arr = [1, 2, 3];

  const asyncRes = await arr.reduce(async (acc, value) => {
    await sleep(10);
    return (await acc) + value;
  }, 0);

  asyncRes; // 6
})();
[0] ●───────→ [ 0 ][(await 0) + 1]: Promise(1)[(await Promise(1)) + 2]: Promise(3)[(await Promise(3)) + 3] ───────→ ● [Promise(6)]

async 함수는 무조건 Promise를 반환하기 때문에 reduce가 최종적으로 반환한 값 역시 Promise가 된다.

Timing

하지만 현재 상태로는 이전 연산의 결과가 settled될 때까지 대기한다는 점은 보장하지만 여전히 sleep은 각각의 async reducer에서 병렬적으로 수행된다. 그렇다면 우선 간략하게나마 reduce의 구현을 먼저 살펴보도록 하자.

function reduce(array, reducer, init) {
  let acc = init;

  for (const value of array) {
    acc = reducer(acc, value);  }

  return acc;
}

reduceasync reducer를 전달한 경우 reducer를 호출하면 각각의 async reducer는 호출자를 block하는 대신 Promise를 반환하게 된다. 다만 이전 연산의 결과에 종속이 존재하는 부분이 각각의 reducer의 어느 위치에 존재하느냐에 따라 다른 동작을 보이게 되는데 하나씩 살펴보도록 하자.

await acc를 나중에 수행하기

다음의 코드에서 sleep은 이전 연산의 결과에 대해 독립적인 작업이고, await acc는 이전 연산의 결과가 resolve되어야 참조할 수 있게 된다. 따라서 async reducer를 호출하면 함수 몸체에서 처음으로 만난 await 우측의 표현식이 평가되므로 각각의 reducer에서 모두 sleep(10)은 그 즉시 수행하게 될 것이다. 그 이후 이전 연산의 결과(acc)를 평가하게 되는데 각각의 acc는 이전 연산에 종속적이므로 연쇄적으로 Promise가 resolve 될 때까지 대기하게 될 것이다.

(async function () {
  const arr = [1, 2, 3];

  const startTime = new Date().getTime();

  const asyncRes = await arr.reduce(async (acc, value) => {
    await sleep(10);
    return (await acc) + value;
  }, 0);

  console.log(`${new Date().getTime() - startTime} ms`);
  // 약 12ms 정도가 소요된다.
})();
[reduce]
    ●         ●         ●         ●
    ┆───────→┌┴┐        ┆         ┆
    ┆────────│ │──────→┌┴┐        ┆
    ┆────────│ │───────│ │──────→┌┴┐
    ┆       [       sleep(10)       ]
    ┆        │ │       │ │←──────│ │
    ┆        │ │←──────│ │ await │ │
    ┆        │ │ await │ │       │ │
    ┆        └┬┘──────→│ │       │ │
    ┆         ┆ 1      └┬┘──────→│ │
    ┆         ┆         ┆ 3      └┬┘
    ┆←────────────────────────────┘ 6

await acc를 먼저 수행하기

하지만 문의 순서를 조금 바꾸어서 이전 연산의 결과를 먼저 await하는 형태로 변경하면 각각의 async reducersleep을 병렬적으로 수행하지 않고, 순차적으로 수행하게 된다.

(async function () {
  const arr = [1, 2, 3];

  const startTime = new Date().getTime();

  const asyncRes = await arr.reduce(async (acc, value) => {
    const resolved = await acc;
    await sleep(10);
    return resolved + value;
  }, 0);

  console.log(`${new Date().getTime() - startTime} ms`);
  // 약 36ms 정도가 소요된다.
})();
[reduce]
    ●         ●         ●         ●
    ┆───────→┌┴┐        ┆         ┆
    ┆────────│ │──────→┌┴┐        ┆
    ┆────────│ │───────│ │──────→┌┴┐
    ┆        │ │       │ │←──────│ │
    ┆        │ │←──────│ │ await │ │
    ┆        │ │ await │ │       │ │
    ┆    [sleep(10)]   │ │       │ │
    ┆        └┬┘──────→│ │       │ │
    ┆         ┆ 1  [sleep(10)]   │ │
    ┆         ┆        └┬┘──────→│ │
    ┆         ┆         ┆ 3  [sleep(10)]
    ┆         ┆         ┆        └┬┘
    ┆←────────────────────────────┘ 6

따라서 이제 async reducer의 설계에 따라 순차 처리와 병렬 처리를 선택할 수 있게끔 하는 인터페이스를 제공할 수 있는 실마리를 찾았다.

자원을 많이 소비하기는 하지만 그만큼 속도가 중요한 상황이라면 병렬적으로, 실행 순서 혹은 자원 절약이 더욱 중요한 상황이라면 순차 처리를 수행하게끔 reducer를 설계하면 된다.

Async functions with map

map 역시 일반적으로 컬렉션을 다룰 때 사용하는 대표적인 함수이다. 비동기 상황에서 map을 이용하려면 고려해야 할 사항이 크게 두 가지가 존재한다. 첫 번째는 모든 요소들을 새로운 값을 갖는 Promise로 래핑해야 한다. 두 번째는 컬렉션 내의 모든 Promise가 settled될 때까지 기다릴 수 있어야 한다. 이러한 두 가지 요구 사항을 충족시키는 API는 Promise.all이 있다. 따라서 일반적으로 다음과 같은 패턴으로 비동기 상황에서 map을 수행하게 된다.

Promise.all(arr.map(async (...) => { ... })
(async function () {
  const arr = [1, 2, 3];

  const asyncRes = await Promise.all(
    arr.map(async (value) => {
      console.log(`before: ${value}`);
      await sleep(10);
      console.log(`after: ${value}`);
      return value + 1;
    }),
  );
  // before 1
  // before 2
  // before 3
  // after 1
  // after 2
  // after 3
  asyncRes; // [2, 3, 4]
})();
  [map]
    ●         ●         ●         ●
    ┆1──────→┌┴┐        ┆         ┆
    ┆2───────│ │──────→┌┴┐        ┆
    ┆3───────│ │───────│ │──────→┌┴┐
    ┆       [       sleep(10)       ]
    ┆        │ │       │ │       │ │
    ┆←──── 2 └┬┘───────└┬┘       └┬┘
    ┆←────────┆────── 3 ┘         │
    ┆←────────┆──────────────── 4

여기서도 sleep은 병렬적으로 수행된다. 하지만 처음의 의도는 순차 처리와 병렬 처리를 선택적으로 제공할 수 있게끔 하는 것이 목적이었으므로 앞서 살펴본 reduce를 이용하면 이전 연산의 결과가 resolve될 때까지 다음 async reducer에서 대기하게끔 설계할 수 있을 것이다.

(async function () {
  const arr = [1, 2, 3];

  const asyncRes = await arr.reduce(async (acc, value) => {
    const resolved = await acc;
    console.log(`before: ${value}`);
    await sleep(10);
    console.log(`after: ${value}`);
    return [...acc, value + 1];
  }, []);
  // before: 1
  // after: 1
  // before: 2
  // after: 2
  // before: 3
  // after: 3

  asyncRes; // [2, 3, 4]
})();
[reduce]
    ●         ●         ●         ●
    ┆[]─────→┌┴┐        ┆         ┆
    ┆←──────2└┬┘        ┆         ┆
    ┆[2]──────┆───────→┌┴┐        ┆
    ┆←────────┆───────3└┬┘        ┆
    ┆[2,3]────┆─────────┆───────→┌┴┐
    ┆←────────┆─────────┆───────4└┬┘

Async functions with forEach

forEach 역시 map과 유사하지만 값에 함수를 적용하고 함수가 반환한 값을 요소로 갖는 새로운 컬렉션을 생성하는 대신 forEach는 각각의 요소에 대해 함수를 호출하기만 하고 결과는 버린다는 차이가 있다.

const arr = [1, 2, 3];

arr.forEach((value) => {
  console.log(value);
});
// 1
// 2
// 3
console.log('Finish in Sync');
// Finish in Sync

그렇다면 비동기 상황에 대해서도 위와 동일한 결과를 얻기 위해 대부분이 시도하는 방법은 글의 서두에서처럼 forEachcallbackFnasync 함수를 전달하는 방법일 것이다.

const arr = [1, 2, 3];
arr.forEach(async (value) => {
  await sleep(10 - value);
  console.log(value);
});
console.log('Finish in async');
// Finish in async
// 3
// 2
// 1

하지만 콘솔에 출력된 결과를 살펴보면 우선 forEach에 전달한 callbackFn이 호출되더라도 block이 발생하지 않는다. 거기에 sleep 역시 병렬적으로 수행되어 가장 짧은 대기 시간을 갖는 3번째 요소가 가장 먼저 출력되었다.

[forEach]
      ●         ●         ●         ●
     ┌┴┐──────→┌┴┐        ┆         ┆
     │ │───────│ │──────→┌┴┐        ┆
     │ │───────│ │───────│ │──────→┌┴┐
done └┬┘       │ │       │ │       └┬┘3
      ┆        │ │       └┬┘2       ┆
      ┆        └┬┘1       ┆         ┆

그렇다면 이번에도 역시 reduce를 이용한다면 비동기 상황에서도 순차적인 처리를 보장할 수 있게 될 것이다.

(async function () {
  const arr = [1, 2, 3];

  await arr.reduce(async (acc, value) => {
    await acc;
    await sleep(10 - value);
    console.log(value);
  }, undefined);

  console.log('Finish in async');
  // 1
  // 2
  // 3
  // Finish in async
})();
[reduce]
      ●         ●         ●         ●
     ┌┴┐──────→┌┴┐        ┆         ┆
     │ │←──────└┬┘1       ┆         ┆
     │ │────────┆───────→┌┴┐        ┆
     │ │        ┆        └┬┘2       ┆
     │ │────────┆─────────┆───────→┌┴┐
     │ │←───────┆─────────┆────────└┬┘3
done └┬┘        ┆         ┆         ┆

Async functions with filter

filter의 경우는 조금 복잡한데, 기본적으로 Promise는 **“truthy”**하기 때문에 map을 이용하여 true 또는 false를 갖는 배열을 만든 후, 원본 컬렉션에 filter를 적용하여 값이 true인 요소와 동일한 인덱스에 위치한 요소만 취해야 한다.

const asyncFilter = async (arr, predicate) => {
  const results = await Promise.all(arr.map(predicate));
  return arr.filter((_, idx) => results[idx]);
};

하지만 filter 역시 reduce를 통해 수행하는 것이 가능하므로 우선 reduce를 이용한 병렬 처리부터 살펴보도록 하자.

const asyncFilter = async (arr, predicate) =>
  arr.reduce(
    async (acc, value) =>
      (await predicate(value))
        ? [...(await acc), value]
        : acc,
    [],
  );

asyncFilter([1, 2, 3], (value) => value % 2 === 0);
  [filter]  [reduce]
      ●         ●         ●         ●         ●
      ┆[1,2,3]─→┆───────→┌┴┐        ┆         ┆
      ┆         ┆────────│ │──────→┌┴┐        ┆
      ┆         ┆────────│ │───────│ │──────→┌┴┐
      ┆         ┆       [ await predicate(value) ]
      ┆         ┆        │ │       │ │←await─│ │
      ┆         ┆        │ │←await─│ │       │ │
      ┆         ┆        └┬┘[]────→│ │       │ │
      ┆         ┆         ┆        └┬┘[2]───→│ │
      ┆←[2]─────┆←[2]─────┆─────────┆────────└┬┘

await acc를 수행하기 전에 await predicate(value)가 위치한 상태이므로 각각의 predicate(value)는 병렬적으로 수행된다.

그렇다면 이번에는 순차 처리의 경우를 생각해보자. 간단하게 문의 순서를 바꿔주면 된다. 즉 await acc를 수행한 이후에 predicate를 수행하게끔 바꾸는 것이다.

const asyncFilter = async (arr, predicate) =>
  arr.reduce(
    async (acc, value) => [
      ...(await acc),
      ...((await predicate(value)) ? [value] : []),
    ],
    [],
  );
  [filter]  [reduce]
      ●         ●         ●         ●         ●
      ┆[1,2,3]─→┆───────→┌┴┐        ┆         ┆
      ┆         ┆────────│ │──────→┌┴┐        ┆
      ┆         ┆────────│ │───────│ │──────→┌┴┐
      ┆         ┆        │ │       │ │← acc ─│ │
      ┆         ┆        │ │← acc ─│ │       │ │
      ┆         ┆ [predicate(1)]   │ │       │ │
      ┆         ┆        └┬┘[]────→│ │       │ │
      ┆         ┆         ┆   [predicate(2)] │ │
      ┆         ┆         ┆        └┬┘[2]───→│ │
      ┆         ┆         ┆         ┆   [predicate(3)]
      ┆←[2]─────┆←[2]─────┆─────────┆────────└┬┘

Reference