完全に自分の為のsqsをpythonで触るときのメモ

自分用のメモなので汚い。やりたいことはSQSのsend/receiveのやり方の例を1つのpythonファイルに押し込めること。

準備

以下が必要。handofcatsやjqfpyは手抜きのためのもの。

requirements.txt

awscli
boto
handofcats
jqfpy
$ pip install -r requirements.txt

使い方

利用するたびにSQSのqueueを作成・削除し直す。

# queueの作成
$ make setup

# <なにかする>

# queueの削除
$ make clean

Makefileはこんな感じ。00的なタスクがどんどん増えていく。

NAME ?= hello

00:
  python $(shell echo $@*.py) run --queue-url $$(make -s get)

setup:
#  pip install -r requirements.txt
    aws sqs create-queue --queue-name ${NAME}
purge:
  aws sqs purge-queue --queue-url $$(make -s get)
clean:
  aws sqs delete-queue --queue-url $$(make -s get)
get:
  aws sqs get-queue-url --queue-name ${NAME} | jqfpy 'get("QueueUrl")' -r

send/receiveを使ったサンプル

コードはawsdocsのexampleのコードから拝借したものをいろいろいじったもの。

subprocessでsenderとreceiverを立ち上げている。handofcatsを使っているのは単にサブコマンドを手軽に定義したかったから。sys.executable__file__のセットは自分自身を動かしたいときに使えるイディオム。

00workers.py

import boto3
import os
from botocore.exceptions import ClientError
from handofcats import as_subcommand


@as_subcommand
def sender(*, queue_url: str) -> None:
    print(os.getpid(), "** sender **")
    sqs_client = boto3.client("sqs")

    entries = []
    for i in range(1, 6):
        entries.append(
            {"Id": f"m{i}", "MessageBody": f"SQS message #{i}", "DelaySeconds": 10}
        )

    try:
        assert (
            entries
        ), "!! An error occurred (AWS.SimpleQueueService.EmptyBatchRequest) when calling the SendMessageBatch operation: There should be at least one SendMessageBatchRequestEntry in the request."

        response = sqs_client.send_message_batch(QueueUrl=queue_url, Entries=entries)
        print(os.getpid(), "SEND", response)
    except ClientError as e:
        print(os.getpid(), "!!", e)


@as_subcommand
def receiver(*, queue_url: str) -> None:
    print(os.getpid(), "** receiver **")

    life = 3
    sqs_client = boto3.client("sqs")
    for _ in range(life):
        try:
            response = sqs_client.receive_message(
                QueueUrl=queue_url,
                AttributeNames=["SentTimestamp"],
                MaxNumberOfMessages=10,
                MessageAttributeNames=["All"],
                WaitTimeSeconds=5,
                VisibilityTimeout=1,
            )
            print(os.getpid(), "RECEIVE", response)
            print(os.getpid(), "----------------------------------------")
            try:
                messages = response["Messages"]
            except KeyError:
                print(os.getpid(), "EMPTY")
                continue

            print(os.getpid(), len(messages))

            will_be_deleted = []
            for msg in messages:
                print(os.getpid(), f"{msg['MessageId']}: {msg['Body']}")
                will_be_deleted.append(
                    {"Id": msg["MessageId"], "ReceiptHandle": msg["ReceiptHandle"]}
                )

            print(
                os.getpid(),
                "DELETE",
                sqs_client.delete_message_batch(
                    QueueUrl=queue_url, Entries=will_be_deleted
                ),
            )
        except ClientError as e:
            print(os.getpid(), "!!", e)


@as_subcommand
def run(*, queue_url: str):
    import sys
    import subprocess

    ps = []
    ps.append(
        subprocess.Popen([sys.executable, __file__, "sender", "--queue-url", queue_url])
    )
    ps.append(
        subprocess.Popen(
            [sys.executable, __file__, "receiver", "--queue-url", queue_url]
        )
    )
    for p in ps:
        p.wait()


as_subcommand.run()

実行結果

実行結果は汚いが雰囲気は分かるはず(未来の自分へ)

