事件循環?

前言

事件循環是每個 asyncio 應用的核心。 事件循環會運行異步任務和回調,執行網絡 IO 操作,以及運行子進程。

應用開發者通常應當使用高層級的 asyncio 函數,例如 asyncio.run(),應當很少有必要引用循環對象或調用其方法。 本節所針對的主要是低層級代碼、庫和框架的編寫者,他們需要更細致地控制事件循環行為。

獲取事件循環

以下低層級函數可被用于獲取、設置或創建事件循環:

asyncio.get_running_loop()?

返回當前 OS 線程中正在運行的事件循環。

如果沒有正在運行的事件循環則會引發 RuntimeError。 此函數只能由協程或回調來調用。

3.7 新版功能.

asyncio.get_event_loop()?

獲取當前事件循環。

如果當前 OS 線程沒有設置當前事件循環,該 OS 線程為主線程,并且 set_event_loop() 還沒有被調用,則 asyncio 將創建一個新的事件循環并將其設為當前事件循環。

由于此函數具有相當復雜的行為(特別是在使用了自定義事件循環策略的時候),更推薦在協程和回調中使用 get_running_loop() 函數而非 get_event_loop()

應該考慮使用 asyncio.run() 函數而非使用低層級函數來手動創建和關閉事件循環。

asyncio.set_event_loop(loop)?

loop 設置為當前 OS 線程的當前事件循環。

asyncio.new_event_loop()?

創建一個新的事件循環。

請注意 get_event_loop(), set_event_loop() 以及 new_event_loop() 函數的行為可以通過 設置自定義事件循環策略 來改變。

目錄

本文檔包含下列小節:

事件循環方法集?

事件循環有下列 低級 APIs:

運行和停止循環?

loop.run_until_complete(future)?

運行直到 future ( Future 的實例 ) 被完成。

如果參數是 coroutine object ,將被隱式調度為 asyncio.Task 來運行。

返回 Future 的結果 或者引發相關異常。

loop.run_forever()?

運行事件循環直到 stop() 被調用。

如果 stop() 在調用 run_forever() 之前被調用,循環將輪詢一次 I/O 選擇器并設置超時為零,再運行所有已加入計劃任務的回調來響應 I/O 事件(以及已加入計劃任務的事件),然后退出。

如果 stop()run_forever() 運行期間被調用,循環將運行當前批次的回調然后退出。 請注意在此情況下由回調加入計劃任務的新回調將不會運行;它們將會在下次 run_forever()run_until_complete() 被調用時運行。

loop.stop()?

停止事件循環。

loop.is_running()?

返回 True 如果事件循環當前正在運行。

loop.is_closed()?

如果事件循環已經被關閉,返回 True

loop.close()?

關閉事件循環。

當這個函數被調用的時候,循環必須處于非運行狀態。pending狀態的回調將被丟棄。

此方法清除所有的隊列并立即關閉執行器,不會等待執行器完成。

這個方法是冪等的和不可逆的。事件循環關閉后,不應調用其他方法。

coroutine loop.shutdown_asyncgens()?

安排所有當前打開的 asynchronous generator 對象通過 aclose() 調用來關閉。 在調用此方法后,如果有新的異步生成器被迭代事件循環將會發出警告。 這應當被用來可靠地完成所有已加入計劃任務的異步生成器。

運行請注意,當使用 asyncio.run() 時,無需調用此函數。

示例:

try:
    loop.run_forever()
finally:
    loop.run_until_complete(loop.shutdown_asyncgens())
    loop.close()

3.6 新版功能.

調度回調?

loop.call_soon(callback, *args, context=None)?

安排在下一次事件循環的迭代中使用 args 參數調用 callback

回調按其注冊順序被調用。每個回調僅被調用一次。

可選鍵值類的參數 context 允許 callback 運行在一個指定的自定義 contextvars.Context 對象中。如果沒有提供 context ,則使用當前上下文。

返回一個能用來取消回調的 asyncio.Handle 實例。

這個方法不是線程安全的。

loop.call_soon_threadsafe(callback, *args, context=None)?

call_soon() 的線程安全變體。必須被用于安排 來自其他線程 的回調。

參見 concurrency and multithreading 部分的文檔。

在 3.7 版更改: 加入鍵值類形參 context。請參閱 PEP 567 查看更多細節。

注解

大多數 asyncio 的調度函數不讓傳遞關鍵字參數。為此,請使用 functools.partial()

# will schedule "print("Hello", flush=True)"
loop.call_soon(
    functools.partial(print, "Hello", flush=True))

使用 partial 對象通常比使用lambda更方便,asyncio 在調試和錯誤消息中能更好的呈現 partial 對象。

調度延遲回調?

事件循環提供安排調度函數在將來某個時刻調用的機制。事件循環使用單調時鐘來跟蹤時間。

loop.call_later(delay, callback, *args, context=None)?

安排 callback 在給定的 delay 秒(可以是 int 或者 float)后被調用。

返回一個 asyncio.TimerHandle 實例,該實例能用于取消回調。

