コード生成中に、裏側でサービスを立ち上げて、通信してあげればN+1を防げる事に気づいた

github.com

コード生成でのN+1?

以前に、コード生成でのN+1問題の発生というような記事を書いた。

この記事では、bulk action的な実行を解として説明をしていた。元々の問題は、process中の処理時間がほぼほぼimport timeで占められるprocessの数が増加することだったので、裏側でサービスを立ち上げてそれと通信するようにしても解決しそう。

過去に書いた以下の記事の内容の一部でもあるかもしれない。

この先はservice化

そして最終的に行き着く先はservice化です。例えばなぜLSP (Language Service Protocol)が生まれたかを考えてみましょう。もちろんエディタ/IDE間での再実装を避けるためという意味もあると思いますが、serviceとして立ち上げてRPC(Remote Procedure Call)的に呼び出すことができるという点が重要だと思っています。

まだ、利用できるバージョンはpypiにあげて居ないけれど、使い方だけメモ。

使い方

例えば、gofmtを実行するようなRPCを考えてみる。hello.goを見てgofmtをした結果hello.formatted.goを作るような操作。こんな感じになる。

definitions.py

from __future__ import annotations
from egoist.app import create_app, SettingsDict, parse_args

settings: SettingsDict = {"rootdir": "", "here": __file__}
app = create_app(settings)

app.include("egoist.directives.define_file")

@app.define_file("egoist.generators.filekit:walk", suffix=".formatted.go")
@app.include_when("egoist.ext.gofmtrpc")
def hello__hello(*, filename: str = "./hello/hello.go") -> None:
    from egoist.generators.filekit import runtime
    from egoist.ext.gofmtrpc import gofmt

    with runtime.create_file() as wf:
        with open(filename) as rf:
            code = rf.read()

        formatted_code = gofmt(code)
        print(formatted_code, file=wf)


if __name__ == "__main__":
    for argv in parse_args(sep="-"):
        app.run(argv)

しれっと内部で使われているegoist.ext.gofmtrpc:gofmtは、裏側でgo製のJSONRPCサーバーと通信している(egoist/gofmtrpc以下にある)。これは例なので実際には各自が自分の好きな機能を裏側で立ち上げて使うことになる。

$ python definitions.py generate
level:INFO  message:register discovery, egoist.ext.gofmtrpc -> http://127.0.0.1:58173    name:egoist.ext.serverprocess.components.discoveryL20
level:INFO  message:spawn server process, gofmtrpc -addr :58173 -sentinel /var/folders/b7/2rk7xp2d0hb2r21zbzjwxb_m0000gn/T/tmph6j9zjpn.sentinel name:egoist.ext.serverprocess.spawnL57
2020/06/06 23:17:57 starting gofmtsrv on :58173
2020/06/06 23:17:57 ip=127.0.0.1:58177 method=.format duration=481.833µs params={"code": "package hello\n\ntype Hello struct {\n\tName string `json:\"name\"`\n\tMessage string `json:\"message\"`\n}\n"} err=<nil>
[F] no change   ./hello/hello.formatted.go
level:INFO  message:terminate egoist.ext.gofmtrpc               name:egoist.ext.serverprocess.directivesL73

ただ動かすだけならけっこう手軽に実装できるのだけれど、以下の点にこだわってしまったら、意外と大変だった。

  • 裏で立ち上げたserver processが通信可能になるまで待ちたい
  • scanでは、server processを立ち上げたくない
  • 無関係のtaskだけを実行したときには、server processを立ち上げたくない

wait server process

subprocessの起動が瞬時に終わるとは限らない。裏で立ち上げたserver processが通信可能になるまで待つ必要がある。いろいろ考えた結果、一周回って、番兵的なファイルの存在確認でコネクションの確立を確認するのが手軽だろうという結論になった。

例えば、gofmtrpc-sentinel を取る。

gofmtrpc -addr :58173 -sentinel /var/folders/b7/2rk7xp2d0hb2r21zbzjwxb_m0000gn/T/tmph6j9zjpn.sentinel

親プロセス(つまりegoist)はsentinelファイルを作り、子プロセス(つまりgofmtrpc)はsentinelファイルを削除する。これでコネクションの確立を表現する。完全に同期するような振る舞いではないが、まぁましな動作をするようになった。

scan

scanはdry run的な振る舞いを期待するので、不要な負荷をかけたくない。例えば、依存関係を把握するときなどにscanを使う。ちなみに先程のコード中では省略されているが、実はnoopというtaskも定義されていた。

$ python definitions.py scan
level:INFO  message:register discovery, egoist.ext.gofmtrpc -> http://127.0.0.1:xxx  name:egoist.ext.serverprocess.components.discoveryL20
level:INFO  message:dry run, skip starting server process, egoist.ext.gofmtrpc  name:egoist.ext.serverprocess.directivesL65
{
  "noop": {
    "task": "noop",
    "depends": []
  },
  "hello/hello.formatted.go": {
    "task": "hello__hello",
    "depends": [
      "hello/hello.go"
    ]
  }
}

このときには、RPCサーバーを立てたくないし、http requestも呼びたくない。

generate

generateを引数無しで実行したときには、全てのtaskが実行される。generateは実行するtaskを指定できて、例えば generate noop では、hello__hello は実行されない。このときにRPCサーバーが立ち上がってほしくない。

