pythonのslackclientで非同期タスクを実行したらValueErrorになった話

最近「ちょっとslack botでも書くか」と思う機会があった。そのときに色々思うことがあったのでメモ。

slack botを書く機会があった。どのライブラリを使うのが良い?

現状ではslack自身が出しているライブラリをそのまま使うのが良いのではないかと思う。

github.com

昔は以下のようなライブラリにけっこうお世話になることがあったが、いろいろと追いついてなかったりasyncio対応などがされていなかったりするので、もう使わなくて良いと思う。この辺のライブラリを薦める情報があったら、古いので無視すると良い。

ちなみにslackbotはslakerに依存している。そしてslackerはarchiveされている(R.I.P.)。

github.com github.com

ということがあった。というのが発端だった。

slackclientの実行でValueError?

というわけで新しいbotはslackclientの方を利用して作っていきたい。簡単なasyncio用の例のhello worldを動かしてみることにする。READMEのコード例などを整理して、asyncioを使うRTMBotを動かすコードをは以下の様になると思う1

x0hello.py

import os
from slack import RTMClient
from slack.errors import SlackApiError


@RTMClient.run_on(event="message")
async def say_hello(**payload):
    """"Hello" を含んだメッセージに反応して "Hi @<user名>" を返す """
    data = payload["data"]
    web_client = payload["web_client"]

    if "text" in data and "Hello" in data.get("text", []):
        channel_id = data["channel"]
        thread_ts = data["ts"]
        user = data["user"]

        await web_client.chat_postMessage(
            channel=channel_id, text=f"Hi <@{user}>!", thread_ts=thread_ts
        )


if __name__ == "__main__":
    import asyncio

    rtm_client = RTMClient(token=os.environ["SLACK_API_TOKEN"], run_async=True)
    asyncio.run(rtm_client.start(), debug=True)

しかし、このコードは動かない。ValueErrorが発生してしまう。 ValueError: a coroutine was expected, got <Task ..> とは?

$ python x0*.py
Traceback (most recent call last):
  File "x0hello.py", line 33, in <module>
    asyncio.run(rtm_client.start(), debug=True)
  File "/opt/local/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/runners.py", line 37, in run
    raise ValueError("a coroutine was expected, got {!r}".format(main))
ValueError: a coroutine was expected, got <Task pending name='Task-1' coro=<RTMClient._connect_and_read() running at $HOME/vboxshare/venvs/my/lib/python3.8/site-packages/slack/rtm/client.py:330>>

asyncio.run()に代わりloop.run_until_complete()を使う形の場合は動く。差分は以下。

ちなみにかつてasyncio.run()が無い以前のpythonのバージョンでは後者の形にするのが一般的だった。

diff -u x0* x1*
--- x0hello.py    2020-07-11 01:40:39.000000000 +0900
+++ x1hello.py    2020-07-11 01:44:49.000000000 +0900
@@ -29,4 +29,6 @@
     import asyncio
 
     rtm_client = RTMClient(token=os.environ["SLACK_API_TOKEN"], run_async=True)
-    asyncio.run(rtm_client.start(), debug=True)
+    loop = asyncio.get_event_loop()
+    loop.set_debug(True)
+    loop.run_until_complete(rtm_client.start())

さて、この違いはなんだろう? この違いを整理しておくと便利だと思ったのでメモしておくことにした。

(:warning: 読者への注意)

エラーメッセージで検索してきて、エラーを解決したいだけの人はここまででおしまい。 以後、冒頭でslack botだのと言ってきたが、これ以降slackclientの話が出ることは無い。

asyncio.run()とloop.run_until_complete()の違い

違いを調べてみよう。この辺りはコードを読んで行くのが一番はやい。以下のようなエラーメッセージとの関連も見ていくことにする。

ValueError: a coroutine was expected, got <Task pending name='Task-1' coro=<RTMClient._connect_and_read() running at ...>> 

awaitable? iscorountine?

ここでasyncio.runの実装は以下の様になっている(コメントなどは省略)。

asyncio/runners.py