callback 只被調用一次。如果兩個回調被安排在同樣的時間點,執行順序未限定。

可選的位置參數 args 在被調用的時候傳遞給 callback? 。 如果你想把關鍵字參數傳遞給 callback ,請使用 functools.partial()

可選鍵值類的參數 context 允許 callback 運行在一個指定的自定義 contextvars.Context 對象中。如果沒有提供 context ,則使用當前上下文。

在 3.7 版更改: 加入鍵值類形參 context。請參閱 PEP 567 查看更多細節。

在 3.7.1 版更改: 在Python 3.7.0 和更早版本的默認事件循環實現中,delay 不能超過一天。在Python3.7.1中已被修復。

loop.call_at(when, callback, *args, context=None)?

安排 callback 在給定的絕對時間戳的 時間 (一個 int 或者 float)被調用,使用與 loop.time() 同樣的時間參考。

本方法的行為和 call_later() 方法相同。

返回一個 asyncio.TimerHandle 實例,該實例能用于取消回調。

在 3.7 版更改: 加入鍵值類形參 context。請參閱 PEP 567 查看更多細節。

在 3.7.1 版更改: 在Python 3.7.0 和更早版本的默認事件循環實現中, when 與當前時間的差值不能超過一天。在 Python3.7.1中已被修復。

loop.time()?

根據時間循環內部的單調時鐘,返回當前時間, float 值。

注解

在 3.8 版更改: 在 Python 3.7 和更早版本中超時 (相對的 delay 或絕對的 when) 不能超過一天。 這在 Python 3.8 中已被修復。

參見

asyncio.sleep() 函數。

創建 Futures 和 Tasks?

loop.create_future()?

創建一個附加到事件循環中的 asyncio.Future 對象。

這是在asyncio中創建Futures的首選方式。這讓第三方事件循環可以提供Future 對象的替代實現(更好的性能或者功能)。

3.5.2 新版功能.

loop.create_task(coro)?

安排一個 協程 的執行。返回一個 Task 對象。

第三方的事件循環可以使用它們自己的 Task 子類來滿足互操作性。這種情況下結果類型是一個 Task 的子類。

loop.set_task_factory(factory)?

設置一個 task 工廠 , 被用于 loop.create_task()

如果 factoryNone 則將設置默認的任務工廠。 在其他情況下,factory 必須為一個 可調用對象 且簽名匹配 (loop, coro),其中 loop 是對活動事件循環的引用,而 coro 是一個協程對象。 該可調用對象必須返回一個兼容 asyncio.Future 的對象。

loop.get_task_factory()?

返回一個任務工廠,或者如果是使用默認值則返回 None

打開網絡連接?

coroutine loop.create_connection(protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None)?

打開一個流式傳輸連接,連接到由 hostport 指定的地址。

套接字族可以是 AF_INETAF_INET6,具體取決于 host (或 family 參數,如果有提供的話)。

套接字類型將為 SOCK_STREAM

protocol_factory 必須為一個返回 asyncio 協議 實現的可調用對象。

這個方法會嘗試在后臺創建連接。當創建成功,返回 (transport, protocol) 組合。

底層操作的大致的執行順序是這樣的:

  1. 創建連接并為其創建一個 傳輸

  2. 不帶參數地調用 protocol_factory 并預期返回一個 協議 實例。

  3. 協議實例通過調用其 connection_made() 方法與傳輸進行配對。

  4. 成功時返回一個 (transport, protocol) 元組。

被創建的傳輸對象是一個實現相關的雙向流。

其他參數:

  • ssl: 如果給定該參數且不為假值,則會創建一個 SSL/TLS 傳輸(默認創建一個純 TCP 傳輸)。 如果 ssl 是一個 ssl.SSLContext 對象,則會使用此上下文來創建傳輸對象;如果 sslTrue,則會使用從 ssl.create_default_context() 返回的默認上下文。

  • server_hostname 設置或重載目標服務器的證書將要匹配的主機名。 應當只在 ssl 不為 None 時傳入。 默認情況下會使用 host 參數的值。 如果 host 為空那就沒有默認值,你必須為 server_hostname 傳入一個值。 如果 server_hostname 為空字符串,則主機名匹配會被禁用(這是一個嚴重的安全風險,使得潛在的中間人攻擊成為可能)。

  • family, proto, flags 是可選的地址族、協議和標志,它們會被傳遞給 getaddrinfo() 來對 host 進行解析。如果要指定的話,這些都應該是來自于 socket 模塊的對應常量。

  • sock,如果指定的話,其應該是一個已經存在,并且已經處于連接狀態的 socket.socket 對象,其會被傳輸對象使用。如果指定了 sock ,那么 host, port, family, proto, flagslocal_addr 就都不應該被指定了。

  • 如果給出 local_addr,它應當是一個用來在本地綁定套接字的 (local_host, local_port) 元組。 local_hostlocal_port 會使用 getaddrinfo() 來查找,這與 hostport 類似。

  • ssl_handshake_timeout 是(用于 TLS 連接的)在放棄連接之前要等待 TLS 握手完成的秒數。 如果參數為 None 則使用 (默認的) 60.0

