コード生成中に、裏側でサービスを立ち上げて、通信してあげればN+1を防げる事に気づいた
コード生成でのN+1?
以前に、コード生成でのN+1問題の発生というような記事を書いた。
この記事では、bulk action的な実行を解として説明をしていた。元々の問題は、process中の処理時間がほぼほぼimport timeで占められるprocessの数が増加することだったので、裏側でサービスを立ち上げてそれと通信するようにしても解決しそう。
過去に書いた以下の記事の内容の一部でもあるかもしれない。
この先はservice化
そして最終的に行き着く先はservice化です。例えばなぜLSP (Language Service Protocol)が生まれたかを考えてみましょう。もちろんエディタ/IDE間での再実装を避けるためという意味もあると思いますが、serviceとして立ち上げてRPC(Remote Procedure Call)的に呼び出すことができるという点が重要だと思っています。
まだ、利用できるバージョンはpypiにあげて居ないけれど、使い方だけメモ。
使い方
例えば、gofmtを実行するようなRPCを考えてみる。hello.goを見てgofmtをした結果hello.formatted.goを作るような操作。こんな感じになる。
definitions.py
from __future__ import annotations from egoist.app import create_app, SettingsDict, parse_args settings: SettingsDict = {"rootdir": "", "here": __file__} app = create_app(settings) app.include("egoist.directives.define_file") @app.define_file("egoist.generators.filekit:walk", suffix=".formatted.go") @app.include_when("egoist.ext.gofmtrpc") def hello__hello(*, filename: str = "./hello/hello.go") -> None: from egoist.generators.filekit import runtime from egoist.ext.gofmtrpc import gofmt with runtime.create_file() as wf: with open(filename) as rf: code = rf.read() formatted_code = gofmt(code) print(formatted_code, file=wf) if __name__ == "__main__": for argv in parse_args(sep="-"): app.run(argv)
しれっと内部で使われているegoist.ext.gofmtrpc:gofmt
は、裏側でgo製のJSONRPCサーバーと通信している(egoist/gofmtrpc以下にある)。これは例なので実際には各自が自分の好きな機能を裏側で立ち上げて使うことになる。
$ python definitions.py generate level:INFO message:register discovery, egoist.ext.gofmtrpc -> http://127.0.0.1:58173 name:egoist.ext.serverprocess.components.discoveryL20 level:INFO message:spawn server process, gofmtrpc -addr :58173 -sentinel /var/folders/b7/2rk7xp2d0hb2r21zbzjwxb_m0000gn/T/tmph6j9zjpn.sentinel name:egoist.ext.serverprocess.spawnL57 2020/06/06 23:17:57 starting gofmtsrv on :58173 2020/06/06 23:17:57 ip=127.0.0.1:58177 method=.format duration=481.833µs params={"code": "package hello\n\ntype Hello struct {\n\tName string `json:\"name\"`\n\tMessage string `json:\"message\"`\n}\n"} err=<nil> [F] no change ./hello/hello.formatted.go level:INFO message:terminate egoist.ext.gofmtrpc name:egoist.ext.serverprocess.directivesL73
ただ動かすだけならけっこう手軽に実装できるのだけれど、以下の点にこだわってしまったら、意外と大変だった。
- 裏で立ち上げたserver processが通信可能になるまで待ちたい
- scanでは、server processを立ち上げたくない
- 無関係のtaskだけを実行したときには、server processを立ち上げたくない
wait server process
subprocessの起動が瞬時に終わるとは限らない。裏で立ち上げたserver processが通信可能になるまで待つ必要がある。いろいろ考えた結果、一周回って、番兵的なファイルの存在確認でコネクションの確立を確認するのが手軽だろうという結論になった。
例えば、gofmtrpc
は -sentinel
を取る。
gofmtrpc -addr :58173 -sentinel /var/folders/b7/2rk7xp2d0hb2r21zbzjwxb_m0000gn/T/tmph6j9zjpn.sentinel
親プロセス(つまりegoist)はsentinelファイルを作り、子プロセス(つまりgofmtrpc)はsentinelファイルを削除する。これでコネクションの確立を表現する。完全に同期するような振る舞いではないが、まぁましな動作をするようになった。
scan
scanはdry run的な振る舞いを期待するので、不要な負荷をかけたくない。例えば、依存関係を把握するときなどにscanを使う。ちなみに先程のコード中では省略されているが、実はnoopというtaskも定義されていた。
$ python definitions.py scan level:INFO message:register discovery, egoist.ext.gofmtrpc -> http://127.0.0.1:xxx name:egoist.ext.serverprocess.components.discoveryL20 level:INFO message:dry run, skip starting server process, egoist.ext.gofmtrpc name:egoist.ext.serverprocess.directivesL65 { "noop": { "task": "noop", "depends": [] }, "hello/hello.formatted.go": { "task": "hello__hello", "depends": [ "hello/hello.go" ] } }
このときには、RPCサーバーを立てたくないし、http requestも呼びたくない。
generate
generateを引数無しで実行したときには、全てのtaskが実行される。generateは実行するtaskを指定できて、例えば generate noop
では、hello__hello
は実行されない。このときにRPCサーバーが立ち上がってほしくない。
# 全部 (RPCサーバーが立ち上がる) $ python definitions.py generate # hello__hello のみ (RPCサーバーが立ち上がる) $ python definitions.py generate hello__hello # noop のみ (RPCサーバーが立ち上がらない) $ python definitions.py generate noop [F] no change ./noop
この振る舞いをサポートするために、新たに app.include_when()
というデコレーターを用意することにした。名前はまた変わるかもしれない(最初は app.use()
だった)。
gofmtrpc の実装
gofmtrpcのハンドリング部分の実装がどうなっているかというと、以下の様な形。gofmt()
とget_gofmtrpc_url()
は実行時に使い、includeされたタイミングでincludeme()
が呼ばれる。
大切なのは app.add_server_process()
で、これにより裏側でRPCサーバーが立ち上がる。nameに渡された値でdiscoveryに登録される。登録されたserviceはget_discovery().lookup(<service name>)
を経由してURLが取れる。渡されたsentinel
オプションが、先程説明した番兵的なファイル。
ちなみに、番兵を環境変数として渡したい場合には environ={"SENTINEL": create_sentinel_file}
という形で渡す。
get_http_client()
の実態は概ね requests.Session
。ただ、scanのときにはfakeオブジェクトが返ってくる。ここでもscanのときには実際の通信をしたくない。gofmt()
の実装自体は特に特別なことはないJSONRPCの通信。
import typing as t from egoist.app import App NAME = __name__ def get_gofmtrpc_url() -> t.Optional[str]: from egoist.ext.serverprocess.runtime import get_discovery return get_discovery().lookup(NAME) _id = 0 def gofmt(code: str) -> str: global _id from egoist.ext.serverprocess.runtime import get_http_client url = get_gofmtrpc_url() assert url is not None _id += 1 res = get_http_client().post( url, json={ "jsonrpc": "2.0", "id": _id, "method": "format", "params": {"code": code}, }, ) res.raise_for_status() formatted = res.json()["result"] # type:str return formatted def includeme(app: App) -> None: from egoist.ext.serverprocess.lazyparams import find_free_port, create_sentinel_file app.include("egoist.ext.serverprocess") # for add_server_process app.add_server_process( "gofmtrpc -addr :{port} -sentinel {sentinel}", params=dict(port=find_free_port, sentinel=create_sentinel_file), name=NAME, )
そんなわけで、先程のdefinitions.pyとセットで使うことで、goのコードの整形ができる様になった。同じように何らかのAPIサーバーを書いてあげれば、それを実行時に使うことができる。
毛色の違う使い方
考えてみれば、APIサーバーとの通信ができるということなので、何もコード生成に用途を絞る必要はないかもしれない。少し毛色の違う使い方として、この仕組をそのまま流用してのAPIサーバーのテスト実行というような使い方もできる。
$ python main.py INFO: Started server process [39296] INFO: Uvicorn running on http://127.0.0.1:58221 (Press CTRL+C to quit) INFO: Waiting for application startup. INFO: remove sentinel /var/folders/b7/2rk7xp2d0hb2r21zbzjwxb_m0000gn/T/tmp9ilwe218.sentinel INFO: Application startup complete. -> http://127.0.0.1:58221 INFO: 127.0.0.1:58223 - "GET / HTTP/1.1" 200 OK <- {'message': 'Hello World'} INFO: Shutting down INFO: Waiting for application shutdown. INFO: Application shutdown complete. INFO: Finished server process [39296]
例えば、以下の様なFastAPIを利用したサーバーがあるとして。
server.py
from fastapi import FastAPI app = FastAPI() @app.on_event("startup") async def startup_event(): import os from egoist.ext.serverprocess.spawn import FileConnectionChecker sentinel = os.environ.get("SENTINEL") if sentinel is not None: checker = FileConnectionChecker(sentinel=sentinel) assert checker.pong() is True @app.get("/") async def root(): return {"message": "Hello World"}
裏側でAPIサーバーを立ち上げて GET /
を呼ぶというような動作を記述できる。
main.py
from egoist.app import create_app, SettingsDict, App settings: SettingsDict = {"rootdir": "output", "here": __file__} app = create_app(settings) def setup_server(app: App) -> None: from egoist.ext.serverprocess.lazyparams import find_free_port, create_sentinel_file app.include("egoist.ext.serverprocess") # for add_server_process app.add_server_process( "uvicorn server:app --port {port}", params=dict(port=find_free_port), name="api-server", env={"SENTINEL": create_sentinel_file}, ) def run() -> None: from egoist.ext.serverprocess.runtime import get_discovery, get_http_client url = get_discovery().lookup("api-server") print("->", url) res = get_http_client().get(url) res.raise_for_status() print("<-", res.json()) if __name__ == "__main__": app.include(setup_server) app.commit(dry_run=False) run()
今回は環境変数を渡す例。理想的にはコネクションの確立の確認のためのイベントは、startup eventのような箇所で実行できる様になっていると嬉しい。
まとめ
- コード生成のN+1を倒したい
- bulk actionの1つの例としてRPCサーバーを立てる方法もある
- 流用してなにか他のものにも使えるかもしれない