dict遊び -- 絵を描く人の余暇のらくがきとコードを書く人の余暇の何か

[poem][python] dict遊び

ポエムです。長いです。

はじめに

絵を描くことが好きだった人が絵を描く仕事につく。絵を描くことを仕事にしている人がたまたま余暇にらくがきの絵を描く。仕事でも絵を描いているのだからもう描かなくても良いじゃないかという声もかけられるが、余暇の落書きは仕事としての絵とは何だか違うような気がして、やっぱり余暇にらくがきをする。

みたいなことがプログラミングにもあるのではないかということを最近良く思う。これは個人的な話でありすごく主観的な捉え方によるものだけれど。絵を描くという領域における余暇の落書きに類するような行為をコードを書くという領域上で行っているような気がした。いろいろ思い返してみると息抜きとしてのプログラミングというものが自分自身の中には存在している気がする。それは面白そうなアプリケーションやライブラリを探すなどのいわゆる情報収集とも違うものではあるし、何か作りたいアプリケーションが存在していてそのアプリケーションを開発するというのとも違う気がする。

コードを書くことで自分の中にあるストレスが消えていくような、いわゆる息抜きとしての行為のプログラミングがどういうものかということを、とりとめもなく書いてみることにした。もちろんすべての人がやっているわけではないし。万人に当てはまるようなものではないけれど。

コードを書くときの楽しさ

コードを書くいった時に思いつくものは以下の様なものだった。

何らかの領域上に何らかの日常的な動作・処理が存在していて、それを機械に実行可能な形に翻訳すること

何らかの動作が存在していてそれをなぞるような形で振る舞いを記述するということがある種のデッサンに似たような感じを受ける。そのような何らかの振る舞いの記述をコーディングと呼んでいるような気がする。そして大抵の場合、コーディングと言った場合にスキルという言葉もセットになってついてくる事が多いが、今回はそのスキルに関しては全く触れるつもりはないし。スキルを向上するためにどうすれば良いかみたいな話はしない。ただただ趣味という意味でコードを書くということをしている事があるようなきがする。

そのコードを書く楽しさとはなんだろうかみたいなことを例をあげてどうにか文章という形で表現してみたい。

dict遊び

pythonのdict

pythonにはdictという組み込みの型が存在する。このdictは他の言語では連想配列と呼ばれたりマップと呼ばれたりすることもあるかもしれない。例えばweb apiのレスポンスがJSONの形式で返されるような時に、そのレスポンスをpython上に持ってきたときにはdictの型の値にデコードされてきたりする。

d = {"x": "a", "y": "b", "z": "c"}
d["y"] # => "b"

基本的にはkeyとvalueのpairになっていて、渡したkeyに対応するvalueが存在する場合にそれが結果として返ってくる。上の例ではdictのkeyのvalueも文字列だけれどもちろんvalueをdictにしたdictを作ることもできる。そしてそのdictのvalueもまたdictになって...というように再帰的にdictを持ったdictはツリー状の構造になる。実際web apiのレスポンスもそのようなツリー構造のデータが返ってくる事が多い。

d = {
    "x": {
        "a": {
            "i": 10,
        },
    },
    "y": {
        "a": {
            "j": 20,
        },
    },
    "z": {
        "a": {
            "k": 30,
        },
    },
}
d["y"]["a"]["j"]  # => 20

dictへのアクセス

任意の深さへのアクセス

どこに何が入っているか完全にわかっている場合にはただただkeyとして渡していけば良い。とは言え深さが異なる位置にアクセスしたくなる事があるかもしれない。ある値は d["a"]["b"] で済むかもしれないし d["x"]["y"]["z"] と3回のアクセスが必要かもしれない。この特定の値を取得するために必要なkeyの列をpathと呼ぶことにする。pathの長さは不定で、あるものは2かもしれないし。またあるものは5などかもしれない。

そのようなパスを受け取って望みの値を取るような処理は書けるかというとたぶん以下の様になる。

def access(d, path):
    for k in path:
        d = d[k]
    return d

access(d, ["x", "y", "z"])  # == d["x"]["y"]["z"]
access(d, ["a", "b"])  # == d["a"]["b"]

任意回のアクセス

このような関数が書けると何が嬉しいかというと、値の位置さえわかっていれば、値の深さによらず望みの値を一気に取得する処理を書くことができるようになる。例えばコストの様な数を取り出してその合計を計算してみる。

def sum_cost(tree, path_list):
    total = 0
    for path in path_list:
        total += access(tree, path)
    return total

今度は、深さが不定なツリーから値を1つ取れるだけでなく、ツリー状のdictから好きな個数の値(上の例ではコスト)を一気に取得する事ができた。あるいはここでちょっと考えてsumという関数があるから使えば短くなると考えて以下のように書くかもしれない。

def sum_cost(tree, path_list):
    return sum(access(tree, path) for path in path_list)