3.7 新版功能: ssl_handshake_timeout 形參。

在 3.6 版更改: 套接字選項 TCP_NODELAY 默認已為所有 TCP 連接進行了設置。

在 3.5 版更改: ProactorEventLoop 類中添加 SSL/TLS 支持。

參見

open_connection() 函數是一個高層級的替代 API。 它返回一對 (StreamReader, StreamWriter),可在 async/await 代碼中直接使用。

coroutine loop.create_datagram_endpoint(protocol_factory, local_addr=None, remote_addr=None, *, family=0, proto=0, flags=0, reuse_address=None, reuse_port=None, allow_broadcast=None, sock=None)?

注解

形參 reuse_address 已不再受支持,因為使用 SO_REUSEADDR 會對 UDP 造成顯著的安全問題。 顯式地傳入 reuse_address=True 將會引發異常。

當具有不同 UID 的多個進程將套接字賦給具有 SO_REUSEADDR 的相同 UDP 套接字地址時,傳入的數據包可能會在套接字間隨機分配。

對于受支持的平臺,reuse_port 可以被用作類似功能的替代。 通過 reuse_port 將改用 SO_REUSEPORT,它能夠防止具有不同 UID 的進程將套接字賦給相同的套接字地址。

創建一個數據報連接。

套接字族可以是 AF_INET, AF_INET6AF_UNIX,具體取決于 host (或 family 參數,如果有提供的話)。

socket類型將是 SOCK_DGRAM

protocol_factory 必須為一個返回 協議 實現的可調用對象。

成功時返回一個 (transport, protocol) 元組。

其他參數:

  • local_addr,如果指定的話,就是一個 (local_host, local_port) 元組,用于在本地綁定套接字。 local_hostlocal_port 是使用 getaddrinfo() 來查找的。

  • remote_addr,如果指定的話,就是一個 (remote_host, remote_port) 元組,用于同一個遠程地址連接。remote_hostremote_port 是使用 getaddrinfo() 來查找的。

  • family, proto, flags 是可選的地址族,協議和標志,其會被傳遞給 getaddrinfo() 來完成 host 的解析。如果要指定的話,這些都應該是來自于 socket 模塊的對應常量。

  • reuse_port 告知內核,只要在創建時都設置了這個旗標,就允許此端點綁定到其他現有端點所綁定的相同端口上。 這個選項在 Windows 和某些 Unix 上不受支持。 如果 SO_REUSEPORT 常量未定義則此功能就是不受支持的。

  • allow_broadcast 告知內核允許此端點向廣播地址發送消息。

  • sock 可選擇通過指定此值用于使用一個預先存在的,已經處于連接狀態的 socket.socket 對象,并將其提供給此傳輸對象使用。如果指定了這個值, local_addrremote_addr 就應該被忽略 (必須為 None)。

本方法不支持Windows。Windows下,請使用 ProactorEventLoop

參見 UDP echo 客戶端協議UDP echo 服務端協議 的例子。

在 3.4.4 版更改: 添加了 family, proto, flags, reuse_address, reuse_port, allow_broadcastsock 等參數。

在 3.7.6 版更改: 出于安全考慮,reuse_address 形參已不再受支持。

coroutine loop.create_unix_connection(protocol_factory, path=None, *, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)?

創建Unix 連接

套接字族將為 AF_UNIX;套接字類型將為 SOCK_STREAM

成功時返回一個 (transport, protocol) 元組。

path 是所要求的 Unix 域套接字的名字,除非指定了 sock 形參。 抽象的 Unix 套接字, str, bytesPath 路徑都是受支持的。

請查看 loop.create_connection() 方法的文檔了解有關此方法的參數的信息。

可用性: Unix。

3.7 新版功能: ssl_handshake_timeout 形參。

在 3.7 版更改: path 形參現在可以是 path-like object 對象。

創建網絡服務?

coroutine loop.create_server(protocol_factory, host=None, port=None, *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)?

創建TCP服務 (socket 類型 SOCK_STREAM ) 監聽 host 地址的 port 端口。

返回一個 Server 對象。

