協程與任務?
本節將簡述用于協程與任務的高層級 API。
協程?
協程通過 async/await 語法進行聲明,是編寫異步應用的推薦方式。例如,以下代碼段 (需要 Python 3.7+) 打印 "hello",等待 1 秒,然后打印 "world":
>>> import asyncio
>>> async def main():
... print('hello')
... await asyncio.sleep(1)
... print('world')
>>> asyncio.run(main())
hello
world
注意:簡單地調用一個協程并不會將其加入執行日程:
>>> main()
<coroutine object main at 0x1053bb7c8>
要真正運行一個協程,asyncio 提供了三種主要機制:
asyncio.run()函數用來運行最高層級的入口點 "main()" 函數 (參見上面的示例。)等待一個協程。以下代碼段會在等待 1 秒后打印 "hello",然后 再次 等待 2 秒后打印 "world":
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(what) async def main(): print(f"started at {time.strftime('%X')}") await say_after(1, 'hello') await say_after(2, 'world') print(f"finished at {time.strftime('%X')}") asyncio.run(main())
預期的輸出:
started at 17:13:52 hello world finished at 17:13:55
asyncio.create_task()函數用來并發運行作為 asyncio任務的多個協程。讓我們修改以上示例,并發 運行兩個
say_after協程:async def main(): task1 = asyncio.create_task( say_after(1, 'hello')) task2 = asyncio.create_task( say_after(2, 'world')) print(f"started at {time.strftime('%X')}") # Wait until both tasks are completed (should take # around 2 seconds.) await task1 await task2 print(f"finished at {time.strftime('%X')}")
注意,預期的輸出顯示代碼段的運行時間比之前快了 1 秒:
started at 17:14:32 hello world finished at 17:14:34
可等待對象?
如果一個對象可以在 await 語句中使用,那么它就是 可等待 對象。許多 asyncio API 都被設計為接受可等待對象。
可等待 對象有三種主要類型: 協程, 任務 和 Future.
協程
Python 協程屬于 可等待 對象,因此可以在其他協程中被等待:
import asyncio
async def nested():
return 42
async def main():
# Nothing happens if we just call "nested()".
# A coroutine object is created but not awaited,
# so it *won't run at all*.
nested()
# Let's do it differently now and await it:
print(await nested()) # will print "42".
asyncio.run(main())
asyncio 也支持舊式的 基于生成器的 協程。
任務
任務 被用來設置日程以便 并發 執行協程。
當一個協程通過 asyncio.create_task() 等函數被打包為一個 任務,該協程將自動排入日程準備立即運行:
import asyncio
async def nested():
return 42
async def main():
# Schedule nested() to run soon concurrently
# with "main()".
task = asyncio.create_task(nested())
# "task" can now be used to cancel "nested()", or
# can simply be awaited to wait until it is complete:
await task
asyncio.run(main())
Futures
Future 是一種特殊的 低層級 可等待對象,表示一個異步操作的 最終結果。
當一個 Future 對象 被等待,這意味著協程將保持等待直到該 Future 對象在其他地方操作完畢。
在 asyncio 中需要 Future 對象以便允許通過 async/await 使用基于回調的代碼。
通常情況下 沒有必要 在應用層級的代碼中創建 Future 對象。
Future 對象有時會由庫和某些 asyncio API 暴露給用戶,用作可等待對象:
async def main():
await function_that_returns_a_future_object()
# this is also valid:
await asyncio.gather(
function_that_returns_a_future_object(),
some_python_coroutine()
)
一個很好的返回對象的低層級函數的示例是 loop.run_in_executor()。
運行 asyncio 程序?
-
asyncio.run(coro, *, debug=False)? 執行 coroutine coro 并返回結果。
此函數運行傳入的協程,負責管理 asyncio 事件循環并 完結異步生成器。
當有其他 asyncio 事件循環在同一線程中運行時,此函數不能被調用。
如果 debug 為
True,事件循環將以調試模式運行。此函數總是會創建一個新的事件循環并在結束時關閉之。它應當被用作 asyncio 程序的主入口點,理想情況下應當只被調用一次。
示例:
async def main(): await asyncio.sleep(1) print('hello') asyncio.run(main())
3.7 新版功能: 重要: 此函數是在 Python 3.7 中加入 asyncio 模塊,處于 暫定基準狀態。
創建任務?
-
asyncio.create_task(coro)? 將 coro 協程 打包為一個
Task排入日程準備執行。返回 Task 對象。該任務會在
get_running_loop()返回的循環中執行,如果當前線程沒有在運行的循環則會引發RuntimeError。此函數 在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低層級的
asyncio.ensure_future()函數。async def coro(): ... # In Python 3.7+ task = asyncio.create_task(coro()) ... # This works in all Python versions but is less readable task = asyncio.ensure_future(coro()) ...
3.7 新版功能.
休眠?
-
coroutine
asyncio.sleep(delay, result=None, *, loop=None)? 阻塞 delay 指定的秒數。
如果指定了 result,則當協程完成時將其返回給調用者。
sleep()總是會掛起當前任務,以允許其他任務運行。loop 參數已棄用,計劃在 Python 3.10 中移除。
以下協程示例運行 5 秒,每秒顯示一次當前日期:
import asyncio import datetime async def display_date(): loop = asyncio.get_running_loop() end_time = loop.time() + 5.0 while True: print(datetime.datetime.now()) if (loop.time() + 1.0) >= end_time: break await asyncio.sleep(1) asyncio.run(display_date())
并發運行任務?
-
awaitable
asyncio.gather(*aws, loop=None, return_exceptions=False)? 并發 運行 aws 序列中的 可等待對象。
如果 aws 中的某個可等待對象為協程,它將自動作為一個任務加入日程。
如果所有可等待對象都成功完成,結果將是一個由所有返回值聚合而成的列表。結果值的順序與 aws 中可等待對象的順序一致。
如果 return_exceptions 為
False(默認),所引發的首個異常會立即傳播給等待gather()的任務。aws 序列中的其他可等待對象 不會被取消 并將繼續運行。如果 return_exceptions 為
True,異常會和成功的結果一樣處理,并聚合至結果列表。如果
gather()被取消,所有被提交 (尚未完成) 的可等待對象也會 被取消。如果 aws 序列中的任一 Task 或 Future 對象 被取消,它將被當作引發了
CancelledError一樣處理 -- 在此情況下gather()調用 不會 被取消。這是為了防止一個已提交的 Task/Future 被取消導致其他 Tasks/Future 也被取消。示例:
import asyncio async def factorial(name, number): f = 1 for i in range(2, number + 1): print(f"Task {name}: Compute factorial({i})...") await asyncio.sleep(1) f *= i print(f"Task {name}: factorial({number}) = {f}") async def main(): # Schedule three calls *concurrently*: await asyncio.gather( factorial("A", 2), factorial("B", 3), factorial("C", 4), ) asyncio.run(main()) # Expected output: # # Task A: Compute factorial(2)... # Task B: Compute factorial(2)... # Task C: Compute factorial(2)... # Task A: factorial(2) = 2 # Task B: Compute factorial(3)... # Task C: Compute factorial(3)... # Task B: factorial(3) = 6 # Task C: Compute factorial(4)... # Task C: factorial(4) = 24
在 3.7 版更改: 如果 gather 本身被取消,則無論 return_exceptions 取值為何,消息都會被傳播。
屏蔽取消操作?
-
awaitable
asyncio.shield(aw, *, loop=None)? -
如果 aw 是一個協程,它將自動作為任務加入日程。
以下語句:
res = await shield(something())
相當于:
res = await something()
不同之處 在于如果包含它的協程被取消,在
something()中運行的任務不會被取消。從something()的角度看來,取消操作并沒有發生。然而其調用者已被取消,因此 "await" 表達式仍然會引發CancelledError。如果通過其他方式取消
something()(例如在其內部操作) 則shield()也會取消。如果希望完全忽略取消操作 (不推薦) 則
shield()函數需要配合一個 try/except 代碼段,如下所示:try: res = await shield(something()) except CancelledError: res = None
超時?
-
coroutine
asyncio.wait_for(aw, timeout, *, loop=None)? 等待 aw 可等待對象 完成,指定 timeout 秒數后超時。
如果 aw 是一個協程,它將自動作為任務加入日程。
timeout 可以為
None,也可以為 float 或 int 型數值表示的等待秒數。如果 timeout 為None,則等待直到完成。如果發生超時,任務將取消并引發
asyncio.TimeoutError.函數將等待直到目標對象確實被取消,所以總等待時間可能超過 timeout 指定的秒數。
如果等待被取消,則 aw 指定的對象也會被取消。
loop 參數已棄用,計劃在 Python 3.10 中移除。
示例:
async def eternity(): # Sleep for one hour await asyncio.sleep(3600) print('yay!') async def main(): # Wait for at most 1 second try: await asyncio.wait_for(eternity(), timeout=1.0) except asyncio.TimeoutError: print('timeout!') asyncio.run(main()) # Expected output: # # timeout!
在 3.7 版更改: 當 aw 因超時被取消,
wait_for會等待 aw 被取消。之前版本則將立即引發asyncio.TimeoutError。
簡單等待?
-
coroutine
asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)? 并發運行 aws 指定的 可等待對象 并阻塞線程直到滿足 return_when 指定的條件。
如果 aws 中的某個可等待對象為協程,它將自動作為任務加入日程。直接向
wait()傳入協程對象已棄用,因為這會導致 令人迷惑的行為。返回兩個 Task/Future 集合:
(done, pending)。用法:
done, pending = await asyncio.wait(aws)
loop 參數已棄用,計劃在 Python 3.10 中移除。
如指定 timeout (float 或 int 類型) 則它將被用于控制返回之前等待的最長秒數。
請注意此函數不會引發
asyncio.TimeoutError。當超時發生時,未完成的 Future 或 Task 將在指定秒數后被返回。return_when 指定此函數應在何時返回。它必須為以下常數之一:
常數
描述
FIRST_COMPLETED函數將在任意可等待對象結束或取消時返回。
FIRST_EXCEPTION函數將在任意可等待對象因引發異常而結束時返回。當沒有引發任何異常時它就相當于
ALL_COMPLETED。ALL_COMPLETED函數將在所有可等待對象結束或取消時返回。
與
wait_for()不同,wait()在超時發生時不會取消可等待對象。注解
wait()會自動將協程作為任務加入日程,以后將以(done, pending)集合形式返回顯式創建的任務對象。因此以下代碼并不會有預期的行為:async def foo(): return 42 coro = foo() done, pending = await asyncio.wait({coro}) if coro in done: # This branch will never be run!
以上代碼段的修正方法如下:
async def foo(): return 42 task = asyncio.create_task(foo()) done, pending = await asyncio.wait({task}) if task in done: # Everything will work as expected now.
直接向
wait()傳入協程對象的方式已棄用。
-
asyncio.as_completed(aws, *, loop=None, timeout=None)? ?并發地運行 aws 集合中的 可等待對象。返回一個
Future對象的迭代器。返回的每個 Future 對象代表來自剩余可等待對象集合的最早結果。如果在所有 Future 對象完成前發生超時則將引發
asyncio.TimeoutError。示例:
for f in as_completed(aws): earliest_result = await f # ...
來自其他線程的日程安排?
-
asyncio.run_coroutine_threadsafe(coro, loop)? 向指定事件循環提交一個協程。線程安全。
返回一個
concurrent.futures.Future以等待來自其他 OS 線程的結果。此函數應該從另一個 OS 線程中調用,而非事件循環運行所在線程。示例:
# Create a coroutine coro = asyncio.sleep(1, result=3) # Submit the coroutine to a given loop future = asyncio.run_coroutine_threadsafe(coro, loop) # Wait for the result with an optional timeout argument assert future.result(timeout) == 3
如果在協程內產生了異常,將會通知返回的 Future 對象。它也可被用來取消事件循環中的任務:
try: result = future.result(timeout) except asyncio.TimeoutError: print('The coroutine took too long, cancelling the task...') future.cancel() except Exception as exc: print(f'The coroutine raised an exception: {exc!r}') else: print(f'The coroutine returned: {result!r}')
參見 concurrency and multithreading 部分的文檔。
不同與其他 asyncio 函數,此函數要求顯式地傳入 loop 參數。
3.5.1 新版功能.
內省?
-
asyncio.current_task(loop=None)? 返回當前運行的
Task實例,如果沒有正在運行的任務則返回None。如果 loop 為
None則會使用get_running_loop()獲取當前事件循環。3.7 新版功能.
-
asyncio.all_tasks(loop=None)? 返回事件循環所運行的未完成的
Task對象的集合。如果 loop 為
None,則會使用get_running_loop()獲取當前事件循環。3.7 新版功能.
Task 對象?
-
class
asyncio.Task(coro, *, loop=None)? 一個與
Future 類似的對象,可運行 Python 協程。非線程安全。Task 對象被用來在事件循環中運行協程。如果一個協程在等待一個 Future 對象,Task 對象會掛起該協程的執行并等待該 Future 對象完成。當該 Future 對象 完成,被打包的協程將恢復執行。
事件循環使用協同日程調度: 一個事件循環每次運行一個 Task 對象。而一個 Task 對象會等待一個 Future 對象完成,該事件循環會運行其他 Task、回調或執行 IO 操作。
使用高層級的
asyncio.create_task()函數來創建 Task 對象,也可用低層級的loop.create_task()或ensure_future()函數。不建議手動實例化 Task 對象。要取消一個正在運行的 Task 對象可使用
cancel()方法。調用此方法將使該 Task 對象拋出一個CancelledError異常給打包的協程。如果取消期間一個協程正在等待一個 Future 對象,該 Future 對象也將被取消。cancelled()可被用來檢測 Task 對象是否被取消。如果打包的協程沒有抑制CancelledError異常并且確實被取消,該方法將返回True。asyncio.Task從Future繼承了其除Future.set_result()和Future.set_exception()以外的所有 API。Task 對象支持
contextvars模塊。當一個 Task 對象被創建,它將復制當前上下文,然后在復制的上下文中運行其協程。在 3.7 版更改: 加入對
contextvars模塊的支持。-
cancel()? 請求取消 Task 對象。
這將安排在下一輪事件循環中拋出一個
CancelledError異常給被封包的協程。協程在之后有機會進行清理甚至使用
try... ...except CancelledError...finally代碼塊抑制異常來拒絕請求。不同于Future.cancel(),Task.cancel()不保證 Task 會被取消,雖然抑制完全取消并不常見,也很不鼓勵這樣做。以下示例演示了協程是如何偵聽取消請求的:
async def cancel_me(): print('cancel_me(): before sleep') try: # Wait for 1 hour await asyncio.sleep(3600) except asyncio.CancelledError: print('cancel_me(): cancel sleep') raise finally: print('cancel_me(): after sleep') async def main(): # Create a "cancel_me" Task task = asyncio.create_task(cancel_me()) # Wait for 1 second await asyncio.sleep(1) task.cancel() try: await task except asyncio.CancelledError: print("main(): cancel_me is cancelled now") asyncio.run(main()) # Expected output: # # cancel_me(): before sleep # cancel_me(): cancel sleep # cancel_me(): after sleep # main(): cancel_me is cancelled now
-
cancelled()? 如果 Task 對象 被取消 則返回
True。當使用
cancel()發出取消請求時 Task 會被 取消,其封包的協程將傳播被拋入的CancelledError異常。
-
done()? 如果 Task 對象 已完成 則返回
True。當 Task 所封包的協程返回一個值、引發一個異常或 Task 本身被取消時,則會被認為 已完成。
-
result()? 返回 Task 的結果。
如果 Task 對象 已完成,其封包的協程的結果會被返回 (或者當協程引發異常時,該異常會被重新引發。)
如果 Task 對象 被取消,此方法會引發一個
CancelledError異常。如果 Task 對象的結果還不可用,此方法會引發一個
InvalidStateError異常。
-
exception()? 返回 Task 對象的異常。
如果所封包的協程引發了一個異常,該異常將被返回。如果所封包的協程正常返回則該方法將返回
None。如果 Task 對象 被取消,此方法會引發一個
CancelledError異常。如果 Task 對象尚未 完成,此方法將引發一個
InvalidStateError異常。
-
add_done_callback(callback, *, context=None)? 添加一個回調,將在 Task 對象 完成 時被運行。
此方法應該僅在低層級的基于回調的代碼中使用。
要了解更多細節請查看
Future.add_done_callback()的文檔。
-
remove_done_callback(callback)? 從回調列表中移除 callback 。
此方法應該僅在低層級的基于回調的代碼中使用。
要了解更多細節請查看
Future.remove_done_callback()的文檔。
-
get_stack(*, limit=None)? 返回此 Task 對象的棧框架列表。
如果所封包的協程未完成,這將返回其掛起所在的棧。如果協程已成功完成或被取消,這將返回一個空列表。如果協程被一個異常終止,這將返回回溯框架列表。
框架總是從按從舊到新排序。
每個被掛起的協程只返回一個??蚣堋?/p>
可選的 limit 參數指定返回框架的數量上限;默認返回所有框架。返回列表的順序要看是返回一個棧還是一個回溯:棧返回最新的框架,回溯返回最舊的框架。(這與 traceback 模塊的行為保持一致。)
-
print_stack(*, limit=None, file=None)? 打印此 Task 對象的?;蚧厮?。
此方法產生的輸出類似于 traceback 模塊通過
get_stack()所獲取的框架。limit 參數會直接傳遞給
get_stack()。file 參數是輸出所寫入的 I/O 流;默認情況下輸出會寫入
sys.stderr。
-
classmethod
all_tasks(loop=None)? 返回一個事件循環中所有任務的集合。
默認情況下將返回當前事件循環中所有任務。如果 loop 為
None,則會使用get_event_loop()函數來獲取當前事件循環。此方法 已棄用 并將在 Python 3.9 中移除。請改用
asyncio.all_tasks()函數。
-
classmethod
current_task(loop=None)? 返回當前運行任務或
None。如果 loop 為
None,則會使用get_event_loop()函數來獲取當前事件循環。此方法 已棄用 并將在 Python 3.9 中移除。請改用
asyncio.current_task()函數。
-
基于生成器的協程?
注解
對基于生成器的協程的支持 已棄用 并計劃在 Python 3.10 中移除。
基于生成器的協程是 async/await 語法的前身。它們是使用 yield from 語句創建的 Python 生成器,可以等待 Future 和其他協程。
基于生成器的協程應該使用 @asyncio.coroutine 裝飾,雖然這并非強制。
-
@asyncio.coroutine? 用來標記基于生成器的協程的裝飾器。
此裝飾器使得舊式的基于生成器的協程能與 async/await 代碼相兼容:
@asyncio.coroutine def old_style_coroutine(): yield from asyncio.sleep(1) async def main(): await old_style_coroutine()
此裝飾器 已棄用 并計劃在 Python 3.10 中移除。
此裝飾器不應該被用于
async def協程。
-
asyncio.iscoroutine(obj)? 如果 obj 是一個 協程對象 則返回
True。此方法不同于
inspect.iscoroutine()因為它對基于生成器的協程返回True。
-
asyncio.iscoroutinefunction(func)? 如果 func 是一個 協程函數 則返回
True。此方法不同于
inspect.iscoroutinefunction()因為它對以@coroutine裝飾的基于生成器的協程函數返回True。