こういう風に同じ処理を短くあるいはより手軽に書けないかと考えるが楽しいというときもあるかもしれない。上手く短く書けたときには頭の中が整理されたような気がする。

あやふやな存在への対応

誤ったpathを含んだリスト

先程の例では全ての欲しい値の位置が分かっている状態について考えていた。実際には値の位置が分かっていない事が多いかもしれない。

例えば、[["a","b","c","cost"],["x","y","cost"], ["i","cost"]] などというpathのリストが渡されたとして ["x","y","cost"] というpathには値が存在していないかもしれない。そういう場合にも適切に合計を計算したい。

とは言え存在しないpathを指定してしまった場合の対応をすることはそんなに難しくない。accessの部分を存在しなかった場合には何にも影響を与えない値を返すようにして使える特別版に置き換えれば良い。例えば以下の様な関数を作る。

def maybe_access(d, path, default=None):
    for k in d:
        if k not in d:
            return default
        d = d[k]
    return d

この関数をaccessの代わりに使えば誤ったpathを含んだリストを受け取ったとしても正しい合計値を返す事ができる。

def sum_cost(tree, path_list):
    return sum(maybe_access(tree, path, default=0) for path in path_list)

不完全なpathでのアクセス

誤ったpathを含んだリストだけではなく、そのpathが不完全であるかもしれない。例えば、正確には d["x"]["y"]["z"]["cost"] であるかもしれない。ところが分かっているのは必ず "y" が途中にある "cost" で終わる値を取りたいということかもしれない。これはちょっと頭を捻る必要がある。つまり something(d, ["y", "cost"]) で先程の d["x"]["y"]["z"]["cost"] の値を取ってきたい。また別のところでの位置は d["x"]["y"]["z"]["x"]["y"]["z"]["cost"] というような形かもしれない。このような末尾と途中の一部だけが分かるpathから値を取得する関数を考えてみる。

from collections import deque


def gentle_access(d, path):
    r = []
    _gentle_access(d, deque(path), r)
    return r


def _gentle_access(d, path, r):
    if len(path) == 0:
        r.append(d)

    if not hasattr(d, "keys"):
        return

    for k in d.keys():
        if k == path[0]:
            q = path.popleft()
            _gentle_access(d[k], path, r)
            path.appendleft(q)
        else:
            _gentle_access(d[k], path, r)


d = {
    "x": {"y": {"z": {"i": {"cost": 10}}}},
    "i": {"cost": 20},
    "z": {"y": {"z": {"i": {"x": {"y": {"z": {"i": {"cost": 30}}}}}}}},
}

gentle_access(d, ["y", "cost"])  # => [10, 30]

再帰がでてきたが全部取得する事ができた。もちろん頑張りすぎないように探索する深さを制限しても良いし。条件に合致したらすぐに1つだけ返すように変えてみても良い。

値を取得したいのではなくツリーを変更したい場合

毎回探索するのも馬鹿馬鹿しいのでたどり着いた値を浅い場所に保持するようにしてみよう。例えば上の例のdictを渡すと以下のようになる操作を考えてみる。

d = {
    "x": {"y": {"z": {"i": {"cost": 10}}}},
    "i": {"cost": 20},
    "z": {"y": {"z": {"i": {"x": {"y": {"z": {"i": {"cost": 30}}}}}}}},
}

simplify(d, ["y", "cost"])

d2 = {
    "x": {"y": {"z": {"i": {"cost": "$cost/x"}}}},
    "i": {"cost": 20},
    "z": {"y": {"z": {"i": {"x": {"y": {"z": {"i": {"cost": "$cost/z"}}}}}}}},
    "$cost": {
        "x": 10,
        "z": 30,
    },
}

今度のd2の方のtreeの方ではコストが欲しい場合には $cost の中だけ観れば良いようになった。同じkeyのものがあったら衝突してしまうから実はリストにしたほうが良いなどもあるかもしれない。この変更の操作を記述するにはどうしたら良いんだろう。

条件に合致した時に変更を加える関数を渡す場合

関数の引数に関数を渡すこともできる。以前に定義した不完全なpathでのアクセスを行う関数に関数を渡せるように改造することを考えてみる。以下の様になる。

def walk(d, path, fn):
    _walk(d, deque(path), fn)


def _walk(d, path, fn):
    if len(path) == 0:
        return

    if not hasattr(d, "keys"):
        return

    for k in d.keys():
        if k == path[0]:
            q = path.popleft()
            if len(path) == 0:
                fn(d, k)
            else:
                _walk(d[k], path)
            path.appendleft(q)
        else:
            _walk(d[k], path)


def simplify(data, path):
    for top_key in data.keys():
        def modify(d, k):
            data["$cost"][top_key] = d[k]
            d[k] = "$cost/{}".format(top_key)
        walk(data[top_key], path, modify)

上手くいっていそう。

