\ To Nest Asyncio? — Martin Durant

Martin Durant

To Nest Asyncio?

written by Martin Durant on 2021-03-25

graph

Abstract: fsspec went async last summer, meaning that many latency-laden calls could be executed concurrently, dramatically improving the speed of some bulk operations. Early on, I decided to offer both sync and async APIs, but not copy methods between the two. This required some deep asyncio hacks that I have finally come to regret and revert. This article is for those interested in some inner workings of asyncio.

Introduction

fsspec

fsspec is a library for reading and writing to several storage backends, giving a common filesystem-like API and many convenience functions. It is used by pyData libraries like Dask, Pandas and Xarray.

The problem

So it turns out that libraries like requests and boto3 are not typically threadsafe and/or hold the GIL (global interpreter lock) while carrying out their calls. This puts a strong bottleneck on the number of calls that can be made per second, which particularly affects the case where each call is very short, e.g., either fetches a very small file from the remote, or performs an operation that needs no feedback like rm.

asyncio

Python's asyncio system, centred on the async/await syntax, allows for many IO-bound tasks to be waiting on sockets or files concurrently on a single thread. This means that you can make many calls at once, and be waiting on all of them at the same time - you only pay the latency overhead once, however many calls there are. This is a huge win, and was released in fsspec v0.8.0, July 2020. Speedups of >100x were seen for some test cases.

Aside: asyncio is designed such that you can have an event loop in one (main) thread, handing off work to process/thread pools if necessary, and treating that work as awaitable results. This is perfect for a server responding to inbound connection requests! However, in our situation we have the inverse: many threads doing work, all of which might want to do IO.

Hacks

Two major decisions were made back then, which I have now had to revert. Here is the story of two neat, but ultimately unsuccessful hacks.

The little hack

This part of the code was based on tested solutions modified from the Dask distributed and streamz projects. It unquestionably works for them, but it turns out that fsspec has different needs.

In order to provide a synchronous API to routines that are actually running async, you need to launch coroutines on an event loop. The original code has one event loop running in a dedicated thread, and a sync function something like the following. (note that f() does not execute anything, but produces a coroutine object. It depends on its closure to be able to see the function and arguments to run, and be able to pass results back).

def sync(loop, func, *args, **kwargs):
    e = threading.Event()
    result = [None]
    error = [False]

    async def f():
        try:
            result[0] = await func(*args, **kwargs)
        except Exception:
            error[0] = sys.exc_info()
        finally:
            e.set()

    asyncio.run_coroutine_threadsafe(f(), loop=loop)
    e.wait()
    if error[0]:
        typ, exc, tb = error[0]
        raise exc.with_traceback(tb)
    else:
        return result[0]

So this solution turns out to a) be a bottleneck, since many threads may be waiting on the one IO thread b) cause hangs. The latter point I don't fully understand, but I think happens because of the not insubstantial CPU work that IO needs to do, and because of cleanup that the garbage collector does while executing f().

The solution was to have one event loop per any thread that does IO, using threading.local data, having that loop stop and start when sync calls come in, with loop.run_until_complete. "Little hack" was removed in this PR.

The big hack

I did not want to copy code between spec.py and asyn.py. I wanted code such as expand_path to be defined in one place, and things that call it (any recursive op) to work whether async or not. The trouble is, the function calls find, which is itself async. Attempting to run it would produce "Event loop is already running". How can you have both? nest-asyncio would make it work, but that patches a host of internal classes, so importing fsspec would affect all async code in the process. So instead I hacked:

def _run_until_done(coro):
    loop = asyncio.get_event_loop()
    task = asyncio.current_task()
    asyncio.tasks._unregister_task(task)
    del asyncio.tasks._current_tasks[loop]
    runner = loop.create_task(coro)
    while not runner.done():
        loop._run_once()
    asyncio.tasks._current_tasks[loop] = task
    return runner.result()

This has the effect of tricking the event loop into ignoring the task that's running right now, so that it can get on with running the coroutine specified (and any others waiting). The only problem, is knowing when to run this technique. Since py37, you can use ayncio.get_running_loop(). It works great! It does not work just as soon as you have more than one thread in place. No attempt to add locks to the code (which ought to only affect the current loop on the current thread) saved me from "pop from Empty deque". In fact, when trying to solve the "small hack" above, this was a common place to see "future on another loop" - it's hard to reason about what is happening.

So I finally bit the bullet ( PR ) and copied code after all. I should have done from the start. Now async code only calls async code.

Current state

Yes, there is duplicated code in the current version. Some of this could be factored out - the bits that don't call back into functions that could be async. However, the deadlocks and weird exceptions are gone. It is still possible, given that sessions are supposed to be cleaned up only on garbage collection, that there are uncclosed sessions/connections are interpreter shutdown. We will try to clear this up, but it's not yet clear to me how to execute async operations at a time when the event loop is longer runnable.

Plans

These changes break async handling of file-like objects (i.e., the results of open()). This can be fixed with some more work - but it was already somewhat broken, and works fine for sync case.