def run(main, *, debug=False):
    if events._get_running_loop() is not None:
        raise RuntimeError(
            "asyncio.run() cannot be called from a running event loop")

    if not coroutines.iscoroutine(main):
        raise ValueError("a coroutine was expected, got {!r}".format(main))

    loop = events.new_event_loop()
    try:
        events.set_event_loop(loop)
        loop.set_debug(debug)
        return loop.run_until_complete(main)
    finally:
        try:
            _cancel_all_tasks(loop)
            loop.run_until_complete(loop.shutdown_asyncgens())
        finally:
            events.set_event_loop(None)
            loop.close()

エラーはこの辺の行で出ていた。

    if not coroutines.iscoroutine(main):
        raise ValueError("a coroutine was expected, got {!r}".format(main))

つまり、冒頭のbotのstart()メソッドは asyncio.iscoroutine()を満たさない。

より整理すると、こういう違いがある。後者は前者を満たすが、前者は後者ではない。

  • awaitableであるもの
  • asyncio.iscorountine()がTrueになるもの

ちなみにTaskオブジェクト(asyncioが内部的にloopに委ねるときにwrapするオブジェクト)一般でそう。例えば、asyncio.gatherなんかもawaitableではあるが同様のエラーになるはず。

x2check.py

import asyncio


async def hello():
    await asyncio.sleep(0.1)
    print("hello")


# これは OK
loop = asyncio.get_event_loop()
loop.run_until_complete(hello())
# これも OK
# asyncio.run(hello())

# これは OK
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(hello(), hello()))
# # これは NG
# asyncio.run(asyncio.gather(hello(), hello()))
# # ValueError: a coroutine was expected, got <_GatheringFuture pending>

エラーになるのは同様の理屈でawaitableではあってもiscorountineはTrueではないから。

ちなみに、これを治すのは簡単で、以下の様にasync関数で包むように書き換えてあげれば良い。

async main():
    await asyncio.gather(hello(), hello())

# これは OK
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

# これも OK
asyncio.run(main())

asyncio.run()の方が手軽なのでなるべくこちらを使っていきたい。

current loop? new loop?

さて、awaitableであってもiscourntineでは無かった、それじゃ、冒頭のhelloのコードを以下の様に直せば良いよね?めでたしめでたしになるかと思いきや、まだ違いがある。落ち着こう。

diff -u x0* x3*
--- x0hello.py    2020-07-11 01:40:39.000000000 +0900
+++ x3hello.py    2020-07-11 02:08:47.000000000 +0900
@@ -29,4 +29,8 @@
     import asyncio
 
     rtm_client = RTMClient(token=os.environ["SLACK_API_TOKEN"], run_async=True)
-    asyncio.run(rtm_client.start(), debug=True)
+
+    async def run():
+        await rtm_client.start()
+
+    asyncio.run(run(), debug=True)

今度は以下のようなエラーが出る。

RuntimeError: Task <Task pending name='Task-1' coro=<run() running at x3hello.py:34> cb=[_run_until_complete_cb() at /opt/local/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/base_events.py:184] created at /opt/local/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/base_events.py:595> got Future <Task pending name='Task-2' coro=<RTMClient._connect_and_read() running at $HOME/vboxshare/venvs/my/lib/python3.8/site-packages/slack/rtm/client.py:330>> attached to a different loop

エラーメッセージが長すぎて分かりづらいので見やすくする。

RuntimeError: Task <Task 1> got Future <Task 2> attached to a different loop

これはrtm_client.start()が内部で使っている処理と外側の処理(先程定義したrun())のloopが違っているよ!というエラー。異なるloopで実行されると困るのでエラーということ。

実際、slackclientのRTMClientは以下のようなコードになっている。Clientのコンストラクタで初期化するタイミングでloopが渡っていなかったら、asyncio.get_event_loop()を呼んで取り出している。

import asyncio


class Client:
    def __init__(self, loop=None):
        self.loop = loop or asyncio.get_event_loop()

    async def start(self):
        await asyncio.sleep(0.1, loop=self.loop)  # テキトーな処理
        print("started!")

# 同様のエラーを返す got Future <Future pending> attached to a different loop
asyncio.run(Client().start())

なぜ別のloopになっているのか?もう一度asyncio.run()のコードを見直してみよう。こういうコードが含まれている。

    loop = events.new_event_loop()
    try:
        events.set_event_loop(loop)
        loop.set_debug(debug)
        return loop.run_until_complete(main)
    finally:
        ...

ぱっと見ただけなら「全然問題なさそうじゃん、違いがわからない」と思うかもしれない。よく見ると違っている。

