Create generator that yields coroutine results as the coroutines finish

asyncio.as_completed() takes an iterable of coroutines or futures and returns an iterable of futures in the order that the input futures complete. Normally, you’d loop over its result and await the members from inside an async function…

import asyncio

async def first():
    await asyncio.sleep(5)
    return 'first'

async def second():
    await asyncio.sleep(1)
    return 'second'

async def third():
    await asyncio.sleep(3)
    return 'third'

async def main():
    for future in asyncio.as_completed([first(), second(), third()]):
        print(await future)

# Prints 'second', then 'third', then 'first'
asyncio.run(main())

… but for the purpose of this question, what we want is to be able to yield these results from an ordinary generator, so that normal synchronous code can consume them without ever knowing that async functions are being used under the hood. We can do that by calling loop.run_until_complete() on the futures yielded by our as_completed call…

import asyncio

async def first():
    await asyncio.sleep(5)
    return 'first'

async def second():
    await asyncio.sleep(1)
    return 'second'

async def third():
    await asyncio.sleep(3)
    return 'third'

def ordinary_generator():
    loop = asyncio.get_event_loop()
    for future in asyncio.as_completed([first(), second(), third()]):
        yield loop.run_until_complete(future)

# Prints 'second', then 'third', then 'first'
for element in ordinary_generator():
    print(element)

In this way, we’ve exposed our async code to non-async-land in a manner that doesn’t require callers to define any functions as async, or to even know that ordinary_generator is using asyncio under the hood.

As an alternative implementation of ordinary_generator() that offers more flexibility in some circumstances, we can repeatedly call asyncio.wait() with the FIRST_COMPLETED flag instead of looping over as_completed():

import concurrent.futures

def ordinary_generator():
    loop = asyncio.get_event_loop()
    pending = [first(), second(), third()]
    while pending:
        done, pending = loop.run_until_complete(
            asyncio.wait(
                pending,
                return_when=concurrent.futures.FIRST_COMPLETED
            )
        )
        for job in done:
            yield job.result()

This approach, maintaining a list of pending jobs, has the advantage that we can adapt it to add jobs to the pending list on the fly. This is useful in use cases where our async jobs can add an unpredictable number of further jobs to the queue – like a web spider that follows all links on each page that it visits.

One caveat: the approaches above assume we’re calling the synchronous code from the main thread, in which case get_event_loop is guaranteed to give us a loop and we’ve got no need to .close it. If we want ordinary_generator to be usable from a non-main thread, especially one that may have previously had an event loop created, then life gets harder, because we can’t rely on get_event_loop (it raises a RuntimeError on any non-main thread that doesn’t have an event loop yet). In that case the simplest thing I can think to do is to spin off a new thread to run our asyncio code, and communicate with it via a queue:

def ordinary_generator():
    sentinel = object()
    queue = Queue()

    def thread_entry_point():
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        for future in asyncio.as_completed([first(), second(), third()]):
            try:
                queue.put(loop.run_until_complete(future))
            except Exception as e:
                queue.put((sentinel, e))
                break
        loop.close()
        queue.put(sentinel)

    Thread(target=thread_entry_point).start()
    while True:
        val = queue.get()
        if val is sentinel:
            return
        if isinstance(val, tuple) and len(val) == 2 and val[0] is sentinel:
            raise val[1]
        yield val

(Combining the use of run_until_complete from the penultimate example with the use of an extra thread in the final example is left as an exercise for any reader who needs to do so.)

Leave a Comment