協程與任務?

本節將簡述用于協程與任務的高層級 API。

協程?

協程通過 async/await 語法進行聲明,是編寫異步應用的推薦方式。例如,以下代碼段 (需要 Python 3.7+) 打印 "hello",等待 1 秒,然后打印 "world":

>>> import asyncio

>>> async def main():
...     print('hello')
...     await asyncio.sleep(1)
...     print('world')

>>> asyncio.run(main())
hello
world

注意:簡單地調用一個協程并不會將其加入執行日程:

>>> main()
<coroutine object main at 0x1053bb7c8>

要真正運行一個協程,asyncio 提供了三種主要機制:

  • asyncio.run() 函數用來運行最高層級的入口點 "main()" 函數 (參見上面的示例。)

  • 等待一個協程。以下代碼段會在等待 1 秒后打印 "hello",然后 再次 等待 2 秒后打印 "world":

    import asyncio
    import time
    
    async def say_after(delay, what):
        await asyncio.sleep(delay)
        print(what)
    
    async def main():
        print(f"started at {time.strftime('%X')}")
    
        await say_after(1, 'hello')
        await say_after(2, 'world')
    
        print(f"finished at {time.strftime('%X')}")
    
    asyncio.run(main())
    

    預期的輸出:

    started at 17:13:52
    hello
    world
    finished at 17:13:55
    
  • asyncio.create_task() 函數用來并發運行作為 asyncio 任務 的多個協程。

    讓我們修改以上示例,并發 運行兩個 say_after 協程:

    async def main():
        task1 = asyncio.create_task(
            say_after(1, 'hello'))
    
        task2 = asyncio.create_task(
            say_after(2, 'world'))
    
        print(f"started at {time.strftime('%X')}")
    
        # Wait until both tasks are completed (should take
        # around 2 seconds.)
        await task1
        await task2
    
        print(f"finished at {time.strftime('%X')}")
    

    注意,預期的輸出顯示代碼段的運行時間比之前快了 1 秒:

    started at 17:14:32
    hello
    world
    finished at 17:14:34
    

可等待對象?

如果一個對象可以在 await 語句中使用,那么它就是 可等待 對象。許多 asyncio API 都被設計為接受可等待對象。

可等待 對象有三種主要類型: 協程, 任務Future.

協程

Python 協程屬于 可等待 對象,因此可以在其他協程中被等待:

import asyncio

async def nested():
    return 42

async def main():
    # Nothing happens if we just call "nested()".
    # A coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()

    # Let's do it differently now and await it:
    print(await nested())  # will print "42".

asyncio.run(main())

重要

在本文檔中 "協程" 可用來表示兩個緊密關聯的概念:

  • 協程函數: 定義形式為 async def 的函數;

  • 協程對象: 調用 協程函數 所返回的對象。

asyncio 也支持舊式的 基于生成器的 協程。

任務

任務 被用來設置日程以便 并發 執行協程。

當一個協程通過 asyncio.create_task() 等函數被打包為一個 任務,該協程將自動排入日程準備立即運行:

import asyncio

async def nested():
    return 42

async def main():
    # Schedule nested() to run soon concurrently
    # with "main()".
    task = asyncio.create_task(nested())

    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await task

asyncio.run(main())

Futures

Future 是一種特殊的 低層級 可等待對象,表示一個異步操作的 最終結果。

當一個 Future 對象 被等待,這意味著協程將保持等待直到該 Future 對象在其他地方操作完畢。

在 asyncio 中需要 Future 對象以便允許通過 async/await 使用基于回調的代碼。

通常情況下 沒有必要 在應用層級的代碼中創建 Future 對象。

Future 對象有時會由庫和某些 asyncio API 暴露給用戶,用作可等待對象:

async def main():
    await function_that_returns_a_future_object()

    # this is also valid:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )

一個很好的返回對象的低層級函數的示例是 loop.run_in_executor()。

運行 asyncio 程序?

asyncio.run(coro, *, debug=False)?

執行 coroutine coro 并返回結果。

此函數運行傳入的協程,負責管理 asyncio 事件循環并 完結異步生成器。

當有其他 asyncio 事件循環在同一線程中運行時,此函數不能被調用。

如果 debugTrue,事件循環將以調試模式運行。

此函數總是會創建一個新的事件循環并在結束時關閉之。它應當被用作 asyncio 程序的主入口點,理想情況下應當只被調用一次。

示例:

async def main():
    await asyncio.sleep(1)
    print('hello')

asyncio.run(main())

3.7 新版功能: 重要: 此函數是在 Python 3.7 中加入 asyncio 模塊,處于 暫定基準狀態。

創建任務?

asyncio.create_task(coro)?

coro 協程 打包為一個 Task 排入日程準備執行。返回 Task 對象。

