concurrent.futures --- 啟動并行任務?
3.2 新版功能.
源碼: Lib/concurrent/futures/thread.py 和 Lib/concurrent/futures/process.py
concurrent.futures 模塊提供異步執行可調用對象高層接口。
異步執行可以由 ThreadPoolExecutor 使用線程或由 ProcessPoolExecutor 使用單獨的進程來實現。 兩者都是實現抽像類 Executor 定義的接口。
Executor 對象?
-
class
concurrent.futures.Executor? 抽象類提供異步執行調用方法。要通過它的子類調用,而不是直接調用。
-
submit(fn, *args, **kwargs)? 調度可調用對象 fn,以
fn(*args **kwargs)方式執行并返回Future對像代表可調用對象的執行。:with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(pow, 323, 1235) print(future.result())
-
map(func, *iterables, timeout=None, chunksize=1)? 類似于
map(func, *iterables)函數,除了以下兩點:iterables 是立即執行而不是延遲執行的;
func 是異步執行的,對 func 的多個調用可以并發執行。
如果從原始調用到
Executor.map()經過 timeout 秒后,__next__()已被調用且返回的結果還不可用,那么已返回的迭代器將觸發concurrent.futures.TimeoutError。 timeout 可以是整數或浮點數。如果 timeout 沒有指定或為None,則沒有超時限制。如果 func 調用引發一個異常,當從迭代器中取回它的值時這個異常將被引發。
使用
ProcessPoolExecutor時,這個方法會將 iterables 分割任務塊并作為獨立的任務并提交到執行池中。這些塊的大概數量可以由 chunksize 指定正整數設置。 對很長的迭代器來說,使用大的 chunksize 值比默認值 1 能顯著地提高性能。 chunksize 對ThreadPoolExecutor沒有效果。在 3.5 版更改: 加入 chunksize 參數。
-
shutdown(wait=True)? 當待執行的 future 對象完成執行后向執行者發送信號,它就會釋放正在使用的任何資源。 在關閉后調用
Executor.submit()和Executor.map()將會引發RuntimeError。如果 wait 為
True則此方法只有在所有待執行的 future 對象完成執行且釋放已分配的資源后才會返回。 如果 wait 為False,方法立即返回,所有待執行的 future 對象完成執行后會釋放已分配的資源。 不管 wait 的值是什么,整個 Python 程序將等到所有待執行的 future 對象完成執行后才退出。如果使用
with語句,你就可以避免顯式調用這個方法,它將會停止Executor(就好像Executor.shutdown()調用時 wait 設為True一樣等待):import shutil with ThreadPoolExecutor(max_workers=4) as e: e.submit(shutil.copy, 'src1.txt', 'dest1.txt') e.submit(shutil.copy, 'src2.txt', 'dest2.txt') e.submit(shutil.copy, 'src3.txt', 'dest3.txt') e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
-
ThreadPoolExecutor?
ThreadPoolExecutor 是 Executor 的子類,它使用線程池來異步執行調用。
當回調已關聯了一個 Future 然后再等待另一個 Future 的結果時就會發產死鎖情況。例如:
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
與:
def wait_on_future():
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this function.
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
-
class
concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())? Executor子類使用最多 max_workers 個線程的線程池來異步執行調用。initializer 是在每個工作者線程開始處調用的一個可選可調用對象。 initargs 是傳遞給初始化器的元組參數。任何向池提交更多工作的嘗試, initializer 都將引發一個異常,當前所有等待的工作都會引發一個
BrokenThreadPool。在 3.5 版更改: 如果 max_workers 為
None或沒有指定,將默認為機器處理器的個數,假如ThreadPoolExecutor側重于 I/O 操作而不是 CPU 運算,那么可以乘以5,同時工作線程的數量可以比ProcessPoolExecutor的數量高。3.6 新版功能: 添加 thread_name_prefix 參數允許用戶控制由線程池創建的
threading.Thread工作線程名稱以方便調試。在 3.7 版更改: 加入 initializer 和*initargs* 參數。
ThreadPoolExecutor 例子?
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
ProcessPoolExecutor?
ProcessPoolExecutor 是 Executor 的子類,它使用進程池來實現異步執行調用。 ProcessPoolExecutor 使用 multiprocessing 回避 Global Interpreter Lock 但也意味著只可以處理和返回可序列化的對象。
__main__ 模塊必須可以被工作者子進程導入。這意味著 ProcessPoolExecutor 不可以工作在交互式解釋器中。
從可調用對象中調用 Executor 或 Future 的方法提交給 ProcessPoolExecutor 會導致死鎖。
-
class
concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())? 異步執行調用的
Executor子類使用一個最多有 max_workers 個進程的進程池。 如果 max_workers 為None或未給出,它將默認為機器的處理器個數。 如果 max_workers 小于等于0,則將引發ValueError。 在 Windows 上,max_workers 必須小于等于61,否則將引發ValueError。 如果 max_workers 為None,則所選擇的默認最多為61,即使存在更多處理器。 mp_context 可以是一個多進程上下文或是 None。 它將被用來啟動工作者。 如果 mp_context 為None或未給出,將使用默認的多進程上下文。initializer 是在每個工作者進程開始處調用的一個可選可調用對象。 initargs 是傳遞給初始化器的元組參數。任何向池提交更多工作的嘗試, initializer 都將引發一個異常,當前所有等待的工作都會引發一個
BrokenProcessPool。在 3.3 版更改: 如果其中一個工作進程被突然終止,
BrokenProcessPool就會馬上觸發。 可預計的行為沒有定義,但執行器上的操作或它的 future 對象會被凍結或死鎖。在 3.7 版更改: 添加 mp_context 參數允許用戶控制由進程池創建給工作者進程的開始方法 。
加入 initializer 和*initargs* 參數。
ProcessPoolExecutor 例子?
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
Future 對象?
Future 類將可調用對象封裝為異步執行。Future 實例由 Executor.submit() 創建。
-
class
concurrent.futures.Future? 將可調用對象封裝為異步執行。
Future實例由Executor.submit()創建,除非測試,不應直接創建。-
cancel()? 嘗試取消調用。 如果調用正在執行或已結束運行不能被取消則該方法將返回
False,否則調用會被取消并且該方法將返回True。
-
cancelled()? 如果調用成功取消返回
True。
-
running()? 如果調用正在執行而且不能被取消那么返回
True。
-
done()? 如果調用已被取消或正常結束那么返回
True。
-
result(timeout=None)? 返回調用返回的值。如果調用還沒完成那么這個方法將等待 timeout 秒。如果在 timeout 秒內沒有執行完成,
concurrent.futures.TimeoutError將會被觸發。timeout 可以是整數或浮點數。如果 timeout 沒有指定或為None,那么等待時間就沒有限制。如果 futrue 在完成前被取消則
CancelledError將被觸發。如果調用引發了一個異常,這個方法也會引發同樣的異常。
-
exception(timeout=None)? 返回由調用引發的異常。如果調用還沒完成那么這個方法將等待 timeout 秒。如果在 timeout 秒內沒有執行完成,
concurrent.futures.TimeoutError將會被觸發。timeout 可以是整數或浮點數。如果 timeout 沒有指定或為None,那么等待時間就沒有限制。如果 futrue 在完成前被取消則
CancelledError將被觸發。如果調用正常完成那么返回
None。
-
add_done_callback(fn)? 附加可調用 fn 到 future 對象。當 future 對象被取消或完成運行時,將會調用 fn,而這個 future 對象將作為它唯一的參數。
加入的可調用對象總被屬于添加它們的進程中的線程按加入的順序調用。如果可調用對象引發一個
Exception子類,它會被記錄下來并被忽略掉。如果可調用對象引發一個BaseException子類,這個行為沒有定義。如果 future 對象已經完成或已取消,fn 會被立即調用。
下面這些
Future方法用于單元測試和Executor實現。-
set_running_or_notify_cancel()? 這個方法只可以在執行關聯
Future工作之前由Executor實現調用或由單測試調用。如果這個方法返回
False那么Future已被取消,即Future.cancel()已被調用并返回True。等待Future完成 (即通過as_completed()或wait()) 的線程將被喚醒。如果這個方法返回
True那么Future不會被取消并已將它變為正在運行狀態,也就是說調用Future.running()時將返回 True。這個方法只可以被調用一次并且不能在調用
Future.set_result()或Future.set_exception()之后再調用。
-
模塊函數?
-
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)? 等待 fs 指定的
Future實例(可能由不同的Executor實例創建)完成。 返回一個由集合構成的具名 2 元組。 第一個集合名稱為done,包含在等待完成之前已完成的期程(包括正常結束或被取消的 future 對象)。 第二個集合名稱為not_done,包含未完成的 future 對象(包括掛起的或正在運行的 future 對象)。timeout 可以用來控制返回前最大的等待秒數。 timeout 可以為 int 或 float 類型。 如果 timeout 未指定或為
None,則不限制等待時間。return_when 指定此函數應在何時返回。它必須為以下常數之一:
常數
描述
FIRST_COMPLETED函數將在任意可等待對象結束或取消時返回。
FIRST_EXCEPTION函數將在任意可等待對象因引發異常而結束時返回。當沒有引發任何異常時它就相當于
ALL_COMPLETED。ALL_COMPLETED函數將在所有可等待對象結束或取消時返回。
-
concurrent.futures.as_completed(fs, timeout=None)? 返回一個包含 fs 所指定的
Future實例(可能由不同的Executor實例創建)的迭代器,這些實例會在完成時生成 future 對象(包括正常結束或被取消的 future 對象)。 任何由 fs 所指定的重復 future 對象將只被返回一次。 任何在as_completed()被調用之前完成的 future 對象將優先被生成。 如果__next__()被調用并且在對as_completed()的原始調用 timeout 秒之后結果仍不可用,則返回的迭代器將引發concurrent.futures.TimeoutError。 timeout 可以為整數或浮點數。 如果 timeout 未指定或為None,則不限制等待時間。
參見
- PEP 3148 -- future 對象 - 異步執行指令。
該提案描述了Python標準庫中包含的這個特性。
Exception類?
-
exception
concurrent.futures.CancelledError? future 對象被取消時會觸發。
-
exception
concurrent.futures.TimeoutError? future 對象執行超出給定的超時數值時引發。
-
exception
concurrent.futures.BrokenExecutor? 當執行器被某些原因中斷而且不能用來提交或執行新任務時就會被引發派生于
RuntimeError的異常類。3.7 新版功能.
-
exception
concurrent.futures.thread.BrokenThreadPool? 當
ThreadPoolExecutor中的其中一個工作者初始化失敗時會引發派生于BrokenExecutor的異常類。3.7 新版功能.
-
exception
concurrent.futures.process.BrokenProcessPool? 當
ThreadPoolExecutor中的其中一個工作者不完整終止時(比如,被外部殺死)會引發派生于BrokenExecutor( 原名RuntimeError) 的異常類。3.3 新版功能.
