Waiting in asyncio

栏目: IT技术 · 发布时间: 4年前

内容简介:One of the main appeals of usingThere’s quite a bit of them! However the different ways have different properties and all of them deserve their place. However I regularly have to look them up to find the right one.Before I start, a few definitions that I w

One of the main appeals of using asyncio is being able to fire off many coroutines and run them concurrently. How many ways do you know for waiting for their results?

There’s quite a bit of them! However the different ways have different properties and all of them deserve their place. However I regularly have to look them up to find the right one.

Before I start, a few definitions that I will use throughout this post:

  • coroutine : A running asynchronous function. So if you define a function as async def f(): ... and call it as f() , you get back a coroutine in the sense that the term is used throughout this post.
  • awaitable : anything that works with await : coroutines, asyncio.Future s, asyncio.Task s, objects that have a __await__ method.
  • I will be using two async functions f and g for my examples. It’s not important what they do, only that they are defined as async def f(): ... and async def g(): ... and that they terminate eventually.

await

The simplest case is to simply await your coroutines:

result_f = await f()
result_g = await g()

However:

  1. The coroutines do not run concurrently. g only starts executing after f has finished.
  2. You can’t cancel them once you started awaiting.

A naive approach to the first problem might be something like this:

coro_f = f()
coro_g = g()

result_f = await coro_f
result_g = await coro_g

But that doesn’t do what you might be thinking it does though. The execution of g / coro_g doesn’t start before it is awaited making it identical to the first example. For both problems you need to wrap your coroutines in tasks .

Tasks

asyncio.Task s wrap your coroutines and get independently scheduled for execution by the event loop. You can create them using asyncio.create_task() :

task_f = asyncio.create_task(f())
task_g = asyncio.create_task(g())

await asyncio.sleep(0.1) # <- f() and g() are already running!
result_f = await task_f
result_g = await task_g

Your tasks now run concurrently and if you decide that you don’t want to wait for task_f or task_g to finish, you can cancel them using task_f.cancel() or task_g.cancel() respectively. Please note that you must create both tasks before you await the first one – otherwise you gain nothing. However, the awaits are only needed to collect the results and to clean up resources.

But waiting for each of them like this is not very practical. In real life code you often enough don’t even know how many awaitables you will need to wrangle. What we need is to gather the results of multiple awaitables.

asyncio.gather()

asyncio.gather() takes 1 or more awaitables as *args , wraps them in tasks if necessary, and waits for all of them to finish. Then it returns the results of all awaitables in the same order as you passed in the awaitables:

result_f, result_g = asyncio.gather(f(), g())

If f() or g() raise an exception, gather will raise it immediately, but the other tasks are not affected. However if gather() itself is canceled, all of the awaitables that it’s gathering and that have not completed yet are also canceled.