そう、new_event_loop() を呼んでいる。get_event_loop() ではなく。そしてこれで作られた新たなloopでrun_until_complete()を呼んでいる。結果として別のloopが利用されていた。ということだった。

というわけでasyncio.run()で冒頭のhelloのbotを動かすには以下のような変更を加えれば良い。

diff -u x0* x5*
--- x0hello.py    2020-07-11 01:40:39.000000000 +0900
+++ x5hello.py    2020-07-11 02:22:56.000000000 +0900
@@ -28,5 +28,11 @@
 if __name__ == "__main__":
     import asyncio
 
-    rtm_client = RTMClient(token=os.environ["SLACK_API_TOKEN"], run_async=True)
-    asyncio.run(rtm_client.start(), debug=True)
+    async def run():
+        loop = asyncio.get_event_loop()
+        rtm_client = RTMClient(
+            token=os.environ["SLACK_API_TOKEN"], loop=loop, run_async=True
+        )
+        await rtm_client.start()
+
+    asyncio.run(run(), debug=True)

今度は大丈夫。

なぜasyncio.run()ではnew loopをする?

しかし、こう思った人もいるかもしれない。「なぜasyncio.run()ではasyncio.get_event_loop()で取り出したloopをそのまま使わず、わざわざnew_event_loop()を呼ぶ必要があるの?」と。たしかに不要そうに感じることがあるかもしれない。

これは、逆に考えてみると良いかもしれない「なぜ別のloopで実行されたことに気づかなければいけないんだろう?」。

ここでも内部の実装を覗いて見たほうが理解は捗る。

asyncio/events.py

def get_event_loop():
    # NOTE: this function is implemented in C (see _asynciomodule.c)
    current_loop = _get_running_loop()
    if current_loop is not None:
        return current_loop
    return get_event_loop_policy().get_event_loop()

loopを取り出すといっても、このloop自体は実は特定の機能を持った具体的な実装ではなくpolicyを満たしたオブジェクト。なので、例えばuvloopのような高速な実装などが存在できる(あるいはtwistedで実装されたようなコードも動き続ける)2

あるいは遭遇しうる地獄

その上で、同じloopが使われるとは限らない。

それこそ、特定のAPIなどへのクライントライブラリなどでは、クライアントの初期化のタイミングと、使われているタイミングが異なる場合がある。例えばクライアントは一度初期化された後に使い回されるかもしれない。そのようなときに意図せず違うloopを利用することが発生しうる。このとき既に止まってしまった別のloopに渡したら、原理上は無限に待ち続けたりエラーになる余地が生まれてしまう。

とくに特定の条件で暗黙に初期化されて使われるloopの存在は厄介。get_event_loop()されたloopならまだパラノイアと言われるかもしれないが、どこかで使われキャッシュされてしまったloopが渡されてしまったときはかなりつらい。その上で、この変なloopの参照がある特定の条件でしか発生し得ないというような状況を作ってしまったら、なぜかあるタイミングでだけ不具合を起こすしかしいつ起きるかはわからないという地獄のような状況が発生しうる。その上そのときpanicすればまだマシで、静かに永遠と待ち続けるというような症状になることもありうる3

これを始点で 新しい世代 のloopを作って実行ということにしてあげると、一種のmarkerとして機能させることができる(mark & sweep的な心境)。

それ以外にもテストのときなどで別のloopを使いたいということはあるのでnew_event_loop()存在自体は便利4

まとめ

loop.run_until_complete() で動くものが asyncio.run() では動かない。それは短くまとめると以下の通り。

  • awaitableなものとiscouroutineがTrueを返すものは違う
  • 異なるloopで実行される余地があるものは良くない?いいね?

そして、slackerなどはR.I.P.

gist


  1. 見やすくするためにREADMEの例からさらに例外処理なども省いている。

  2. 実際のところ相互運用性の話自体はpepで提案された頃からけっこう気にされていたはず。 https://www.python.org/dev/peps/pep-3156/

  3. そしてpythonはシングルコア。待つことで状況が改善することはない。

  4. 例えば、loopを使ったテスト自体を並行で実行したいときだとか、fakeな実装で高速化するだとか頑張れる余地がある。そうでなくてもフレッシュな何の状態も持たないloopを利用したいとき