pythonのasyncioでrun_in_executor()を使ってもブロックしてるように見えて上手く処理を逃がせないと感じたとき。

asyncioを使っていて、awaitableな関数群で構成されていないコードセットを無理やり非同期化してお茶を濁す時にrun_in_executor()が使われる。

ただしこのrun_in_executor()を使っても上手く処理を別スレッドに逃がせずという事態に陥ることもある。その理由などについてのメモ。

(実験用のコードのgist。分かっている人はこれを見るだけで良いかもしれない)

(正確には言えば、しっかりと別スレッドで実行されてはいるのだけれど、特定のコードブロックの実行がCPUを占有して、非同期的に動いている他の処理の待ち受けにすら反応を返さない様に見える状態のこと)

run_in_executor()を使ってもブロックしてしまう

とりあえず以下の様なコードを見て欲しい。1秒掛かりそうなタスクを2個、2秒掛かりそうなタスクを1個、並行で動かそうとしているが実際の実行時間は4秒になっている。

import logging
import asyncio
import time
from functools import partial
from mysleep import mysleep

logger = logging.getLogger(__name__)


async def run():
    loop = asyncio.get_event_loop()
    st = time.time()
    logger.info("**start**")
    result = await asyncio.gather(
        *[
            loop.run_in_executor(None, partial(mysleep, "x", 1)),
            loop.run_in_executor(None, partial(mysleep, "y", 1)),
            loop.run_in_executor(None, partial(mysleep, "z", 2)),
        ]
    )
    logger.info("**end** %r %r", result, time.time() - st)


logging.basicConfig(level=logging.INFO, format="%(asctime)s" + logging.BASIC_FORMAT)
asyncio.run(run(), debug=True)

x,y,zのmaxで2秒で終わりそうな気がするのだけれど。4(1+1+2)秒掛かる。

実行結果

$ 2019-03-20 20:54:59,837INFO:__main__:**start**
('    ... start mysleep', 'x', 1)
('    ... end mysleep', 'x')
('    ... start mysleep', 'y', 1)
('    ... end mysleep', 'y')
('    ... start mysleep', 'z', 2)
('    ... end mysleep', 'z')
2019-03-20 20:55:03,846WARNING:asyncio:Executing <Task pending coro=<run() running at 00use_thread_pool_executor.py:18> wait_for=<_GatheringFuture pending cb=[<TaskWakeupMethWrapper object at 0x7f4d1a05a828>()] created at /usr/lib/python3.7/asyncio/tasks.py:615> cb=[_run_until_complete_cb() at /usr/lib/python3.7/asyncio/base_events.py:158] created at /usr/lib/python3.7/asyncio/base_events.py:563> took 4.009 seconds
2019-03-20 20:55:03,850INFO:__main__:**end** ['x', 'y', 'z'] 4.012997150421143

run_in_executor()は同期的な処理を非同期化するので上手く動いてくれそうなものだけれど(と思う人が多い)。

そもそもrun_in_executor()とは?

実際の所、run_in_executor()は、concurrent.futuresのExecutorに処理を任せているだけ。(defaultではThreadPoolExecutorが使われるので)別スレッドで動くということにはなるものの結局pythonのthreadを立ち上げているというだけなので、pythonの実行コンテキスト上ロックが掛かってしまっては処理がブロックしてしまう。

asyncio/base_events.py

# concurrent.futuresのExecutorに渡しているだけ

class class BaseEventLoop(events.AbstractEventLoop):
# ...
    def run_in_executor(self, executor, func, *args):
        self._check_closed()
        if self._debug:
            self._check_callback(func, 'run_in_executor')
        if executor is None:
            executor = self._default_executor
            if executor is None:
                executor = concurrent.futures.ThreadPoolExecutor()
                self._default_executor = executor
        return futures.wrap_future(
            executor.submit(func, *args), loop=self)

本当にconcurrent.futuresのExecutorを使っているだけ。

run_in_executor() が効かない理由

run_in_executor()が効かないと感じた時にその原因の可能性は複数ある。

  • IOバウンドな処理とCPUバウンドな処理のうちCPUバウンドな処理に該当する場合
  • 内部の処理が雑でGILが掛かってる系のコードになっている(Py_BEGIN_ALLOW_THREADSで囲まれていない部分(Cの話))

ぱっと見た感じ前者と後者は全く別のものなように感じてしまうかもしれないけれど。cpythonがこの両者を混同して解釈してしまうことがある(前者と後者を同じものだと解釈した人は処理系の気持ちが分かっている人かもしれない)。

