graphqlのdataloader的なもの(bulk query)について考えてみる

そろそろノルマがやばいのでdataloderを支えるような概念について考えてみることにする。

例えば以下のようなコードは3回queryを実行する。

u0 = Users.get(1)
u1 = Users.get(2)
u2 = Users.get(3)

実際のアプリケーションでは、これらのqueryがコード上の到るところに散らばっているようなイメージ。通常はN+1になる。これを一回(あるいは定数回)だけのqueryにしたい。

やりたいことは基本的にはN+1の除去なのだけれど、嬉しいのは(特徴的なのは)、時間的にaggregateしているところ。このおかげでコード上では遠くにあるようなcomponentごとの通信をinterceptした形で、素直なコードになりつつ、N+1を除去したような形で取り扱うことができる。

そしてこの種のgroupingを行うためには非同期処理の機構が整っているとやりやすい。

ただ、特にgraphqlで特別というわけではなく、古くはbackboneあたりのSPAの頃から、promiseをcacheする機構を用意したりなどして自前で実装している人はいたりした。

仕組み

この種の機構の肝になっているのは「時間的にaggregateしている」ということ。

例えばレイヤーをHTTP(HTTPS)の通信に持っていくとしたら、同様のrequestをまとめるmiddlewareが間に挟まった状態と考える感じ。そしてrequestを一定時間バッファリングし、同一条件でのqueryをまとめてbulk queryとしてbackendに投げて、結果を分割した形で返す。

というふうに考えると、複数のクライアントのrequestを考慮したaggregateになる(フロントエンドだけのアーキテクチャとみるのではなく、インフラ用のアーキテクチャと見る事もできる)。つまりいたるるところで同様の仕組みを考えることができたりする。

実装自体はfutureのような機構があればたやすい。基本的には以下の機構があれば良い。

  • future的な機能
  • 取得対象のidを一覧で受けて返すquery(in query)

futureを使ったbulk query

簡単な実験用のコードを書いてみる。usersというテーブルが有り、ここでnameがprimary key(id)だとする。そして、in queryが実装されている。find_all()in_オプションにidとなる値を渡してあげると良い。

(minidbはこの場の実験コードを書くためのちょっとしたラッパー。実装は後述)

また、冒頭のgetに似たようなfind_one()というqueryも実装している。

import minidb
import asyncio


class Users(minidb.AsyncTable):
    pk = "name"


users = Users([{"name": "foo"}, {"name": "bar"}, {"name": "boo"}])


async def run():
    print(await users.find_one(in_=["foo"]))
    # {'name': 'foo'}

    print(await users.find_one(in_=["boo"]))
    # {'name': 'boo'}


asyncio.run(run())

このコードに対するdata loader的な振る舞いを考えてみる。それは、find_one()のようなidを一つだけ取り値をひとつだけ返すような関数を表面上は使っているように見えるものの、内部ではfind_all()のように一度にガバっと値を取るような関数を使ったコードに書き換えて利用されることを目指したものになる。

bulk query

bulk queryの実装を考えてみよう。一定時間待ち、その間に受け取ったrequestをbufferに貯めておく、そしてfind_all()としてまとめたqueryとして投げて、結果を回収する。個別のrequestの結果を返す。

例えばこういう感じ。

async def run_bulk_query():
    bulk_fut = asyncio.Future()
    input_buf = []

    async def find_user(name: str):
        input_buf.append(name)
        for row in await bulk_fut:
            if row["name"] == name:
                return row

    async def do_bulk(n):
        await asyncio.sleep(n)
        bulk_fut.set_result(await users.find_all(in_=input_buf))

    async def do_task(name):
        print(await find_user(name))

    actions = [do_task(name) for name in ["foo", "bar", "boo"]]
    actions.append(do_bulk(0.1))
    await asyncio.gather(*actions)

場合によっては、futureをより細かい粒度で用意する場合もある。そしてこの種のbufferを持ったqueueのような機構を此処の検索の条件毎にdispatchするような機構(単純には連想配列)をもたせるとそれっぽいミドルウェアが完成する。

依存があればfuture同士をつなげていくようにしてあげれば良い。

gist

https://gist.github.com/podhmo/a8d6732958b71132c7635f94643b39ff