concurrent.futures を使って非同期処理(並列化)を実装してみたので、その時のメモをここに残します。
使い方
Executor
並列化は、以下の2つのクラスで実装することができます。
- マルチプロセス(ProcessPoolExecutor)
- マルチスレッド(ThreadPoolExecutor)
max_workers
max_workers で同時に実行可能なタスク数を指定できます。
マルチプロセス(ProcessPoolExecutor)
executor = concurrent.futures.ProcessPoolExecutor(max_workers=2)
マルチスレッド(ThreadPoolExecutor)
executor = concurrent.futures.ProcessPoolExecutor(max_workers=2)
プロセスとスレッドについて
違いは以下の通りです。
プロセス
- 独立のメモリ空間を保有している処理の単位
- 1つ以上のスレッドから構成
スレッド
- 1つのプロセスに割り当てられたメモリ内で動作する処理の単位
- スレッド間ではメモリが共有
実装
サンプルコード(ThreadPoolExecutor)
def parallel_excute_task(store, client, task_name, config, logger): import traceback import concurrent.futures as confu results = store.get_results(task_name) num = 0 futures = [] results = [] recv_max_workers = config.get("recv_max_workers") or 1 recv_timeout = config.get("recv_timeout") or 600 try: with confu.ThreadPoolExecutor(max_workers=recv_max_workers) as executor: for i in results: cid = str(i["cid"]) future = executor.submit( store.get_resource, client, cid, task_name, config ) futures.append(future) for future in confu.as_completed(futures, recv_timeout): result = future.result() results.append(result) return results except Exception: for future in futures: if not future.running(): future.cancel() for process in executor._processes.values(): process.kill() raise