pythonのprint()をlogger.infoに変える黒魔術(loggedpy)

github.com

pythonのprint()をlogger.infoに変える黒魔術(loggedpy)を実装してみた(土日)。

発端はこのtweet

意外とやってみると辛かったり奥深さを感じたりした(中のコードを覗くと辛さがわかるかもしれない。暇だったらどこかで発表したい。かも)。

loggedpy

使いかたはpythonのかわりにloggedpyコマンドを使うだけ(日常的に使うと言うよりはデバッグのときに使う感じかもしれない)。するとprint()を使った部分があたかもログ出力のように扱われる。

$ loggedpy hello.py
INFO  2018-09-24 01:06:56,698 hello hello
INFO  2018-09-24 01:06:56,698 bye

# 元々の出力
$ python hello.py
hello hello
bye

現状は logging.BASIC_FORMATにasctimeを追加しただけの状態だけれど。良いデフォルト値を募集中。

ちなみに上の出力結果を返すコードは以下のようなもの

def main():
    print("hello hello")
    print("bye")


if __name__ == "__main__":
    main()

対応しているもの

以下3つ全てにおおよそ対応

  • python <filename>.py での実行
  • setup.pyなどでインストールされるコマンドの実行
  • python -m <module> での実行

詳しくはreadme参照

--flavor

出力結果を変えたくなった場合には--flavorオプションで実装を差し替えられる。例えばもうちょっと出力を豪華にするには以下の様なコードを書く

customized.py

import logging
import loggedpy


class Flavor(loggedpy.Flavor):
    format = "%(levelname)s\t%(asctime)s\t%(name)s\tin\t%(filename)s:%(lineno)s\t%(funcName)s\t%(message)s"
    level = logging.DEBUG

今度は以下の様な出力になる(funcNameやlinenoが追加されてちょっと豪華)

$ loggedpy --flavor=./customized.py:Flavor hello.py
INFO    2018-09-26 22:23:38,377 __main__    in  hello.py:2  main    hello hello
INFO    2018-09-26 22:23:38,377 __main__    in  hello.py:3  main    bye

使える変数はloggingのLogRecord属性にあるもの

JSON出力

昔作ったmonokakiと組み合わせると手軽にJSON出力にできる。

$ loggedpy --flavor=./customized.py:Flavor ../hello.py
{
  "logger": "__main__",
  "levelname": "INFO",
  "asctime": "2018-09-26 22:27:07,258",
  "name": "__main__",
  "filename": "hello.py",
  "lineno": 2,
  "funcName": "main",
  "message": "hello hello"
}
{
  "logger": "__main__",
  "levelname": "INFO",
  "asctime": "2018-09-26 22:27:07,258",
  "name": "__main__",
  "filename": "hello.py",
  "lineno": 3,
  "funcName": "main",
  "message": "bye"
}

詳しくはexamplesを参照

追記

以下ができたらpypiにあげる(たぶん)

  • djangoなどでの動作確認
  • デフォルトの出力を良い感じにする(まだ決めあぐねている)

pythonで複数のhttp requestを同時に投げる方法(requests)について

時折、ちょっとしたスピードアップにhttp(https) requestを並列で投げたい場合がある。aiohttpなどを使っても良いが既存のコードを書き換えるのもめんどくさい。そのようなときの方法のメモ。

concurrent.futuresを使う方法も考えられるけれど。今回はasyncioだけに絞った話。

いわゆるpythonでのhttp requestのデファクトスタンダードrequestsだと思う。ところでこのrequestsはnon blockingではないのでasyncio上で使おうとすると処理がblockしてしまう。run_in_executor()を使うと処理を外部の環境に移せる。

試しに、run_in_executor()を使ってみよう。動作確認自体は直接のhttp requestではなく、requestを模したfakeの関数を使って行う。以下の様な関数。この処理はtime.sleep()自体が同期的な処理なのでblockする。

import time


def req(i):
    print(i, "start")
    time.sleep(0.5)
    print(i, "end")
    return i

これを使ってblockしてしまう処理をasyncio上で呼び出すための実験をしてみる。

run_in_executor()を使わない場合

まずは、run_in_executor()を使わない場合を見ていく。例えば3つのtaskを同時にrequestしてみる(asyncio.gatherは受け取ったtaskが全部終了するまで待ち、結果を渡された順序で返してくれる便利関数。これの目的や説明などについては昔書いた)。

