goのflagで@<filename>と言う形式ならファイルの中身を利用するValueを作ってみる

ちょっとだけメモ。

最近は、標準ライブラリのflagだけで生活できるような気がしている。

まぁそれはおいておいて、例えば、curlなどで使われている@<filename>と言う表記で、ファイルの中身を取り出すvalueが欲しくなった。その実装のメモ。

flag.Value

flagパッケージでオプションの取扱いかたを変えたいときには flag.Value を実装した型をつくる。これは以下のようなインターフェイスのもの。

type Value interface {
    String() string
    Set(string) error
}

Set()で良い感じに値を更新してくれれば良い。作ったValueは、flag.FlagSet.Var()経由で利用できるようだ。

boolValueを例に実装を覗いてみる

flag.Valueの実装例として、flag.boolValueを覗いてみる。boolValueはboolのnew typeで作られていたらしい。

// -- bool Value
type boolValue bool

func newBoolValue(val bool, p *bool) *boolValue {
    *p = val
    return (*boolValue)(p)
}

func (b *boolValue) Set(s string) error {
    v, err := strconv.ParseBool(s)
    if err != nil {
        err = errParse
    }
    *b = boolValue(v)
    return err
}

func (b *boolValue) Get() interface{} { return bool(*b) }

func (b *boolValue) String() string { return strconv.FormatBool(bool(*b)) }

コレを通常使う場合は以下のようなコードになる。FlagSet.BoolVar() (か FlagSet.Bool()) が使われる。

var option struct {
    Verbose bool
}

fs := flag.NewFlagSet("app", flag.ExitOnError)
fs.BoolVar(&option.Verbose, "verbose", false, "verbose output")

fs.Parse()
fmt.Println(option.Verbose)

この実装は以下の様になっている。

// BoolVar defines a bool flag with specified name, default value, and usage string.
// The argument p points to a bool variable in which to store the value of the flag.
func (f *FlagSet) BoolVar(p *bool, name string, value bool, usage string) {
    f.Var(newBoolValue(value, p), name, usage)
}

まぁそんなわけでいい感じにやっていけば良い。

@付きの実装

基本的には、Set()をどうやって実装するかというだけの話になる。以下のような実装になった。

type FileContenOrLiteralValue string

func (v *FileContenOrLiteralValue) String() string {
    return string(*v)
}

func (v *FileContenOrLiteralValue) Set(s string) error {
    if !strings.HasPrefix(s, "@") {
        *v = FileContenOrLiteralValue(s)
        return nil
    }

    b, err := ioutil.ReadFile(strings.TrimPrefix(s, "@"))
    if err != nil {
        return err
    }
    *v = FileContenOrLiteralValue(string(b))
    return nil
}

こんな感じで使う。

func main() {
    var options struct {
        Target string
    }
    fs := flag.NewFlagSet("app", flag.ExitOnError)
    fs.Var((*FileContenOrLiteralValue)(&options.Target), "target", "literal or @<targetname>")

    fs.Parse(os.Args[1:])
    fmt.Println(options.Target)
}

実際の実行例

$ cat todo.json
{
  "todo": {
    "title": "foo"
  }
}

$ go run 01*/main.go --help
Usage of app:
  -target value
        literal or @<targetname>

ファイルの中身を取り出す

$ go run 01*/main.go --target @todo.json
{
  "todo": {
    "title": "foo"
  }
}

直接JSON文字列として

$ go run 01*/main.go --target '{"status": "ok"}'
{"status": "ok"}

プロセス置換も使える

