pythonで複数のhttp requestを同時に投げる方法(requests)について
時折、ちょっとしたスピードアップにhttp(https) requestを並列で投げたい場合がある。aiohttpなどを使っても良いが既存のコードを書き換えるのもめんどくさい。そのようなときの方法のメモ。
concurrent.futuresを使う方法も考えられるけれど。今回はasyncioだけに絞った話。
いわゆるpythonでのhttp requestのデファクトスタンダードはrequestsだと思う。ところでこのrequestsはnon blockingではないのでasyncio上で使おうとすると処理がblockしてしまう。run_in_executor()
を使うと処理を外部の環境に移せる。
試しに、run_in_executor()
を使ってみよう。動作確認自体は直接のhttp requestではなく、requestを模したfakeの関数を使って行う。以下の様な関数。この処理はtime.sleep()
自体が同期的な処理なのでblockする。
import time def req(i): print(i, "start") time.sleep(0.5) print(i, "end") return i
これを使ってblockしてしまう処理をasyncio上で呼び出すための実験をしてみる。
run_in_executor()
を使わない場合
まずは、run_in_executor()
を使わない場合を見ていく。例えば3つのtaskを同時にrequestしてみる(asyncio.gatherは受け取ったtaskが全部終了するまで待ち、結果を渡された順序で返してくれる便利関数。これの目的や説明などについては昔書いた)。
import logging import asyncio async def run(): async def run_req(i): return req(i) tasks = [run_req(i) for i in range(3)] return await asyncio.gather(*tasks) logging.basicConfig(level=logging.DEBUG) st = time.time() print(asyncio.get_event_loop().run_until_complete(run())) print(time.time() - st)
実際、3つのtaskを実行するので1.5秒掛かる(asyncioを使ったコードを実行するときにはデバッグモードを有効にしておくのがオススメ。PYTHONASYNCIODEBUG=1の部分)。
$ PYTHONASYNCIODEBUG=1 python 00req.py DEBUG:asyncio:Using selector: EpollSelector 0 start 0 end 1 start 1 end 2 start 2 end [0, 1, 2] 1.5026493072509766
run_in_executor()
を使う場合
今度はrun_in_executor()
を使った場合を見ていく。run_in_executor()
の第一引数にNoneを渡すとデフォルトのexecutorで呼ばれる(設定を変更していない限りconcurrent.futures.ThreadPoolExecutorで実行される)。
import logging import asyncio async def run(loop): async def run_req(i): return await loop.run_in_executor(None, req, i) tasks = [run_req(i) for i in range(3)] return await asyncio.gather(*tasks) logging.basicConfig(level=logging.DEBUG) st = time.time() loop = asyncio.get_event_loop() print(loop.run_until_complete(run(loop))) print(time.time() - st)
並列に実行されるならおおよそ0.5秒で実行が終わるはず。
$ PYTHONASYNCIODEBUG=1 python 01req.py DEBUG:asyncio:Using selector: EpollSelector 0 start 1 start 2 start 0 end 1 end 2 end [0, 1, 2] 0.5050966739654541
並列で実行できている。
同時リクエスト数を制限したい場合
なるべく並列で実行したい一方で同時リクエスト数を制限したいことがある。こういうときはasyncio.Semaphoreを使うと良い。
--- 01req.py 2018-09-22 18:08:03.346066271 +0900 +++ 02req.py 2018-09-22 18:16:24.069497610 +0900 @@ -13,10 +13,13 @@ async def run(loop): + sem = asyncio.Semaphore(5) + async def run_req(i): - return await loop.run_in_executor(None, req, i) + async with sem: + return await loop.run_in_executor(None, req, i) - tasks = [run_req(i) for i in range(3)] + tasks = [run_req(i) for i in range(20)] return await asyncio.gather(*tasks)
20個のタスクを同時に5個まで並列で実行しているので2.0秒程度掛かるはず。
$ PYTHONASYNCIODEBUG=1 python 02req.py DEBUG:asyncio:Using selector: EpollSelector 0 start 1 start 2 start 3 start 4 start ... 19 start 15 end 16 end 17 end 18 end 19 end [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19] 2.0162127017974854
大丈夫そう。
途中で失敗した場合にすぐに終了して欲しい
これはデフォルトの動作がそのまま。正確にはすぐにではないけれど。途中で中断される。
例えば、途中での失敗を模したコードを追加してみる。
-- 02req.py 2018-09-22 18:16:24.069497610 +0900 +++ 03req.py 2018-09-22 18:22:18.704888305 +0900 @@ -4,6 +4,8 @@ def req(i): print(i, "start") time.sleep(0.5) + if i == 3: + raise Exception("hmm") print(i, "end") return i
10個のタスクを3つ並列で実行して途中で失敗した場合の結果。もし仮に全てが実行されるなら0~9まで全てのtaskがstartになる。
$ PYTHONASYNCIODEBUG=1 python 03req.py DEBUG:asyncio:Using selector: EpollSelector 0 start 1 start 2 start 0 end 1 end 2 end 3 start 4 start 5 start 6 start 4 end 5 end Traceback (most recent call last): File "03req.py", line 31, in <module> print(loop.run_until_complete(run(loop))) File "/usr/lib/python3.7/asyncio/base_events.py", line 568, in run_until_complete return future.result() File "03req.py", line 25, in run return await asyncio.gather(*tasks) File "03req.py", line 22, in run_req return await loop.run_in_executor(None, req, i) File "/usr/lib/python3.7/concurrent/futures/thread.py", line 57, in run result = self.fn(*self.args, **self.kwargs) File "03req.py", line 8, in req raise Exception("hmm") Exception: hmm 6 end
6までで終了しているので途中で中断されている。
全部実行したい場合
途中で止まらず全部実行したい場合にはreturn_exceptionsにTrueを指定すれば良い。
--- 03req.py 2018-09-22 18:26:53.361512907 +0900 +++ 04req.py 2018-09-22 18:27:50.385216947 +0900 @@ -22,7 +22,7 @@ return await loop.run_in_executor(None, req, i) tasks = [run_req(i) for i in range(10)] - return await asyncio.gather(*tasks) + return await asyncio.gather(*tasks, return_exceptions=True) logging.basicConfig(level=logging.DEBUG)
全部実行される。
$ PYTHONASYNCIODEBUG=1 python 04req.py DEBUG:asyncio:Using selector: EpollSelector 0 start 1 start 2 start 0 end 3 start 1 end 4 start 2 end 5 start 6 start 4 end 7 start 5 end 8 start 6 end 9 start 7 end 8 end 9 end [0, 1, 2, Exception('hmm'), 4, 5, 6, 7, 8, 9] 2.0221383571624756
発生した例外は戻り値として返される。そのまま中断されず全てのタスクが実行されている。
requestの結果を使ってさらにrequest
requestの結果を使ってさらにrequestしたいことがある。特にcrawlerのようなコードを書いているとき(とはいえ、crawlerのようなものを作るときには何らかのライブラリを使った方が良いかもしれない)。 このときにはasyncio.Queueを使えば良い(行いたい操作によってLifoQueueとかPriorityQueueを使い分けることもある)。
import time # levelが追加された def req(i, level): print(i, level, "start") time.sleep(0.5) print(i, level, "end") return i, level + 1 import logging import asyncio async def run(loop): q = asyncio.Queue() r = [] for i in range(5): q.put_nowait((i, 0)) async def consume(): while not q.empty(): i, level = await q.get() x, next_level = await loop.run_in_executor(None, req, i, level) r.append((x, next_level)) # 3階層くらいまで戻り値queueに追加する if next_level < 3: for j in range(x): q.put_nowait((j, next_level)) # 並列数はasyncio.waitに渡すtaskの数で調整する await asyncio.wait([consume() for _ in range(5)]) return r
25個のタスクを5個ずつ並列で消費するので2.5秒くらい掛かる。はず。
$ PYTHONASYNCIODEBUG=1 python 05req.py DEBUG:asyncio:Using selector: EpollSelector ... [(0, 1), (2, 1), (3, 1), (4, 1), (1, 1), (0, 2), (0, 2), (1, 2), (1, 2), (2, 2), (0, 2), (1, 2), (2, 2), (3, 2), (0, 2), (0, 3), (0, 3), (0, 3), (1, 3), (0, 3), (0, 3), (1, 3), (0, 3), (1, 3), (2, 3)] 3.0186729431152344
ここで実行時の順序を揃えたい場合にはtimestamp的なものを一緒に追加してそれを元にsortする。
--- 05req.py 2018-09-22 19:02:14.581036491 +0900 +++ 06req.py 2018-09-22 19:05:32.868959088 +0900 @@ -16,19 +16,19 @@ q = asyncio.Queue() r = [] for i in range(5): - q.put_nowait((i, 0)) + q.put_nowait((i, 0, time.time())) async def consume(): while not q.empty(): - i, level = await q.get() + i, level, timestamp = await q.get() x, next_level = await loop.run_in_executor(None, req, i, level) - r.append((x, next_level)) + r.append((timestamp, x, next_level)) if next_level < 3: for j in range(x): - q.put_nowait((j, next_level)) + q.put_nowait((j, next_level, time.time())) await asyncio.wait([consume() for _ in range(5)]) - return r + return [x[1:] for x in sorted(r)]
asyncio.Queueの使いかたのもう一つの流派
asyncio.Queueを単なる値の入れ物として使うのではなく、taskを管理するものとして使う流派もある。この場合には、task_done()
とjoin()
が重要になってくる。ちなみにドキュメントに書かれているのはこちらの流派。
考えてみると当たり前で、queueの中にある・無しの他に現在実行中のような状態がある。taskを状態で表すなら、予約中・実行中・完了という3つの状態があり、この実行中の状態を空扱いにして終了されては困る。
それぞれがどういうものかは中のコードを覗いてみるとわかりやすい。_unfinished_tasks
というインスタンス変数に現在実行中のタスクの数がカウントされている(ちなみに_unfinished_tasks
はput()
が呼ばれたタイミングでカウントアップされる)。
class Queue: # ... def task_done(self): if self._unfinished_tasks <= 0: raise ValueError('task_done() called too many times') # タスクが完了したらカウントダウン self._unfinished_tasks -= 1 if self._unfinished_tasks == 0: self._finished.set() async def join(self): # 実行中のタスクが存在していたら完了するまでwait if self._unfinished_tasks > 0: await self._finished.wait()
この流派の書き方で先程のコードを書き換えてみる。違いは以下
- 無限ループ
- Queueから取り出した値を使い終わったら
task_done()
を呼び出している - Queueに対する実行がなくなったら(
join()
の後)、動かしていたworker task(consume()
)を明示的に止める
--- 05req.py 2018-09-22 21:18:27.130508295 +0900 +++ 05req.tmp.py 2018-09-22 21:24:55.783094248 +0900 @@ -19,15 +19,19 @@ q.put_nowait((i, 0)) async def consume(): - while not q.empty(): + while True: i, level = await q.get() x, next_level = await loop.run_in_executor(None, req, i, level) r.append((x, next_level)) if next_level < 3: for j in range(x): q.put_nowait((j, next_level)) + q.task_done() - await asyncio.wait([consume() for _ in range(5)]) + tasks = [loop.create_task(consume()) for _ in range(5)] + await q.join() + for task in tasks: + task.cancel() return r
taskをloop経由での実行として登録をしたい(非同期に実行したい)場合にはcreate_task()
に渡してあげると良い。以前asyncioでコードを書いていた時に、これには気づかず、「awaitすると待ってしまう。ただ呼ぶだけでは実行されない(coroutineが生成されるだけ)。call_soon()
などは同期的な処理」という感じで困っていたことがあった。
非同期操作と同期処理を混ぜて実行する
一部だけ非同期に対応している場合にrun_in_executor()
では呼びたくない。asyncio.iscoroutinefunction()
で一応調べられる。あんまりオススメはしないけれど。以下の様な関数を定義しておくと、同期・非同期を気にせずtask化できる。
async def run_task_contextually(loop, fn, *args): if asyncio.iscoroutinefunction(fn): return await fn(*args) else: return await loop.run_in_executor(None, fn, *args)
同期・非同期を混ぜたコード。
async def req_async(i): print(i, "start") await asyncio.sleep(0.5) print(i, "end") return i async def run(loop): sem = asyncio.Semaphore(5) async def run_req(fn, i): async with sem: return await run_task_contextually(loop, fn, i) tasks = [] tasks.extend([run_req(req, i) for i in range(3)]) tasks.extend([run_req(req_async, i) for i in range(3, 6)]) tasks.extend([run_req(req, i) for i in range(6, 9)]) return await asyncio.gather(*tasks)
実行してみる。
$ PYTHONASYNCIODEBUG=1 python 10*.py DEBUG:asyncio:Using selector: EpollSelector 0 start 1 start 2 start 3 start 4 start 0 end DEBUG:asyncio:poll 499.446 ms took 500.131 ms: 1 events 2 end 1 end 3 end 4 end 5 start 6 start 7 start 8 start 5 end 6 end DEBUG:asyncio:poll took 0.152 ms: 1 events 7 end 8 end DEBUG:asyncio:poll took 0.125 ms: 1 events [0, 1, 2, 3, 4, 5, 6, 7, 8] 1.0186710357666016