pythonで複数のhttp requestを同時に投げる方法(requests)について

時折、ちょっとしたスピードアップにhttp(https) requestを並列で投げたい場合がある。aiohttpなどを使っても良いが既存のコードを書き換えるのもめんどくさい。そのようなときの方法のメモ。

concurrent.futuresを使う方法も考えられるけれど。今回はasyncioだけに絞った話。

いわゆるpythonでのhttp requestのデファクトスタンダードrequestsだと思う。ところでこのrequestsはnon blockingではないのでasyncio上で使おうとすると処理がblockしてしまう。run_in_executor()を使うと処理を外部の環境に移せる。

試しに、run_in_executor()を使ってみよう。動作確認自体は直接のhttp requestではなく、requestを模したfakeの関数を使って行う。以下の様な関数。この処理はtime.sleep()自体が同期的な処理なのでblockする。

import time


def req(i):
    print(i, "start")
    time.sleep(0.5)
    print(i, "end")
    return i

これを使ってblockしてしまう処理をasyncio上で呼び出すための実験をしてみる。

run_in_executor()を使わない場合

まずは、run_in_executor()を使わない場合を見ていく。例えば3つのtaskを同時にrequestしてみる(asyncio.gatherは受け取ったtaskが全部終了するまで待ち、結果を渡された順序で返してくれる便利関数。これの目的や説明などについては昔書いた)。

import logging
import asyncio


async def run():
    async def run_req(i):
        return req(i)

    tasks = [run_req(i) for i in range(3)]
    return await asyncio.gather(*tasks)


logging.basicConfig(level=logging.DEBUG)
st = time.time()
print(asyncio.get_event_loop().run_until_complete(run()))
print(time.time() - st)

実際、3つのtaskを実行するので1.5秒掛かる(asyncioを使ったコードを実行するときにはデバッグモードを有効にしておくのがオススメ。PYTHONASYNCIODEBUG=1の部分)。

$ PYTHONASYNCIODEBUG=1 python 00req.py
DEBUG:asyncio:Using selector: EpollSelector
0 start
0 end
1 start
1 end
2 start
2 end
[0, 1, 2]
1.5026493072509766

run_in_executor()を使う場合

今度はrun_in_executor()を使った場合を見ていく。run_in_executor()の第一引数にNoneを渡すとデフォルトのexecutorで呼ばれる(設定を変更していない限りconcurrent.futures.ThreadPoolExecutorで実行される)。

import logging
import asyncio


async def run(loop):
    async def run_req(i):
        return await loop.run_in_executor(None, req, i)

    tasks = [run_req(i) for i in range(3)]
    return await asyncio.gather(*tasks)


logging.basicConfig(level=logging.DEBUG)
st = time.time()
loop = asyncio.get_event_loop()
print(loop.run_until_complete(run(loop)))
print(time.time() - st)

並列に実行されるならおおよそ0.5秒で実行が終わるはず。

$ PYTHONASYNCIODEBUG=1 python 01req.py
DEBUG:asyncio:Using selector: EpollSelector
0 start
1 start
2 start
0 end
1 end
2 end
[0, 1, 2]
0.5050966739654541

並列で実行できている。

同時リクエスト数を制限したい場合

なるべく並列で実行したい一方で同時リクエスト数を制限したいことがある。こういうときはasyncio.Semaphoreを使うと良い。

--- 01req.py 2018-09-22 18:08:03.346066271 +0900
+++ 02req.py  2018-09-22 18:16:24.069497610 +0900
@@ -13,10 +13,13 @@
 
 
 async def run(loop):
+    sem = asyncio.Semaphore(5)
+
     async def run_req(i):
-        return await loop.run_in_executor(None, req, i)
+        async with sem:
+            return await loop.run_in_executor(None, req, i)
 
-    tasks = [run_req(i) for i in range(3)]
+    tasks = [run_req(i) for i in range(20)]
     return await asyncio.gather(*tasks)

20個のタスクを同時に5個まで並列で実行しているので2.0秒程度掛かるはず。

$ PYTHONASYNCIODEBUG=1 python 02req.py
DEBUG:asyncio:Using selector: EpollSelector
0 start
1 start
2 start
3 start
4 start
...
19 start
15 end
16 end
17 end
18 end
19 end
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
2.0162127017974854

大丈夫そう。

途中で失敗した場合にすぐに終了して欲しい

これはデフォルトの動作がそのまま。正確にはすぐにではないけれど。途中で中断される。

例えば、途中での失敗を模したコードを追加してみる。

-- 02req.py   2018-09-22 18:16:24.069497610 +0900
+++ 03req.py  2018-09-22 18:22:18.704888305 +0900
@@ -4,6 +4,8 @@
 def req(i):
     print(i, "start")
     time.sleep(0.5)