參數:

  • protocol_factory 必須為一個返回 協議 實現的可調用對象。

  • host 形參可被設為幾種類型,它確定了服務器所應監聽的位置:

    • 如果 host 是一個字符串,則 TCP 服務器會被綁定到 host 所指明的單一網絡接口。

    • 如果 host 是一個字符串序列,則 TCP 服務器會被綁定到序列所指明的所有網絡接口。

    • 如果 host 是一個空字符串或 None,則會應用所有接口并將返回包含多個套接字的列表(通常是一個 IPv4 的加一個 IPv6 的)。

  • family 可被設為 socket.AF_INETAF_INET6 以強制此套接字使用 IPv4 或 IPv6。 如果未設定,則 family 將通過主機名稱來確定 (默認為 AF_UNSPEC)。

  • flags 是用于 getaddrinfo() 的位掩碼。

  • 可以選擇指定 sock 以便使用預先存在的套接字對象。 如果指定了此參數,則不可再指定 hostport

  • backlog 是傳遞給 listen() 的最大排隊連接的數量(默認為100)。

  • ssl 可被設置為一個 SSLContext 實例以在所接受的連接上啟用 TLS。

  • reuse_address 告知內核要重用一個處于 TIME_WAIT 狀態的本地套接字,而不是等待其自然超時失效。 如果未指定此參數則在 Unix 上將自動設置為 True

  • reuse_port 告知內核,只要在創建的時候都設置了這個標志,就允許此端點綁定到其它端點列表所綁定的同樣的端口上。這個選項在 Windows 上是不支持的。

  • ssl_handshake_timeout 是(用于 TLS 服務器的)在放棄連接之前要等待 TLS 握手完成的秒數。 如果參數為 (默認值) None 則為 60.0 秒。

  • start_serving 設置成 True (默認值) 會導致創建server并立即開始接受連接。設置成 False ,用戶需要等待 Server.start_serving() 或者 Server.serve_forever() 以使server開始接受連接。

3.7 新版功能: 增加了 ssl_handshake_timeoutstart_serving 形參。

在 3.6 版更改: 套接字選項 TCP_NODELAY 默認已為所有 TCP 連接進行了設置。

在 3.5 版更改: ProactorEventLoop 類中添加 SSL/TLS 支持。

在 3.5.1 版更改: host 形參可以是一個字符串的序列。

參見

start_server() 函數是一個高層級的替代 API,它返回一對 StreamReaderStreamWriter,可在 async/await 代碼中使用。

coroutine loop.create_unix_server(protocol_factory, path=None, *, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)?

loop.create_server() 類似但是專用于 AF_UNIX 套接字族。

path 是必要的 Unix 域套接字名稱,除非提供了 sock 參數。 抽象的 Unix 套接字, str, bytesPath 路徑都是受支持的。

請查看 loop.create_server() 方法的文檔了解有關此方法的參數的信息。

可用性: Unix。

3.7 新版功能: The ssl_handshake_timeout and start_serving parameters.

在 3.7 版更改: path 形參現在可以是 Path 對象。

coroutine loop.connect_accepted_socket(protocol_factory, sock, *, ssl=None, ssl_handshake_timeout=None)?

將已被接受的連接包裝成一個傳輸/協議對。

此方法可被服務器用來接受 asyncio 以外的連接,但是使用 asyncio 來處理它們。

參數:

  • protocol_factory 必須為一個返回 協議 實現的可調用對象。

  • sock 是一個預先存在的套接字對象,它是由 socket.accept 返回的。

  • ssl 可被設置為一個 SSLContext 以在接受的連接上啟用 SSL。

  • ssl_handshake_timeout 是(為一個SSL連接)在中止連接前,等待SSL握手完成的時間【單位秒】。如果為 None (缺省) 則是 60.0 秒。

返回一個 (transport, protocol) 對。

3.7 新版功能: ssl_handshake_timeout 形參。

3.5.3 新版功能.

傳輸文件?

coroutine loop.sendfile(transport, file, offset=0, count=None, *, fallback=True)?

file 通過 transport 發送。 返回所發送的字節總數。

如果可用的話,該方法將使用高性能的 os.sendfile()

file 必須是個二進制模式打開的常規文件對象。

offset 指明從何處開始讀取文件。 如果指定了 count,它是要傳輸的字節總數而不再一直發送文件直至抵達 EOF。 文件位置總是會被更新,即使此方法引發了錯誤,并可以使用 file.tell() 來獲取實際發送的字節總數。

fallback 設為 True 會使得 asyncio 在平臺不支持 sendfile 系統調用時手動讀取并發送文件(例如 Windows 或 Unix 上的 SSL 套接字)。

如果系統不支持 sendfile 系統調用且 fallbackFalse 則會引發 SendfileNotAvailableError

3.7 新版功能.

TLS 升級?

coroutine loop.start_tls(transport, protocol, sslcontext, *, server_side=False, server_hostname=None, ssl_handshake_timeout=None)?

將現有基于傳輸的連接升級到 TLS。

返回一個新的傳輸實例,其中 protocol 必須在 await 之后立即開始使用。 傳給 start_tls 方法的 transport 實例應永遠不會被再次使用。

參數:

  • transportprotocol 實例的方法與 create_server()create_connection() 所返回的類似。

  • sslcontext :一個已經配置好的 SSLContext 實例。

  • 當服務端連接已升級時 (如 create_server() 所創建的對象) server_side 會傳入 True

  • server_hostname :設置或者覆蓋目標服務器證書中相對應的主機名。

  • ssl_handshake_timeout 是(用于 TLS 連接的)在放棄連接之前要等待 TLS 握手完成的秒數。 如果參數為 None 則使用 (默認的) 60.0

3.7 新版功能.

監控文件描述符?

loop.add_reader(fd, callback, *args)?

開始監視 fd 文件描述符以獲取讀取的可用性,一旦 fd 可用于讀取,使用指定的參數調用 callback