import logging
import asyncio


async def run():
    async def run_req(i):
        return req(i)

    tasks = [run_req(i) for i in range(3)]
    return await asyncio.gather(*tasks)


logging.basicConfig(level=logging.DEBUG)
st = time.time()
print(asyncio.get_event_loop().run_until_complete(run()))
print(time.time() - st)

実際、3つのtaskを実行するので1.5秒掛かる(asyncioを使ったコードを実行するときにはデバッグモードを有効にしておくのがオススメ。PYTHONASYNCIODEBUG=1の部分)。

$ PYTHONASYNCIODEBUG=1 python 00req.py
DEBUG:asyncio:Using selector: EpollSelector
0 start
0 end
1 start
1 end
2 start
2 end
[0, 1, 2]
1.5026493072509766

run_in_executor()を使う場合

今度はrun_in_executor()を使った場合を見ていく。run_in_executor()の第一引数にNoneを渡すとデフォルトのexecutorで呼ばれる(設定を変更していない限りconcurrent.futures.ThreadPoolExecutorで実行される)。

import logging
import asyncio


async def run(loop):
    async def run_req(i):
        return await loop.run_in_executor(None, req, i)

    tasks = [run_req(i) for i in range(3)]
    return await asyncio.gather(*tasks)


logging.basicConfig(level=logging.DEBUG)
st = time.time()
loop = asyncio.get_event_loop()
print(loop.run_until_complete(run(loop)))
print(time.time() - st)

並列に実行されるならおおよそ0.5秒で実行が終わるはず。

$ PYTHONASYNCIODEBUG=1 python 01req.py
DEBUG:asyncio:Using selector: EpollSelector
0 start
1 start
2 start
0 end
1 end
2 end
[0, 1, 2]
0.5050966739654541

並列で実行できている。

同時リクエスト数を制限したい場合

なるべく並列で実行したい一方で同時リクエスト数を制限したいことがある。こういうときはasyncio.Semaphoreを使うと良い。

--- 01req.py 2018-09-22 18:08:03.346066271 +0900
+++ 02req.py  2018-09-22 18:16:24.069497610 +0900
@@ -13,10 +13,13 @@
 
 
 async def run(loop):
+    sem = asyncio.Semaphore(5)
+
     async def run_req(i):
-        return await loop.run_in_executor(None, req, i)
+        async with sem:
+            return await loop.run_in_executor(None, req, i)
 
-    tasks = [run_req(i) for i in range(3)]
+    tasks = [run_req(i) for i in range(20)]
     return await asyncio.gather(*tasks)

20個のタスクを同時に5個まで並列で実行しているので2.0秒程度掛かるはず。

$ PYTHONASYNCIODEBUG=1 python 02req.py
DEBUG:asyncio:Using selector: EpollSelector
0 start
1 start
2 start
3 start
4 start
...
19 start
15 end
16 end
17 end
18 end
19 end
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
2.0162127017974854

大丈夫そう。

途中で失敗した場合にすぐに終了して欲しい

これはデフォルトの動作がそのまま。正確にはすぐにではないけれど。途中で中断される。

例えば、途中での失敗を模したコードを追加してみる。

-- 02req.py   2018-09-22 18:16:24.069497610 +0900
+++ 03req.py  2018-09-22 18:22:18.704888305 +0900
@@ -4,6 +4,8 @@
 def req(i):
     print(i, "start")
     time.sleep(0.5)
+    if i == 3:
+        raise Exception("hmm")
     print(i, "end")
     return i

10個のタスクを3つ並列で実行して途中で失敗した場合の結果。もし仮に全てが実行されるなら0~9まで全てのtaskがstartになる。

$ PYTHONASYNCIODEBUG=1 python 03req.py
DEBUG:asyncio:Using selector: EpollSelector
0 start
1 start
2 start
0 end
1 end
2 end
3 start
4 start
5 start
6 start
4 end
5 end
Traceback (most recent call last):
  File "03req.py", line 31, in <module>
    print(loop.run_until_complete(run(loop)))
  File "/usr/lib/python3.7/asyncio/base_events.py", line 568, in run_until_complete
    return future.result()
  File "03req.py", line 25, in run
    return await asyncio.gather(*tasks)
  File "03req.py", line 22, in run_req
    return await loop.run_in_executor(None, req, i)
  File "/usr/lib/python3.7/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "03req.py", line 8, in req
    raise Exception("hmm")
