tl;dr
今さらジローではあるが, AWS Lambda で実装されている Dead Letter Queue について簡単にチュートリアルしたので, その様子をダイジェストで.
尚, 本記事では, 以下のバージョン Serverless Framework を利用して Lambda ファンクションをはじめ各種リソースを構築することを前提としている.
$ sls version 1.24.1
また, 本記事で利用したコードは Python を利用し. 以下の Github リポジトリにアップしているので, 誤り等あればプルリクエストを頂けると嬉しいでござる.
うんちく
Dead Letter Queue について
Dead Letter Queue とは.
非同期で呼び出されてエラーになってしまった Lambda 関数について, その未処理イベントを Amazon SQS キューや Amazon SNS トピックに送信する
Lambda 関数を非同期で呼び出した場合, エラーとなった関数は 2 回のリトライが行われて, その後, そのイベントは破棄されてしまうが, Dead Letter Queue を Lambda 関数に設定すると, 未処理イベントをキューイング又は通知することが可能になる.
Dead Letter Queue のメッセージには, イベントメッセージ (Lambda ファンクションを呼び出す際に引数 event に渡される値)と, メッセージアトリビュートにはイベントが処理されなかった原因を特定する為に役立つ以下のような内容が記録されている. (以下, ドキュメントより引用.)
名前 | タイプ | 値 |
---|---|---|
RequestID | 文字列 | 一意のリクエスト ID |
ErrorCode | 数値 | 3 桁の HTTP エラーコード |
ErrorMessage | 文字列 | エラーメッセージ (1 KB に切り捨て) |
例えば, 以下のようなコードにて例外が発生した場合, ErrorMessage には Errooooooooooooooooooo!
が入ることになる.
def error_function(): raise Exception('Errooooooooooooooooooo!')
Lambda の呼び出しパターン
Dead Letter Queue を触るにあたって, Lambda を呼び出す場合の呼び出しパターン (Invovation Type) について確認する.
パターン (InvocationType) | 概要 |
---|---|
Event | 非同期呼び出しを行い, 関数はキューインした後に実行される. 関数処理が失敗した場合, 自動的に 2 回のリトライが実施される (初回実行を含めて 3 回実行される) |
RequestResponse | 同期呼び出しで, デフォルトの Invocation Type となる. リトライ処理は SDK に依存しており, 独自に実装する必要がある |
DryRun | 関数は実行せず, 呼び出し元の関数の権限や入力が有効かどうか等の検証を行う |
以下は, Invocation Type の Lambda 関数を実行して, エラーが発生した場合の CloudWatch Logs の一部を抜粋したもの.
$ awslogs get /aws/lambda/dlq-tutorial-debug-dlq_tutorial ALL --aws-region=ap-northeast-1 --start='60m ago' | grep 'START RequestId: 704466d2-5b67-11e8-86fa-9b55620f256a' /aws/lambda/dlq-tutorial-debug-dlq_tutorial 2018/05/19/[$LATEST]1fb5c8a186c044f8bacef1f24d9a8746 START RequestId: 704466d2-5b67-11e8-86fa-9b55620f256a Version: $LATEST /aws/lambda/dlq-tutorial-debug-dlq_tutorial 2018/05/19/[$LATEST]1fb5c8a186c044f8bacef1f24d9a8746 START RequestId: 704466d2-5b67-11e8-86fa-9b55620f256a Version: $LATEST /aws/lambda/dlq-tutorial-debug-dlq_tutorial 2018/05/19/[$LATEST]1fb5c8a186c044f8bacef1f24d9a8746 START RequestId: 704466d2-5b67-11e8-86fa-9b55620f256a Version: $LATEST
先述の通り, リトライを含めて 3 回関数が実行されていることがわかる. (同一 RequestId が 3 回実行されている.)
尚, Invocation Type については, 以下のドキュメント, 記事が参考になった.
- Invoke - AWS Lambda
- 再試行動作について - AWS Lambda
- [AWS]知っておいたほうがいいLambda関数の呼び出しタイプとリトライ方式まとめ | Developers.IO
有難うございました.
ということで, 呼び出しパターン Event の Lambda 関数を利用して Dead Letter Queue をチュートリアルしていく.
Serverless Framework で Dead Letter Queue (以後, DLQ とする)
各種コードについて
サンプルプロジェクトの作成とデプロイ
以下のように Python 3 でプロジェクトを作成する.
sls create --template=aws-python3 --path=dlq-tutorial
デプロイについては,
sls deploy
一発なのが嬉しい限り.
serverless.yml
Dead Letter Queue を設定する場合, serverless-plugin-lambda-dead-letter プラグインを利用すると設定も簡単で便利. 以下, serverless.yml の一部を抜粋.
# file name: serverless.yml ... iamRoleStatements: - Effect: Allow Action: - sqs:SendMessage Resource: - Fn::Join: [ ":", [ "arn:aws:sqs", Ref: "AWS::Region", Ref: "AWS::AccountId", "${self:custom.dlq_tutorial}" ] ] plugins: - serverless-plugin-lambda-dead-letter custom: dlq_tutorial: dlq-tutorial functions: dlq_tutorial: handler: handler.hello memorySize: 128 timeout: 300 deadLetter: sqs: ${self:custom.dlq_tutorial} ...
Lambda ファンクション
以下のように Exception 例外を発生させてみる. Lambda では例外を発生させることで, DLQ にメッセージを送信する.
# file name: handler.py ... def hello(event, context): if event == {}: raise Exception('Noooooooooooooo eventtttttttttt.') else: return world(event) def world(event): v = event.get('hello') if v is not None: return 'hello world.' else: raise Exception('Erroooooooooooooooooooor.') ...
Lambda ファンクションのエラーについては, 以下のドキュメントが参考になる.
ドキュメント読むの大事.
尚, 念の為, 上記の Lambda ファンクションについては, ユニットテストも書いた.
$ python -m unittest tests.test_handler -v test_hello (tests.test_handler.HelloTest) ... ok test_hello_exception (tests.test_handler.HelloTest) ... ok test_hello_exception_message (tests.test_handler.HelloTest) ... ok test_world (tests.test_handler.HelloTest) ... ok test_world_exception (tests.test_handler.HelloTest) ... ok test_world_exception_message (tests.test_handler.HelloTest) ... ok ---------------------------------------------------------------------- Ran 6 tests in 0.001s OK
尚, unittest において例外発生をテストする方法については, 別の記事にまとめたいと思う.
DLQ を処理する Lambda ファンクション
以下のような書いた.こちらについても, 長くなるので一部抜粋. 基本的には SQS に蓄積されているメッセージを取得して何かする感じ.
# file name: handler.py ... def dlq_message_handler(event, context): sqs = boto3.resource('sqs') queue_name = 'dlq-tutorial' q = sqs.get_queue_by_name(QueueName=queue_name) messages = [] while True: q_messages = q.receive_messages(AttributeNames=['All'], MessageAttributeNames=['All'], MaxNumberOfMessages=10) if q_messages: for qm in q_messages: attr = qm.attributes body = qm.body msga = qm.message_attributes sent_time_stamp = attr.get('SentTimestamp') error_message = msga['ErrorMessage']['StringValue'] ...
取得したメッセージは, その後に何をするかで加工の方法は異なると思うが, 改めて再処理の Lambda ファンクションを呼んで処理を完了させることも可能.
今回は, 取得したメッセージから, 以下の要素を取得して csv ファイルに生成するようにしてみた.
- 送信日時
- メッセージ本文
- メッセージアトリビュートからエラーメッセージ
受信メッセージから dict 型のデータに加工して, csv モジュールの DictWriter クラスを用いることで簡単に csv 化することが出来た.
DLQ に蓄積されたメッセージ
エラーメッセージの処理例
ということで, 1 分毎に例外が発生しているので, しばらく放置しておくと SQS にメッセージが蓄積されている.
$ aws sqs receive-message \ --queue-url https://ap-northeast-1.queue.amazonaws.com/012345678912/dlq-tutorial \ --attribute-names All \ --message-attribute-names All \ --max-number-of-messages 10 \ --query Messages[].MessageId [ "b512e862-3e13-4def-8591-732a280fe17b" ]
ひとまず, csv に書き出す処理を実行してみる.
$ sls invoke local --stage debug --function dlq_message_handler --data '' null
csv ファイルが作成されているので, 以下のように確認してみる.
$ cat example.csv SentTimestamp,ErrorMessage,EventBody 1526741198363,Erroooooooooooooooooooor.,{'hell': 'world'} 1526740906810,Erroooooooooooooooooooor.,{'hell': 'world'} 1526741150850,Erroooooooooooooooooooor.,{'hell': 'world'} 1526741089397,Erroooooooooooooooooooor.,{'hell': 'world'} 1526740981380,Erroooooooooooooooooooor.,{'hell': 'world'} 1526741027864,Erroooooooooooooooooooor.,{'hell': 'world'}
この csv ファイルを S3 にアップロードしておいて, Pre-Signed URL を発行して Slack に通知するとかが良さそうな気がしている.
尚, 今回は検証を目的としており, 1 分毎にエラーを吐いている為, とんでもないシステムになってしまいそうだけど, 実際の運用ではこのようなエラーが発生しないように注意を払って実装したい...
エラー検知
DLQ リソースに SQS を利用している場合, 「メッセージがキューに蓄積されていること = Lambda ファンクションにエラーが発生している」になると思うが, この場合, CloudWatch メトリクスの NumberOfMessagesReceived
あたりを監視しておくと良さそうな気がする.
ところが, ドキュメントによると, SQS の CloudWatch では詳細モニタリングは提供されていない為, 5 分よりも短い間隔での監視が要件となる場合には, 今回のような監視を用途とした Lambda 関数 (DLQ を処理する Lambda ファンクション) を別途で用意する必要があると思われる.
以上
「今さらジロー」であるが, Lambda の Dead Letter Queue 機能に触れてみた. エラー自体をキューに放り込むことが出来るということで, 未処理エラーの検知や再処理が比較的簡単に行えるのではと考えている. 今後は積極的に DLQ を利用していきたい.