+    if i == 3:
+        raise Exception("hmm")
     print(i, "end")
     return i

10個のタスクを3つ並列で実行して途中で失敗した場合の結果。もし仮に全てが実行されるなら0~9まで全てのtaskがstartになる。

$ PYTHONASYNCIODEBUG=1 python 03req.py
DEBUG:asyncio:Using selector: EpollSelector
0 start
1 start
2 start
0 end
1 end
2 end
3 start
4 start
5 start
6 start
4 end
5 end
Traceback (most recent call last):
  File "03req.py", line 31, in <module>
    print(loop.run_until_complete(run(loop)))
  File "/usr/lib/python3.7/asyncio/base_events.py", line 568, in run_until_complete
    return future.result()
  File "03req.py", line 25, in run
    return await asyncio.gather(*tasks)
  File "03req.py", line 22, in run_req
    return await loop.run_in_executor(None, req, i)
  File "/usr/lib/python3.7/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "03req.py", line 8, in req
    raise Exception("hmm")
Exception: hmm
6 end

6までで終了しているので途中で中断されている。

全部実行したい場合

途中で止まらず全部実行したい場合にはreturn_exceptionsにTrueを指定すれば良い。

--- 03req.py 2018-09-22 18:26:53.361512907 +0900
+++ 04req.py  2018-09-22 18:27:50.385216947 +0900
@@ -22,7 +22,7 @@
             return await loop.run_in_executor(None, req, i)
 
     tasks = [run_req(i) for i in range(10)]
-    return await asyncio.gather(*tasks)
+    return await asyncio.gather(*tasks, return_exceptions=True)
 
 
 logging.basicConfig(level=logging.DEBUG)

全部実行される。

$ PYTHONASYNCIODEBUG=1 python 04req.py
DEBUG:asyncio:Using selector: EpollSelector
0 start
1 start
2 start
0 end
3 start
1 end
4 start
2 end
5 start
6 start
4 end
7 start
5 end
8 start
6 end
9 start
7 end
8 end
9 end
[0, 1, 2, Exception('hmm'), 4, 5, 6, 7, 8, 9]
2.0221383571624756

発生した例外は戻り値として返される。そのまま中断されず全てのタスクが実行されている。

requestの結果を使ってさらにrequest

requestの結果を使ってさらにrequestしたいことがある。特にcrawlerのようなコードを書いているとき(とはいえ、crawlerのようなものを作るときには何らかのライブラリを使った方が良いかもしれない)。 このときにはasyncio.Queueを使えば良い(行いたい操作によってLifoQueueとかPriorityQueueを使い分けることもある)。

import time

# levelが追加された
def req(i, level):
    print(i, level, "start")
    time.sleep(0.5)
    print(i, level, "end")
    return i, level + 1


import logging
import asyncio


async def run(loop):
    q = asyncio.Queue()
    r = []
    for i in range(5):
        q.put_nowait((i, 0))

    async def consume():
        while not q.empty():
            i, level = await q.get()
            x, next_level = await loop.run_in_executor(None, req, i, level)
            r.append((x, next_level))

            # 3階層くらいまで戻り値queueに追加する
            if next_level < 3:
                for j in range(x):
                    q.put_nowait((j, next_level))

    # 並列数はasyncio.waitに渡すtaskの数で調整する
    await asyncio.wait([consume() for _ in range(5)])
    return r

25個のタスクを5個ずつ並列で消費するので2.5秒くらい掛かる。はず。

$ PYTHONASYNCIODEBUG=1 python 05req.py
DEBUG:asyncio:Using selector: EpollSelector
...
[(0, 1), (2, 1), (3, 1), (4, 1), (1, 1), (0, 2), (0, 2), (1, 2), (1, 2), (2, 2), (0, 2), (1, 2), (2, 2), (3, 2), (0, 2), (0, 3), (0, 3), (0, 3), (1, 3), (0, 3), (0, 3), (1, 3), (0, 3), (1, 3), (2, 3)]
3.0186729431152344

ここで実行時の順序を揃えたい場合にはtimestamp的なものを一緒に追加してそれを元にsortする。

--- 05req.py 2018-09-22 19:02:14.581036491 +0900
+++ 06req.py  2018-09-22 19:05:32.868959088 +0900
@@ -16,19 +16,19 @@
     q = asyncio.Queue()
     r = []
     for i in range(5):
-        q.put_nowait((i, 0))
+        q.put_nowait((i, 0, time.time()))
 
     async def consume():
         while not q.empty():
-            i, level = await q.get()
+            i, level, timestamp = await q.get()
             x, next_level = await loop.run_in_executor(None, req, i, level)