loop.remove_reader(fd)?

停止對文件描述符 fd 讀取可用性的監視。

loop.add_writer(fd, callback, *args)?

開始監視 fd 文件描述符的寫入可用性,一旦 fd 可用于寫入,使用指定的參數調用 callback

使用 functools.partial() 傳遞關鍵字參數callback.

loop.remove_writer(fd)?

停止對文件描述符 fd 的寫入可用性監視。

另請查看 平臺支持 一節了解以上方法的某些限制。

直接使用 socket 對象?

通常,使用基于傳輸的 API 的協議實現,例如 loop.create_connection()loop.create_server() 比直接使用套接字的實現更快。 但是,在某些應用場景下性能并不非常重要,直接使用 socket 對象會更方便。

coroutine loop.sock_recv(sock, nbytes)?

sock 接收至多 nbytessocket.recv() 的異步版本。

返回接收到的數據【bytes對象類型】。

sock 必須是個非阻塞socket。

在 3.7 版更改: 雖然這個方法總是被記錄為協程方法,但它在 Python 3.7 之前的發行版中會返回一個 Future。 從 Python 3.7 開始它則是一個 async def 方法。

coroutine loop.sock_recv_into(sock, buf)?

sock 接收數據放入 buf 緩沖區。 模仿了阻塞型的 socket.recv_into() 方法。

返回寫入緩沖區的字節數。

sock 必須是個非阻塞socket。

3.7 新版功能.

coroutine loop.sock_sendall(sock, data)?

data 發送到 sock 套接字。 socket.sendall() 的異步版本。

此方法會持續發送數據到套接字直至 data 中的所有數據發送完畢或是有錯誤發生。 當成功時會返回 None。 當發生錯誤時,會引發一個異常。 此外,沒有辦法能確定有多少數據或是否有數據被連接的接收方成功處理。

sock 必須是個非阻塞socket。

在 3.7 版更改: 雖然這個方法一直被標記為協程方法。但是,Python 3.7 之前,該方法返回 Future ,從Python 3.7 開始,這個方法是 async def 方法。

coroutine loop.sock_connect(sock, address)?

sock 連接到位于 address 的遠程套接字。

socket.connect() 的異步版本。

sock 必須是個非阻塞socket。

在 3.5.2 版更改: address 不再需要被解析。 sock_connect 將嘗試檢查 address 是否已通過調用 socket.inet_pton() 被解析。 如果沒有,則將使用 loop.getaddrinfo() 來解析 address

coroutine loop.sock_accept(sock)?

接受一個連接。 模仿了阻塞型的 socket.accept() 方法。

scoket 必須綁定到一個地址上并且監聽連接。返回值是一個 (conn, address) 對,其中 conn 是一個 新*的套接字對象,用于在此連接上收發數據,*address 是連接的另一端的套接字所綁定的地址。

sock 必須是個非阻塞socket。

在 3.7 版更改: 雖然這個方法一直被標記為協程方法。但是,Python 3.7 之前,該方法返回 Future ,從Python 3.7 開始,這個方法是 async def 方法。

coroutine loop.sock_sendfile(sock, file, offset=0, count=None, *, fallback=True)?

在可能的情況下使用高性能的 os.sendfile 發送文件。 返回所發送的字節總數。

socket.sendfile() 的異步版本。

sock 必須為非阻塞型的 socket.SOCK_STREAM socket

file 必須是個用二進制方式打開的常規文件對象。

offset 指明從何處開始讀取文件。 如果指定了 count,它是要傳輸的字節總數而不再一直發送文件直至抵達 EOF。 文件位置總是會被更新,即使此方法引發了錯誤,并可以使用 file.tell() 來獲取實際發送的字節總數。

fallback 被設為 True 時,會使用 asyncio 在平臺不支持 sendfile 系統調用時手動讀取并發送文件(例如 Windows 或 Unix 上的 SSL 套接字)。

如果系統不支持 sendfile 并且 fallbackFalse ,引發 SendfileNotAvailableError 異常。

sock 必須是個非阻塞socket。

3.7 新版功能.

DNS?

coroutine loop.getaddrinfo(host, port, *, family=0, type=0, proto=0, flags=0)?

異步版的 socket.getaddrinfo()

coroutine loop.getnameinfo(sockaddr, flags=0)?

異步版的 socket.getnameinfo()

在 3.7 版更改: getaddrinfogetnameinfo 方法一直被標記返回一個協程,但是Python 3.7之前,實際返回的是 asyncio.Future 對象。從Python 3.7 開始,這兩個方法是協程。

使用管道?

coroutine loop.connect_read_pipe(protocol_factory, pipe)?

在事件循環中注冊 pipe 的讀取端。

protocol_factory 必須為一個返回 asyncio 協議 實現的可調用對象。

pipe 是個 類似文件型對象.

返回一對 (transport, protocol),其中 transport 支持 ReadTransport 接口而 protocol 是由 protocol_factory 所實例化的對象。

使用 SelectorEventLoop 事件循環, pipe 被設置為非阻塞模式。