該任務會在 get_running_loop() 返回的循環中執行,如果當前線程沒有在運行的循環則會引發 RuntimeError。

此函數 在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低層級的 asyncio.ensure_future() 函數。

async def coro():
    ...

# In Python 3.7+
task = asyncio.create_task(coro())
...

# This works in all Python versions but is less readable
task = asyncio.ensure_future(coro())
...

3.7 新版功能.

休眠?

coroutine asyncio.sleep(delay, result=None, *, loop=None)?

阻塞 delay 指定的秒數。

如果指定了 result,則當協程完成時將其返回給調用者。

sleep() 總是會掛起當前任務,以允許其他任務運行。

loop 參數已棄用,計劃在 Python 3.10 中移除。

以下協程示例運行 5 秒,每秒顯示一次當前日期:

import asyncio
import datetime

async def display_date():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

asyncio.run(display_date())

并發運行任務?

awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)?

并發 運行 aws 序列中的 可等待對象。

如果 aws 中的某個可等待對象為協程,它將自動作為一個任務加入日程。

如果所有可等待對象都成功完成,結果將是一個由所有返回值聚合而成的列表。結果值的順序與 aws 中可等待對象的順序一致。

如果 return_exceptionsFalse (默認),所引發的首個異常會立即傳播給等待 gather() 的任務。aws 序列中的其他可等待對象 不會被取消 并將繼續運行。

如果 return_exceptionsTrue,異常會和成功的結果一樣處理,并聚合至結果列表。

如果 gather() 被取消,所有被提交 (尚未完成) 的可等待對象也會 被取消

如果 aws 序列中的任一 Task 或 Future 對象 被取消,它將被當作引發了 CancelledError 一樣處理 -- 在此情況下 gather() 調用 不會 被取消。這是為了防止一個已提交的 Task/Future 被取消導致其他 Tasks/Future 也被取消。

示例:

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")

async def main():
    # Schedule three calls *concurrently*:
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )

asyncio.run(main())

# Expected output:
#
#     Task A: Compute factorial(2)...
#     Task B: Compute factorial(2)...
#     Task C: Compute factorial(2)...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3)...
#     Task C: Compute factorial(3)...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4)...
#     Task C: factorial(4) = 24

在 3.7 版更改: 如果 gather 本身被取消,則無論 return_exceptions 取值為何,消息都會被傳播。

屏蔽取消操作?

awaitable asyncio.shield(aw, *, loop=None)?

保護一個 可等待對象 防止其被 取消。

如果 aw 是一個協程,它將自動作為任務加入日程。

以下語句:

res = await shield(something())

相當于:

res = await something()

不同之處 在于如果包含它的協程被取消,在 something() 中運行的任務不會被取消。從 something() 的角度看來,取消操作并沒有發生。然而其調用者已被取消,因此 "await" 表達式仍然會引發 CancelledError。

如果通過其他方式取消 something() (例如在其內部操作) 則 shield() 也會取消。

如果希望完全忽略取消操作 (不推薦) 則 shield() 函數需要配合一個 try/except 代碼段,如下所示:

try:
    res = await shield(something())
except CancelledError:
    res = None

超時?

coroutine asyncio.wait_for(aw, timeout, *, loop=None)?

等待 aw 可等待對象 完成,指定 timeout 秒數后超時。

如果 aw 是一個協程,它將自動作為任務加入日程。

timeout 可以為 None,也可以為 float 或 int 型數值表示的等待秒數。如果 timeoutNone,則等待直到完成。

如果發生超時,任務將取消并引發 asyncio.TimeoutError.

要避免任務 取消,可以加上 shield()

函數將等待直到目標對象確實被取消,所以總等待時間可能超過 timeout 指定的秒數。

如果等待被取消,則 aw 指定的對象也會被取消。

loop 參數已棄用,計劃在 Python 3.10 中移除。

示例:

async def eternity():
    # Sleep for one hour
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')

asyncio.run(main())

# Expected output:
#
#     timeout!

在 3.7 版更改: aw 因超時被取消,wait_for 會等待 aw 被取消。之前版本則將立即引發 asyncio.TimeoutError

簡單等待?

coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)?

并發運行 aws 指定的 可等待對象 并阻塞線程直到滿足 return_when 指定的條件。

如果 aws 中的某個可等待對象為協程,它將自動作為任務加入日程。直接向 wait() 傳入協程對象已棄用,因為這會導致 令人迷惑的行為。

返回兩個 Task/Future 集合: (done, pending)。

用法:

done, pending = await asyncio.wait(aws)

loop 參數已棄用,計劃在 Python 3.10 中移除。

如指定 timeout (float 或 int 類型) 則它將被用于控制返回之前等待的最長秒數。

請注意此函數不會引發 asyncio.TimeoutError。當超時發生時,未完成的 Future 或 Task 將在指定秒數后被返回。

