mypyのdmypyの実装のされ方を覗いてみた

mypyのdmypyの実装の仕方を把握する。知りたいことは裏側でのprocessの管理の仕方とクライアントとサーバー間のデータのやり取りの仕方。

準備

$ git clone --depth 1 git@github.com:python/mypy

エントリーポイントの把握

インストールされるコマンドの確認(setup.py)

setup(name='mypy',
...
      entry_points={'console_scripts': ['mypy=mypy.__main__:console_entry',
                                        'stubgen=mypy.stubgen:main',
                                        'dmypy=mypy.dmypy:main',
                                        ]},
      data_files=data_files,
...

mypy.dmypy:main()がエントリーポイント実際のファイル

main.py

ほとんどargparseを触っているだけ。中の実装もサブコマンドを定義するという上では必要十分そうな感じ。

$ grep -A 2 "@action" dmypy.py
# @action(subparse) decorator.

parser = argparse.ArgumentParser(description="Client for mypy daemon mode",
--
@action(start_parser)
def do_start(args: argparse.Namespace) -> None:
    """Start daemon (it must not already be running).
--
@action(restart_parser)
def do_restart(args: argparse.Namespace, allow_sources: bool = False) -> None:
    """Restart daemon (it may or may not be running; but not hanging).
--
@action(run_parser)
def do_run(args: argparse.Namespace) -> None:
    """Do a check, starting (or restarting) the daemon as necessary
--
@action(status_parser)
def do_status(args: argparse.Namespace) -> None:
    """Print daemon status.
--
@action(stop_parser)
def do_stop(args: argparse.Namespace) -> None:
    """Stop daemon via a 'stop' request."""
--
@action(kill_parser)
def do_kill(args: argparse.Namespace) -> None:
    """Kill daemon process with SIGKILL."""
--
@action(check_parser)
def do_check(args: argparse.Namespace) -> None:
    """Ask the daemon to check a list of files."""
--
@action(recheck_parser)
def do_recheck(args: argparse.Namespace) -> None:
    """Ask the daemon to check the same list of files it checked most recently.
--
@action(hang_parser)
def do_hang(args: argparse.Namespace) -> None:
    """Hang for 100 seconds, as a debug hack."""
--
@action(daemon_parser)
def do_daemon(args: argparse.Namespace) -> None:
    """Serve requests in the foreground."""
--
@action(help_parser)
def do_help(args: argparse.Namespace) -> None:
    """Print full help (same as dmypy --help)."""

興味の有りそうなコマンドを見てみる

  • status
  • start
  • run
  • stop
  • restart
  • kill
  • daemon

ほとんど全部だった。processの開始はstart,run,restart。バックグラウンドジョブではなくフォアグラウンドジョブとして立ち上げるにはdaemon

status

statushの実装はdo_status。単純にまとめると以下

  1. read_status
  2. check_status
  3. request (おそらくバックグラウンドジョブとの通信)
@action(status_parser)
def do_status(args: argparse.Namespace) -> None:
    """Print daemon status.

    This verifies that it is responsive to requests.
    """
    status = read_status()
    if args.verbose:
        show_stats(status)
    # Both check_status() and request() may raise BadStatus,
    # which will be handled by main().
    check_status(status)
    response = request('status', timeout=5)
    if args.verbose or 'error' in response:
        show_stats(response)
    if 'error' in response:
        sys.exit("Daemon is stuck; consider %s kill" % sys.argv[0])
    print("Daemon is up and running")

read_statusがどういう実装なのかと言うのが気になるところ。

def get_status() -> Tuple[int, str]:
    """Read status file and check if the process is alive.

    Return (pid, sockname) on success.

    Raise BadStatus if something's wrong.
    """
    data = read_status()
    return check_status(data)

なるほど。

def read_status() -> Dict[str, object]:
    """Read status file.

    Raise BadStatus if the status file doesn't exist or contains
    invalid JSON or the JSON is not a dict.
    """
    if not os.path.isfile(STATUS_FILE):
        raise BadStatus("No status file found")
    with open(STATUS_FILE) as f:
        try:
            data = json.load(f)
        except Exception as err:
            raise BadStatus("Malformed status file (not JSON)")
    if not isinstance(data, dict):
        raise BadStatus("Invalid status file (not a dict)")
    return data

ちなみに見るファイルは.dmypy.jsonらしい

from mypy.dmypy_util import STATUS_FILE, receive

# dmypy_util.py
STATUS_FILE = '.dmypy.json'

こういう感じのJSONが格納される。結構原始的。

.dmypy.json

{
  "pid": 8995,
  "sockname": "/tmp/tmptadv4ddn/dmypy.sock"
}

終わったらcheck_status()に渡される。

  • JSON中にpidがなかったらBadStatus
  • 取得したpidのprocessに対して0を送っている。
def check_status(data: Dict[str, Any]) -> Tuple[int, str]:
    """Check if the process is alive.

    Return (pid, sockname) on success.

    Raise BadStatus if something's wrong.
    """
    if 'pid' not in data:
        raise BadStatus("Invalid status file (no pid field)")
    pid = data['pid']
    if not isinstance(pid, int):
        raise BadStatus("pid field is not an int")
    try:
        os.kill(pid, 0)
    except OSError as err:
        raise BadStatus("Daemon has died")
    if 'sockname' not in data:
        raise BadStatus("Invalid status file (no sockname field)")
    sockname = data['sockname']
    if not isinstance(sockname, str):
        raise BadStatus("sockname field is not a string")
    return pid, sockname

kill -0

killに渡すsignalの一覧に0はない。

$ kill -l
 1) SIGHUP   2) SIGINT   3) SIGQUIT  4) SIGILL   5) SIGTRAP
 6) SIGABRT  7) SIGBUS   8) SIGFPE   9) SIGKILL 10) SIGUSR1
11) SIGSEGV 12) SIGUSR2 13) SIGPIPE 14) SIGALRM 15) SIGTERM
16) SIGSTKFLT   17) SIGCHLD 18) SIGCONT 19) SIGSTOP 20) SIGTSTP
21) SIGTTIN 22) SIGTTOU 23) SIGURG  24) SIGXCPU 25) SIGXFSZ
26) SIGVTALRM   27) SIGPROF 28) SIGWINCH    29) SIGIO   30) SIGPWR
31) SIGSYS  34) SIGRTMIN    35) SIGRTMIN+1  36) SIGRTMIN+2  37) SIGRTMIN+3
38) SIGRTMIN+4  39) SIGRTMIN+5  40) SIGRTMIN+6  41) SIGRTMIN+7  42) SIGRTMIN+8
43) SIGRTMIN+9  44) SIGRTMIN+10 45) SIGRTMIN+11 46) SIGRTMIN+12 47) SIGRTMIN+13
48) SIGRTMIN+14 49) SIGRTMIN+15 50) SIGRTMAX-14 51) SIGRTMAX-13 52) SIGRTMAX-12
53) SIGRTMAX-11 54) SIGRTMAX-10 55) SIGRTMAX-9  56) SIGRTMAX-8  57) SIGRTMAX-7
58) SIGRTMAX-6  59) SIGRTMAX-5  60) SIGRTMAX-4  61) SIGRTMAX-3  62) SIGRTMAX-2
63) SIGRTMAX-1  64) SIGRTMAX    

ただ、これが何かはmanに書いてある。processの存在確認に使われる。

$ man 2 kill
  If  sig is 0, then no signal is sent, but existence and permission checks are still performed; this can be used to check for the existence of a process ID or process group ID that the caller is per‐
  mitted to signal.

request()の実装

内部的にはunixドメインソケットがあるだけ(.dmypy.jsonのsocket_nameを見ればわかる通り)。これにJSONをsendしている。

def request(command: str, *, timeout: Optional[float] = None,
            **kwds: object) -> Dict[str, Any]:
    """Send a request to the daemon.

    Return the JSON dict with the response.

    Raise BadStatus if there is something wrong with the status file
    or if the process whose pid is in the status file has died.

    Return {'error': <message>} if a socket operation or receive()
    raised OSError.  This covers cases such as connection refused or
    closed prematurely as well as invalid JSON received.
    """
    args = dict(kwds)
    args.update(command=command)
    bdata = json.dumps(args).encode('utf8')
    pid, sockname = get_status()
    sock = socket.socket(socket.AF_UNIX)
    if timeout is not None:
        sock.settimeout(timeout)
    try:
        sock.connect(sockname)
        sock.sendall(bdata)
        sock.shutdown(socket.SHUT_WR)
        response = receive(sock)
    except OSError as err:
        return {'error': str(err)}
    # TODO: Other errors, e.g. ValueError, UnicodeError
    else:
        return response
    finally:
        sock.close()

この辺の定数はお約束感あるのだけれど。どこかで一度ソケットプログラミングとかの本などを読んでみると。

  • socket.AF_UNIX -- unixドメインソケット
  • socket.SHUT_WR -- write portのcloseみたいな感じ

https://docs.python.jp/3/library/socket.html#socket.socket.shutdown

ちなみにここだけみてもどうやってデータが返ってきているかはわからない(戻り値はdictなのに)。receiveも覗かないとダメ。 雑にデータを取り出して取りきったらJSONとしてdeserializeしているだけですね。

mypy/dmypy_util.py

def receive(sock: socket.socket) -> Any:
    """Receive JSON data from a socket until EOF.

    Raise a subclass of OSError if there's a socket exception.

    Raise OSError if the data received is not valid JSON or if it is
    not a dict.
    """
    bdata = bytearray()
    while True:
        more = sock.recv(100000)
        if not more:
            break
        bdata.extend(more)
    if not bdata:
        raise OSError("No data received")
    try:
        data = json.loads(bdata.decode('utf8'))
    except Exception:
        raise OSError("Data received is not valid JSON")
    if not isinstance(data, dict):
        raise OSError("Data received is not a dict (%s)" % str(type(data)))
    return data

start

clientの方を覗いた経験から幾つか振る舞いが予想できる。おそらく以下のことをしている。

  • unixドメインソケットを開いて待ち受ける
  • 自分自身のpidと開いたソケットの情報を.dmypy.jsonに保存する
  • 待受状態になる(入力はsocket)
@action(start_parser)
def do_start(args: argparse.Namespace) -> None:
    """Start daemon (it must not already be running).

    This is where mypy flags are set from the command line.

    Setting flags is a bit awkward; you have to use e.g.:

      dmypy start -- --strict

    since we don't want to duplicate mypy's huge list of flags.
    """
    try:
        get_status()
    except BadStatus as err:
        # Bad or missing status file or dead process; good to start.
        pass
    else:
        sys.exit("Daemon is still alive")
    start_server(args)

存在チェックにBadStatusのelseを使っているのはわかりやすい(既に立ち上がっているならget_statusは通るはずで。ファイルが存在しなければBadStatusになる)。こういう心配りの欠けたコマンドを書いてしまって使いづらいやつになったりする。

内部的にはほぼほぼstart_server。

def start_server(args: argparse.Namespace, allow_sources: bool = False) -> None:
    """Start the server from command arguments and wait for it."""
    # Lazy import so this import doesn't slow down other commands.
    from mypy.dmypy_server import daemonize, Server, process_start_options
    if daemonize(Server(process_start_options(args.flags, allow_sources),
                        timeout=args.timeout).serve,
                 args.log_file) != 0:
        sys.exit(1)
    wait_for_server()

daemonizeも自分で実装しているっぽい。なるほど。lazy importはまぁそうだよねというかんじ(なんでserver,client形式にしてbagroundにserverを立ち上げる形のコマンドにしたのかという話だし)。

wait_for_serverだけ先に見ておく。裏側のdaemonの立ち上がりを確認してから終了したいので、立ち上がるまで待つという感じ。このようにすると、このクライアントのstartコマンドの終了=バックグラウンドプロセスの立ち上がりとなるのでとても具合が良い。

def wait_for_server(timeout: float = 5.0) -> None:
    """Wait until the server is up.

    Exit if it doesn't happen within the timeout.
    """
    endtime = time.time() + timeout
    while time.time() < endtime:
        try:
            data = read_status()
        except BadStatus:
            # If the file isn't there yet, retry later.
            time.sleep(0.1)
            continue
        # If the file's content is bogus or the process is dead, fail.
        pid, sockname = check_status(data)
        print("Daemon started")
        return
    sys.exit("Timed out waiting for daemon to start")

やっていることは簡単でtimeout(5秒)になるまで0.1秒ずつ待ちながらポーリングしている形。正常に立ち上がったら.dmypy.jsonができるはずだし。BadStatusは返ってこなくなるはずなので。

daemonの起動

daemonizeの部分、そういえばこれもけっこうたまに見る慣習(的な知識になってしまっている)。

    if daemonize(Server(process_start_options(args.flags, allow_sources),
                        timeout=args.timeout).serve,
                 args.log_file) != 0:
        sys.exit(1)

終了ステータスをそのまま返しているのかな。なので0以外だったら失敗。

実際に中のコードを覗いてみる。これ説明するのも面倒だけれど。いわゆるdaemonizeの処理。コメントにstackoverflowへのリンクが貼ってあるのが良い。つまりそういうこと。

PEPにもそれ関連の提案があるらしい(PEP3143)。statusはdeferredなので延期されてしまってるけれど(詳しいPEPのstatus遷移はPEP1に)

unixドメインソケットが出てきたときも思ったけれど。そういえばwindowsには対応していなさそう。

def daemonize(func: Callable[[], None], log_file: Optional[str] = None) -> int:
    """Arrange to call func() in a grandchild of the current process.

    Return 0 for success, exit status for failure, negative if
    subprocess killed by signal.
    """
    # See https://stackoverflow.com/questions/473620/how-do-you-create-a-daemon-in-python
    if sys.platform == 'win32':
        raise ValueError('Mypy daemon is not supported on Windows yet')
    sys.stdout.flush()
    sys.stderr.flush()
    pid = os.fork()
    if pid:
        # Parent process: wait for child in case things go bad there.
        npid, sts = os.waitpid(pid, 0)
        sig = sts & 0xff
        if sig:
            print("Child killed by signal", sig)
            return -sig
        sts = sts >> 8
        if sts:
            print("Child exit status", sts)
        return sts
    # Child process: do a bunch of UNIX stuff and then fork a grandchild.
    try:
        os.setsid()  # Detach controlling terminal
        os.umask(0o27)
        devnull = os.open('/dev/null', os.O_RDWR)
        os.dup2(devnull, 0)
        os.dup2(devnull, 1)
        os.dup2(devnull, 2)
        os.close(devnull)
        pid = os.fork()
        if pid:
            # Child is done, exit to parent.
            os._exit(0)
        # Grandchild: run the server.
        if log_file:
            sys.stdout = sys.stderr = open(log_file, 'a', buffering=1)
            fd = sys.stdout.fileno()
            os.dup2(fd, 2)
            os.dup2(fd, 1)
        func()
    finally:
        # Make sure we never get back into the caller.
        os._exit(1)

やっていることはいわゆるdouble forkと呼ばれているもの。このあたりは絶対自分で書いてメンテしたくないので標準ライブラリに入って欲しい。 PEP3143の実装提案自体はpypipython-daemonとして存在するのでしばらくはこちらを使うと良さそう。

Serverの実装

daemonizeに関しては脇道だったので本体の実装の方を見る。これはmypy.dmypy_serverに存在する。とりあえず今までのコードを見てきたイメージとして入力としてJSONを受取りJSONを返すというようなもの。 router的なものが存在して、分岐した後にそれぞれのコマンドに対応したhandler的なものが存在する感じ何じゃないかと思う。

dmypy_server.py

class Server:

    # NOTE: the instance is constructed in the parent process but
    # serve() is called in the grandchild (by daemonize()).

# ...
    def serve(self) -> None:
        """Serve requests, synchronously (no thread or fork)."""
        try:
            sock = self.create_listening_socket()
            if self.timeout is not None:
                sock.settimeout(self.timeout)
            try:
                with open(STATUS_FILE, 'w') as f:
                    json.dump({'pid': os.getpid(), 'sockname': sock.getsockname()}, f)
                    f.write('\n')  # I like my JSON with trailing newline
                while True:
                    try:
                        conn, addr = sock.accept()
                    except socket.timeout:
                        print("Exiting due to inactivity.")
                        reset_global_state()
                        sys.exit(0)
                    try:
                        data = receive(conn)
                    except OSError as err:
                        conn.close()  # Maybe the client hung up
                        continue
                    resp = {}  # type: Dict[str, Any]
                    if 'command' not in data:
                        resp = {'error': "No command found in request"}
                    else:
                        command = data['command']
                        if not isinstance(command, str):
                            resp = {'error': "Command is not a string"}
                        else:
                            command = data.pop('command')
                            try:
                                resp = self.run_command(command, data)
                            except Exception:
                                # If we are crashing, report the crash to the client
                                tb = traceback.format_exception(*sys.exc_info())
                                resp = {'error': "Daemon crashed!\n" + "".join(tb)}
                                conn.sendall(json.dumps(resp).encode('utf8'))
                                raise
                    try:
                        conn.sendall(json.dumps(resp).encode('utf8'))
                    except OSError as err:
                        pass  # Maybe the client hung up
                    conn.close()
                    if command == 'stop':
                        sock.close()
                        reset_global_state()
                        sys.exit(0)
            finally:
                os.unlink(STATUS_FILE)
        finally:
            shutil.rmtree(self.sock_directory)
            exc_info = sys.exc_info()
            if exc_info[0] and exc_info[0] is not SystemExit:
                traceback.print_exception(*exc_info)

至って普通のserverのコード。細かい話は後で書く。

unixドメインソケットの生成

通信用のソケットの生成も先程のServerクラスのcreate_listening_socket()でやっている。ここで返したsocketの情報を元に.dmypy.jsonが作られる(serve()) ついでにtry-finallyで対象のtemporary directoryはけされる(serve()のshutil.rmtree)

class Server:
# ...
    def create_listening_socket(self) -> socket.socket:
        """Create the socket and set it up for listening."""
        self.sock_directory = tempfile.mkdtemp()
        sockname = os.path.join(self.sock_directory, SOCKET_NAME)
        sock = socket.socket(socket.AF_UNIX)
        sock.bind(sockname)
        sock.listen(1)
        return sock

routing部分

routing部分は先程のServerクラスのrun_command()でやっている。単純にcmd_のprefixを付けて、その名前のメソッドを呼ぶだけ。よくある形。 (ちなみにprefix付けずに呼んじゃえば良いじゃんという話があるかもしれないけれど。prefix付けていると「提供しているメソッドの一覧」とか返すのに便利だったりする(他にもこのクラスがメソッドを持っていた場合面倒という話))

class Server:
# ...
    def run_command(self, command: str, data: Mapping[str, object]) -> Dict[str, object]:
        """Run a specific command from the registry."""
        key = 'cmd_' + command
        method = getattr(self.__class__, key, None)
        if method is None:
            return {'error': "Unrecognized command '%s'" % command}
        else:
            return method(self, **data)

stop

stopはと言うと、stopコマンドを呼んでいるだけ。

@action(stop_parser)
def do_stop(args: argparse.Namespace) -> None:
    """Stop daemon via a 'stop' request."""
    # May raise BadStatus, which will be handled by main().
    response = request('stop', timeout=5)
    if response:
        show_stats(response)
        sys.exit("Daemon is stuck; consider %s kill" % sys.argv[0])
    else:
        print("Daemon stopped")

daemon

daemonで最後にする。これはバックグラウンドで通常作られるdaemonをフォアグラウンドで起動したいと言うようなコマンド。中を見たらすぐわかるという感じ。

@action(daemon_parser)
def do_daemon(args: argparse.Namespace) -> None:
    """Serve requests in the foreground."""
    # Lazy import so this import doesn't slow down other commands.
    from mypy.dmypy_server import Server, process_start_options
    Server(process_start_options(args.flags, allow_sources=False), timeout=args.timeout).serve()

疲れたのでおしまい。