python 00workers.py run --queue-url $(make -s get)
56370 ** sender **
56371 ** receiver **
56370 SEND {'Successful': [{'Id': 'm1', 'MessageId': '36f5a371-7b6a-4bd2-b3ce-c08c57d7715f', 'MD5OfMessageBody': '8a69e233effa0e11be98d3866700bc00'}, {'Id': 'm2', 'MessageId': 'e2bf9104-191a-4fa8-af23-f3483d13ab4b', 'MD5OfMessageBody': 'df4abd2ff167c2f564494cf50be9afbe'}, {'Id': 'm3', 'MessageId': '972c2dd5-5a75-4993-b5d4-63a9d457c47d', 'MD5OfMessageBody': 'f6db23f357fb15af3060dd19f95f12b9'}, {'Id': 'm4', 'MessageId': '3e7fc5b7-6057-489c-bcba-e4284b70933b', 'MD5OfMessageBody': '6f1d6f7921a093398b7816335f9b30dd'}, {'Id': 'm5', 'MessageId': 'b6a33bc8-fc53-448a-bf37-ba6876f679e8', 'MD5OfMessageBody': 'b0acfb636ba5c9b0210a8be552470695'}], 'ResponseMetadata': {'RequestId': '534b22a5-e8d5-5812-b41e-7411aafb4172', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '534b22a5-e8d5-5812-b41e-7411aafb4172', 'date': 'Wed, 05 Feb 2020 04:09:18 GMT', 'content-type': 'text/xml', 'content-length': '1260'}, 'RetryAttempts': 0}}
56371 RECEIVE {'ResponseMetadata': {'RequestId': 'e51f8cf0-64ad-5e1f-92b9-da4426834dfb', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'e51f8cf0-64ad-5e1f-92b9-da4426834dfb', 'date': 'Wed, 05 Feb 2020 04:09:23 GMT', 'content-type': 'text/xml', 'content-length': '240'}, 'RetryAttempts': 0}}
56371 ----------------------------------------
56371 EMPTY
56371 RECEIVE {'Messages': [{'MessageId': '36f5a371-7b6a-4bd2-b3ce-c08c57d7715f', 'ReceiptHandle': 'AQEBiWLCO2HzXvNxHcmhI6P+OPgINFwF89DRYNkCwdZztlC6o0GmLo0/bHS1G+EMqwUtQf1eCFG4Kuc6tbCoMxkeTgsvrznW4ldax5xa5pqMVj3EW8oKZVsV54XaqXXhEnzQTpCdBJoa6tWtVIsNhc5ezdwaJiz2FJILZ5uKNBKKf/cG6iyok/+pfUyRAqORFNddiZfN00vYhjXwGtspu1h320wuol5M5v5jOaabbDikmIKr8iNWCJ5akkizpPs5fgQRpGnd0MaazHYX5z6ck3Qf2/2Yhz/GQxKPu/YFDsa26Xvo4ZgjGO4nsBlt11+yHTSbR3FU3L7YPyAFz/8sicJiKGwLBoQ7cJ/xqaoKVMVot0wCLHk4mdbgPSEPPaskfcYBa8H8l7YIC8HSpwNU5S+G4Q==', 'MD5OfBody': '8a69e233effa0e11be98d3866700bc00', 'Body': 'SQS message #1', 'Attributes': {'SentTimestamp': '1580875758721'}}, {'MessageId': 'b6a33bc8-fc53-448a-bf37-ba6876f679e8', 'ReceiptHandle': 'AQEBpLoh2x4Mxc54CEOv51kxyNWyoLlOHq5Ewi65hQQ0+X4KZseeqnioZvAovYzjkkkL9d6grZ5Q9KJGynil5XwhmM4fl7rJqcsflZkXvIX+HAaIYUBTdmc5uUhG+qZgl5UAzdDySRsWy4zzCUgG34nTpbbak0isW+EKzxCKeHmF5E8ng1GCA+A8t3Ria6+A4h2tus76pWg1hnQiA+0+mmuyzT5u+TvQ4HRIql7SY25JAFme3a+sq4Vemuf3ekX0O2nIowNIF7shK4RWVORrxhU+rbaR2SgGpD/nTJDkriiOK84UCaDR18bwycSxQP7a2cn6GASGlAceS4F20Xhm+QqsfH/QVjOiPtlNFKo5d+wsn2ZtEQtnxg78jrzG75Y8lTTSiVcnI9lye0iDlY1hPkeP/Q==', 'MD5OfBody': 'b0acfb636ba5c9b0210a8be552470695', 'Body': 'SQS message #5', 'Attributes': {'SentTimestamp': '1580875758722'}}], 'ResponseMetadata': {'RequestId': 'a848c26d-afeb-5b5b-9a05-94d9edd808bf', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'a848c26d-afeb-5b5b-9a05-94d9edd808bf', 'date': 'Wed, 05 Feb 2020 04:09:28 GMT', 'content-type': 'text/xml', 'content-length': '1622'}, 'RetryAttempts': 0}}
56371 ----------------------------------------
56371 2
56371 36f5a371-7b6a-4bd2-b3ce-c08c57d7715f: SQS message #1
56371 b6a33bc8-fc53-448a-bf37-ba6876f679e8: SQS message #5
56371 DELETE {'Successful': [{'Id': '36f5a371-7b6a-4bd2-b3ce-c08c57d7715f'}, {'Id': 'b6a33bc8-fc53-448a-bf37-ba6876f679e8'}], 'ResponseMetadata': {'RequestId': 'c71b1431-2e60-5dc8-b1b8-42856c58ce2c', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'c71b1431-2e60-5dc8-b1b8-42856c58ce2c', 'date': 'Wed, 05 Feb 2020 04:09:28 GMT', 'content-type': 'text/xml', 'content-length': '494'}, 'RetryAttempts': 0}}
56371 RECEIVE {'Messages': [{'MessageId': 'e2bf9104-191a-4fa8-af23-f3483d13ab4b', 'ReceiptHandle': 'AQEBREIOXdU94TI84w1v1LAJ3AhB6ybL1ViXbvRqvwVWByVFEjWzQqvjZZiNQVNYHBKinMwDKSfaUfbvnawcxbPwBBhPqo3g4Ubr7pJ/Ky8Q6SVn/vric1zZFisUDmrZO6olQHSf4nwQ1kJHDLKaH2QujQuGOaWoW0KxDI+e7itz9uNY4zsCernhgfAVWwA/LnCqPK3fzFI47NNUwa7UeDhc1Rt0IWxe8Dw/MABM/chRL9e72WnW1MeMp7b5ixd+K2Gyq3DeB82Cf5Q9Sqk9Or+vAlf5FSSz+cBRaqxBGF6MgpBbOhZGjpMYyIqqPe8XhEHFkHT/mPPeR8qs/YPDrqx3vuKjoZdpJhFI5BOtcQELTOo6BITnWRMbxIwO2GHp0ynNLVflN3sKUmioM86Vq3R5Zg==', 'MD5OfBody': 'df4abd2ff167c2f564494cf50be9afbe', 'Body': 'SQS message #2', 'Attributes': {'SentTimestamp': '1580875758861'}}], 'ResponseMetadata': {'RequestId': 'd90e512a-5e39-5f0d-bbdd-079e1c012ce9', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'd90e512a-5e39-5f0d-bbdd-079e1c012ce9', 'date': 'Wed, 05 Feb 2020 04:09:28 GMT', 'content-type': 'text/xml', 'content-length': '942'}, 'RetryAttempts': 0}}
56371 ----------------------------------------
56371 1
56371 e2bf9104-191a-4fa8-af23-f3483d13ab4b: SQS message #2
56371 DELETE {'Successful': [{'Id': 'e2bf9104-191a-4fa8-af23-f3483d13ab4b'}], 'ResponseMetadata': {'RequestId': '1cab75f5-3e3e-5f9b-8b97-23417c07bc7f', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '1cab75f5-3e3e-5f9b-8b97-23417c07bc7f', 'date': 'Wed, 05 Feb 2020 04:09:29 GMT', 'content-type': 'text/xml', 'content-length': '386'}, 'RetryAttempts': 0}}