-            r.append((x, next_level))
+            r.append((timestamp, x, next_level))
             if next_level < 3:
                 for j in range(x):
-                    q.put_nowait((j, next_level))
+                    q.put_nowait((j, next_level, time.time()))
 
     await asyncio.wait([consume() for _ in range(5)])
-    return r
+    return [x[1:] for x in sorted(r)]

asyncio.Queueの使いかたのもう一つの流派

asyncio.Queueを単なる値の入れ物として使うのではなく、taskを管理するものとして使う流派もある。この場合には、task_done()join()が重要になってくる。ちなみにドキュメントに書かれているのはこちらの流派

考えてみると当たり前で、queueの中にある・無しの他に現在実行中のような状態がある。taskを状態で表すなら、予約中・実行中・完了という3つの状態があり、この実行中の状態を空扱いにして終了されては困る。

それぞれがどういうものかは中のコードを覗いてみるとわかりやすい。_unfinished_tasksというインスタンス変数に現在実行中のタスクの数がカウントされている(ちなみに_unfinished_tasksput()が呼ばれたタイミングでカウントアップされる)。

class Queue:
# ...
    def task_done(self):
        if self._unfinished_tasks <= 0:
            raise ValueError('task_done() called too many times')
        # タスクが完了したらカウントダウン
        self._unfinished_tasks -= 1
        if self._unfinished_tasks == 0:
            self._finished.set()

    async def join(self):
        # 実行中のタスクが存在していたら完了するまでwait
        if self._unfinished_tasks > 0:
            await self._finished.wait()

この流派の書き方で先程のコードを書き換えてみる。違いは以下

  • 無限ループ
  • Queueから取り出した値を使い終わったらtask_done()を呼び出している
  • Queueに対する実行がなくなったら(join()の後)、動かしていたworker task(consume())を明示的に止める
--- 05req.py 2018-09-22 21:18:27.130508295 +0900
+++ 05req.tmp.py  2018-09-22 21:24:55.783094248 +0900
@@ -19,15 +19,19 @@
         q.put_nowait((i, 0))
 
     async def consume():
-        while not q.empty():
+        while True:
             i, level = await q.get()
             x, next_level = await loop.run_in_executor(None, req, i, level)
             r.append((x, next_level))
             if next_level < 3:
                 for j in range(x):
                     q.put_nowait((j, next_level))
+            q.task_done()
 
-    await asyncio.wait([consume() for _ in range(5)])
+    tasks = [loop.create_task(consume()) for _ in range(5)]
+    await q.join()
+    for task in tasks:
+        task.cancel()
     return r

taskをloop経由での実行として登録をしたい(非同期に実行したい)場合にはcreate_task()に渡してあげると良い。以前asyncioでコードを書いていた時に、これには気づかず、「awaitすると待ってしまう。ただ呼ぶだけでは実行されない(coroutineが生成されるだけ)。call_soon()などは同期的な処理」という感じで困っていたことがあった。

非同期操作と同期処理を混ぜて実行する

一部だけ非同期に対応している場合にrun_in_executor()では呼びたくない。asyncio.iscoroutinefunction()で一応調べられる。あんまりオススメはしないけれど。以下の様な関数を定義しておくと、同期・非同期を気にせずtask化できる。

async def run_task_contextually(loop, fn, *args):
    if asyncio.iscoroutinefunction(fn):
        return await fn(*args)
    else:
        return await loop.run_in_executor(None, fn, *args)

同期・非同期を混ぜたコード。

async def req_async(i):
    print(i, "start")
    await asyncio.sleep(0.5)
    print(i, "end")
    return i


async def run(loop):
    sem = asyncio.Semaphore(5)

    async def run_req(fn, i):
        async with sem:
            return await run_task_contextually(loop, fn, i)

    tasks = []
    tasks.extend([run_req(req, i) for i in range(3)])
    tasks.extend([run_req(req_async, i) for i in range(3, 6)])
    tasks.extend([run_req(req, i) for i in range(6, 9)])
    return await asyncio.gather(*tasks)

実行してみる。

$ PYTHONASYNCIODEBUG=1 python 10*.py
DEBUG:asyncio:Using selector: EpollSelector
0 start
1 start
2 start
3 start
4 start
0 end
DEBUG:asyncio:poll 499.446 ms took 500.131 ms: 1 events
2 end
1 end
3 end
4 end
5 start
6 start
7 start
8 start
5 end
6 end
DEBUG:asyncio:poll took 0.152 ms: 1 events
7 end
8 end
DEBUG:asyncio:poll took 0.125 ms: 1 events
[0, 1, 2, 3, 4, 5, 6, 7, 8]
1.0186710357666016