$ go run 01*/main.go --target @<(echo "{\"ans\": \"$(echo 1 + 1 | bc -l)\"")
{"ans": "2"

はい。

それが良いかは別として、同じような理屈で、file://<path> のような形式にも対応できそうですね。

gist

processを立ち上げまくる機会を避けるための簡単なRPCのメモ

時折、特定の前処理の一部として、1つのファイルを受け取る1つのスクリプトが何個も連鎖することがある。このようなときに都度都度processが立ち上がると、ほとんどの時間がimport timeで辛いということが起きる事がある。

以前書いた記事の中では、この辺が近いかもしれない。

まぁ、まじめにパイプラインを作っても良いけれど、そこまで頑張らなくても良い気がする。

最初はいちいちwebAPIを定義しようと思ったが、operationId1を指定してのアクセスができれば良いだけなので、それはもうRPCだよなーという考えになった。それ用の作業のためにweb APIを作るのも良いが、まぁ手軽なのはRPCだよね。ということでメモ。

例の紹介

これからの例では以下の関数を公開している。

  • pow() -- べき乗
  • add() -- 和
  • mul() -- 積

ただし、使い所は、build scriptの一部という想定なので、以下の点が特殊かもしれない。

  • clientがsub processでserverを立ち上げる
  • clientの作業が全部終わったら、serverを終了させる
  • server, clientはお互いが通信可能になるまで待つ

通信可能かのチェックはファイルの存在の有無でやっている。これが一番手軽そうだった。

xmlrpc

標準ライブラリだけの範囲でやるならxmlrpcがある。とはいえ、複数のclientが同一のserverに同時にアクセスするとエラーが起きえたり、ドキュメントにも書いてあるとおりに、悪意を持ったアクセスに対しては安全ではない。

まぁ一応標準ライブラリの範囲なので。

01server.py

import sys
import pathlib
import time
from xmlrpc.server import SimpleXMLRPCServer
from xmlrpc.server import SimpleXMLRPCRequestHandler
from handofcats import as_command


# Restrict to a particular path.
class RequestHandler(SimpleXMLRPCRequestHandler):
    rpc_paths = ("/RPC2",)


# Create server
@as_command
def run(*, port: int = 8888, sentinel: str = "") -> None:
    if sentinel:
        p = pathlib.Path(sentinel)
        ok = False
        for wait_time in [0.1, 0.2, 0.2, 0.4, 0.8, 1.6, 3.2, 6.4]:
            if not p.exists():
                print(f"  wait ... {wait_time}", file=sys.stderr)
                time.sleep(wait_time)
                continue

            print(f"ack {sentinel}", file=sys.stderr)
            p.unlink()
            ok = True
            break
        if not ok:
            raise RuntimeError(f"timeout, f{sentinel}, {time.time()-st}")

    print(f"listen ... {port}", file=sys.stderr)
    with SimpleXMLRPCServer(
        ("localhost", port), requestHandler=RequestHandler
    ) as server:
        server.register_introspection_functions()

        # Register pow() function; this will use the value of
        # pow.__name__ as the name, which is just 'pow'.
        server.register_function(pow)

        # Register a function under a different name
        def adder_function(x, y):
            return x + y

        server.register_function(adder_function, "add")

        # Register an instance; all the methods of the instance are
        # published as XML-RPC methods (in this case, just 'mul').
        class MyFuncs:
            def mul(self, x, y):
                return x * y

        server.register_instance(MyFuncs())

        # Run the server's main loop
        server.serve_forever()

02use.py

import sys
import tempfile
import pathlib
import time
import subprocess
import logging
import xmlrpc.client
import contextlib
from handofcats import as_command

logger = logging.getLogger(__name__)


def _find_free_port(host: str = "0.0.0.0") -> int:
    import socket

    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
        sock.bind((host, 0))
        sock.listen(1)
        name, port = sock.getsockname()  # type: t.Tuple[str, int]
    return port


@contextlib.contextmanager
def connect_server(*, sentinel: str, port: int):
    cmd = [sys.executable, "01server.py", "--port", str(port), "--sentinel", sentinel]
    server_process = subprocess.Popen(cmd)
    try:
        p = pathlib.Path(sentinel)
        with p.open("w"):
            pass

        ok = False
        st = time.time()
        for wait_time in [0.1, 0.2, 0.2, 0.4, 0.8, 1.6, 3.2, 6.4]:
            if p.exists():
                print(f"  wait ... {wait_time}", file=sys.stderr)
                time.sleep(wait_time)
                continue
            ok = True
            break
        if not ok:
            raise RuntimeError(f"timeout, f{sentinel}, {time.time()-st}")
        yield server_process
    finally:
        try:
            server_process.terminate()
            server_process.wait()
        except Exception as e:
            logger.info(str(e), exc_info=True)


@as_command
def run() -> None:
    sentinel = tempfile.mktemp()
    port = _find_free_port()

    with connect_server(sentinel=sentinel, port=port):
        print(f"connect ... {port}", file=sys.stderr)
        s = xmlrpc.client.ServerProxy(f"http://localhost:{port}")
        print(s.pow(2, 3))  # Returns 2**3 = 8
        print(s.add(2, 3))  # Returns 5
        print(s.mul(5, 2))  # Returns 5*2 = 10

        # Print list of available methods
        print(s.system.listMethods())

実行してみる。

python 02use.py
** handofcats.customize: DEBUG=1, activate logging **
    wait ... 0.1
    wait ... 0.2
** handofcats.customize: DEBUG=1, activate logging **
ack /var/folders/b7/2rk7xp2d0hb2r21zbzjwxb_m0000gn/T/tmp9bdn0obj
listen ... 52158
connect ... 52158
127.0.0.1 - - [07/Nov/2020 18:36:33] "POST /RPC2 HTTP/1.1" 200 -
8
127.0.0.1 - - [07/Nov/2020 18:36:33] "POST /RPC2 HTTP/1.1" 200 -
5
127.0.0.1 - - [07/Nov/2020 18:36:33] "POST /RPC2 HTTP/1.1" 200 -
10
127.0.0.1 - - [07/Nov/2020 18:36:33] "POST /RPC2 HTTP/1.1" 200 -

はい。

tinyrpc

asyncioに対応しているわけではないが、同期的なrequest/responseで良いのであればtinyrpcが便利かもしれない。

github.com

なんとなくzmqを使っている。

03server.py

import sys
import pathlib
import time
import zmq
from tinyrpc.server import RPCServer
from tinyrpc.dispatch import RPCDispatcher
from tinyrpc.protocols.jsonrpc import JSONRPCProtocol
from tinyrpc.transports.zmq import ZmqServerTransport
from handofcats import as_command

ctx = zmq.Context()
dispatcher = RPCDispatcher()


@dispatcher.public(name="add")
def adder_function(x, y):
    return x + y


dispatcher.public(pow)


@dispatcher.public
def mul(x, y):
    return x * y


@dispatcher.public
def list_methods():
    return list(dispatcher.method_map.keys())


@as_command
def run(*, port: int = 8888, sentinel: str = "") -> None:
    if sentinel:
        p = pathlib.Path(sentinel)
        ok = False
        for wait_time in [0.1, 0.2, 0.2, 0.4, 0.8, 1.6, 3.2, 6.4]:
            if not p.exists():
                print(f"  wait ... {wait_time}", file=sys.stderr)
                time.sleep(wait_time)
                continue

            print(f"ack {sentinel}", file=sys.stderr)
            p.unlink()
            ok = True
            break
        if not ok:
            raise RuntimeError(f"timeout, f{sentinel}, {time.time()-st}")

    print(f"listen ... {port}", file=sys.stderr)
    transport = ZmqServerTransport.create(ctx, f"tcp://127.0.0.1:{port}")
    rpc_server = RPCServer(transport, JSONRPCProtocol(), dispatcher)
    rpc_server.serve_forever()

clientはこんな感じ。clientというかserverを利用するスクリプト

x03client.py

from __future__ import annotations
import typing as t
import sys
import tempfile
import pathlib
import time
import subprocess
import logging
import contextlib
import zmq
from tinyrpc import RPCClient
from tinyrpc.protocols.jsonrpc import JSONRPCProtocol
from tinyrpc.transports.zmq import ZmqClientTransport
from handofcats import as_command

logger = logging.getLogger(__name__)


def _find_free_port(host: str = "0.0.0.0") -> int:
    import socket

    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
        sock.bind((host, 0))
        sock.listen(1)
        name, port = sock.getsockname()  # type: t.Tuple[str, int]
    return port


@contextlib.contextmanager
def connect_server(
    cmd: t.List[str], *, sentinel: str, port: int
) -> t.Iterator[t.subprocess.Popen]:
    assert sentinel in cmd
    assert str(port) in cmd

    server_process = subprocess.Popen(cmd)
    try:
        p = pathlib.Path(sentinel)
        with p.open("w"):
            pass

        ok = False
        st = time.time()
        for wait_time in [0.1, 0.2, 0.2, 0.4, 0.8, 1.6, 3.2, 6.4]:
            if p.exists():
                print(f"  wait ... {wait_time}", file=sys.stderr)
                time.sleep(wait_time)
                continue
            ok = True
            break
        if not ok:
            raise RuntimeError(f"timeout, f{sentinel}, {time.time()-st}")
        yield server_process
    finally:
        try:
            server_process.terminate()
            server_process.wait()
        except Exception as e:
            logger.info(str(e), exc_info=True)


@as_command
def run() -> None:
    ctx = zmq.Context()

    port = _find_free_port()
    sentinel = tempfile.mktemp()

    cmd = [sys.executable, "03server.py", "--port", str(port), "--sentinel", sentinel]
    with connect_server(cmd, sentinel=sentinel, port=port):
        print(f"connect ... {port}", file=sys.stderr)
        rpc_client = RPCClient(
            JSONRPCProtocol(), ZmqClientTransport.create(ctx, f"tcp://127.0.0.1:{port}")
        )
        s = rpc_client.get_proxy()

        print("Returns 2**3 = 8", "->", s.pow(2, 3))
        print("Returns 5", "->", s.add(2, 3))
        print("Returns 5*2 = 10", "->", s.mul(5, 2))

        # Print list of available methods
        print(s.list_methods())

実行結果は同様。

** handofcats.customize: DEBUG=1, activate logging **
    wait ... 0.1
    wait ... 0.2
** handofcats.customize: DEBUG=1, activate logging **
ack /var/folders/b7/2rk7xp2d0hb2r21zbzjwxb_m0000gn/T/tmp9iwr0h0u
listen ... 52196
connect ... 52196
Returns 2**3 = 8 -> 8
Returns 5 -> 5
Returns 5*2 = 10 -> 10
['add', 'pow', 'mul', 'list_methods']

なぜgrpcなどにしないか?

まぁしても良いのだけど、なぜgrpcなどにしないかというと、なんでだろうな? 一番近いのは、提供したいサービスの型が決まっていないようなものだからかもしれない。例えば、あるpandasのdataframeがあったとして、それに対してgroupbyをするようなサービスを提供したい。inputとなるデータの構造は決まっていない。

例えば、以下のようなよくあるデータセットを元に考えるイメージ2

このようなときに、grpcはあまり適切ではない。

本当に使えない?

いや、まぁファイル名だけを返すような形にして使っても良いかもしれない。それなら使えるかも? 加えて、変換処理などの場合には、変換結果自体に興味があることは少ない。inputもoutputもurlを含んだ何か程度で十分なのではないか?それだったらgrpcを使っても良いかもしれない。まぁ何でも良いかも。

クライアントは変換結果自体に興味はないので、message packを使ったmprpcなどもありはするのだけれど、こちらを使う価値はあんまり無さそう。そんなわけでtinyrpcくらいでちょうど良い気もしている。

まとめ

  • xmlrpc, tinyrpcを使っていい感じにprocessをまとめる方法のメモ
  • ハンドシェイク的なチェックはファイルの存在有無が手軽 (複数インスタンスは不可能)
  • 今回の意味では、クライアントはrequest/responseのインターフェイスとして変換結果に興味がない

gist


  1. openAPIのoperationIDをイメージしていた

  2. irisとかそういうやつら