How can I Interleave / merge async iterables?

There is no way to write this with a loop statement. async/await code always executes sequentially, to do things concurrently you need to use promise combinators directly. For plain promises, there’s Promise.all, for async iterators there is nothing (yet) so we need to write it on our own:

async function* combine(iterable) {
    const asyncIterators = Array.from(iterable, o => o[Symbol.asyncIterator]());
    const results = [];
    let count = asyncIterators.length;
    const never = new Promise(() => {});
    function getNext(asyncIterator, index) {
        return asyncIterator.next().then(result => ({
            index,
            result,
        }));
    }
    const nextPromises = asyncIterators.map(getNext);
    try {
        while (count) {
            const {index, result} = await Promise.race(nextPromises);
            if (result.done) {
                nextPromises[index] = never;
                results[index] = result.value;
                count--;
            } else {
                nextPromises[index] = getNext(asyncIterators[index], index);
                yield result.value;
            }
        }
    } finally {
        for (const [index, iterator] of asyncIterators.entries())
            if (nextPromises[index] != never && iterator.return != null)
                iterator.return();
        // no await here - see https://github.com/tc39/proposal-async-iteration/issues/126
    }
    return results;
}

Notice that combine does not support passing values into next or cancellation through .throw or .return.

You can call it like

(async () => {
  for await (const x of combine([a, b, c])) {
    console.log(x);
  }
})().catch(console.error);

Leave a Comment