return_when 指定此函數應在何時返回。它必須為以下常數之一:

常數

描述

FIRST_COMPLETED

函數將在任意可等待對象結束或取消時返回。

FIRST_EXCEPTION

函數將在任意可等待對象因引發異常而結束時返回。當沒有引發任何異常時它就相當于 ALL_COMPLETED

ALL_COMPLETED

函數將在所有可等待對象結束或取消時返回。

wait_for() 不同,wait() 在超時發生時不會取消可等待對象。

注解

wait() 會自動將協程作為任務加入日程,以后將以 (done, pending) 集合形式返回顯式創建的任務對象。因此以下代碼并不會有預期的行為:

async def foo():
    return 42

coro = foo()
done, pending = await asyncio.wait({coro})

if coro in done:
    # This branch will never be run!

以上代碼段的修正方法如下:

async def foo():
    return 42

task = asyncio.create_task(foo())
done, pending = await asyncio.wait({task})

if task in done:
    # Everything will work as expected now.

直接向 wait() 傳入協程對象的方式已棄用。

asyncio.as_completed(aws, *, loop=None, timeout=None)?

?并發地運行 aws 集合中的 可等待對象。返回一個 Future 對象的迭代器。返回的每個 Future 對象代表來自剩余可等待對象集合的最早結果。

如果在所有 Future 對象完成前發生超時則將引發 asyncio.TimeoutError。

示例:

for f in as_completed(aws):
    earliest_result = await f
    # ...

來自其他線程的日程安排?

asyncio.run_coroutine_threadsafe(coro, loop)?

向指定事件循環提交一個協程。線程安全。

返回一個 concurrent.futures.Future 以等待來自其他 OS 線程的結果。

此函數應該從另一個 OS 線程中調用,而非事件循環運行所在線程。示例:

# Create a coroutine
coro = asyncio.sleep(1, result=3)

# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)

# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3

如果在協程內產生了異常,將會通知返回的 Future 對象。它也可被用來取消事件循環中的任務:

try:
    result = future.result(timeout)
except asyncio.TimeoutError:
    print('The coroutine took too long, cancelling the task...')
    future.cancel()
except Exception as exc:
    print(f'The coroutine raised an exception: {exc!r}')
else:
    print(f'The coroutine returned: {result!r}')

參見 concurrency and multithreading 部分的文檔。

不同與其他 asyncio 函數,此函數要求顯式地傳入 loop 參數。

3.5.1 新版功能.

內省?

asyncio.current_task(loop=None)?

返回當前運行的 Task 實例,如果沒有正在運行的任務則返回 None。

如果 loopNone 則會使用 get_running_loop() 獲取當前事件循環。

3.7 新版功能.

asyncio.all_tasks(loop=None)?

返回事件循環所運行的未完成的 Task 對象的集合。

如果 loopNone,則會使用 get_running_loop() 獲取當前事件循環。

3.7 新版功能.

Task 對象?

class asyncio.Task(coro, *, loop=None)?

一個與 Future 類似 的對象,可運行 Python 協程。非線程安全。

Task 對象被用來在事件循環中運行協程。如果一個協程在等待一個 Future 對象,Task 對象會掛起該協程的執行并等待該 Future 對象完成。當該 Future 對象 完成,被打包的協程將恢復執行。

事件循環使用協同日程調度: 一個事件循環每次運行一個 Task 對象。而一個 Task 對象會等待一個 Future 對象完成,該事件循環會運行其他 Task、回調或執行 IO 操作。

使用高層級的 asyncio.create_task() 函數來創建 Task 對象,也可用低層級的 loop.create_task()ensure_future() 函數。不建議手動實例化 Task 對象。

要取消一個正在運行的 Task 對象可使用 cancel() 方法。調用此方法將使該 Task 對象拋出一個 CancelledError 異常給打包的協程。如果取消期間一個協程正在等待一個 Future 對象,該 Future 對象也將被取消。

cancelled() 可被用來檢測 Task 對象是否被取消。如果打包的協程沒有抑制 CancelledError 異常并且確實被取消,該方法將返回 True。

asyncio.TaskFuture 繼承了其除 Future.set_result()Future.set_exception() 以外的所有 API。

Task 對象支持 contextvars 模塊。當一個 Task 對象被創建,它將復制當前上下文,然后在復制的上下文中運行其協程。

在 3.7 版更改: 加入對 contextvars 模塊的支持。

cancel()?

請求取消 Task 對象。

這將安排在下一輪事件循環中拋出一個 CancelledError 異常給被封包的協程。

協程在之后有機會進行清理甚至使用 try ... ... except CancelledError ... finally 代碼塊抑制異常來拒絕請求。不同于 Future.cancel(),Task.cancel() 不保證 Task 會被取消,雖然抑制完全取消并不常見,也很不鼓勵這樣做。

以下示例演示了協程是如何偵聽取消請求的:

async def cancel_me():
    print('cancel_me(): before sleep')

    try:
        # Wait for 1 hour
        await asyncio.sleep(3600)
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')

async def main():
    # Create a "cancel_me" Task
    task = asyncio.create_task(cancel_me())

    # Wait for 1 second
    await asyncio.sleep(1)

    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")

asyncio.run(main())

# Expected output:
#
#     cancel_me(): before sleep
#     cancel_me(): cancel sleep
#     cancel_me(): after sleep
#     main(): cancel_me is cancelled now
cancelled()?

如果 Task 對象 被取消 則返回 True。

當使用 cancel() 發出取消請求時 Task 會被 取消,其封包的協程將傳播被拋入的 CancelledError 異常。

done()?

如果 Task 對象 已完成 則返回 True。

當 Task 所封包的協程返回一個值、引發一個異常或 Task 本身被取消時,則會被認為 已完成

result()?

返回 Task 的結果。

如果 Task 對象 已完成,其封包的協程的結果會被返回 (或者當協程引發異常時,該異常會被重新引發。)

如果 Task 對象 被取消,此方法會引發一個 CancelledError 異常。

如果 Task 對象的結果還不可用,此方法會引發一個 InvalidStateError 異常。

exception()?

返回 Task 對象的異常。

如果所封包的協程引發了一個異常,該異常將被返回。如果所封包的協程正常返回則該方法將返回 None。

如果 Task 對象 被取消,此方法會引發一個 CancelledError 異常。

如果 Task 對象尚未 完成,此方法將引發一個 InvalidStateError 異常。

add_done_callback(callback, *, context=None)?

添加一個回調,將在 Task 對象 完成 時被運行。

此方法應該僅在低層級的基于回調的代碼中使用。

要了解更多細節請查看 Future.add_done_callback() 的文檔。

remove_done_callback(callback)?

從回調列表中移除 callback 。

此方法應該僅在低層級的基于回調的代碼中使用。

要了解更多細節請查看 Future.remove_done_callback() 的文檔。

get_stack(*, limit=None)?

返回此 Task 對象的棧框架列表。

如果所封包的協程未完成,這將返回其掛起所在的棧。如果協程已成功完成或被取消,這將返回一個空列表。如果協程被一個異常終止,這將返回回溯框架列表。

框架總是從按從舊到新排序。

每個被掛起的協程只返回一個??蚣堋?/p>

可選的 limit 參數指定返回框架的數量上限;默認返回所有框架。返回列表的順序要看是返回一個棧還是一個回溯:棧返回最新的框架,回溯返回最舊的框架。(這與 traceback 模塊的行為保持一致。)

print_stack(*, limit=None, file=None)?

打印此 Task 對象的?;蚧厮?。

此方法產生的輸出類似于 traceback 模塊通過 get_stack() 所獲取的框架。

limit 參數會直接傳遞給 get_stack()。

file 參數是輸出所寫入的 I/O 流;默認情況下輸出會寫入 sys.stderr。

classmethod all_tasks(loop=None)?

返回一個事件循環中所有任務的集合。

默認情況下將返回當前事件循環中所有任務。如果 loopNone,則會使用 get_event_loop() 函數來獲取當前事件循環。

此方法 已棄用 并將在 Python 3.9 中移除。請改用 asyncio.all_tasks() 函數。

classmethod current_task(loop=None)?

返回當前運行任務或 None。

如果 loopNone,則會使用 get_event_loop() 函數來獲取當前事件循環。

此方法 已棄用 并將在 Python 3.9 中移除。請改用 asyncio.current_task() 函數。

基于生成器的協程?

注解

對基于生成器的協程的支持 已棄用 并計劃在 Python 3.10 中移除。

基于生成器的協程是 async/await 語法的前身。它們是使用 yield from 語句創建的 Python 生成器,可以等待 Future 和其他協程。

基于生成器的協程應該使用 @asyncio.coroutine 裝飾,雖然這并非強制。

@asyncio.coroutine?

用來標記基于生成器的協程的裝飾器。

此裝飾器使得舊式的基于生成器的協程能與 async/await 代碼相兼容:

@asyncio.coroutine
def old_style_coroutine():
    yield from asyncio.sleep(1)

async def main():
    await old_style_coroutine()

此裝飾器 已棄用 并計劃在 Python 3.10 中移除。

此裝飾器不應該被用于 async def 協程。

asyncio.iscoroutine(obj)?

如果 obj 是一個 協程對象 則返回 True

此方法不同于 inspect.iscoroutine() 因為它對基于生成器的協程返回 True。

asyncio.iscoroutinefunction(func)?

如果 func 是一個 協程函數 則返回 True

此方法不同于 inspect.iscoroutinefunction() 因為它對以 @coroutine 裝飾的基于生成器的協程函數返回 True