pythonでSSE (server side events) の例を作ってみて遊んでみた

今回は、以下の記事の実行をwsgirefだけを使って試してみようと言う趣旨のメモ。

お試しで遊ぶ程度の実装なので真面目な実装ではない。

SSE

SSEは雑に言うと以下のようなもの

  • コネクション張りっぱなし
  • コネクションの再接続をブラウザがやってくれる1
  • ただのHTTP ( Content-Type=text/event-stream )

ブラウザ側からは以下の様なイベントが受け取れる

  • open -- (default) -- コネクションが確立されたとき
  • close -- (default) -- コネクションが切断(終了)されたとき。状態はreadyStateを見る2
  • message -- (default) -- イベント名を付けなかった場合

(もちろんその他に自分で名前をつけたイベントも受け取れる (名前付きイベント))

出力例

server側の出力例

  • : で始まるメッセージはコメント
  • data: で始まる行はメッセージ
  • \n\n でメッセージを区切る

https://developer.mozilla.org/ja/docs/Web/API/Server-sent_events/Using_server-sent_events#examples から引用

: this is a test stream

data: some text

data: another message
data: with two lines

上の出力例は、LDJSONで考えると以下のようなものとして解釈される。

{"data": "some text"}
{"data": "another message\nwith two lines"}

JS側ではこのような形で待ち受ける。

eventSource.addEventListener("message", function(e){
  console.log(`message: ${e.data}`);
});

eventで名前付きのメッセージが送れる。

event: userconnect
data: {"username": "bobby", "time": "02:33:48"}

data: Here's a system message of some kind that will get used
data: to accomplish some task.

event: usermessage
data: {"username": "bobby", "time": "02:34:11", "text": "Hi everyone."}

たとえば、userconnectというイベントはaddEventListener()で待ち受ける

eventSource.addEventListener("userconnect", function(e){
    ...
});

動くコード

以下の様に実行して http://localhost:8080 を開くと、MDNの実行例が動くようになる。

$ PORT=8080 python 02ssedemo.py
Serving on port 8080...
127.0.0.1 - - [26/May/2021 19:52:08] "GET /sse HTTP/1.1" 200 616
127.0.0.1 - - [26/May/2021 19:53:22] "GET / HTTP/1.1" 200 1310
127.0.0.1 - - [26/May/2021 19:53:27] "GET /sse HTTP/1.1" 200 612
127.0.0.1 - - [26/May/2021 19:53:37] "GET /sse HTTP/1.1" 200 615
127.0.0.1 - - [26/May/2021 19:53:47] "GET /sse HTTP/1.1" 200 617
127.0.0.1 - - [26/May/2021 19:53:58] "GET /sse HTTP/1.1" 200 615
...
  • /sse SSE用の出力
  • それ以外 index.htmlのようなもの (MDNの例に似た物が動く)

ブラウザーで開いた画面

/sse 自体はこういう出力を返している。

$ http -S -v :8080/sse
GET /sse HTTP/1.1
Accept: */*
Accept-Encoding: gzip, deflate
Connection: keep-alive
Host: localhost:8080
User-Agent: HTTPie/2.4.0



HTTP/1.0 200 OK
Cache-Control: no-cache
Content-type: text/event-stream; charset=utf-8
Date: Wed, 26 May 2021 10:52:03 GMT
Server: WSGIServer/0.2 CPython/3.9.2

: this is a test stream
event: clock
data: {"time": "1622026323.7402098", "c": 0 }

event: clock
data: {"time": "1622026324.242613", "c": 1 }

event: clock
data: {"time": "1622026324.744423", "c": 2 }

event: clock
data: {"time": "1622026325.246079", "c": 3 }

event: clock
data: {"time": "1622026325.74704", "c": 4 }

event: clock
data: {"time": "1622026326.252667", "c": 5 }

event: clock
data: {"time": "1622026326.7538052", "c": 6 }

event: clock
data: {"time": "1622026327.255278", "c": 7 }

event: clock
data: {"time": "1622026327.760777", "c": 8 }

event: clock
data: {"time": "1622026328.2637691", "c": 9 }

pythonのコード

02ssedemo.py

  • wsgi appはgeneratorを返すとstream responseとして処理される3
  • PATH_INFOを見ればpathでの分岐はできる

(f-stringを使うとキモいので微妙だったかもしれない)

import os
import sys
from wsgiref.simple_server import make_server

PORT = int(os.getenv("PORT") or "8080")


def app(environ, start_response):
    path = environ["PATH_INFO"]
    if path == "/sse":
        yield from sse(environ, start_response)
    else:
        yield from index(environ, start_response)


def index(environ, start_response):
    status = "200 OK"
    headers = [
        ("Content-type", "text/html; charset=utf-8"),
    ]
    start_response(status, headers)
    yield f"""
<body>
  <script>
    // const evtSource = new EventSource("//localhost:{PORT}/sse", {{ withCredentials: true }} ); 
    const evtSource = new EventSource("//localhost:{PORT}/sse", {{ withCredentials: false }} ); 
    evtSource.onmessage = function(e) {{
      var newElement = document.createElement("li");
      var eventList = document.getElementById('list');

      newElement.innerHTML = "message: " + e.data;
      eventList.appendChild(newElement);
    }};

    evtSource.addEventListener("clock",  function(e){{
      var newElement = document.createElement("li");
      var eventList = document.getElementById('list');

      newElement.innerHTML = "clock: " + e.data;
      eventList.appendChild(newElement);
    }});

    // default events
    evtSource.addEventListener("open",  function(e){{
      console.log("open")
    }});
    evtSource.addEventListener('error', function(event) {{
        switch (event.target.readyState) {{
            case EventSource.CONNECTING:
                console.log('Reconnecting...');
                break;
            case EventSource.CLOSED:
                console.log('Connection failed, will not reconnect');
                break;
        }}
      }});

    // if evtSource.close() is not called, reset connection
  </script>
<body>
  <div id="list"></div>
</body>
</body>
""".encode(
        "utf-8"
    )


def sse(environ, start_response):
    import time

    status = "200 OK"
    headers = [
        ("Content-type", "text/event-stream; charset=utf-8"),
        ("Cache-Control", "no-cache"),
    ]
    start_response(status, headers)
    yield ": this is a test stream".encode("utf-8")  # this is comment
    yield "\n".encode("utf-8")

    for i in range(10):

        yield "event: clock\n".encode("utf-8")
        yield f'data: {{"time": "{time.time()}", "c": {i} }}\n'.encode("utf-8")
        yield "\n".encode("utf-8")
        time.sleep(0.5)


httpd = make_server("", PORT, app)
print(f"Serving on port {PORT}...", file=sys.stderr)

# Serve until process is killed
httpd.serve_forever()

ただ、CLIでやり取りするとしたらブラウザが実装してくれる挙動を実装しないとダメなのだよなー。

gist

参考


  1. 詳しい仕様はこのあたり https://triple-underscore.github.io/HTML-server-sent-events-ja.html#sse-processing-model

  2. EventSourceのIDLはこのあたり https://html.spec.whatwg.org/multipage/server-sent-events.html#the-eventsource-interface

  3. というかwsgiの仕様的には何らかの1個以上の文字列を返せれば何でも良い。通常は長さ1のリストであることが多いけれど。 https://www.python.org/dev/peps/pep-0333/#buffering-and-streaming