読者です 読者をやめる 読者になる 読者になる

pythonのwsgiref でちょっとしたHTTPを話したい時に

python memo

pythonのwsgiref でちょっとしたHTTPを話したい時に

pythonでちょっとした調査をしたい時に、何かパッケージを入れるのも面倒な場合に wsgiref だけですませたくなる場合がある。その時のためのmemo。

hello world

from wsgiref.simple_server import make_server


def app(environ, start_response):
    status = '200 OK'
    headers = [('Content-type', 'text/plain; charset=utf-8')]
    start_response(status, headers)
    return [b"Hello World"]

httpd = make_server('', 8000, app)
print("Serving on port 8000...")

# Serve until process is killed
httpd.serve_forever()

PATHで分岐

environの PATH_INFO を見る

def app(environ, start_response):
    path = environ["PATH_INFO"]
    if path.startswith("/api"):
        return on_api(environ, start_response)
    else:
        return on_html(environ, start_response)

request methodで分岐(GET/POST)

environの request_method を見る

def on_api(environ, start_response):
    # 直接は関係ないけれど apiならContent-typeも異なる
    headers = [('Content-type', 'application/json; charset=utf-8')]
    request_method = environ["REQUEST_METHOD"]
    if request_method == "POST":
        # post
    else:
        # GET or other method

雑なhtmlを返す

雑で良い。

import os.path

def on_html(environ, start_response):
    path = environ["PATH_INFO"]
    headers = [('Content-type', 'text/html; charset=utf-8')]
    if path in ("", "/"):
        path = "index.html"
    try:
        with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), path.lstrip("/"))) as rf:
            status = '200 OK'
            start_response(status, headers)
            return [rf.read()]
    except Exception as e:
        print(e)
        status = '404 Not Found'
        start_response(status, headers)
        return [bytes(path)]

ファイルをdownloadさせたい

テキトウにheaderを付ける

from wsgiref.headers import Headers


def on_file_download(environ, start_response):
    headers = [('Content-type', 'text/html; charset=utf-8')]
    h = Headers(headers)
    h.add_header('content-disposition', 'attachment', filename='index.html')

    status = '200 OK'
    start_response(status, headers)

    with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), path.lstrip("/"))) as rf:
        return [rf.read()]

GETに付加されたquery stringをparseする

environ の QUERY_STRING を見る

try:
    from urllib.parse import parse_qs
except ImportError:
    from urlparse import parse_qs

def on_get(environ, start_response):
    path = environ["PATH_INFO"]
    print(parse_qs(environ["QUERY_STRING"]))

POSTされたデータをparseする

environ の wsgi.input を見る

import cgi

def on_post(environ, start_respose):
    wsgi_input = environ["wsgi.input"]
    form = cgi.FieldStorage(fp=wsgi_input, environ=environ, keep_blank_values=True)
    # data: Dict[string, List[string]]  重複したparameterがあった場合
    data = {k: form[k].value for k in form}

request.bodyにjsonが付いたrequestをparseする

REST APIなどrequest.bodyにデータを載せるようなものは以下の様にする。 environ の CONTENT_LENGTH を見ないとだめ。

import json

def on_post_rest(environ, start_response):
    wsgi_input = environ["wsgi.input"]
    content_length = int(environ["CONTENT_LENGTH"])
    # data: JSON = Dict[string, Union[string,int,float,bool,JSON,List[JSON]]]
    data = json.loads(wsgi_input.read(content_length))

付録: python2.x,python3.xの両方に対応するにはちょっと大変

python2.xではstrが来るが。python3.xでは確実にbytesを返さないとだめ。 python3.xの場合には適切にbytesを取り扱わないとダメ。

def on_api_post(path, environ, start_response):
    status = '200 OK'
    headers = [('Content-type', 'application/json; charset=utf-8')]
    start_response(status, headers)

    wsgi_input = environ["wsgi.input"]
    content_length = int(environ["CONTENT_LENGTH"])
    data = json.loads(wsgi_input.read(content_length)).decode("utf-8")))
    return [json.dumps(data).encode("utf-8"))]

python2.x, python3.xの両方に対応する場合。

import sys
PY3 = sys.version_info[0] == 3
if PY3:
    text_type = str
    binary_type = bytes
else:
    text_type = unicode
    binary_type = str


def bytes_(s, encoding='utf-8', errors='strict'):
    if isinstance(s, text_type):
        return s.encode(encoding, errors)
    return s


def text_(s, encoding='utf-8', errors='strict'):
    if isinstance(s, binary_type):
        return s.decode(encoding, errors)
    return s

def on_api_post(path, environ, start_response):
    status = '200 OK'
    headers = [('Content-type', 'application/json; charset=utf-8')]
    start_response(status, headers)

    wsgi_input = environ["wsgi.input"]
    content_length = int(environ["CONTENT_LENGTH"])
    data = json.loads(text_(wsgi_input.read(content_length))))
    return [bytes_(json.dumps(data))]