d = {
    "x": {"y": {"z": {"i": {"cost": 10}}}},
    "i": {"cost": 20},
    "z": {"y": {"z": {"i": {"x": {"y": {"z": {"i": {"cost": 30}}}}}}}},
}
simplify(d, ["y", "cost"])

# {
#     'x': {'y': {'z': {'i': {'cost': '$cost/x'}}}},
#     'z': {'y': {'z': {'i': {'x': {'y': {'z': {'i': {'cost': '$cost/z'}}}}}}}},
#     'i': {'cost': 20},
#     '$cost': {'x': 10, 'z': 30}
# }

実行した結果x,zの値が$costに移動している。そして元々値があった場所には$costから辿るための道筋が記されている(e.g. "$cost/z"のことなどを指している)。上手くいっていそう。本当に?すこしだけ考えてみると途中の意味を良く決めていない状態のままコードを書いていた。例えば、"y" から始まるdict。こういう値はどうなるべきだろう?

d3 = {"y": {"x": {"cost": 100}}}

途中の意味に開始地点も入るのならこれも $cost にまとめられる対象に含まれて欲しいかもしれない。以下のように。

d3 = {
    "y": {"x": {"cost": 100}},
    "$cost": {"y": 100},
}

少しコードを変えてみる。

def simplify2(data, path):
    path = deque(path)
    data.setdefault("$cost", {})
    for top_key in data.keys():
        def modify(d, k):
            data["$cost"][top_key] = d[k]
            d[k] = "$cost/{}".format(top_key)

        if path[0] == top_key:
            q = path.popleft()
            walk(data[top_key], path, modify)
            path.appendleft(q)
        else:
            walk(data[top_key], path, modify)

d3 = {"y": {"x": {"cost": 100}}}
simplify(d3, ["y", "cost"])

# {'$cost': {'y': 100}, 'y': {'x': {'cost': '$cost/y'}}}

dequeの利用が二重になっている部分が気に入らないところではあるけれど。一応作りたかった答えを作る事はできている。

条件に合致した時、その位置の物を直接返すようにする場合

もしかしたらの話、たとえば先程の処理を以下のような形で書けるとしたらどうだろう。

for top_key, iterator in simplify3(d, ["y", "cost"]):
    for k, sd in iterator:
        if "$cost" not in d:
            d["$cost"] = {}
        d["$cost"][top_key] = sd[k]
        sd[k] = "$cost/{}".format(top_key)

今度は欲しい値がこちらに直接やってくるので関数を受け渡す必要がない。このように書ければもう少し先程の処理なども気軽に掛けたのかもしれない。

最後の値だけではなく途中の値も全て欲しい場合

あるいは最後の値だけではなく途中の値も全て欲しい場合もあるかもしれない。例えば今までpathとして渡していた値は ["y", "cost"] だったが、この "y" の階層でも何か副次的な処理が行いたい場合もあるかもしれない。この場合はどうにかすることができるだろうか? 例えば以下の様になってほしい。

modify(d, ["y", "cost"])

{
    'z': {'y': {'z': {'i': {'x': {'y': {'z': {'i': {'cost': '$cost/z'}}}}}}}},
    'x': {'y': {'z': {'i': {'cost': '$cost/x'}}}},
    'i': {'cost': 20},
    '$cost': {'x': 10, 'z': 30},
    '$ys': [
         {'z': {'i': {'x': {'y': {'z': {'i': {'cost': '$cost/z'}}}}}}},
         {'z': {'i': {'cost': '$cost/x'}}},
    ],
}

今度は$costの他に$ysも現れてそのyの階層以下の値を別途$ysに持ちたい。もちろん先程のwalkを何度も使えばできるが酷い形になる。

for top_key in list(d.keys()):
    def on_y(d1, k1):

        def on_cost(d2, k2):
            if "$cost" not in d:
                d["$cost"] = {}
            if "$ys" not in d:
                d["$ys"] = []
            d["$cost"][top_key] = d2[k2]
            d2[k2] = "$cost/{}".format(top_key)
            d["$ys"].append(d1[k1])

        walk(d1, ["cost"], on_cost)
    walk(d[top_key], ["y"], on_y)

同じ名前の位置に対して同じ処理を行いたい

さらに酷い形を進めてみよう。たとえば、a,b,a,b,a,b,a,b,a,bと続いていく木に対してaのときにだけ何らかの処理を付け加えたい場合にはどうすれば良いだろう?これももちろんアクセス用の関数を何度も書けばこなすことができる。

def on_a0(d0, k0):
    something(d0[k0])

    def on_a1(d1, k1):
        something(d1[k1])

        def on_a2(d2, k2):
            something(d2[k2])

            def on_a3(d3, k3):
                something(d3[k3])

                def on_a4(d4, k4):
                    something(d4[k4])

                    def on_a5(d5, k5):
                        something(d5[k5])

                    walk(d4, path, on_a5)

                walk(d3, path, on_a4)

            walk(d2, path, on_a3)

        walk(d1, path, on_a2)

    walk(d0, path, on_a1)