加えて本番用のコードを書く前の実験で書いた仮のfakeのコードがtime.sleep()などを使っていた場合には、fakeの方のコードでは上手く動いているのに。。と感じてしまったりする。こういうときなどにpythonへの負の感情を募らせてしまうということがあるかもしれない。

結局、非同期処理と言っても、1つのCPUで動いているだけなので、息継ぎ的な部分がないコードがずっとCPUを掴んだらasyncioのloop自体も止まる。

(例えばlifetime的なものを定期的に更新するheartbeatを兼ねたループを実行したい場合などに全体の実行が止まっては困る。こういうときのためにどうして想定とは異なる挙動をするのかと言うことが把握できていると後々便利)

つまり非同期に優しいと言うのは2段階の意味があり、例えば、awaitable ready(asyncioのツールセットで構成されたコード)とrun_in_executor friendly(GILを気にして書かれたコード)と呼び分けることはできるかもしれない。この後者ですら無い場合にはrun_in_executor()で処理を逃したつもりになっても他のtaskの実行も止めてしまうかのように動くことがある。

最後の砦としてのProcessPoolExecutor

run_in_executor()では実行するExecutorを指定することができる。そして、concurent.futuresのExecutorにはThreadPoolExecutor(defaultで使われる)の他にもう1つProcessPoolExecutorが存在する。

こちらはprocess自体を分けて実行するので処理を別の実行コンテキストに逃がせるかもしれない(正確に言うとforkしたprocessPool内のprocessとpickleで通信する。詳しいアーキテクチャは直接コードを見るとコメントにがあるので知りたい人は覗いてみると良い)。

これを使ってあげるとprocessが別なのでGILの影響から逃れられる(逃れてはいないけれど、他には影響を与えないという意味で)。

--- 00use_thread_pool_executor.py    2019-03-20 20:53:38.629294940 +0900
+++ 01use_process_pool_executor.py    2019-03-20 20:53:38.615961610 +0900
@@ -1,6 +1,7 @@
 import logging
 import asyncio
 import time
+from concurrent.futures import ProcessPoolExecutor
 from functools import partial
 from mysleep import mysleep
 
@@ -11,11 +12,12 @@
     loop = asyncio.get_event_loop()
     st = time.time()
     logger.info("**start**")
+    executor = ProcessPoolExecutor(max_workers=3)
     result = await asyncio.gather(
         *[
-            loop.run_in_executor(None, partial(mysleep, "x", 1)),
-            loop.run_in_executor(None, partial(mysleep, "y", 1)),
-            loop.run_in_executor(None, partial(mysleep, "z", 2)),
+            loop.run_in_executor(executor, partial(mysleep, "x", 1)),
+            loop.run_in_executor(executor, partial(mysleep, "y", 1)),
+            loop.run_in_executor(executor, partial(mysleep, "z", 2)),
         ]
     )
     logger.info("**end** %r %r", result, time.time() - st)

2秒(max(1,1,2))で終わった。

PYTHONPATH=. python 01use_process_pool_executor.py
2019-03-20 20:55:03,959INFO:__main__:**start**
('    ... start mysleep', 'x', 1)
('    ... start mysleep', 'y', 1)
('    ... start mysleep', 'z', 2)
('    ... end mysleep', 'x')
('    ... end mysleep', 'y')
2019-03-20 20:55:04,967INFO:asyncio:poll took 1001.933 ms: 1 events
('    ... end mysleep', 'z')
2019-03-20 20:55:05,969INFO:__main__:**end** ['x', 'y', 'z'] 2.0095720291137695

なので最後の手段としてprocessを分けるという方法は使えるかもしれない。

このようなときにはなるべくmainのloopには実際のタスクの実行をさせないようにしてあげることが大切(逆に言うと、他が非同期でも一部の心ない同期的な処理で全体を壊すということは普通にありえる)。

詳細:実際にmysleepを作る

(ここからは詳細の話なので興味がない人はProcessPoolExecutorで解決することもあるよで納得しちゃっても良いです)

ここまでの説明だけでは、「ある特殊な状態のときにrun_in_executor()に処理を逃しても全体としてブロックしてしまったかのように他の処理の実行が遅れてしまう」ということが分かっただけの状態。そもそもどういう時にこのような事態に陥るかかと言うことがわからないまま。

