mypyのdmypyの実装のされ方を覗いてみた
mypyのdmypyの実装の仕方を把握する。知りたいことは裏側でのprocessの管理の仕方とクライアントとサーバー間のデータのやり取りの仕方。
準備
$ git clone --depth 1 git@github.com:python/mypy
エントリーポイントの把握
インストールされるコマンドの確認(setup.py)
setup(name='mypy', ... entry_points={'console_scripts': ['mypy=mypy.__main__:console_entry', 'stubgen=mypy.stubgen:main', 'dmypy=mypy.dmypy:main', ]}, data_files=data_files, ...
mypy.dmypy:main()
がエントリーポイント実際のファイル。
main.py
ほとんどargparseを触っているだけ。中の実装もサブコマンドを定義するという上では必要十分そうな感じ。
$ grep -A 2 "@action" dmypy.py # @action(subparse) decorator. parser = argparse.ArgumentParser(description="Client for mypy daemon mode", -- @action(start_parser) def do_start(args: argparse.Namespace) -> None: """Start daemon (it must not already be running). -- @action(restart_parser) def do_restart(args: argparse.Namespace, allow_sources: bool = False) -> None: """Restart daemon (it may or may not be running; but not hanging). -- @action(run_parser) def do_run(args: argparse.Namespace) -> None: """Do a check, starting (or restarting) the daemon as necessary -- @action(status_parser) def do_status(args: argparse.Namespace) -> None: """Print daemon status. -- @action(stop_parser) def do_stop(args: argparse.Namespace) -> None: """Stop daemon via a 'stop' request.""" -- @action(kill_parser) def do_kill(args: argparse.Namespace) -> None: """Kill daemon process with SIGKILL.""" -- @action(check_parser) def do_check(args: argparse.Namespace) -> None: """Ask the daemon to check a list of files.""" -- @action(recheck_parser) def do_recheck(args: argparse.Namespace) -> None: """Ask the daemon to check the same list of files it checked most recently. -- @action(hang_parser) def do_hang(args: argparse.Namespace) -> None: """Hang for 100 seconds, as a debug hack.""" -- @action(daemon_parser) def do_daemon(args: argparse.Namespace) -> None: """Serve requests in the foreground.""" -- @action(help_parser) def do_help(args: argparse.Namespace) -> None: """Print full help (same as dmypy --help)."""
興味の有りそうなコマンドを見てみる
- status
- start
- run
- stop
- restart
- kill
- daemon
ほとんど全部だった。processの開始はstart,run,restart。バックグラウンドジョブではなくフォアグラウンドジョブとして立ち上げるにはdaemon。
status
statushの実装はdo_status。単純にまとめると以下
- read_status
- check_status
- request (おそらくバックグラウンドジョブとの通信)
@action(status_parser) def do_status(args: argparse.Namespace) -> None: """Print daemon status. This verifies that it is responsive to requests. """ status = read_status() if args.verbose: show_stats(status) # Both check_status() and request() may raise BadStatus, # which will be handled by main(). check_status(status) response = request('status', timeout=5) if args.verbose or 'error' in response: show_stats(response) if 'error' in response: sys.exit("Daemon is stuck; consider %s kill" % sys.argv[0]) print("Daemon is up and running")
read_statusがどういう実装なのかと言うのが気になるところ。
def get_status() -> Tuple[int, str]: """Read status file and check if the process is alive. Return (pid, sockname) on success. Raise BadStatus if something's wrong. """ data = read_status() return check_status(data)
なるほど。
def read_status() -> Dict[str, object]: """Read status file. Raise BadStatus if the status file doesn't exist or contains invalid JSON or the JSON is not a dict. """ if not os.path.isfile(STATUS_FILE): raise BadStatus("No status file found") with open(STATUS_FILE) as f: try: data = json.load(f) except Exception as err: raise BadStatus("Malformed status file (not JSON)") if not isinstance(data, dict): raise BadStatus("Invalid status file (not a dict)") return data
ちなみに見るファイルは.dmypy.jsonらしい
from mypy.dmypy_util import STATUS_FILE, receive # dmypy_util.py STATUS_FILE = '.dmypy.json'
こういう感じのJSONが格納される。結構原始的。
.dmypy.json
{ "pid": 8995, "sockname": "/tmp/tmptadv4ddn/dmypy.sock" }
終わったらcheck_status()に渡される。
- JSON中にpidがなかったらBadStatus
- 取得したpidのprocessに対して0を送っている。
def check_status(data: Dict[str, Any]) -> Tuple[int, str]: """Check if the process is alive. Return (pid, sockname) on success. Raise BadStatus if something's wrong. """ if 'pid' not in data: raise BadStatus("Invalid status file (no pid field)") pid = data['pid'] if not isinstance(pid, int): raise BadStatus("pid field is not an int") try: os.kill(pid, 0) except OSError as err: raise BadStatus("Daemon has died") if 'sockname' not in data: raise BadStatus("Invalid status file (no sockname field)") sockname = data['sockname'] if not isinstance(sockname, str): raise BadStatus("sockname field is not a string") return pid, sockname
kill -0
killに渡すsignalの一覧に0はない。
$ kill -l 1) SIGHUP 2) SIGINT 3) SIGQUIT 4) SIGILL 5) SIGTRAP 6) SIGABRT 7) SIGBUS 8) SIGFPE 9) SIGKILL 10) SIGUSR1 11) SIGSEGV 12) SIGUSR2 13) SIGPIPE 14) SIGALRM 15) SIGTERM 16) SIGSTKFLT 17) SIGCHLD 18) SIGCONT 19) SIGSTOP 20) SIGTSTP 21) SIGTTIN 22) SIGTTOU 23) SIGURG 24) SIGXCPU 25) SIGXFSZ 26) SIGVTALRM 27) SIGPROF 28) SIGWINCH 29) SIGIO 30) SIGPWR 31) SIGSYS 34) SIGRTMIN 35) SIGRTMIN+1 36) SIGRTMIN+2 37) SIGRTMIN+3 38) SIGRTMIN+4 39) SIGRTMIN+5 40) SIGRTMIN+6 41) SIGRTMIN+7 42) SIGRTMIN+8 43) SIGRTMIN+9 44) SIGRTMIN+10 45) SIGRTMIN+11 46) SIGRTMIN+12 47) SIGRTMIN+13 48) SIGRTMIN+14 49) SIGRTMIN+15 50) SIGRTMAX-14 51) SIGRTMAX-13 52) SIGRTMAX-12 53) SIGRTMAX-11 54) SIGRTMAX-10 55) SIGRTMAX-9 56) SIGRTMAX-8 57) SIGRTMAX-7 58) SIGRTMAX-6 59) SIGRTMAX-5 60) SIGRTMAX-4 61) SIGRTMAX-3 62) SIGRTMAX-2 63) SIGRTMAX-1 64) SIGRTMAX
ただ、これが何かはmanに書いてある。processの存在確認に使われる。
$ man 2 kill
If sig is 0, then no signal is sent, but existence and permission checks are still performed; this can be used to check for the existence of a process ID or process group ID that the caller is per‐ mitted to signal.
request()の実装
内部的にはunixドメインソケットがあるだけ(.dmypy.jsonのsocket_nameを見ればわかる通り)。これにJSONをsendしている。
def request(command: str, *, timeout: Optional[float] = None, **kwds: object) -> Dict[str, Any]: """Send a request to the daemon. Return the JSON dict with the response. Raise BadStatus if there is something wrong with the status file or if the process whose pid is in the status file has died. Return {'error': <message>} if a socket operation or receive() raised OSError. This covers cases such as connection refused or closed prematurely as well as invalid JSON received. """ args = dict(kwds) args.update(command=command) bdata = json.dumps(args).encode('utf8') pid, sockname = get_status() sock = socket.socket(socket.AF_UNIX) if timeout is not None: sock.settimeout(timeout) try: sock.connect(sockname) sock.sendall(bdata) sock.shutdown(socket.SHUT_WR) response = receive(sock) except OSError as err: return {'error': str(err)} # TODO: Other errors, e.g. ValueError, UnicodeError else: return response finally: sock.close()
この辺の定数はお約束感あるのだけれど。どこかで一度ソケットプログラミングとかの本などを読んでみると。
https://docs.python.jp/3/library/socket.html#socket.socket.shutdown
ちなみにここだけみてもどうやってデータが返ってきているかはわからない(戻り値はdictなのに)。receiveも覗かないとダメ。 雑にデータを取り出して取りきったらJSONとしてdeserializeしているだけですね。
mypy/dmypy_util.py
def receive(sock: socket.socket) -> Any: """Receive JSON data from a socket until EOF. Raise a subclass of OSError if there's a socket exception. Raise OSError if the data received is not valid JSON or if it is not a dict. """ bdata = bytearray() while True: more = sock.recv(100000) if not more: break bdata.extend(more) if not bdata: raise OSError("No data received") try: data = json.loads(bdata.decode('utf8')) except Exception: raise OSError("Data received is not valid JSON") if not isinstance(data, dict): raise OSError("Data received is not a dict (%s)" % str(type(data))) return data
start
clientの方を覗いた経験から幾つか振る舞いが予想できる。おそらく以下のことをしている。
@action(start_parser) def do_start(args: argparse.Namespace) -> None: """Start daemon (it must not already be running). This is where mypy flags are set from the command line. Setting flags is a bit awkward; you have to use e.g.: dmypy start -- --strict since we don't want to duplicate mypy's huge list of flags. """ try: get_status() except BadStatus as err: # Bad or missing status file or dead process; good to start. pass else: sys.exit("Daemon is still alive") start_server(args)
存在チェックにBadStatusのelseを使っているのはわかりやすい(既に立ち上がっているならget_statusは通るはずで。ファイルが存在しなければBadStatusになる)。こういう心配りの欠けたコマンドを書いてしまって使いづらいやつになったりする。
内部的にはほぼほぼstart_server。
def start_server(args: argparse.Namespace, allow_sources: bool = False) -> None: """Start the server from command arguments and wait for it.""" # Lazy import so this import doesn't slow down other commands. from mypy.dmypy_server import daemonize, Server, process_start_options if daemonize(Server(process_start_options(args.flags, allow_sources), timeout=args.timeout).serve, args.log_file) != 0: sys.exit(1) wait_for_server()
daemonizeも自分で実装しているっぽい。なるほど。lazy importはまぁそうだよねというかんじ(なんでserver,client形式にしてbagroundにserverを立ち上げる形のコマンドにしたのかという話だし)。
wait_for_serverだけ先に見ておく。裏側のdaemonの立ち上がりを確認してから終了したいので、立ち上がるまで待つという感じ。このようにすると、このクライアントのstartコマンドの終了=バックグラウンドプロセスの立ち上がりとなるのでとても具合が良い。
def wait_for_server(timeout: float = 5.0) -> None: """Wait until the server is up. Exit if it doesn't happen within the timeout. """ endtime = time.time() + timeout while time.time() < endtime: try: data = read_status() except BadStatus: # If the file isn't there yet, retry later. time.sleep(0.1) continue # If the file's content is bogus or the process is dead, fail. pid, sockname = check_status(data) print("Daemon started") return sys.exit("Timed out waiting for daemon to start")
やっていることは簡単でtimeout(5秒)になるまで0.1秒ずつ待ちながらポーリングしている形。正常に立ち上がったら.dmypy.jsonができるはずだし。BadStatusは返ってこなくなるはずなので。
daemonの起動
daemonizeの部分、そういえばこれもけっこうたまに見る慣習(的な知識になってしまっている)。
if daemonize(Server(process_start_options(args.flags, allow_sources), timeout=args.timeout).serve, args.log_file) != 0: sys.exit(1)
終了ステータスをそのまま返しているのかな。なので0以外だったら失敗。
実際に中のコードを覗いてみる。これ説明するのも面倒だけれど。いわゆるdaemonizeの処理。コメントにstackoverflowへのリンクが貼ってあるのが良い。つまりそういうこと。
PEPにもそれ関連の提案があるらしい(PEP3143)。statusはdeferredなので延期されてしまってるけれど(詳しいPEPのstatus遷移はPEP1に)
unixドメインソケットが出てきたときも思ったけれど。そういえばwindowsには対応していなさそう。
def daemonize(func: Callable[[], None], log_file: Optional[str] = None) -> int: """Arrange to call func() in a grandchild of the current process. Return 0 for success, exit status for failure, negative if subprocess killed by signal. """ # See https://stackoverflow.com/questions/473620/how-do-you-create-a-daemon-in-python if sys.platform == 'win32': raise ValueError('Mypy daemon is not supported on Windows yet') sys.stdout.flush() sys.stderr.flush() pid = os.fork() if pid: # Parent process: wait for child in case things go bad there. npid, sts = os.waitpid(pid, 0) sig = sts & 0xff if sig: print("Child killed by signal", sig) return -sig sts = sts >> 8 if sts: print("Child exit status", sts) return sts # Child process: do a bunch of UNIX stuff and then fork a grandchild. try: os.setsid() # Detach controlling terminal os.umask(0o27) devnull = os.open('/dev/null', os.O_RDWR) os.dup2(devnull, 0) os.dup2(devnull, 1) os.dup2(devnull, 2) os.close(devnull) pid = os.fork() if pid: # Child is done, exit to parent. os._exit(0) # Grandchild: run the server. if log_file: sys.stdout = sys.stderr = open(log_file, 'a', buffering=1) fd = sys.stdout.fileno() os.dup2(fd, 2) os.dup2(fd, 1) func() finally: # Make sure we never get back into the caller. os._exit(1)
やっていることはいわゆるdouble forkと呼ばれているもの。このあたりは絶対自分で書いてメンテしたくないので標準ライブラリに入って欲しい。 PEP3143の実装提案自体はpypiにpython-daemonとして存在するのでしばらくはこちらを使うと良さそう。
Serverの実装
daemonizeに関しては脇道だったので本体の実装の方を見る。これはmypy.dmypy_serverに存在する。とりあえず今までのコードを見てきたイメージとして入力としてJSONを受取りJSONを返すというようなもの。 router的なものが存在して、分岐した後にそれぞれのコマンドに対応したhandler的なものが存在する感じ何じゃないかと思う。
dmypy_server.py
class Server: # NOTE: the instance is constructed in the parent process but # serve() is called in the grandchild (by daemonize()). # ... def serve(self) -> None: """Serve requests, synchronously (no thread or fork).""" try: sock = self.create_listening_socket() if self.timeout is not None: sock.settimeout(self.timeout) try: with open(STATUS_FILE, 'w') as f: json.dump({'pid': os.getpid(), 'sockname': sock.getsockname()}, f) f.write('\n') # I like my JSON with trailing newline while True: try: conn, addr = sock.accept() except socket.timeout: print("Exiting due to inactivity.") reset_global_state() sys.exit(0) try: data = receive(conn) except OSError as err: conn.close() # Maybe the client hung up continue resp = {} # type: Dict[str, Any] if 'command' not in data: resp = {'error': "No command found in request"} else: command = data['command'] if not isinstance(command, str): resp = {'error': "Command is not a string"} else: command = data.pop('command') try: resp = self.run_command(command, data) except Exception: # If we are crashing, report the crash to the client tb = traceback.format_exception(*sys.exc_info()) resp = {'error': "Daemon crashed!\n" + "".join(tb)} conn.sendall(json.dumps(resp).encode('utf8')) raise try: conn.sendall(json.dumps(resp).encode('utf8')) except OSError as err: pass # Maybe the client hung up conn.close() if command == 'stop': sock.close() reset_global_state() sys.exit(0) finally: os.unlink(STATUS_FILE) finally: shutil.rmtree(self.sock_directory) exc_info = sys.exc_info() if exc_info[0] and exc_info[0] is not SystemExit: traceback.print_exception(*exc_info)
至って普通のserverのコード。細かい話は後で書く。
unixドメインソケットの生成
通信用のソケットの生成も先程のServerクラスのcreate_listening_socket()でやっている。ここで返したsocketの情報を元に.dmypy.jsonが作られる(serve()) ついでにtry-finallyで対象のtemporary directoryはけされる(serve()のshutil.rmtree)
class Server: # ... def create_listening_socket(self) -> socket.socket: """Create the socket and set it up for listening.""" self.sock_directory = tempfile.mkdtemp() sockname = os.path.join(self.sock_directory, SOCKET_NAME) sock = socket.socket(socket.AF_UNIX) sock.bind(sockname) sock.listen(1) return sock
routing部分
routing部分は先程のServerクラスのrun_command()でやっている。単純にcmd_のprefixを付けて、その名前のメソッドを呼ぶだけ。よくある形。 (ちなみにprefix付けずに呼んじゃえば良いじゃんという話があるかもしれないけれど。prefix付けていると「提供しているメソッドの一覧」とか返すのに便利だったりする(他にもこのクラスがメソッドを持っていた場合面倒という話))
class Server: # ... def run_command(self, command: str, data: Mapping[str, object]) -> Dict[str, object]: """Run a specific command from the registry.""" key = 'cmd_' + command method = getattr(self.__class__, key, None) if method is None: return {'error': "Unrecognized command '%s'" % command} else: return method(self, **data)
stop
stopはと言うと、stopコマンドを呼んでいるだけ。
@action(stop_parser) def do_stop(args: argparse.Namespace) -> None: """Stop daemon via a 'stop' request.""" # May raise BadStatus, which will be handled by main(). response = request('stop', timeout=5) if response: show_stats(response) sys.exit("Daemon is stuck; consider %s kill" % sys.argv[0]) else: print("Daemon stopped")
daemon
daemonで最後にする。これはバックグラウンドで通常作られるdaemonをフォアグラウンドで起動したいと言うようなコマンド。中を見たらすぐわかるという感じ。
@action(daemon_parser) def do_daemon(args: argparse.Namespace) -> None: """Serve requests in the foreground.""" # Lazy import so this import doesn't slow down other commands. from mypy.dmypy_server import Server, process_start_options Server(process_start_options(args.flags, allow_sources=False), timeout=args.timeout).serve()
疲れたのでおしまい。