concurrent.futures --- 啟動并行任務?

3.2 新版功能.

源碼: Lib/concurrent/futures/thread.pyLib/concurrent/futures/process.py


concurrent.futures 模塊提供異步執行可調用對象高層接口。

異步執行可以由 ThreadPoolExecutor 使用線程或由 ProcessPoolExecutor 使用單獨的進程來實現。 兩者都是實現抽像類 Executor 定義的接口。

Executor 對象?

class concurrent.futures.Executor?

抽象類提供異步執行調用方法。要通過它的子類調用,而不是直接調用。

submit(fn, *args, **kwargs)?

調度可調用對象 fn,以 fn(*args **kwargs) 方式執行并返回 Future 對像代表可調用對象的執行。:

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
map(func, *iterables, timeout=None, chunksize=1)?

類似于 map(func, *iterables) 函數,除了以下兩點:

  • iterables 是立即執行而不是延遲執行的;

  • func 是異步執行的,對 func 的多個調用可以并發執行。

如果從原始調用到 Executor.map() 經過 timeout 秒后, __next__() 已被調用且返回的結果還不可用,那么已返回的迭代器將觸發 concurrent.futures.TimeoutErrortimeout 可以是整數或浮點數。如果 timeout 沒有指定或為 None ,則沒有超時限制。

如果 func 調用引發一個異常,當從迭代器中取回它的值時這個異常將被引發。

使用 ProcessPoolExecutor 時,這個方法會將 iterables 分割任務塊并作為獨立的任務并提交到執行池中。這些塊的大概數量可以由 chunksize 指定正整數設置。 對很長的迭代器來說,使用大的 chunksize 值比默認值 1 能顯著地提高性能。 chunksizeThreadPoolExecutor 沒有效果。

在 3.5 版更改: 加入 chunksize 參數。

shutdown(wait=True)?

當待執行的 future 對象完成執行后向執行者發送信號,它就會釋放正在使用的任何資源。 在關閉后調用 Executor.submit()Executor.map() 將會引發 RuntimeError

如果 waitTrue 則此方法只有在所有待執行的 future 對象完成執行且釋放已分配的資源后才會返回。 如果 waitFalse,方法立即返回,所有待執行的 future 對象完成執行后會釋放已分配的資源。 不管 wait 的值是什么,整個 Python 程序將等到所有待執行的 future 對象完成執行后才退出。

如果使用 with 語句,你就可以避免顯式調用這個方法,它將會停止 Executor (就好像 Executor.shutdown() 調用時 wait 設為 True 一樣等待):

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

ThreadPoolExecutor?

ThreadPoolExecutorExecutor 的子類,它使用線程池來異步執行調用。

當回調已關聯了一個 Future 然后再等待另一個 Future 的結果時就會發產死鎖情況。例如:

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

與:

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())?

Executor 子類使用最多 max_workers 個線程的線程池來異步執行調用。

initializer 是在每個工作者線程開始處調用的一個可選可調用對象。 initargs 是傳遞給初始化器的元組參數。任何向池提交更多工作的嘗試, initializer 都將引發一個異常,當前所有等待的工作都會引發一個 BrokenThreadPool

在 3.5 版更改: 如果 max_workersNone 或沒有指定,將默認為機器處理器的個數,假如 ThreadPoolExecutor 側重于 I/O 操作而不是 CPU 運算,那么可以乘以 5,同時工作線程的數量可以比 ProcessPoolExecutor 的數量高。

3.6 新版功能: 添加 thread_name_prefix 參數允許用戶控制由線程池創建的 threading.Thread 工作線程名稱以方便調試。

在 3.7 版更改: 加入 initializer 和*initargs* 參數。

ThreadPoolExecutor 例子?

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor?

ProcessPoolExecutorExecutor 的子類,它使用進程池來實現異步執行調用。 ProcessPoolExecutor 使用 multiprocessing 回避 Global Interpreter Lock 但也意味著只可以處理和返回可序列化的對象。

__main__ 模塊必須可以被工作者子進程導入。這意味著 ProcessPoolExecutor 不可以工作在交互式解釋器中。

從可調用對象中調用 ExecutorFuture 的方法提交給 ProcessPoolExecutor 會導致死鎖。

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())?

異步執行調用的 Executor 子類使用一個最多有 max_workers 個進程的進程池。 如果 max_workersNone 或未給出,它將默認為機器的處理器個數。 如果 max_workers 小于等于 0,則將引發 ValueError。 在 Windows 上,max_workers 必須小于等于 61,否則將引發 ValueError。 如果 max_workersNone,則所選擇的默認最多為 61,即使存在更多處理器。 mp_context 可以是一個多進程上下文或是 None。 它將被用來啟動工作者。 如果 mp_contextNone 或未給出,將使用默認的多進程上下文。

initializer 是在每個工作者進程開始處調用的一個可選可調用對象。 initargs 是傳遞給初始化器的元組參數。任何向池提交更多工作的嘗試, initializer 都將引發一個異常,當前所有等待的工作都會引發一個 BrokenProcessPool

在 3.3 版更改: 如果其中一個工作進程被突然終止,BrokenProcessPool 就會馬上觸發。 可預計的行為沒有定義,但執行器上的操作或它的 future 對象會被凍結或死鎖。

在 3.7 版更改: 添加 mp_context 參數允許用戶控制由進程池創建給工作者進程的開始方法 。

加入 initializer 和*initargs* 參數。

ProcessPoolExecutor 例子?

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

Future 對象?