Exception: hmm
6 end

6までで終了しているので途中で中断されている。

全部実行したい場合

途中で止まらず全部実行したい場合にはreturn_exceptionsにTrueを指定すれば良い。

--- 03req.py 2018-09-22 18:26:53.361512907 +0900
+++ 04req.py  2018-09-22 18:27:50.385216947 +0900
@@ -22,7 +22,7 @@
             return await loop.run_in_executor(None, req, i)
 
     tasks = [run_req(i) for i in range(10)]
-    return await asyncio.gather(*tasks)
+    return await asyncio.gather(*tasks, return_exceptions=True)
 
 
 logging.basicConfig(level=logging.DEBUG)

全部実行される。

$ PYTHONASYNCIODEBUG=1 python 04req.py
DEBUG:asyncio:Using selector: EpollSelector
0 start
1 start
2 start
0 end
3 start
1 end
4 start
2 end
5 start
6 start
4 end
7 start
5 end
8 start
6 end
9 start
7 end
8 end
9 end
[0, 1, 2, Exception('hmm'), 4, 5, 6, 7, 8, 9]
2.0221383571624756

発生した例外は戻り値として返される。そのまま中断されず全てのタスクが実行されている。

requestの結果を使ってさらにrequest

requestの結果を使ってさらにrequestしたいことがある。特にcrawlerのようなコードを書いているとき(とはいえ、crawlerのようなものを作るときには何らかのライブラリを使った方が良いかもしれない)。 このときにはasyncio.Queueを使えば良い(行いたい操作によってLifoQueueとかPriorityQueueを使い分けることもある)。

import time

# levelが追加された
def req(i, level):
    print(i, level, "start")
    time.sleep(0.5)
    print(i, level, "end")
    return i, level + 1


import logging
import asyncio


async def run(loop):
    q = asyncio.Queue()
    r = []
    for i in range(5):
        q.put_nowait((i, 0))

    async def consume():
        while not q.empty():
            i, level = await q.get()
            x, next_level = await loop.run_in_executor(None, req, i, level)
            r.append((x, next_level))

            # 3階層くらいまで戻り値queueに追加する
            if next_level < 3:
                for j in range(x):
                    q.put_nowait((j, next_level))

    # 並列数はasyncio.waitに渡すtaskの数で調整する
    await asyncio.wait([consume() for _ in range(5)])
    return r

25個のタスクを5個ずつ並列で消費するので2.5秒くらい掛かる。はず。

$ PYTHONASYNCIODEBUG=1 python 05req.py
DEBUG:asyncio:Using selector: EpollSelector
...
[(0, 1), (2, 1), (3, 1), (4, 1), (1, 1), (0, 2), (0, 2), (1, 2), (1, 2), (2, 2), (0, 2), (1, 2), (2, 2), (3, 2), (0, 2), (0, 3), (0, 3), (0, 3), (1, 3), (0, 3), (0, 3), (1, 3), (0, 3), (1, 3), (2, 3)]
3.0186729431152344

ここで実行時の順序を揃えたい場合にはtimestamp的なものを一緒に追加してそれを元にsortする。

--- 05req.py 2018-09-22 19:02:14.581036491 +0900
+++ 06req.py  2018-09-22 19:05:32.868959088 +0900
@@ -16,19 +16,19 @@
     q = asyncio.Queue()
     r = []
     for i in range(5):
-        q.put_nowait((i, 0))
+        q.put_nowait((i, 0, time.time()))
 
     async def consume():
         while not q.empty():
-            i, level = await q.get()
+            i, level, timestamp = await q.get()
             x, next_level = await loop.run_in_executor(None, req, i, level)
-            r.append((x, next_level))
+            r.append((timestamp, x, next_level))
             if next_level < 3:
                 for j in range(x):
-                    q.put_nowait((j, next_level))
+                    q.put_nowait((j, next_level, time.time()))
 
     await asyncio.wait([consume() for _ in range(5)])
-    return r
+    return [x[1:] for x in sorted(r)]

asyncio.Queueの使いかたのもう一つの流派