walk(d, path, on_a0)

しかしこれは馬鹿馬鹿しい。ところで自分自身を返す事はできないのだろうか?例えば操作関数がもう1つ引数を取り、引数自体が自分自身を返すみたいな構造。もしそれができるなら以下の様に書けるはず。

def on_a0(walker, d0, k0):
    something(d0[k0])
    walker(d0[k0])
walk(d, path, on_a0)

あるいはpathの探索をその階層のkeyの一致のみで調べていたが。幾つかのkeyの内のどれか1つと当てはまるのような一致条件の変更などはできるだろうか?もっというと一番最後の再帰の例のように繰り返しの構造を引数として取得して使うことはできないだろうか?あれこれコードをイジった結果自分自身を使う処理をこういう形で書ける様になったりする

import unittest


class WalkerTests(unittest.TestCase):
    def _getTargetClass(self):
        from my import LooseDictWalker
        return LooseDictWalker

    def _makeOne(self, *args, **kwargs):
        return self._getTargetClass()(*args, **kwargs)

    def test_chains2(self):
        from my.walkers import SimpleContext
        from my.operators import ANY

        class RecContext(SimpleContext):
            def __init__(self, qs):
                self.qs = qs[:]

            def __call__(self, walker, fn, value):
                return fn(walker, self, value)

        s = []

        def matched(walker, ctx, value):
            s.append(value)
            walker.walk(ctx.qs[:], value, ctx=ctx)

        d = {"a": {"b": {"a": {"b": {"a": {"b": 10}}}}}}
        qs = [ANY, "b"]
        self._makeOne(on_container=matched).walk(qs, d, ctx=RecContext(qs))

        expected = [
            {'b': {'a': {'b': {'a': {'b': 10}}}}},
            {'b': {'a': {'b': 10}}},
            {'b': 10}
        ]

        self.assertEqual(s, expected)

加えてある程度、変更用の関数の引数の形式も自由にできる。例えば以下のようなPathContextを定義すると、その時点まで辿った状態を(パンくずリストの様な履歴)を受け取る事ができるようになる。

class PathContext(object):
    def __init__(self):
        self.path = []

    def push(self, v):
        self.path.append(v)

    def pop(self):
        self.path.pop()

    def __call__(self, walker, fn, value):
        return fn(self.path, value)

というようなことをしていた。こういうちょっとしたコンパクトな処理に対する何らかのアイデアから端を発して不満を解決しつつ、より広い領域に対しても、意図していた通りの振る舞いさせる記述ができないか考えながら変更を加えるみたいな作業が気晴らしになっているらしい。

おまけ

ちなみに最後の例ででてきたwalkerは現時点ではこういう実装

class SimpleContext(object):
    def push(self, ctx):
        pass

    def pop(self):
        pass

    def __call__(self, fn, walker, value):
        return fn(value)

def apply(q, v):
    if callable(q):
        return q(v)
    else:
        return q == v

class Any(object):
    __repr__ = repr

    def __call__(self, v):
        return True
ANY = Any()

class LooseDictWalker(object):
    context_factory = PathContext

    def __init__(self, on_container=None, on_data=None):
        self.on_container = on_container
        self.on_data = on_data

    def on_found(self, ctx, d, k):
        if self.on_container is not None:
            ctx(self, self.on_container, d)
        if self.on_data is not None:
            ctx(self, self.on_data, d[k])

    def walk(self, qs, d, depth=-1, ctx=None):
        ctx = ctx or self.context_factory()
        return self._walk(ctx, deque(qs), d, depth=depth)

    def _walk(self, ctx, qs, d, depth):
        if depth == 0:
            return

        if not qs:
            return

        if hasattr(d, "keys"):
            for k in list(d.keys()):
                ctx.push(k)
                if apply(qs[0], k):
                    q = qs.popleft()
                    self._walk(ctx, qs, d[k], depth - 1)
                    if len(qs) == 0:
                        self.on_found(ctx, d, k)
                    qs.appendleft(q)
                else:
                    self._walk(ctx, qs, d[k], depth)
                ctx.pop()
            return d
        elif isinstance(d, (list, tuple)):
            ctx.push("[]")
            for e in d:
                self._walk(ctx, qs, e, depth)
            ctx.pop()
            return d
        else:
            return d

invalid recursive type XXX

zero valueを設定する際に循環してしまうと無限に再帰してしまい終わらないという話。

例えば以下のような定義はダメ。

自分自身で再帰

type Tree struct {
    Value int
    Left  Tree
    Right Tree
}

もちろん、ポインターにしてあげれば、nilがzero valueになるので無限に再帰はしなくなる。

type Tree struct {
    Value int
    Left  *Tree
    Right *Tree
}

相互再帰(N=2)