# 全部 (RPCサーバーが立ち上がる)
$ python definitions.py generate

# hello__hello のみ (RPCサーバーが立ち上がる)
$ python definitions.py generate hello__hello

# noop のみ (RPCサーバーが立ち上がらない)
$ python definitions.py generate noop
[F] no change   ./noop

この振る舞いをサポートするために、新たに app.include_when() というデコレーターを用意することにした。名前はまた変わるかもしれない(最初は app.use() だった)。

gofmtrpc の実装

gofmtrpcのハンドリング部分の実装がどうなっているかというと、以下の様な形。gofmt()get_gofmtrpc_url()は実行時に使い、includeされたタイミングでincludeme()が呼ばれる。

大切なのは app.add_server_process() で、これにより裏側でRPCサーバーが立ち上がる。nameに渡された値でdiscoveryに登録される。登録されたserviceはget_discovery().lookup(<service name>)を経由してURLが取れる。渡されたsentinelオプションが、先程説明した番兵的なファイル。

ちなみに、番兵を環境変数として渡したい場合には environ={"SENTINEL": create_sentinel_file} という形で渡す。

get_http_client()の実態は概ね requests.Session。ただ、scanのときにはfakeオブジェクトが返ってくる。ここでもscanのときには実際の通信をしたくない。gofmt()の実装自体は特に特別なことはないJSONRPCの通信。

import typing as t
from egoist.app import App

NAME = __name__


def get_gofmtrpc_url() -> t.Optional[str]:
    from egoist.ext.serverprocess.runtime import get_discovery

    return get_discovery().lookup(NAME)


_id = 0


def gofmt(code: str) -> str:
    global _id
    from egoist.ext.serverprocess.runtime import get_http_client

    url = get_gofmtrpc_url()
    assert url is not None
    _id += 1
    res = get_http_client().post(
        url,
        json={
            "jsonrpc": "2.0",
            "id": _id,
            "method": "format",
            "params": {"code": code},
        },
    )
    res.raise_for_status()
    formatted = res.json()["result"]  # type:str
    return formatted


def includeme(app: App) -> None:
    from egoist.ext.serverprocess.lazyparams import find_free_port, create_sentinel_file

    app.include("egoist.ext.serverprocess")  # for add_server_process

    app.add_server_process(
        "gofmtrpc -addr :{port} -sentinel {sentinel}",
        params=dict(port=find_free_port, sentinel=create_sentinel_file),
        name=NAME,
    )

そんなわけで、先程のdefinitions.pyとセットで使うことで、goのコードの整形ができる様になった。同じように何らかのAPIサーバーを書いてあげれば、それを実行時に使うことができる。

毛色の違う使い方

考えてみれば、APIサーバーとの通信ができるということなので、何もコード生成に用途を絞る必要はないかもしれない。少し毛色の違う使い方として、この仕組をそのまま流用してのAPIサーバーのテスト実行というような使い方もできる。

$ python main.py
INFO:     Started server process [39296]
INFO:     Uvicorn running on http://127.0.0.1:58221 (Press CTRL+C to quit)
INFO:     Waiting for application startup.
INFO:     remove sentinel /var/folders/b7/2rk7xp2d0hb2r21zbzjwxb_m0000gn/T/tmp9ilwe218.sentinel
INFO:     Application startup complete.
-> http://127.0.0.1:58221
INFO:     127.0.0.1:58223 - "GET / HTTP/1.1" 200 OK
<- {'message': 'Hello World'}
INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [39296]

例えば、以下の様なFastAPIを利用したサーバーがあるとして。

server.py

from fastapi import FastAPI

app = FastAPI()


@app.on_event("startup")
async def startup_event():
    import os
    from egoist.ext.serverprocess.spawn import FileConnectionChecker

    sentinel = os.environ.get("SENTINEL")
    if sentinel is not None:
        checker = FileConnectionChecker(sentinel=sentinel)
        assert checker.pong() is True


@app.get("/")
async def root():
    return {"message": "Hello World"}

裏側でAPIサーバーを立ち上げて GET / を呼ぶというような動作を記述できる。

main.py

from egoist.app import create_app, SettingsDict, App

settings: SettingsDict = {"rootdir": "output", "here": __file__}
app = create_app(settings)


def setup_server(app: App) -> None:
    from egoist.ext.serverprocess.lazyparams import find_free_port, create_sentinel_file

    app.include("egoist.ext.serverprocess")  # for add_server_process
    app.add_server_process(
        "uvicorn server:app --port {port}",
        params=dict(port=find_free_port),
        name="api-server",
        env={"SENTINEL": create_sentinel_file},
    )


def run() -> None:
    from egoist.ext.serverprocess.runtime import get_discovery, get_http_client

    url = get_discovery().lookup("api-server")
    print("->", url)
    res = get_http_client().get(url)
    res.raise_for_status()
    print("<-", res.json())


if __name__ == "__main__":
    app.include(setup_server)
    app.commit(dry_run=False)
    run()

今回は環境変数を渡す例。理想的にはコネクションの確立の確認のためのイベントは、startup eventのような箇所で実行できる様になっていると嬉しい。

まとめ

  • コード生成のN+1を倒したい
  • bulk actionの1つの例としてRPCサーバーを立てる方法もある
  • 流用してなにか他のものにも使えるかもしれない

gist