coroutine loop.connect_write_pipe(protocol_factory, pipe)?

在事件循環中注冊 pipe 的寫入端。

protocol_factory 必須為一個返回 asyncio 協議 實現的可調用對象。

pipe 是個 類似文件型對象.

返回一對 (transport, protocol),其中 transport 支持 WriteTransport 接口而 protocol 是由 protocol_factory 所實例化的對象。

使用 SelectorEventLoop 事件循環, pipe 被設置為非阻塞模式。

注解

在 Windows 中 SelectorEventLoop 不支持上述方法。 對于 Windows 請改用 ProactorEventLoop

Unix 信號?

loop.add_signal_handler(signum, callback, *args)?

設置 callback 作為 signum 信號的處理程序。

此回調將與該事件循環中其他加入隊列的回調和可運行協程一起由 loop 發起調用。 不同與使用 signal.signal() 注冊的信號處理程序,使用此函數注冊的回調可以與事件循環進行交互。

如果信號數字非法或者不可捕獲,就拋出一個 ValueError 。如果建立處理器的過程中出現問題,會拋出一個 RuntimeError

使用 functools.partial() 傳遞關鍵字參數callback.

signal.signal() 一樣,這個函數只能在主線程中調用。

loop.remove_signal_handler(sig)?

移除 sig 信號的處理程序。

如果信號處理程序被移除則返回 True,否則如果給定信號未設置處理程序則返回 False

可用性: Unix。

參見

signal 模塊。

在線程或者進程池中執行代碼。?

awaitable loop.run_in_executor(executor, func, *args)?

安排在指定的執行器中調用 func

The executor argument should be an concurrent.futures.Executor instance. The default executor is used if executor is None.

示例:

import asyncio
import concurrent.futures

def blocking_io():
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    with open('/dev/urandom', 'rb') as f:
        return f.read(100)

def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    return sum(i * i for i in range(10 ** 7))

async def main():
    loop = asyncio.get_running_loop()

    ## Options:

    # 1. Run in the default loop's executor:
    result = await loop.run_in_executor(
        None, blocking_io)
    print('default thread pool', result)

    # 2. Run in a custom thread pool:
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, blocking_io)
        print('custom thread pool', result)

    # 3. Run in a custom process pool:
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, cpu_bound)
        print('custom process pool', result)

asyncio.run(main())

這個方法返回一個 asyncio.Future 對象。

使用 functools.partial() 傳遞關鍵字參數func

在 3.5.3 版更改: loop.run_in_executor() 不會再配置它所創建的線程池執行器的 max_workers,而是將其留給線程池執行器 (ThreadPoolExecutor) 來設置默認值。

loop.set_default_executor(executor)?

executor 設為 run_in_executor() 所使用的默認執行器。 executor 應當是 ThreadPoolExecutor 的實例。

3.7 版后已移除: 使用不是Using an executor that is not an instance of ThreadPoolExecutor 實例的執行器的做法已被棄用并將在 Python 3.9 中引起錯誤。

executor 必須是個 concurrent.futures.ThreadPoolExecutor 的實例。

錯誤處理API?

允許自定義事件循環中如何去處理異常。

loop.set_exception_handler(handler)?

handler 設置為新的事件循環異常處理器。

如果 handlerNone,將設置默認的異常處理程序。 在其他情況下,handler 必須是一個可調用對象且簽名匹配 (loop, context),其中 loop 是對活動事件循環的引用,而 context 是一個包含異常詳情的 dict (請查看 call_exception_handler() 文檔來獲取關于上下文的更多信息)。

loop.get_exception_handler()?

返回當前的異常處理器,如果沒有設置異常處理器,則返回 None

3.5.2 新版功能.

loop.default_exception_handler(context)?

默認的異常處理器。

此方法會在發生異常且未設置異常處理程序時被調用。 此方法也可以由想要具有不同于默認處理程序的行為的自定義異常處理程序來調用。

context 參數和 call_exception_handler() 中的同名參數完全相同。

loop.call_exception_handler(context)?

調用當前事件循環的異常處理器。

context 是個包含下列鍵的 dict 對象(未來版本的Python可能會引入新鍵):

  • 'message': 錯誤消息;

  • 'exception' (可選): 異常對象;

  • 'future' (可選): asyncio.Future 實例;

  • 'handle' (可選): asyncio.Handle 實例;

  • 'protocol' (可選): Protocol 實例;

  • 'transport' (可選): Transport 實例;

  • 'socket' (可選): socket.socket 實例。

注解

此方法不應在子類化的事件循環中被重載。 對于自定義的異常處理,請使用 set_exception_handler() 方法。

開啟調試模式?

loop.get_debug()?

獲取事件循環調試模式設置(bool)。

如果環境變量 PYTHONASYNCIODEBUG 是一個非空字符串,就返回 True ,否則就返回 False

loop.set_debug(enabled: bool)?

設置事件循環的調試模式。

在 3.7 版更改: The new -X dev command line option can now also be used to enable the debug mode.

運行子進程?