自分自身の型で再帰的に定義してはダメというルールではない。初期化時のiterationで循環参照が起きれば無限再帰が起きる。こういうのもダメ。

type L struct {
    R R
}

type R struct {
    L L
}

どれか1つでもnilになればそこが終端になるので以下はOK。

type L struct {
    R R
}

type R struct {
    L *L
}

N=3

もちろん3つ組の循環でもダメ。

type A struct {
    B B
}

type B struct {
    C C
}

type C struct {
    A A
}

循環しなければただの木なのでいつかは終端に辿り着く。

type T0 struct {
    T1 T1
}

type T1 struct {
    T2 T2
}

type T2 struct{}

httptestでmock server的なものを作る方法のメモ

以下の3つが欲しい

  • get
  • post (form)
  • post (json)
package m

import (
    "bytes"
    "encoding/json"
    "net/http"
    "net/http/httptest"
    "net/url"
    "testing"

    "io/ioutil"

    "github.com/pkg/errors"
    "github.com/stretchr/testify/assert"
    "github.com/stretchr/testify/require"
)

func TestGet(t *testing.T) {
    ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        assert.Exactly(t, "/foo", r.URL.Path)
        assert.Exactly(t, "1", r.URL.Query().Get("value"))
    }))
    defer ts.Close()

    _, err := http.Get(ts.URL + "/foo?value=1")
    require.NoError(t, err)
}

func TestPost(t *testing.T) {
    ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        assert.Exactly(t, "/foo", r.URL.Path)
        r.ParseForm()
        assert.Exactly(t, "1", r.Form.Get("value"))
    }))
    defer ts.Close()
    values := url.Values{}
    values.Add("value", "1")
    _, err := http.PostForm(ts.URL+"/foo", values)
    require.NoError(t, err)
}

func TestPostJSON(t *testing.T) {
    type data struct {
        Name string `json:"name"`
        Age  int    `json:"int"`
    }

    ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        assert.Exactly(t, "/foo", r.URL.Path)
        var val data
        parseJSONRequest(r, func(b []byte) error {
            return json.Unmarshal(b, &val)
        })
        assert.Exactly(t, "foo", val.Name)
        assert.Exactly(t, 20, val.Age)
    }))
    defer ts.Close()

    dataset := data{
        Name: "foo",
        Age:  20,
    }

    b, err := json.Marshal(dataset)
    require.NoError(t, err)
    req, err := http.NewRequest("POST", ts.URL+"/foo", bytes.NewBuffer(b))
    req.Header.Set("Content-Type", "application/json")

    _, err = (&http.Client{}).Do(req)
    require.NoError(t, err)
}

func parseJSONRequest(r *http.Request, parse func(body []byte) error) error {
    if r.Body == nil {
        return errors.New("missing form body")
    }
    ct := r.Header.Get("Content-Type")
    if ct != "application/json" {
        return errors.Errorf("invalid content type: %v", ct)
    }
    b, err := ioutil.ReadAll(r.Body)
    if err != nil {
        return err
    }
    return parse(b)
}

magicalimportというライブラリを作ってました

magicalimportというライブラリを作ってました。

はじめに

これは何をするライブラリかというと物理的なファイル名を指定して、指定したファイルをpython moduleとしてimportするためのライブラリです。

用途

例えばconfigファイルの読み込みに便利かもしれません。

使い方

例えば、以下のようなファイル構造の時に、以下のようなfoo.pyがあった時に。

.
├── a
│   └── b
│       └── c
│           └── foo.py
└── main.py

a/b/c/foo.py

name = "foo"
_age = "*secret*"

main.pyでは以下の様なコードでfoo.pyを読み込むことができます。

from magicalimport import import_from_physical_path

foo = import_from_physical_path("./a/b/c/foo.py")

ちなみに、importするmodule名を指定する事もできて、 as_ オプションを付けます。sys.modulesに登録されるのでその後は普通にimportできます。

from magicalimport import import_from_physical_path

foo = import_from_physical_path("./a/b/c/foo.py", as_="foo2")
import foo2
# fooとfoo2は同じもの

注意点

moduleの階層構造に関係なくimportしているところがあるので読み込んだ先のファイルでrelative importなどは上手くいかないです。

例えば、以下のような設定ファイルの構造でlocal.py,test.pyがbase.pyの設定を共有したいときなどに。

config
├── base.py
├── test.py
└── local.py

普通にrelative importしたくなりますがこれは動きません。

from .base import *

以下の様に書く必要があります。star importする場合にはexpose_all_membersを使うと便利です。

# 以下はだいたい `from .base import *` と同じ
import magicalimport
import os.path


here = os.path.dirname(os.path.abspath(__file__))
base = magicalimport.import_from_physical_path(os.path.join(here, "./base.py"), as_="base")
magicalimport.expose_all_members(base)

moduleの"_"で始まるものはimportされないですが。

追記

python2もサポートしました

追記

here オプションをサポートしました。以下の様に書ける様になりました。