なので実際に再現するようなコードを作ってみる。

mysleep

time.sleep()run_in_executor()フレンドリーと考えると、そうではないsleep関数を作るのが一番手軽。cでのsleepを直接呼ぶようなモジュールを作ってしまうのが手軽そう。

めんどくさいのでcythonを使う。man 3 sleepなどを実行してみるとunistd.hにposixのsleepは入っているようなのでこれを使う(実際インポート用のglueコードが用意されているのでcimportで直接取り出せる)。

以下のようなmysleep.pyxを作る。

mysleep.pyx

from posix.unistd cimport sleep

def mysleep(x, unsigned int n):
    print("    ... start mysleep", x, n)
    sleep(n)
    print("    ... end mysleep", x)
    return x

ビルドは以下のようなsetup.pyを書く

setup.py

from distutils.core import setup
from distutils.extension import Extension
from Cython.Distutils import build_ext

setup(
    cmdclass={"build_ext": build_ext},
    ext_modules=[Extension("mysleep", sources=["mysleep.pyx"])],
)

実際にbuildして使ってみる。

$ python setup.py build_ext --inplace
$ time PYTHONPATH=. python -c 'import mysleep; print(mysleep.mysleep("OK", 1))'
('    ... start mysleep', 'OK', 1)
('    ... end mysleep', 'OK')
OK

real    0m1.068s
user    0m0.051s
sys 0m0.017s

完成。このmysleepモジュールを使ったコードが冒頭のコード。

実行結果を再掲してみると、2秒を期待している所で4秒掛かっている(WARNINGでログが出ているのである程度は気づけるかもしれない?)。

$ 2019-03-20 20:54:59,837INFO:__main__:**start**
('    ... start mysleep', 'x', 1)
('    ... end mysleep', 'x')
('    ... start mysleep', 'y', 1)
('    ... end mysleep', 'y')
('    ... start mysleep', 'z', 2)
('    ... end mysleep', 'z')
2019-03-20 20:55:03,846WARNING:asyncio:Executing <Task pending coro=<run() running at 00use_thread_pool_executor.py:18> wait_for=<_GatheringFuture pending cb=[<TaskWakeupMethWrapper object at 0x7f4d1a05a828>()] created at /usr/lib/python3.7/asyncio/tasks.py:615> cb=[_run_until_complete_cb() at /usr/lib/python3.7/asyncio/base_events.py:158] created at /usr/lib/python3.7/asyncio/base_events.py:563> took 4.009 seconds
2019-03-20 20:55:03,850INFO:__main__:**end** ['x', 'y', 'z'] 4.012997150421143

GILを一時的に開放する

本来Cで拡張モジュールを使う場合にはマクロでゴニョゴニョしないとダメなのだけれど、cythonは便利なのでnogilのcontext managerで包んであげればその間のコードはGILの制限から解放される。

diff --git a/daily/20190320/example_run_in_executor/mysleep.pyx b/daily/20190320/example_run_in_executor/mysleep.pyx
index c27e4da..bbf6764 100644
--- a/daily/20190320/example_run_in_executor/mysleep.pyx
+++ b/daily/20190320/example_run_in_executor/mysleep.pyx
@@ -2,6 +2,7 @@ from posix.unistd cimport sleep
 
 def mysleep(x, unsigned int n):
     print("    ... start mysleep", x, n)
-    sleep(n)
+    with nogil:
+        sleep(n)
     print("    ... end mysleep", x)
     return x

今度はThreadPoolExecutorでも上手く動く。

$ $ PYTHONPATH=. python 00use_thread_pool_executor.py
2019-03-21 15:12:40,999INFO:__main__:**start**
('    ... start mysleep', 'x', 1)
('    ... start mysleep', 'y', 1)
('    ... start mysleep', 'z', 2)
('    ... end mysleep', 'x')
('    ... end mysleep', 'y')
2019-03-21 15:12:42,004INFO:asyncio:poll took 1000.853 ms: 1 events
('    ... end mysleep', 'z')
2019-03-21 15:12:43,005INFO:__main__:**end** ['x', 'y', 'z'] 2.00624680519104

やりましたね。

gist

(実験用のコードのgist。分かっている人はこれを見るだけで良いかもしれない)

see also

uvloopとasyncioとの違いはみたいな話が気になった人はこの記事を見るのも良いかも

asyncioのもう少し外側の話という意味では、trioやcruioの入門でこのあたりを見るのも良いかも