読者です 読者をやめる 読者になる 読者になる

asyncioのloopをどうするかということについて考えてみた

はじめに

pythonには非同期用の標準ライブラリとして asyncio というモジュールが用意されている。このasyncioを使う際にevent loopの取り扱いに悩むことがあるかもしれない。どうすれば良いのかを少しだけまじめに考えてみた。

ライブラリ or アプリケーション

まず、書こうとしているコードがアプリケーションなのかライブラリなのか判断する必要があるかもしれない。実際に書こうとしているコードが依存関係の終端に位置するつまり現在書かれているコードを使う人が誰もいないというのであればそれはアプリケーションであり、そうでなければライブラリということにして判断して欲しい。

アプリケーションであれば気兼ねなく asyncio.get_event_loop() を使っても良い。一方ライブラリならもう少し気を付けようという方針になるのではないかと思った。

具体的に何が変わるかというと、コードがアプリケーション用のものであるならば以下の様な書き方で良い。つまり取り出したevent loopを明示的に取り扱わなくて良い。

async def do_task():
    yield do_subtask_1()
    yield asyncio.sleep(1)
    yield do_subtask_2()

loop = asyncio.get_event_loop()
loop.run_until_complete(do_task())

一方で、書こうとしているコードがライブラリであれば、取り出したevent loopを明示的に取り扱ったほうが良い。

async def do_task(loop=None):
    if loop is None:
        loop = get_event_loop()
    yield do_subtask_1(loop=loop)
    yield asyncio.sleep(1, loop=loop)
    yield do_subtask_2(loop=loop)
    
loop = asyncio.get_event_loop()
loop.run_until_complete(do_task(loop=loop))

なぜかといえば、asyncio.get_event_loop() で行われているloopはthread localなオブジェクトに保存されている。もちろん、ただのsingletonであるよりは良いけれど、暗黙の状態を保持してそれが使われるということは避けたい。なので明示的にloopを渡して使えるようにするというのが良いのではないかと思う。

loop=None を追加し、loopがなかったら get_event_loop() を呼ぶというイディオムを結構よく目にする。ところでこのままであっても引数としてloopを忘れてしまった際に、問題なく 別の新しく生成されたloop に処理が登録されてしまうという可能性がある。特に、親の非同期関数では明示的にloopを渡しているものの、内部の子となる非同期関数の呼び出しでは渡されたloopオブジェクトを利用していないというようなコードが書かれてしまった時にこのような事態は起きる。(後述するテストで担保するという方法はある)

以下の様なものをイメージしている。

async def do_task(loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()
    await do_subtask()  # loop=loop を忘れている
    await do_subtask2(loop=loop)

テストの書き方(ライブラリ側)

話はテストに移る。基本的には適切にloopの値を初期化しましょうということに尽きる。以下のようなコードがあれば良い。

class _Tests(unittest.TestCase):
    def setUp(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(None)

    def tearDown(self):
        self.loop.stop()
        self.loop.close()        

まず、 get_event_loop() ではなく new_event_loop() でevent loopを取り出している。この関数は内部に抱えているloopを返すのではなく、新たなevent loopを都度生成する。

次の記述はちょっと特殊かもしれない。これは実際に動くコードの安全弁みたいなものになっている。例えば上の例ででてきた暗黙のloopを利用してしまった記述をしてしまった際にエラーにするために必要になっている。

set_event_loop(None) の意味

もう少し詳細に説明する。 set_event_loop() は 冒頭に登場してきた get_event_loop() と対になる関数で、asyncioが内部に抱える event loopを設定する関数になっている。このevent loopにNoneを入れておくことで、内部で暗黙に event loop を取得しようとした際に以下のような RuntimeError を発生させてくれるようになる。

RuntimeError: There is no current event loop in thread 'MainThread'.

実際のテストコード

実際のテストコードは以下の様な形になっていれば良いのではないかと思う。

class Tests(_Tests):
    def test_it(self):
        from yours import do_task
        self.loop.run_until_complete(do_task(loop=self.loop))

もう少し考えるコト

実際にまじめにコードを書くならもう少し考えるコトがあるかもしれない。例えば以下のようなもの。

  • そもそも asyncio.sleep をそのまま実行したくない。待ちたくない
  • 既存の処理の計算負荷に依存したたまたま動くコードになっているのかもしれない
  • unittestではmockあるいはfakeを使いたい

特に順序や特定のケースでたまたま動くというケースを避けるために、独自のテスト用のloopオブジェクトを作るというのはありなのではないかと思ったりした。

appendix asyncio.test_utils

たまたま asyncioのコードを覗いていたら asyncio.test_utils というモジュールをみつけた。これがそのまま使えることもあるかもしれない。このモジュールは、実際のところ、python自体のasyncioのテストに使われていてる。 https://github.com/python/cpython/tree/master/Lib/test/test_asyncio

追記

ところで loopを明示的に扱うというのであれば、以下の様な形にしても良いのでは無いかと思ったがそのように書いているものは少なそう?

class MyAsyncTask:
    def __init__(self, loop):
        self.loop = loop

    async def do_task(self):
        return await do_subtask(loop=self.loop)