import magicalimport


base = magicalimport.import_from_physical_path("base.py", as_="base", here=__file__)
magicalimport.expose_all_members(base)

flaskでconfigファイルの指定に物理的なパスを指定できるようにしたくなったという話

はじめに

flaskを使い始めたのだけれど。flaskのconfigファイルの指定が対象のflaskアプリの存在するモジュールの位置からの相対パスを指定するらしく使いづらい。

例えば、fooパッケージの中で、以下の様なコードがあった場合には、fooパッケージからの相対位置を指定する必要がある。

from flask import Flask
app = Flask(__name__)
app.config.from_python("<config path>")
app.run()

この __name__ を使っているパッケージの位置からの相対位置を指定する必要がある。だるい。これを止めて直接特定のファイルの位置を指定できるようにしようという話。

Appのconfig_classを変更すれば良い

はじめに手抜きをするために以下のmagicalimportというパッケージをインストールしておく*1

$ pip install magicalimport

configオブジェクトを作るファクトリーの設定を変更する必要がある。以下の様な形で作れば良い。

from flask import Flask, Config
import logging
import magicalimport
logger = logging.getLogger(__name__)


class CustomConfig(Config):
    def from_pyfile(self, path):
        return self.from_object(magicalimport.import_from_physical_path(path))


class App(Flask):
    config_class = CustomConfig

# or monkey patching
# Flask.config_class = CustomConfig


if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("-c", "--config", default=None, required=True)
    args = parser.parse_args()

    app = App(__name__)
    if args.config:
        app.config.from_pyfile(args.config)

    logging.basicConfig(
        level=app.config["LOGGING_LEVEL"].upper(),
        format=app.config["LOGGING_FORMAT"]
    )
    logger.info("running.. (port=%s)", app.config["PORT"])
    app.run(port=app.config["PORT"])

あとは以下のように好きな位置のファイルを指定してアプリを起動する事ができる。

$ tree config
config
├── base.py
└── local.py

1 directory, 2 files

$ python foo/app.py --config=./config/local.py
asctime2016-10-02 00:52:40,828  loglevel:INFO   message:running.. (port=8888)
asctime2016-10-02 00:52:40,847  loglevel:INFO   message: * Running on http://127.0.0.1:8888/ (Press CTRL+C to quit)

やりましたね。

設定ファイルの内容

設定ファイルの内容は以下

config/base.py

PLAIN_LOGGING_FORMAT = "%(asctime)s %(levelname)s %(message)s"
LTSV_LOGGING_FORMAT = "asctime%(asctime)s\tloglevel:%(levelname)s\tmessage:%(message)s"

config/local.py

import magicalimport
import os.path
here = os.path.dirname(os.path.abspath(__file__))
base = magicalimport.import_from_physical_path(os.path.join(here, "./base.py"), as_="base")

LOGGING_LEVEL = "info"
LOGGING_FORMAT = base.LTSV_LOGGING_FORMAT


PORT = 8888 

*1:実はこのためについ先程作りました

makefileわりと便利だよという話

はじめに

makeはmacにもlinuxにもデフォルトであって便利*1。 とは言え、色々とわかりづらいところがある。 便利なところだけ見繕って使うと便利。

個人的なmakefileの使い方は2種類

個人的には以下のような2つの目的でmakefileを使っている。

  • 複数の目的のためのタスクを記述して手軽に実行できるようにする
  • 1つの目的のためにまじめに依存関係を定義して使う成果物を生成する

後者がたぶん本命だけれど。前者で使うことも多い。

複数の目的のためのタスクを記述して手軽に実行できるようにする

利用するコマンドのオプションを省略したい場合に使う。 脳みそのリソースの削減。細かなオプションの意味など後で見返した時に覚えていないことが多いのでメモがわりにmakefileを書く。

依存関係の書き方は以下の様な感じ。

<タスク名や生成されるファイル>: <依存するタスクや依存するファイル> ...
    <何か実行したい処理>

たいていcleanとdefaultを定義する。記述部分がごちゃっとしてきたら変数にする

SRC = x.txt y.txt

clean:
  rm -f *.out
default:
  python script.py ${SRC} > x.out

デフォルトの設定値(ただし実行時に変えたい)

以下のように実行時に値を渡したい場合がある。

$ make
10!!
# 10ではなく1にしたい
$ FOO=1 make
1!!

このような場合には ?= を使って設定する。実行するコマンド自体のエコー出力を消したい場合には @ をつける。

FOO ?= 10

default:
   @echo ${FOO}!!

phony

makefileにただclean,defaultと書くとそれが生成物としてのファイル名を表しているのか。タスク名を表しているのかわからない。同名のファイルなどがあると困る。なのでphonyを書いておく。

.PHONY clean default

1つの目的のためにまじめに依存関係を定義して使う