本小節所描述的方法都是低層級的。 在常規 async/await 代碼中請考慮改用高層級的 asyncio.create_subprocess_shell()asyncio.create_subprocess_exec() 便捷函數。

注解

Windows 中默認的 asyncio 事件循環不支持子進程。 詳情參見 Windows 上的子進程支持

coroutine loop.subprocess_exec(protocol_factory, *args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)?

args 指定的一個或者多個字符串型參數創建一個子進程。

args 必須是個由下列形式的字符串組成的列表:

第一個字符串指定可執行程序,其余的字符串指定其參數。 所有字符串參數共同組成了程序的 argv

此方法類似于調用標準庫 subprocess.Popen 類,設置 shell=False 并將字符串列表作為第一個參數傳入;但是,Popen 只接受一個單獨的字符串列表參數,而 subprocess_exec 接受多個字符串參數。

protocol_factory 必須為一個返回 asyncio.SubprocessProtocol 類的子類的可調用對象。

其他參數:

  • stdin: either a file-like object representing a pipe to be connected to the subprocess's standard input stream using connect_write_pipe(), or the subprocess.PIPE constant (default). By default a new pipe will be created and connected.

  • stdout: either a file-like object representing the pipe to be connected to the subprocess's standard output stream using connect_read_pipe(), or the subprocess.PIPE constant (default). By default a new pipe will be created and connected.

  • stderr: either a file-like object representing the pipe to be connected to the subprocess's standard error stream using connect_read_pipe(), or one of subprocess.PIPE (default) or subprocess.STDOUT constants.

    By default a new pipe will be created and connected. When subprocess.STDOUT is specified, the subprocess' standard error stream will be connected to the same pipe as the standard output stream.

  • All other keyword arguments are passed to subprocess.Popen without interpretation, except for bufsize, universal_newlines and shell, which should not be specified at all.

其他參數的文檔,請參閱 subprocess.Popen 類的構造函數。

返回一對 (transport, protocol),其中 transport 來自 asyncio.SubprocessTransport 基類而 protocol 是由 protocol_factory 所實例化的對象。

coroutine loop.subprocess_shell(protocol_factory, cmd, *, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)?

基于 cmd 創建一個子進程,該參數可以是一個 str 或者按 文件系統編碼格式 編碼得到的 bytes ,使用平臺的 "shell" 語法。

這類似與用 shell=True 調用標準庫的 subprocess.Popen 類。

protocol_factory 必須為一個返回 SubprocessProtocol 類的子類的可調用對象。

請參閱 subprocess_exec() 了解有關其余參數的詳情。

返回一對 (transport, protocol),其中 transport 來自 SubprocessTransport 基類而 protocol 是由 protocol_factory 所實例化的對象。

注解

應用程序要負責確保正確地轉義所有空白字符和特殊字符以防止 shell 注入 漏洞。 shlex.quote() 函數可以被用來正確地轉義字符串中可能被用來構造 shell 命令的空白字符和特殊字符。

回調處理?

class asyncio.Handle?

loop.call_soon(), loop.call_soon_threadsafe() 所返回的回調包裝器對象。

cancel()?

取消回調。 如果此回調已被取消或已被執行,此方法將沒有任何效果。

cancelled()?

如果此回調已被取消則返回 True

3.7 新版功能.

class asyncio.TimerHandle?

loop.call_later()loop.call_at() 所返回的回調包裝器對象。

這個類是 Handle 的子類。

when()?

返回加入計劃任務的回調時間,以 float 值表示的秒數。

時間值是一個絕對時間戳,使用與 loop.time() 相同的時間引用。

3.7 新版功能.

Server 對象?

Server 對象可使用 loop.create_server(), loop.create_unix_server(), start_server()start_unix_server() 等函數來創建。

請不要直接實例化該類。

class asyncio.Server?

Server 對象是異步上下文管理器。當用于 async with 語句時,異步上下文管理器可以確保 Server 對象被關閉,并且在 async with 語句完成后,不接受新的連接。

srv = await loop.create_server(...)

async with srv:
    # some code

# At this point, srv is closed and no longer accepts new connections.

在 3.7 版更改: Python3.7 開始,Server 對象是一個異步上下文管理器。

close()?

停止服務:關閉監聽的套接字并且設置 sockets 屬性為 None

用于表示已經連進來的客戶端連接會保持打開的狀態。

服務器是被異步關閉的,使用 wait_closed() 協程來等待服務器關閉。

get_loop()?

返回與服務器對象相關聯的事件循環。

3.7 新版功能.

coroutine start_serving()?

開始接受連接。

這個方法是冪等的【相同參數重復執行,能獲得相同的結果】,所以此方法能在服務已經運行的時候調用。

傳給 loop.create_server()asyncio.start_server()start_serving 僅限關鍵字形參允許創建不接受初始連接的 Server 對象。 在此情況下可以使用 Server.start_serving()Server.serve_forever() 讓 Server 對象開始接受連接。

3.7 新版功能.

coroutine serve_forever()?

開始接受連接,直到協程被取消。 serve_forever 任務的取消將導致服務器被關閉。