asyncio.Queueを単なる値の入れ物として使うのではなく、taskを管理するものとして使う流派もある。この場合には、task_done()join()が重要になってくる。ちなみにドキュメントに書かれているのはこちらの流派

考えてみると当たり前で、queueの中にある・無しの他に現在実行中のような状態がある。taskを状態で表すなら、予約中・実行中・完了という3つの状態があり、この実行中の状態を空扱いにして終了されては困る。

それぞれがどういうものかは中のコードを覗いてみるとわかりやすい。_unfinished_tasksというインスタンス変数に現在実行中のタスクの数がカウントされている(ちなみに_unfinished_tasksput()が呼ばれたタイミングでカウントアップされる)。

class Queue:
# ...
    def task_done(self):
        if self._unfinished_tasks <= 0:
            raise ValueError('task_done() called too many times')
        # タスクが完了したらカウントダウン
        self._unfinished_tasks -= 1
        if self._unfinished_tasks == 0:
            self._finished.set()

    async def join(self):
        # 実行中のタスクが存在していたら完了するまでwait
        if self._unfinished_tasks > 0:
            await self._finished.wait()

この流派の書き方で先程のコードを書き換えてみる。違いは以下

  • 無限ループ
  • Queueから取り出した値を使い終わったらtask_done()を呼び出している
  • Queueに対する実行がなくなったら(join()の後)、動かしていたworker task(consume())を明示的に止める
--- 05req.py 2018-09-22 21:18:27.130508295 +0900
+++ 05req.tmp.py  2018-09-22 21:24:55.783094248 +0900
@@ -19,15 +19,19 @@
         q.put_nowait((i, 0))
 
     async def consume():
-        while not q.empty():
+        while True:
             i, level = await q.get()
             x, next_level = await loop.run_in_executor(None, req, i, level)
             r.append((x, next_level))
             if next_level < 3:
                 for j in range(x):
                     q.put_nowait((j, next_level))
+            q.task_done()
 
-    await asyncio.wait([consume() for _ in range(5)])
+    tasks = [loop.create_task(consume()) for _ in range(5)]
+    await q.join()
+    for task in tasks:
+        task.cancel()
     return r

taskをloop経由での実行として登録をしたい(非同期に実行したい)場合にはcreate_task()に渡してあげると良い。以前asyncioでコードを書いていた時に、これには気づかず、「awaitすると待ってしまう。ただ呼ぶだけでは実行されない(coroutineが生成されるだけ)。call_soon()などは同期的な処理」という感じで困っていたことがあった。

非同期操作と同期処理を混ぜて実行する

一部だけ非同期に対応している場合にrun_in_executor()では呼びたくない。asyncio.iscoroutinefunction()で一応調べられる。あんまりオススメはしないけれど。以下の様な関数を定義しておくと、同期・非同期を気にせずtask化できる。

async def run_task_contextually(loop, fn, *args):
    if asyncio.iscoroutinefunction(fn):
        return await fn(*args)
    else:
        return await loop.run_in_executor(None, fn, *args)

同期・非同期を混ぜたコード。

async def req_async(i):
    print(i, "start")
    await asyncio.sleep(0.5)
    print(i, "end")
    return i


async def run(loop):
    sem = asyncio.Semaphore(5)

    async def run_req(fn, i):
        async with sem:
            return await run_task_contextually(loop, fn, i)

    tasks = []
    tasks.extend([run_req(req, i) for i in range(3)])
    tasks.extend([run_req(req_async, i) for i in range(3, 6)])
    tasks.extend([run_req(req, i) for i in range(6, 9)])
    return await asyncio.gather(*tasks)

実行してみる。

$ PYTHONASYNCIODEBUG=1 python 10*.py
DEBUG:asyncio:Using selector: EpollSelector
0 start
1 start
2 start
3 start
4 start
0 end
DEBUG:asyncio:poll 499.446 ms took 500.131 ms: 1 events
2 end
1 end
3 end
4 end
5 start
6 start
7 start
8 start
5 end
6 end
DEBUG:asyncio:poll took 0.152 ms: 1 events
7 end
8 end
DEBUG:asyncio:poll took 0.125 ms: 1 events
[0, 1, 2, 3, 4, 5, 6, 7, 8]
1.0186710357666016