完全に自分の為のsqsをpythonで触るときのメモ
自分用のメモなので汚い。やりたいことはSQSのsend/receiveのやり方の例を1つのpythonファイルに押し込めること。
準備
以下が必要。handofcatsやjqfpyは手抜きのためのもの。
requirements.txt
awscli boto handofcats jqfpy
$ pip install -r requirements.txt
使い方
利用するたびにSQSのqueueを作成・削除し直す。
# queueの作成 $ make setup # <なにかする> # queueの削除 $ make clean
Makefileはこんな感じ。00的なタスクがどんどん増えていく。
NAME ?= hello 00: python $(shell echo $@*.py) run --queue-url $$(make -s get) setup: # pip install -r requirements.txt aws sqs create-queue --queue-name ${NAME} purge: aws sqs purge-queue --queue-url $$(make -s get) clean: aws sqs delete-queue --queue-url $$(make -s get) get: aws sqs get-queue-url --queue-name ${NAME} | jqfpy 'get("QueueUrl")' -r
send/receiveを使ったサンプル
コードはawsdocsのexampleのコードから拝借したものをいろいろいじったもの。
subprocessでsenderとreceiverを立ち上げている。handofcatsを使っているのは単にサブコマンドを手軽に定義したかったから。sys.executable
と__file__
のセットは自分自身を動かしたいときに使えるイディオム。
00workers.py
import boto3 import os from botocore.exceptions import ClientError from handofcats import as_subcommand @as_subcommand def sender(*, queue_url: str) -> None: print(os.getpid(), "** sender **") sqs_client = boto3.client("sqs") entries = [] for i in range(1, 6): entries.append( {"Id": f"m{i}", "MessageBody": f"SQS message #{i}", "DelaySeconds": 10} ) try: assert ( entries ), "!! An error occurred (AWS.SimpleQueueService.EmptyBatchRequest) when calling the SendMessageBatch operation: There should be at least one SendMessageBatchRequestEntry in the request." response = sqs_client.send_message_batch(QueueUrl=queue_url, Entries=entries) print(os.getpid(), "SEND", response) except ClientError as e: print(os.getpid(), "!!", e) @as_subcommand def receiver(*, queue_url: str) -> None: print(os.getpid(), "** receiver **") life = 3 sqs_client = boto3.client("sqs") for _ in range(life): try: response = sqs_client.receive_message( QueueUrl=queue_url, AttributeNames=["SentTimestamp"], MaxNumberOfMessages=10, MessageAttributeNames=["All"], WaitTimeSeconds=5, VisibilityTimeout=1, ) print(os.getpid(), "RECEIVE", response) print(os.getpid(), "----------------------------------------") try: messages = response["Messages"] except KeyError: print(os.getpid(), "EMPTY") continue print(os.getpid(), len(messages)) will_be_deleted = [] for msg in messages: print(os.getpid(), f"{msg['MessageId']}: {msg['Body']}") will_be_deleted.append( {"Id": msg["MessageId"], "ReceiptHandle": msg["ReceiptHandle"]} ) print( os.getpid(), "DELETE", sqs_client.delete_message_batch( QueueUrl=queue_url, Entries=will_be_deleted ), ) except ClientError as e: print(os.getpid(), "!!", e) @as_subcommand def run(*, queue_url: str): import sys import subprocess ps = [] ps.append( subprocess.Popen([sys.executable, __file__, "sender", "--queue-url", queue_url]) ) ps.append( subprocess.Popen( [sys.executable, __file__, "receiver", "--queue-url", queue_url] ) ) for p in ps: p.wait() as_subcommand.run()
実行結果
実行結果は汚いが雰囲気は分かるはず(未来の自分へ)
python 00workers.py run --queue-url $(make -s get) 56370 ** sender ** 56371 ** receiver ** 56370 SEND {'Successful': [{'Id': 'm1', 'MessageId': '36f5a371-7b6a-4bd2-b3ce-c08c57d7715f', 'MD5OfMessageBody': '8a69e233effa0e11be98d3866700bc00'}, {'Id': 'm2', 'MessageId': 'e2bf9104-191a-4fa8-af23-f3483d13ab4b', 'MD5OfMessageBody': 'df4abd2ff167c2f564494cf50be9afbe'}, {'Id': 'm3', 'MessageId': '972c2dd5-5a75-4993-b5d4-63a9d457c47d', 'MD5OfMessageBody': 'f6db23f357fb15af3060dd19f95f12b9'}, {'Id': 'm4', 'MessageId': '3e7fc5b7-6057-489c-bcba-e4284b70933b', 'MD5OfMessageBody': '6f1d6f7921a093398b7816335f9b30dd'}, {'Id': 'm5', 'MessageId': 'b6a33bc8-fc53-448a-bf37-ba6876f679e8', 'MD5OfMessageBody': 'b0acfb636ba5c9b0210a8be552470695'}], 'ResponseMetadata': {'RequestId': '534b22a5-e8d5-5812-b41e-7411aafb4172', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '534b22a5-e8d5-5812-b41e-7411aafb4172', 'date': 'Wed, 05 Feb 2020 04:09:18 GMT', 'content-type': 'text/xml', 'content-length': '1260'}, 'RetryAttempts': 0}} 56371 RECEIVE {'ResponseMetadata': {'RequestId': 'e51f8cf0-64ad-5e1f-92b9-da4426834dfb', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'e51f8cf0-64ad-5e1f-92b9-da4426834dfb', 'date': 'Wed, 05 Feb 2020 04:09:23 GMT', 'content-type': 'text/xml', 'content-length': '240'}, 'RetryAttempts': 0}} 56371 ---------------------------------------- 56371 EMPTY 56371 RECEIVE {'Messages': [{'MessageId': '36f5a371-7b6a-4bd2-b3ce-c08c57d7715f', 'ReceiptHandle': 'AQEBiWLCO2HzXvNxHcmhI6P+OPgINFwF89DRYNkCwdZztlC6o0GmLo0/bHS1G+EMqwUtQf1eCFG4Kuc6tbCoMxkeTgsvrznW4ldax5xa5pqMVj3EW8oKZVsV54XaqXXhEnzQTpCdBJoa6tWtVIsNhc5ezdwaJiz2FJILZ5uKNBKKf/cG6iyok/+pfUyRAqORFNddiZfN00vYhjXwGtspu1h320wuol5M5v5jOaabbDikmIKr8iNWCJ5akkizpPs5fgQRpGnd0MaazHYX5z6ck3Qf2/2Yhz/GQxKPu/YFDsa26Xvo4ZgjGO4nsBlt11+yHTSbR3FU3L7YPyAFz/8sicJiKGwLBoQ7cJ/xqaoKVMVot0wCLHk4mdbgPSEPPaskfcYBa8H8l7YIC8HSpwNU5S+G4Q==', 'MD5OfBody': '8a69e233effa0e11be98d3866700bc00', 'Body': 'SQS message #1', 'Attributes': {'SentTimestamp': '1580875758721'}}, {'MessageId': 'b6a33bc8-fc53-448a-bf37-ba6876f679e8', 'ReceiptHandle': 'AQEBpLoh2x4Mxc54CEOv51kxyNWyoLlOHq5Ewi65hQQ0+X4KZseeqnioZvAovYzjkkkL9d6grZ5Q9KJGynil5XwhmM4fl7rJqcsflZkXvIX+HAaIYUBTdmc5uUhG+qZgl5UAzdDySRsWy4zzCUgG34nTpbbak0isW+EKzxCKeHmF5E8ng1GCA+A8t3Ria6+A4h2tus76pWg1hnQiA+0+mmuyzT5u+TvQ4HRIql7SY25JAFme3a+sq4Vemuf3ekX0O2nIowNIF7shK4RWVORrxhU+rbaR2SgGpD/nTJDkriiOK84UCaDR18bwycSxQP7a2cn6GASGlAceS4F20Xhm+QqsfH/QVjOiPtlNFKo5d+wsn2ZtEQtnxg78jrzG75Y8lTTSiVcnI9lye0iDlY1hPkeP/Q==', 'MD5OfBody': 'b0acfb636ba5c9b0210a8be552470695', 'Body': 'SQS message #5', 'Attributes': {'SentTimestamp': '1580875758722'}}], 'ResponseMetadata': {'RequestId': 'a848c26d-afeb-5b5b-9a05-94d9edd808bf', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'a848c26d-afeb-5b5b-9a05-94d9edd808bf', 'date': 'Wed, 05 Feb 2020 04:09:28 GMT', 'content-type': 'text/xml', 'content-length': '1622'}, 'RetryAttempts': 0}} 56371 ---------------------------------------- 56371 2 56371 36f5a371-7b6a-4bd2-b3ce-c08c57d7715f: SQS message #1 56371 b6a33bc8-fc53-448a-bf37-ba6876f679e8: SQS message #5 56371 DELETE {'Successful': [{'Id': '36f5a371-7b6a-4bd2-b3ce-c08c57d7715f'}, {'Id': 'b6a33bc8-fc53-448a-bf37-ba6876f679e8'}], 'ResponseMetadata': {'RequestId': 'c71b1431-2e60-5dc8-b1b8-42856c58ce2c', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'c71b1431-2e60-5dc8-b1b8-42856c58ce2c', 'date': 'Wed, 05 Feb 2020 04:09:28 GMT', 'content-type': 'text/xml', 'content-length': '494'}, 'RetryAttempts': 0}} 56371 RECEIVE {'Messages': [{'MessageId': 'e2bf9104-191a-4fa8-af23-f3483d13ab4b', 'ReceiptHandle': 'AQEBREIOXdU94TI84w1v1LAJ3AhB6ybL1ViXbvRqvwVWByVFEjWzQqvjZZiNQVNYHBKinMwDKSfaUfbvnawcxbPwBBhPqo3g4Ubr7pJ/Ky8Q6SVn/vric1zZFisUDmrZO6olQHSf4nwQ1kJHDLKaH2QujQuGOaWoW0KxDI+e7itz9uNY4zsCernhgfAVWwA/LnCqPK3fzFI47NNUwa7UeDhc1Rt0IWxe8Dw/MABM/chRL9e72WnW1MeMp7b5ixd+K2Gyq3DeB82Cf5Q9Sqk9Or+vAlf5FSSz+cBRaqxBGF6MgpBbOhZGjpMYyIqqPe8XhEHFkHT/mPPeR8qs/YPDrqx3vuKjoZdpJhFI5BOtcQELTOo6BITnWRMbxIwO2GHp0ynNLVflN3sKUmioM86Vq3R5Zg==', 'MD5OfBody': 'df4abd2ff167c2f564494cf50be9afbe', 'Body': 'SQS message #2', 'Attributes': {'SentTimestamp': '1580875758861'}}], 'ResponseMetadata': {'RequestId': 'd90e512a-5e39-5f0d-bbdd-079e1c012ce9', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'd90e512a-5e39-5f0d-bbdd-079e1c012ce9', 'date': 'Wed, 05 Feb 2020 04:09:28 GMT', 'content-type': 'text/xml', 'content-length': '942'}, 'RetryAttempts': 0}} 56371 ---------------------------------------- 56371 1 56371 e2bf9104-191a-4fa8-af23-f3483d13ab4b: SQS message #2 56371 DELETE {'Successful': [{'Id': 'e2bf9104-191a-4fa8-af23-f3483d13ab4b'}], 'ResponseMetadata': {'RequestId': '1cab75f5-3e3e-5f9b-8b97-23417c07bc7f', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '1cab75f5-3e3e-5f9b-8b97-23417c07bc7f', 'date': 'Wed, 05 Feb 2020 04:09:29 GMT', 'content-type': 'text/xml', 'content-length': '386'}, 'RetryAttempts': 0}}
gist
gist
minitask (experimental)
どこにも説明がないが最近書いているminitaskを使うと現時点では以下の様なコードと動作をする(ここでは詳しい説明は一切しない)。
https://github.com/podhmo/minitask
この記事自体もminitaskのための情報整理を念頭に入れていたりはしていた。此処から先はメモ程度のもの。現時点のスナップショット(つまり変わりうる)。加えて動作自体は同じものではないのでこれを使えば綺麗に書けるとかそういう話もしない。
01workers.py
from __future__ import annotations import time from botocore.exceptions import ClientError from handofcats import as_command from minitask.worker.sqsworker import Manager, Config def receiver(m: Manager, uid: str) -> None: with m.open_reader_queue(uid) as q: for item in q: # print("@", q.latest) print("<-", item) def sender(m: Manager, uid: str) -> None: with m.open_writer_queue(uid) as q: try: for i in range(1, 10): m = f"SQS message #{i}" q.put({"message": m}) time.sleep(0.1) except ClientError as e: print("!!", e) @as_command def run(*, queue_url: str): with Manager(Config(MessageAttributeNames=["All"])) as m: m.spawn(sender, uid=queue_url) m.spawn(receiver, uid=queue_url)
実行結果
python 01workers.py --queue-url $(make -s get) <- {'message': 'SQS message #1'} <- {'message': 'SQS message #2'} <- {'message': 'SQS message #3'} <- {'message': 'SQS message #4'} <- {'message': 'SQS message #5'} <- {'message': 'SQS message #6'} <- {'message': 'SQS message #7'} <- {'message': 'SQS message #8'} <- {'message': 'SQS message #9'}