一連のタスク依存関係を持ちつつ連動して実行されて欲しい場合にまじめに依存関係を定義する。 対象となるものがファイルの場合には、成果物としてのファイルのutimeよりも依存先のファイルのutimeの方が新しい場合にのみ実行される。便利。

ただしこの目的の為に書く場合には、わりとまじめに使う場合が多いのでその場合はそれぞれの環境のツールを使うという感じでも良い気もする。とは言え、簡単なレベルならmakefileでもわりとどうにかなるので便利なことは便利。

以下のような謎の定義を書いておくと、依存関係にあるファイル拡張子によって実行する処理を記述できる。例えば、.pyの拡張子を持つファイルのpythonを実行した結果をリダイレクトした結果を.py.outに出力するなど。

%.py.out: %.py
  python $< > $@
  
hello.py.out: hello.py

clean:
  rm -f *.py.out

make -p

おもむろに make -p と実行してみると良い。内部で利用できる関係性を見る事ができる。

$ make -p # 一部だけ表示

%.py.out: %.py
#   commands to execute (from `Makefile', line 2):
    python $< > $@

ちなみに 無引数で実行した時に実行されるタスクは DEFAULT_GOALとして表示される。

$ make -p    # 一部だけ表示

# makefile
.DEFAULT_GOAL := hello.py.out

