読者です 読者をやめる 読者になる 読者になる

asyncioで実行されるタスクの実行順序の制御について

python asyncio

asyncioで実行されるタスクの実行順序の制御について

asyncioのドキュメント を見ると、asyncio.asyncを使う際には、yield fromを忘れると終了を待たずにつぎの処理が実行される。このため以下の様な不具合があるというようなことが書かれている。

  • 実行順序が保証されない
  • タスクの終了が保証されない

open_content,push_content,close_contentという3つのタスクが存在するような状況を考える。 これらのタスクが順番に実行されて欲しい。

@asyncio.coroutine
def run():
    yield from open_content() # 0
    yield from push_content() # 1
    yield from close_content() # 2
    yield from asyncio.sleep(1)
    loop.stop()

このコードを実行した結果は以下のとおり。ドキュメントの通り、yield fromを付けると期待通りの結果を返してくれる

[0] open
[1] push_content
[2] close
Pending tasks at exit: set()

0,1,2という番号が振られたタスクが記述した通りの順序で行われている。

一方で、これもまた、ドキュメントの通りにyield fromを忘れた場合には順序が保証されない。

@asyncio.coroutine
def run_broken():
    asyncio.async(open_content()) # 0
    asyncio.async(push_content()) # 1
    asyncio.async(close_content()) # 2
    yield from asyncio.sleep(1)
    loop.stop()

結果は以下のとおり

[2] close
[1] push_content
Pending tasks at exit: {Task(<open_content>)<PENDING>}

2,1と実行順序が記述した順序と一致しておらず、終了していないタスクも存在する。 ここまではドキュメントの通り。

実行順序は保証しなくても良いが、並列に実行し実行結果は待ちたいというような状況

ここからが本題。yield fromを連ねることで実行順序を保証し結果を待つようにして実行できることがわかった。 この時の実行時のイメージは各タスクが直列化されて実行される。図にすると以下の様。

3つのタスク、a,b,cが存在して、それぞれ順にyield fromで実行された場合。

|a| -> |b| -> |c|

しかし、1つ前の結果を待つことなく同時並列的に実行しながら結果を待つというようなことが行いたい場合もある。 このような時にどうすれば良いのか分からなかったので考えてみることにした。

先ほどのタスクa,b,cについて同時並列的に実行を行いたい。図にすると以下の様。

|a| -\
|b| ---> dosmething
|c| -/

futureでロックを作って全ての終了を待つ。

futureは結果が指定されるまで待つと言うことが出来るようなオブジェクト。

yield from my_future

という記述があった場合には、別の経路で

my_future.set_result(anything)

というように結果が格納されるまで実行を止めて待つ。

このfutureでロックを作ることで望みの動作が作れるのではないかと思った。

また、futureはadd_done_calllbackなどでタスクの実行が終了した後のコールバックをしてい出来る。これでロックのカウンターを増減させる事ができる。先ほどのタスクa,b,cにおいて、各タスクの実行後に内部的なカウンターをインクリメントし、カウンターの数値が実行したタスクの数と等しくなった時に、ロックとして使っているfutureに結果を格納すると言ったことを行えば良さそう。

書いたコードは以下のようなもの。

@asyncio.coroutine
def run_parallel():
    sentinel = Future(loop=loop)
    tasks = [asyncio.async(open_content()),
             asyncio.async(push_content()),
             asyncio.async(close_content())]
    results = [None] * len(tasks)
    N = 0

    def _done_callback(i, f):
        nonlocal N
        results[i] = f._result
        N += 1
        if N == len(tasks):
            sentinel.set_result(results)

    for i, t in enumerate(tasks):
        t.add_done_callback(partial(_done_callback, i))

    yield from sentinel
    loop.stop()

動作結果

[2] close
[1] push_content
[0] open
Pending tasks at exit: set()

2,1,0と実行順序はバラバラなものの全てのタスクが実行されている。

asyncio.gatherを使う。

実は、asyncio.gather()という関数が存在する

長々と書いてしまったけれど。コレが答え。以下の様に書けば望みの動作を行ってくれる。 内部的にはfutureを使ってロックを作ったものと同様。

@asyncio.coroutine
def run_parallel2():
    yield from asyncio.gather(open_content(),
                              push_content(),
                              close_content())
    loop.stop()

実行結果

[2] close
[1] push_content
[0] open
Pending tasks at exit: set()

動作する完全なコードのgist

https://gist.github.com/podhmo/157307bc90edc35978ae