gist

gist

minitask (experimental)

どこにも説明がないが最近書いているminitaskを使うと現時点では以下の様なコードと動作をする(ここでは詳しい説明は一切しない)。

https://github.com/podhmo/minitask

この記事自体もminitaskのための情報整理を念頭に入れていたりはしていた。此処から先はメモ程度のもの。現時点のスナップショット(つまり変わりうる)。加えて動作自体は同じものではないのでこれを使えば綺麗に書けるとかそういう話もしない。

01workers.py

from __future__ import annotations
import time
from botocore.exceptions import ClientError
from handofcats import as_command
from minitask.worker.sqsworker import Manager, Config


def receiver(m: Manager, uid: str) -> None:
    with m.open_reader_queue(uid) as q:
        for item in q:
            # print("@", q.latest)
            print("<-", item)


def sender(m: Manager, uid: str) -> None:
    with m.open_writer_queue(uid) as q:
        try:
            for i in range(1, 10):
                m = f"SQS message #{i}"
                q.put({"message": m})
                time.sleep(0.1)
        except ClientError as e:
            print("!!", e)


@as_command
def run(*, queue_url: str):
    with Manager(Config(MessageAttributeNames=["All"])) as m:
        m.spawn(sender, uid=queue_url)
        m.spawn(receiver, uid=queue_url)

実行結果

python 01workers.py --queue-url $(make -s get)
<- {'message': 'SQS message #1'}
<- {'message': 'SQS message #2'}
<- {'message': 'SQS message #3'}
<- {'message': 'SQS message #4'}
<- {'message': 'SQS message #5'}
<- {'message': 'SQS message #6'}
<- {'message': 'SQS message #7'}
<- {'message': 'SQS message #8'}
<- {'message': 'SQS message #9'}