$ make clean
rm -f *.py.out
$ make
python hello.py > hello.py.out
# 2度目の実行はskip
$ make
make: `hello.py.out' is up to date.

シェルの実行結果をつかって定義を記述

シェルの実行結果をつかって定義を記述することもできる。 $(shell ...) を使う。

例えば以下のような状況で。

$ ls
Makefile  bye.py      hello.py

make を実行すると hello.py.outbye.py.out が生成されるようなmakefileは以下。

%.py.out: %.py
  python $< > $@

default: hello.py.out bye.py.out

hello.py.out: hello.py
bye.py.out: bye.py

clean:
  rm -f *.py.out
.PHONY: clean

これが面倒な場合には以下のようにも書ける。関係性が自明なのでhello.py.outなどの記述は省略できる。

%.py.out: %.py
  python $< > $@

default: hello.py.out bye.py.out

clean:
  rm -f *.py.out
.PHONY: clean

さらに依存先部分のhello.py.outなども書きたくない場合には $(shell ...) を使って以下の様にも書ける。 これはシェルスクリプトの実行結果に置き換えるというような意味。

%.py.out: %.py
  python $< > $@

default: $(shell ls *.py | sed 's/$$/.out/g')

clean:
  rm -f *.py.out
.PHONY: clean

注意点として $ は2つ必要(sedの記述は sed 's/$/.out/g' で良いところを)。 markdownで書いたreadmeなどを作ったりするのに便利。

よく分からない自動変数

今まで放置してきた謎の変数について紹介。以下の様になっている。

  • $@ 出力対象(生成されるファイル)の名前
  • $^ 入力対象(依存されるファイル)の名前全部
  • $< 入力対象の内1つ目のものの名前

おそらくよく使うのはこの辺の3つ。詳細を知りたければ このあたり

上の変数などを使って、出力結果をmarkdownでまとめてみたreadmeを作ってみる。

%.py.out: %.py
  python $< > $@

readme.md: $(shell ls *.py | sed 's/$$/.out/g')
  rm -f tmp
  touch tmp
  for i in $^; do echo $$i >> tmp; echo '```' >> tmp; cat $$i >> tmp; echo '```' >> tmp; done
  mv tmp $@

clean:
  rm -f *.py.out
  rm -f readme.md
  
.PHONY: clean

実行結果をまとめたreadmeを生成。

$ make
python bye.py > bye.py.out
python hello.py > hello.py.out
rm -f tmp
touch tmp
for i in bye.py.out hello.py.out; do echo $i >> tmp; echo '```' >> tmp; cat $i >> tmp; echo '```' >> tmp; done
mv tmp readme.md

$ cat readme.md
 bye.py.out
 ```
 bye
 ```
 hello.py.out
 ```
 hello
 ```

makefileわりと便利だよという話。

*1:macもdefaultでgnu makeっぽい

[python][logging] pythonのloggingモジュールの呼び出し元のファイル名:行番号を出力する機能が残念という話

はじめに

pythonの標準ライブラリのloggingモジュールを利用すると、ログ出力の中に呼び出し元の位置に関する情報(ファイル名や関数名など)を含める事ができる。 具体的にはloggerに渡すフォーマットに以下を含めることで、ログ出力時に呼び出した位置の情報が出力される。

  • %(pathname)s ログ出力が行わる実行コードが存在するファイルのパス名。
  • %(funcName)s ログ出力が行われた際のコンテキスト上の関数名。
  • %(lineno)s ログ出力が行われたファイルの行番号

便利で素晴らしい。一方で、ちょっとloggingモジュールのloggerに細工をしようと思った時にこの機能が上手く動作しなくなって悲しかったという話。

loggingモジュールので呼び出し元の位置をログに含める事ができる

例えば以下のようなファイル構造の時に、hello.pyで定義された hello() 上のログ出力にはhello.py中で呼び出されたということが分かる。

$ tree .
.
├── hello.py
└── main.py

hello.py

import logging
logger = logging.getLogger("*")


def hello():
    logger.info("hello")

main.py

import logging
import hello

format = "level:%(levelname)s\tmsg:%(message)s\tlocation:%(pathname)s(%(lineno)s)\tfn:%(funcName)s"
logging.basicConfig(level=logging.DEBUG, format=format)

hello.hello()

具体的には以下のようなログが出力される。

level:INFO   msg:hello   location:~/work/hello.py(6) fn:hello

期待通りhello.py上で実行されたものだということが分かる。一方で、loggerに対してもう少し機能を付加したいとloggerオブジェクトをwrapするようなオブジェクトを定義したときにこの機能が上手く動作しなくなる

元のloggerをwrapしたオブジェクトを作ってみる

例えば、何らかの処理を追加したいと思い。直接的にloggingモジュールのloggerを使うのではなく、loggerを内部に保持したオブジェクトを作りたいと思うことがある。以下のようなもの。

wrap.pyでLoggerWrapperを定義し、hello.pyではこのオブジェクトで包んだオブジェクトを利用してログ出力をしようとするように、コードを変えてみる。

wrap.py

class LoggerWrapper:
    def __init__(self, logger):
        self.logger = logger

    def info(self, *args, **kwargs):
        # do something
        self.logger.info(*args, **kwargs)

hello.py

import logging
import wrap


logger = logging.getLogger(__name__)
wlogger = wrap.LoggerWrapper(logger) 


def hello2():
    wlogger.info("hello")

この時にも、hello2() 上のログ出力の位置としてファイル名がhello.pyになってほしい。ところがそうはならない。LoggerWrapper上のLoggerWrapper.info() の位置がログ出力されてしまう。

level:INFO   msg:hello   location:~/work/wrap.py(7)  fn:info

原因

実行時の位置情報について期待していない結果が出力されてしまう原因はloggerの定義を見てみると分かる。例えばpython3.5.2上のloggingモジュールのコードを見ると以下の様になっている。

if hasattr(sys, '_getframe'):
    currentframe = lambda: sys._getframe(3)
else: #pragma: no cover
    def currentframe():
        """Return the frame object for the caller's stack frame."""
        try:
            raise Exception
        except Exception:
            return sys.exc_info()[2].tb_frame.f_back


## snip..
class Logger(Filterer):

    ## snip..
    def findCaller(self, stack_info=False):
        """
        Find the stack frame of the caller so that we can note the source
        file name, line number and function name.
        """
        f = currentframe()
        #On some versions of IronPython, currentframe() returns None if
        #IronPython isn't run with -X:Frames.
        if f is not None:
            f = f.f_back
        rv = "(unknown file)", 0, "(unknown function)", None
        while hasattr(f, "f_code"):
            co = f.f_code
            filename = os.path.normcase(co.co_filename)
            if filename == _srcfile:
                f = f.f_back
                continue
            sinfo = None
            if stack_info:
                sio = io.StringIO()
                sio.write('Stack (most recent call last):\n')
                traceback.print_stack(f, file=sio)
                sinfo = sio.getvalue()
                if sinfo[-1] == '\n':
                    sinfo = sinfo[:-1]
                sio.close()
            rv = (co.co_filename, f.f_lineno, co.co_name, sinfo)
            break
        return rv

内部の詳細は省略するとして、ログ出力の位置の構造を取得するのに、実行時のstackframeの情報を利用している。 この際に取得するstackframeのレベルが3で固定になってしまっているので、実行時に呼び出される logger.findCaller() の深さが変わってしまうようなコードを書いてしまうと上手く位置が取得できなくなってしまう(loggingモジュールのloggerは、何かのオブジェクトにwrapしたりすることはせずに直接使う必要が有るということ)。

かなしい。

もちろん以下のようにログ出力部分を関数で包んでしまっても上手く行かない。

def log_with_additional_info(message, *args, **kwargs):
    logger.info("additional: {}".format(message), *args, **kwargs)


def hello3():
    log_with_additional_info("hello")

hello3() ではなく log_with_additional_info() 上で使われたということになってしまう。

level:INFO   msg:additional: hello   location:~/work/hello.py(14)    fn:log_with_additional_info

もちろん、両者は共に loggingモジュールのloggerの logger.info() が呼ばれた位置 であることには違いないのだけれど。

以下の様なことができたらなーという感じ。

# 位置情報を取得する際にsys._getframe(3) ではなく sys._getframe(4)が呼ばれるようになる
logger.setFindCallerDepth(4)

gist https://gist.github.com/podhmo/056213a4cd51c2e611b963b36ae9cdbb