ものづくりのブログ

うちのネコを題材にしたものづくりができたらいいなと思っていろいろ奮闘してます。

【Python】concurrent.futures を使った非同期処理

concurrent.futures を使って非同期処理(並列化)を実装してみたので、その時のメモをここに残します。

使い方

Executor

並列化は、以下の2つのクラスで実装することができます。

  • マルチプロセス(ProcessPoolExecutor)
  • マルチスレッド(ThreadPoolExecutor)

max_workers

max_workers で同時に実行可能なタスク数を指定できます。

マルチプロセス(ProcessPoolExecutor)
executor = concurrent.futures.ProcessPoolExecutor(max_workers=2)
マルチスレッド(ThreadPoolExecutor)
executor = concurrent.futures.ProcessPoolExecutor(max_workers=2)

プロセスとスレッドについて

違いは以下の通りです。

プロセス

  • 独立のメモリ空間を保有している処理の単位
  • 1つ以上のスレッドから構成

スレッド

  • 1つのプロセスに割り当てられたメモリ内で動作する処理の単位
  • スレッド間ではメモリが共有

実装

サンプルコード(ThreadPoolExecutor)

def parallel_excute_task(store, client, task_name, config, logger):
    import traceback
    import concurrent.futures as confu
    
    results = store.get_results(task_name)
    num = 0
    futures = []
    results = []
    recv_max_workers = config.get("recv_max_workers") or 1
    recv_timeout = config.get("recv_timeout") or 600

    try:
        with confu.ThreadPoolExecutor(max_workers=recv_max_workers) as executor:
            for i in results:
                cid = str(i["cid"])
                future = executor.submit(
                    store.get_resource,
                    client,
                    cid,
                    task_name,
                    config
                )
                futures.append(future)

            for future in confu.as_completed(futures, recv_timeout):
                result = future.result()
                results.append(result)
        return results

    except Exception:
        for future in futures:
            if not future.running():
                future.cancel()
        for process in executor._processes.values():
            process.kill()
        raise