processを立ち上げまくる機会を避けるための簡単なRPCのメモ
時折、特定の前処理の一部として、1つのファイルを受け取る1つのスクリプトが何個も連鎖することがある。このようなときに都度都度processが立ち上がると、ほとんどの時間がimport timeで辛いということが起きる事がある。
以前書いた記事の中では、この辺が近いかもしれない。
まぁ、まじめにパイプラインを作っても良いけれど、そこまで頑張らなくても良い気がする。
最初はいちいちwebAPIを定義しようと思ったが、operationId1を指定してのアクセスができれば良いだけなので、それはもうRPCだよなーという考えになった。それ用の作業のためにweb APIを作るのも良いが、まぁ手軽なのはRPCだよね。ということでメモ。
例の紹介
これからの例では以下の関数を公開している。
- pow() -- べき乗
- add() -- 和
- mul() -- 積
ただし、使い所は、build scriptの一部という想定なので、以下の点が特殊かもしれない。
- clientがsub processでserverを立ち上げる
- clientの作業が全部終わったら、serverを終了させる
- server, clientはお互いが通信可能になるまで待つ
通信可能かのチェックはファイルの存在の有無でやっている。これが一番手軽そうだった。
xmlrpc
標準ライブラリだけの範囲でやるならxmlrpcがある。とはいえ、複数のclientが同一のserverに同時にアクセスするとエラーが起きえたり、ドキュメントにも書いてあるとおりに、悪意を持ったアクセスに対しては安全ではない。
まぁ一応標準ライブラリの範囲なので。
01server.py
import sys import pathlib import time from xmlrpc.server import SimpleXMLRPCServer from xmlrpc.server import SimpleXMLRPCRequestHandler from handofcats import as_command # Restrict to a particular path. class RequestHandler(SimpleXMLRPCRequestHandler): rpc_paths = ("/RPC2",) # Create server @as_command def run(*, port: int = 8888, sentinel: str = "") -> None: if sentinel: p = pathlib.Path(sentinel) ok = False for wait_time in [0.1, 0.2, 0.2, 0.4, 0.8, 1.6, 3.2, 6.4]: if not p.exists(): print(f" wait ... {wait_time}", file=sys.stderr) time.sleep(wait_time) continue print(f"ack {sentinel}", file=sys.stderr) p.unlink() ok = True break if not ok: raise RuntimeError(f"timeout, f{sentinel}, {time.time()-st}") print(f"listen ... {port}", file=sys.stderr) with SimpleXMLRPCServer( ("localhost", port), requestHandler=RequestHandler ) as server: server.register_introspection_functions() # Register pow() function; this will use the value of # pow.__name__ as the name, which is just 'pow'. server.register_function(pow) # Register a function under a different name def adder_function(x, y): return x + y server.register_function(adder_function, "add") # Register an instance; all the methods of the instance are # published as XML-RPC methods (in this case, just 'mul'). class MyFuncs: def mul(self, x, y): return x * y server.register_instance(MyFuncs()) # Run the server's main loop server.serve_forever()
02use.py
import sys import tempfile import pathlib import time import subprocess import logging import xmlrpc.client import contextlib from handofcats import as_command logger = logging.getLogger(__name__) def _find_free_port(host: str = "0.0.0.0") -> int: import socket with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.bind((host, 0)) sock.listen(1) name, port = sock.getsockname() # type: t.Tuple[str, int] return port @contextlib.contextmanager def connect_server(*, sentinel: str, port: int): cmd = [sys.executable, "01server.py", "--port", str(port), "--sentinel", sentinel] server_process = subprocess.Popen(cmd) try: p = pathlib.Path(sentinel) with p.open("w"): pass ok = False st = time.time() for wait_time in [0.1, 0.2, 0.2, 0.4, 0.8, 1.6, 3.2, 6.4]: if p.exists(): print(f" wait ... {wait_time}", file=sys.stderr) time.sleep(wait_time) continue ok = True break if not ok: raise RuntimeError(f"timeout, f{sentinel}, {time.time()-st}") yield server_process finally: try: server_process.terminate() server_process.wait() except Exception as e: logger.info(str(e), exc_info=True) @as_command def run() -> None: sentinel = tempfile.mktemp() port = _find_free_port() with connect_server(sentinel=sentinel, port=port): print(f"connect ... {port}", file=sys.stderr) s = xmlrpc.client.ServerProxy(f"http://localhost:{port}") print(s.pow(2, 3)) # Returns 2**3 = 8 print(s.add(2, 3)) # Returns 5 print(s.mul(5, 2)) # Returns 5*2 = 10 # Print list of available methods print(s.system.listMethods())
実行してみる。
python 02use.py ** handofcats.customize: DEBUG=1, activate logging ** wait ... 0.1 wait ... 0.2 ** handofcats.customize: DEBUG=1, activate logging ** ack /var/folders/b7/2rk7xp2d0hb2r21zbzjwxb_m0000gn/T/tmp9bdn0obj listen ... 52158 connect ... 52158 127.0.0.1 - - [07/Nov/2020 18:36:33] "POST /RPC2 HTTP/1.1" 200 - 8 127.0.0.1 - - [07/Nov/2020 18:36:33] "POST /RPC2 HTTP/1.1" 200 - 5 127.0.0.1 - - [07/Nov/2020 18:36:33] "POST /RPC2 HTTP/1.1" 200 - 10 127.0.0.1 - - [07/Nov/2020 18:36:33] "POST /RPC2 HTTP/1.1" 200 -
はい。
tinyrpc
asyncioに対応しているわけではないが、同期的なrequest/responseで良いのであればtinyrpcが便利かもしれない。
なんとなくzmqを使っている。
03server.py
import sys import pathlib import time import zmq from tinyrpc.server import RPCServer from tinyrpc.dispatch import RPCDispatcher from tinyrpc.protocols.jsonrpc import JSONRPCProtocol from tinyrpc.transports.zmq import ZmqServerTransport from handofcats import as_command ctx = zmq.Context() dispatcher = RPCDispatcher() @dispatcher.public(name="add") def adder_function(x, y): return x + y dispatcher.public(pow) @dispatcher.public def mul(x, y): return x * y @dispatcher.public def list_methods(): return list(dispatcher.method_map.keys()) @as_command def run(*, port: int = 8888, sentinel: str = "") -> None: if sentinel: p = pathlib.Path(sentinel) ok = False for wait_time in [0.1, 0.2, 0.2, 0.4, 0.8, 1.6, 3.2, 6.4]: if not p.exists(): print(f" wait ... {wait_time}", file=sys.stderr) time.sleep(wait_time) continue print(f"ack {sentinel}", file=sys.stderr) p.unlink() ok = True break if not ok: raise RuntimeError(f"timeout, f{sentinel}, {time.time()-st}") print(f"listen ... {port}", file=sys.stderr) transport = ZmqServerTransport.create(ctx, f"tcp://127.0.0.1:{port}") rpc_server = RPCServer(transport, JSONRPCProtocol(), dispatcher) rpc_server.serve_forever()
clientはこんな感じ。clientというかserverを利用するスクリプト。
x03client.py
from __future__ import annotations import typing as t import sys import tempfile import pathlib import time import subprocess import logging import contextlib import zmq from tinyrpc import RPCClient from tinyrpc.protocols.jsonrpc import JSONRPCProtocol from tinyrpc.transports.zmq import ZmqClientTransport from handofcats import as_command logger = logging.getLogger(__name__) def _find_free_port(host: str = "0.0.0.0") -> int: import socket with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.bind((host, 0)) sock.listen(1) name, port = sock.getsockname() # type: t.Tuple[str, int] return port @contextlib.contextmanager def connect_server( cmd: t.List[str], *, sentinel: str, port: int ) -> t.Iterator[t.subprocess.Popen]: assert sentinel in cmd assert str(port) in cmd server_process = subprocess.Popen(cmd) try: p = pathlib.Path(sentinel) with p.open("w"): pass ok = False st = time.time() for wait_time in [0.1, 0.2, 0.2, 0.4, 0.8, 1.6, 3.2, 6.4]: if p.exists(): print(f" wait ... {wait_time}", file=sys.stderr) time.sleep(wait_time) continue ok = True break if not ok: raise RuntimeError(f"timeout, f{sentinel}, {time.time()-st}") yield server_process finally: try: server_process.terminate() server_process.wait() except Exception as e: logger.info(str(e), exc_info=True) @as_command def run() -> None: ctx = zmq.Context() port = _find_free_port() sentinel = tempfile.mktemp() cmd = [sys.executable, "03server.py", "--port", str(port), "--sentinel", sentinel] with connect_server(cmd, sentinel=sentinel, port=port): print(f"connect ... {port}", file=sys.stderr) rpc_client = RPCClient( JSONRPCProtocol(), ZmqClientTransport.create(ctx, f"tcp://127.0.0.1:{port}") ) s = rpc_client.get_proxy() print("Returns 2**3 = 8", "->", s.pow(2, 3)) print("Returns 5", "->", s.add(2, 3)) print("Returns 5*2 = 10", "->", s.mul(5, 2)) # Print list of available methods print(s.list_methods())
実行結果は同様。
** handofcats.customize: DEBUG=1, activate logging ** wait ... 0.1 wait ... 0.2 ** handofcats.customize: DEBUG=1, activate logging ** ack /var/folders/b7/2rk7xp2d0hb2r21zbzjwxb_m0000gn/T/tmp9iwr0h0u listen ... 52196 connect ... 52196 Returns 2**3 = 8 -> 8 Returns 5 -> 5 Returns 5*2 = 10 -> 10 ['add', 'pow', 'mul', 'list_methods']
なぜgrpcなどにしないか?
まぁしても良いのだけど、なぜgrpcなどにしないかというと、なんでだろうな? 一番近いのは、提供したいサービスの型が決まっていないようなものだからかもしれない。例えば、あるpandasのdataframeがあったとして、それに対してgroupbyをするようなサービスを提供したい。inputとなるデータの構造は決まっていない。
例えば、以下のようなよくあるデータセットを元に考えるイメージ2
このようなときに、grpcはあまり適切ではない。
本当に使えない?
いや、まぁファイル名だけを返すような形にして使っても良いかもしれない。それなら使えるかも? 加えて、変換処理などの場合には、変換結果自体に興味があることは少ない。inputもoutputもurlを含んだ何か程度で十分なのではないか?それだったらgrpcを使っても良いかもしれない。まぁ何でも良いかも。
クライアントは変換結果自体に興味はないので、message packを使ったmprpcなどもありはするのだけれど、こちらを使う価値はあんまり無さそう。そんなわけでtinyrpcくらいでちょうど良い気もしている。
まとめ
- xmlrpc, tinyrpcを使っていい感じにprocessをまとめる方法のメモ
- ハンドシェイク的なチェックはファイルの存在有無が手軽 (複数インスタンスは不可能)
- 今回の意味では、クライアントはrequest/responseのインターフェイスとして変換結果に興味がない