Future 類將可調用對象封裝為異步執行。Future 實例由 Executor.submit() 創建。

class concurrent.futures.Future?

將可調用對象封裝為異步執行。Future 實例由 Executor.submit() 創建,除非測試,不應直接創建。

cancel()?

嘗試取消調用。 如果調用正在執行或已結束運行不能被取消則該方法將返回 False,否則調用會被取消并且該方法將返回 True

cancelled()?

如果調用成功取消返回 True

running()?

如果調用正在執行而且不能被取消那么返回 True

done()?

如果調用已被取消或正常結束那么返回 True

result(timeout=None)?

返回調用返回的值。如果調用還沒完成那么這個方法將等待 timeout 秒。如果在 timeout 秒內沒有執行完成,concurrent.futures.TimeoutError 將會被觸發。timeout 可以是整數或浮點數。如果 timeout 沒有指定或為 None,那么等待時間就沒有限制。

如果 futrue 在完成前被取消則 CancelledError 將被觸發。

如果調用引發了一個異常,這個方法也會引發同樣的異常。

exception(timeout=None)?

返回由調用引發的異常。如果調用還沒完成那么這個方法將等待 timeout 秒。如果在 timeout 秒內沒有執行完成,concurrent.futures.TimeoutError 將會被觸發。timeout 可以是整數或浮點數。如果 timeout 沒有指定或為 None,那么等待時間就沒有限制。

如果 futrue 在完成前被取消則 CancelledError 將被觸發。

如果調用正常完成那么返回 None

add_done_callback(fn)?

附加可調用 fn 到 future 對象。當 future 對象被取消或完成運行時,將會調用 fn,而這個 future 對象將作為它唯一的參數。

加入的可調用對象總被屬于添加它們的進程中的線程按加入的順序調用。如果可調用對象引發一個 Exception 子類,它會被記錄下來并被忽略掉。如果可調用對象引發一個 BaseException 子類,這個行為沒有定義。

如果 future 對象已經完成或已取消,fn 會被立即調用。

下面這些 Future 方法用于單元測試和 Executor 實現。

set_running_or_notify_cancel()?

這個方法只可以在執行關聯 Future 工作之前由 Executor 實現調用或由單測試調用。

如果這個方法返回 False 那么 Future 已被取消,即 Future.cancel() 已被調用并返回 True 。等待 Future 完成 (即通過 as_completed()wait()) 的線程將被喚醒。

如果這個方法返回 True 那么 Future 不會被取消并已將它變為正在運行狀態,也就是說調用 Future.running() 時將返回 True

這個方法只可以被調用一次并且不能在調用 Future.set_result()Future.set_exception() 之后再調用。

set_result(result)?

設置將 Future 關聯工作的結果給 result

這個方法只可以由 Executor 實現和單元測試使用。

set_exception(exception)?

設置 Future 關聯工作的結果給 Exception exception

這個方法只可以由 Executor 實現和單元測試使用。

模塊函數?

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)?

等待 fs 指定的 Future 實例(可能由不同的 Executor 實例創建)完成。 返回一個由集合構成的具名 2 元組。 第一個集合名稱為 done,包含在等待完成之前已完成的期程(包括正常結束或被取消的 future 對象)。 第二個集合名稱為 not_done,包含未完成的 future 對象(包括掛起的或正在運行的 future 對象)。

timeout 可以用來控制返回前最大的等待秒數。 timeout 可以為 int 或 float 類型。 如果 timeout 未指定或為 None ,則不限制等待時間。

return_when 指定此函數應在何時返回。它必須為以下常數之一:

常數

描述

FIRST_COMPLETED

函數將在任意可等待對象結束或取消時返回。

FIRST_EXCEPTION

函數將在任意可等待對象因引發異常而結束時返回。當沒有引發任何異常時它就相當于 ALL_COMPLETED

ALL_COMPLETED

函數將在所有可等待對象結束或取消時返回。

concurrent.futures.as_completed(fs, timeout=None)?

返回一個包含 fs 所指定的 Future 實例(可能由不同的 Executor 實例創建)的迭代器,這些實例會在完成時生成 future 對象(包括正常結束或被取消的 future 對象)。 任何由 fs 所指定的重復 future 對象將只被返回一次。 任何在 as_completed() 被調用之前完成的 future 對象將優先被生成。 如果 __next__() 被調用并且在對 as_completed() 的原始調用 timeout 秒之后結果仍不可用,則返回的迭代器將引發 concurrent.futures.TimeoutErrortimeout 可以為整數或浮點數。 如果 timeout 未指定或為 None,則不限制等待時間。

參見

PEP 3148 -- future 對象 - 異步執行指令。

該提案描述了Python標準庫中包含的這個特性。

Exception類?

exception concurrent.futures.CancelledError?

future 對象被取消時會觸發。

exception concurrent.futures.TimeoutError?

future 對象執行超出給定的超時數值時引發。

exception concurrent.futures.BrokenExecutor?

當執行器被某些原因中斷而且不能用來提交或執行新任務時就會被引發派生于 RuntimeError 的異常類。

3.7 新版功能.

exception concurrent.futures.thread.BrokenThreadPool?

ThreadPoolExecutor 中的其中一個工作者初始化失敗時會引發派生于 BrokenExecutor 的異常類。

3.7 新版功能.

exception concurrent.futures.process.BrokenProcessPool?

ThreadPoolExecutor 中的其中一個工作者不完整終止時(比如,被外部殺死)會引發派生于 BrokenExecutor ( 原名 RuntimeError ) 的異常類。

3.3 新版功能.