You can also pass return_exceptions=True and then exceptions are returned like normal results and you have to check yourself whether or not they were successful (probably using isinstance(result, BaseException) .

Summary

  • Takes many awaitables as *args .
  • Wraps each awaitable in a task if necessary.
  • Returns the list of results in the same order .
    return_exceptions=True
    gather()
    
  • If gather() itself is canceled, it cancels all unfinished tasks it’s gathering.

Now we can wait for many awaitables at once! However well-behaved distributed systems need timeouts . Since gather() hasn’t an option for that, we need the next helper.

asyncio.wait_for()

asyncio.wait_for() takes two arguments: one awaitable and a timeout in seconds. If the awaitable is a coroutine, it will automatically be wrapped by a task. So the following construct is quite common:

try:
    result_f, result_g = await asyncio.wait_for(
        asyncio.gather(f(), g()),
        timeout=5.0.
    )
except asyncio.TimeoutError:
    print("oops took longer than 5s!")

If the timeout expires, the inner task gets cancelled. Which for gather() means that all tasks that it is gathering are canceled too: in this case f() and g() .

Please note that just replacing create_task() by wait_for() and calling it a day does not work. wait_for() is an async function itself and does not start executing until you await it:

# NOT concurrent!
cf = asyncio.wait_for(f(), timeout=0.5)
cg = asyncio.wait_for(g(), timeout=0.5)

# cf and cg are both COROUTINES, not tasks!
# At THIS point, there's NOTHING to be scheduled by the event loop.

await cf  # g() is NOT executing yet!
await cg  # wait_for creates only HERE the task for g()

If you now think that there would be no need for wait_for() if gather() had a timeout option, we’re thinking the same thing.

Summary

  • Takes one awaitable.
  • Wraps the awaitable in a task if necessary.
  • Takes a timeout that cancels the task if it expires.
  • Unlike create_task() , is a coroutine itself that doesn’t execute until awaited.

Interlude: async-timeout

A more elegant approach to timeouts is the async-timeout packageon PyPI. It gives you an asynchronous context manager that allows you to apply a total timeout even if you need to execute the coroutines sequentially :

async with async_timeout.timeout(5.0):
    await f()
    await g()

Sometimes, you don’t want to wait until all awaitables are done. Maybe you want to process them as they finish and report some kind of progress to the user.

asyncio.as_completed()

asyncio.as_completed() takes an iterableof awaitables and returns an iterator that yields asyncio.Future s in the order the awaitables are done:

for fut in asyncio.as_completed([task_f, task_g], timeout=5.0):
    try:
        await fut
        print("one task down!")
    except Exception:
        print("ouch")

There’s no way to find out which awaitable you’re awaiting though.

Summary

  • Takes many awaitables in an iterable.
  • Yields Future s that you have to await as soon as something is done.
  • Does not guarantee to return the original awaitables that you passed in.
  • Does wrap the awaitables in tasks (it actually calls asyncio.ensure_future() on them).
  • Takes an optional timeout.

Finally, you may want more control over waiting and that takes us to the final waiting primitive.

asyncio.wait()

asyncio.wait() is the most unwieldy of the APIs but also the most powerful one. It reminds a little of the venerable select() system call.

Like as_completed() , it takes awaitables in an iterable. It will return two sets : the awaitables that are done and those that are still pending . It’s up to you to await themand to determine which result belongs to what:

done, pending = await asyncio.wait([task_f, task_g])

for t in done:
    try:
        if t is task_f:
            print(f"The result of f() is { await task_f }.")
    except Exception as e:
        print(f"f() failed with { repr(e) }.")

# ...and same for g()

This code would not work if you passed in a coroutine and wait() wrapped it in a task, because the returned awaitable would be different from the one that you passed in and the identity check would always fail. Currently it will do it anyway, but it will warn you about it because it’s probably a bug.

How can an awaitable be still pending when wait() returns? There are two possibilities:

  1. You can pass a timeout after which wait() will return. Unlike with gather() , nothing is done to the awaitables when that timeout expires. The function just returns and sorts the tasks into the done and the pending buckets.
  2. You can tell wait() to not wait until all awaitables are done using the return_when argument. By default it’s set to asyncio.ALL_COMPLETED which does exactly what it sounds like. But you can also set it to asyncio.FIRST_EXCEPTION that also waits for all awaitables to finish, unless one of them raises an exception – then it will make it return immediately. Finally, asyncio.FIRST_COMPLETED returns the moment any of the awaitables finishes.

All of this together is a bit complicated but allows you to build powerful dispatcher functions. Often using a while loop until all awaitables are done.

Summary

  • Takes many awaitables in an iterable.
  • Will not return the results, but the passed awaitables sorted into two sets , that are returned as a tuple of (done, pending ). It’s up to you to await and dispatch.
  • Will wrap in tasks, but will warn about it because that means you get a different awaitables back than you put in. Avoid and only pass tasks!
  • Gives you fine-grained control when it should sort the tasks into the buckets and return – it never cancels any of the tasks:
    • Pass a timeout to limit the maximum waiting time.
    • return_when=asyncio.ALL_COMPLETED : As soon all awaitables are done.
    • return_when=asyncio.FIRST_EXCEPTION : As soon as all are done, or as soon as one raises an exception.
    • return_when=asyncio.FIRST_COMPLETED : As soon as any awaitable is done.

Next Steps

Of all the available asyncio material, I’m only aware of two sets that are comprehensive, accurate, and written by people with extensive practical asyncio experience:

  1. Łukasz Langa who did a lot of asyncio work at Facebook and Instagram and is now working for EdgeDB – a database that uses asyncio extensively – started a video series that begins with nothing and works itself up to asyncio . The third episode is particularly pertinent to this post, because it shows the practical use of asyncio.wait() .
  2. For more advanced asyncio -in-production advice, Lynn Root of Spotify gave two talks on that topic: asyncio in Practice: We Did It Wrong (2018) and Advanced asyncio : Solving Real-world Production Problems (2019) with an extensive written tutorial.

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Persuasive Technology

Persuasive Technology

B.J. Fogg / Morgan Kaufmann / 2002-12 / USD 39.95

Can computers change what you think and do? Can they motivate you to stop smoking, persuade you to buy insurance, or convince you to join the Army? "Yes, they can," says Dr. B.J. Fogg, directo......一起来看看 《Persuasive Technology》 这本书的介绍吧!

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具