\
Abstract: fsspec went async last summer, meaning that many latency-laden calls could be executed concurrently, dramatically improving the speed of some bulk operations. Early on, I decided to offer both sync and async APIs, but not copy methods between the two. This required some deep asyncio hacks that I have finally come to regret and revert. This article is for those interested in some inner workings of asyncio.
fsspec
is a library for reading and writing to several storage backends, giving
a common filesystem-like API and many convenience functions. It is used by pyData
libraries like Dask, Pandas and Xarray.
So it turns out that libraries like requests
and boto3
are not typically threadsafe and/or
hold the GIL (global interpreter lock) while carrying out their calls. This puts a strong
bottleneck on the number of calls that can be made per second, which particularly affects
the case where each call is very short, e.g., either fetches a very small file from the
remote, or performs an operation that needs no feedback like rm
.
Python's asyncio system, centred on the async
/await
syntax, allows for many IO-bound
tasks to be waiting on sockets or files concurrently on a single thread. This means that you can make
many calls at once, and be waiting on all of them at the same time - you only pay the
latency overhead once, however many calls there are. This is a huge win, and was released
in fsspec
v0.8.0, July 2020. Speedups of >100x were seen for some test cases.
Aside: asyncio is designed such that you can have an event loop in one (main) thread, handing off work to process/thread pools if necessary, and treating that work as awaitable results. This is perfect for a server responding to inbound connection requests! However, in our situation we have the inverse: many threads doing work, all of which might want to do IO.
Two major decisions were made back then, which I have now had to revert. Here is the story of two neat, but ultimately unsuccessful hacks.
This part of the code was based on tested solutions modified from the Dask
distributed
and streamz
projects. It
unquestionably works for them, but it turns out that fsspec
has different needs.
In order to provide a synchronous API to routines that are actually running async,
you need to launch coroutines on an event loop. The original code has one event loop
running in a dedicated thread, and a sync function something like the following.
(note that f()
does not execute anything, but produces a coroutine object. It depends on its
closure to be able to see the function and arguments to run, and be able to pass results back).
def sync(loop, func, *args, **kwargs): e = threading.Event() result = [None] error = [False] async def f(): try: result[0] = await func(*args, **kwargs) except Exception: error[0] = sys.exc_info() finally: e.set() asyncio.run_coroutine_threadsafe(f(), loop=loop) e.wait() if error[0]: typ, exc, tb = error[0] raise exc.with_traceback(tb) else: return result[0]
So this solution turns out to a) be a bottleneck, since many threads may be waiting on the one
IO thread b) cause hangs. The latter point I don't fully understand, but I think happens because
of the not insubstantial CPU work that IO needs to do, and because of cleanup that the garbage
collector does while executing f()
.
The solution was to have one
event loop per any thread that does IO, using threading.local
data, having that loop
stop and start when sync calls come in, with loop.run_until_complete
. "Little hack"
was removed in this PR.
I did not want to copy code between spec.py and asyn.py. I wanted code such as
expand_path
to be defined in one place, and things that call it (any recursive
op) to work whether async or not. The trouble is, the function calls find
, which
is itself async. Attempting to run it would produce "Event loop is already running".
How can you have both? nest-asyncio
would
make it work, but that patches a host of internal classes, so importing fsspec would
affect all async code in the process. So instead I hacked:
def _run_until_done(coro): loop = asyncio.get_event_loop() task = asyncio.current_task() asyncio.tasks._unregister_task(task) del asyncio.tasks._current_tasks[loop] runner = loop.create_task(coro) while not runner.done(): loop._run_once() asyncio.tasks._current_tasks[loop] = task return runner.result()
This has the effect of tricking the event loop into ignoring the task that's running
right now, so that it can get on with running the coroutine specified (and any others
waiting). The only problem, is knowing when to run this technique. Since py37, you can
use ayncio.get_running_loop()
. It works great! It does not work just as soon as you
have more than one thread in place. No attempt to add locks to the code (which ought to
only affect the current loop on the current thread) saved me from "pop from Empty deque".
In fact, when trying to solve the "small hack" above, this was a common place to see
"future on another loop" - it's hard to reason about what is happening.
So I finally bit the bullet ( PR ) and copied code after all. I should have done from the start. Now async code only calls async code.
Yes, there is duplicated code in the current version. Some of this could be factored out - the bits that don't call back into functions that could be async. However, the deadlocks and weird exceptions are gone. It is still possible, given that sessions are supposed to be cleaned up only on garbage collection, that there are uncclosed sessions/connections are interpreter shutdown. We will try to clear this up, but it's not yet clear to me how to execute async operations at a time when the event loop is longer runnable.
These changes break async handling of file-like objects (i.e., the results of open()
). This can
be fixed with some more work - but it was already somewhat broken, and works fine for sync case.