如果服務器已經在接受連接了,這個方法可以被調用。每個 Server 對象,僅能有一個 serve_forever 任務。

示例:

async def client_connected(reader, writer):
    # Communicate with the client with
    # reader/writer streams.  For example:
    await reader.readline()

async def main(host, port):
    srv = await asyncio.start_server(
        client_connected, host, port)
    await srv.serve_forever()

asyncio.run(main('127.0.0.1', 0))

3.7 新版功能.

is_serving()?

如果服務器正在接受新連接的狀態,返回 True

3.7 新版功能.

coroutine wait_closed()?

等待 close() 方法執行完畢。

sockets?

List of socket.socket objects the server is listening on, or None if the server is closed.

在 3.7 版更改: 在 Python 3.7 之前 Server.sockets 會直接返回內部的服務器套接字列表。 在 3.7 版則會返回該列表的副本。

事件循環實現?

asyncio 帶有兩種不同的事件循環實現: SelectorEventLoopProactorEventLoop

By default asyncio is configured to use SelectorEventLoop on all platforms.

class asyncio.SelectorEventLoop?

基于 selectors 模塊的事件循環。

使用給定平臺中最高效的可用 selector。 也可以手動配置要使用的特定 selector:

import asyncio
import selectors

selector = selectors.SelectSelector()
loop = asyncio.SelectorEventLoop(selector)
asyncio.set_event_loop(loop)

可用性: Unix, Windows。

class asyncio.ProactorEventLoop?

用 "I/O Completion Ports" (IOCP) 構建的專為Windows 的事件循環。

可用性: Windows。

An example how to use ProactorEventLoop on Windows:

import asyncio
import sys

if sys.platform == 'win32':
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
class asyncio.AbstractEventLoop?

asyncio 兼容事件循環的抽象基類。

事件循環方法 一節列出了 AbstractEventLoop 的替代實現應當定義的所有方法。

例子?

請注意本節中的所有示例都 有意地 演示了如何使用低層級的事件循環 API,例如 loop.run_forever()loop.call_soon()。 現代的 asyncio 應用很少需要以這樣的方式編寫;請考慮使用高層級的函數例如 asyncio.run()

call_soon() 的 Hello World 示例。?

一個使用 loop.call_soon() 方法來安排回調的示例。 回調會顯示 "Hello World" 然后停止事件循環:

import asyncio

def hello_world(loop):
    """A callback to print 'Hello World' and stop the event loop"""
    print('Hello World')
    loop.stop()

loop = asyncio.get_event_loop()

# Schedule a call to hello_world()
loop.call_soon(hello_world, loop)

# Blocking call interrupted by loop.stop()
try:
    loop.run_forever()
finally:
    loop.close()

參見

一個類似的 Hello World 示例,使用協程和 run() 函數創建。

使用 call_later() 來展示當前的日期?

一個每秒刷新顯示當前日期的示例。 回調使用 loop.call_later() 方法在 5 秒后將自身重新加入計劃日程,然后停止事件循環:

import asyncio
import datetime

def display_date(end_time, loop):
    print(datetime.datetime.now())
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, display_date, end_time, loop)
    else:
        loop.stop()

loop = asyncio.get_event_loop()

# Schedule the first call to display_date()
end_time = loop.time() + 5.0
loop.call_soon(display_date, end_time, loop)

# Blocking call interrupted by loop.stop()
try:
    loop.run_forever()
finally:
    loop.close()

參見

一個類似的 current date 示例,使用協程和 run() 函數創建。

監控一個文件描述符的讀事件?

使用 loop.add_reader() 方法,等到文件描述符收到一些數據,然后關閉事件循環:

import asyncio
from socket import socketpair

# Create a pair of connected file descriptors
rsock, wsock = socketpair()

loop = asyncio.get_event_loop()

def reader():
    data = rsock.recv(100)
    print("Received:", data.decode())

    # We are done: unregister the file descriptor
    loop.remove_reader(rsock)

    # Stop the event loop
    loop.stop()

# Register the file descriptor for read event
loop.add_reader(rsock, reader)

# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())

try:
    # Run the event loop
    loop.run_forever()
finally:
    # We are done. Close sockets and the event loop.
    rsock.close()
    wsock.close()
    loop.close()

參見

為SIGINT和SIGTERM設置信號處理器?

(這個 signals 示例只適用于 Unix。)

使用 loop.add_signal_handler() 方法為信號 SIGINTSIGTERM 注冊處理程序:

import asyncio
import functools
import os
import signal

def ask_exit(signame, loop):
    print("got signal %s: exit" % signame)
    loop.stop()

async def main():
    loop = asyncio.get_running_loop()

    for signame in {'SIGINT', 'SIGTERM'}:
        loop.add_signal_handler(
            getattr(signal, signame),
            functools.partial(ask_exit, signame, loop))

    await asyncio.sleep(3600)

print("Event loop running for 1 hour, press Ctrl+C to interrupt.")
print(f"pid {os.getpid()}: send SIGINT or SIGTERM to exit.")

asyncio.run(main())