ようへいの日々精進XP

よかろうもん

(超メモ)CloudWatch Logs Subscriptions → Lambda → Twitter メモ

超メモシリーズ。

何がしたいのか?

CloudWatch Logs Subscriptions でフィルタされたレコードを Lambda で Twitter に呟かせたい。


メモ

Lambda にはどのような状態でレコードが入ってくるのか?

CloudWatch Logs Subscriptions でフィルタされたレコードは Kinesis ストリームを介して Lambda の event オブジェクトには以下のような JSON で入ってくる。

{
    "awslogs": {
        "data": "H4sIAAAAAAAAAJ2RTY+CMBCG/wrpXgXb0lLgZrJqTPbjoDdjTJWJNlIgpega43/fAbMfpz3sqZN3Ju887/RGLLStPsDq2gDJyfNkNdm+TpfLyXxKRqS+VOBQ5pIxSdM4SShFuawPc1d3DXaKen8CF6Ly0JfegbbYaGunrbYQNqaB0lSw5ZTJkNGQKRxtu127d6bxpq5mpvTgWpKvyYu2u0I/TLb+AuDJZvCdnqHy/ciNmALtY865SngsOE0TJWjMleI8TZhgMhNKZkpKpSjPUFY8EzLFkUT09N5gZq8t4jMhJMNUTCSKj75ugfaLUbD+5k1XNMuFzGMaSUFxY/DEkk2weJu9B2EY5MHR+yYfjxvLZeTROzJVfdJOR/vajk/Xrr1245/00dHbEjngwzu991DMDJQFZruRQnvo8fqT/7m9v2DDcSwfKoZvGPY/cB7g+wqGEhnJ72D/Qb3fN/dPf1Js6SkCAAA="
    }
}

Python で処理しようとする場合には以下のように JSON がパースされた状態で event オブジェクトに入ってくる。

f:id:inokara:20151018195837p:plain

data キーから値を取り出してレコードを解析(Base64 をデコードして解凍)する。解析すると以下のような状態の JSON になる。

{
  "logEvents": [
    {
      "extractedFields": {
        "message": "http://pm25.test.inokara.com/kyusyu/2015-10-17.html",
        "lev": "INFO",
        "lv": "I,",
        "sp1": "--",
        "sp2": ":",
        "datetime": "2015-10-18T09:45:30.540420 #16"
      },
      "message": "I, [2015-10-18T09:45:30.540420 #16] INFO -- : http://pm25.test.inokara.com/kyusyu/2015-10-17.html",
      "timestamp": 1445136614672,
      "id": "32227623420867403277228614159475975577029772729458032640"
    }
  ],
  "subscriptionFilters": [
    "LambdaStream_tweet"
  ],
  "logStream": "soramame-pipeline_2015-10-17",
  "logGroup": "docker-log",
  "owner": "xxxxxxxxxxxxxxx",
  "messageType": "DATA_MESSAGE"
}

最終的には logEventsextractedFields.message にある URL を取り出して Twitter に呟かせたい。

ということで

以下のような Lambda ファンクションとなった。

# -*- coding: utf-8 -*-

import sys, json
import datetime
import ConfigParser
import twitter
import base64
import gzip
from StringIO import StringIO

def extract_url(record):
    # Base64 デコードして解凍してデータを取得する
    decoded_data = record.decode("base64")
    json_data = json.loads(gzip.GzipFile(fileobj=StringIO(decoded_data)).read())
    for data in json_data['logEvents']:
        return data['extractedFields']['message']

def lambda_handler(event, context):
    # config.ini から Twitter API の Credential 情報を取得
    c = ConfigParser.SafeConfigParser()
    c.read("./config.ini")
    # Twitter API を初期化
    api = twitter.Api(
        consumer_key        = c.get('tw','consumer_key'),
        consumer_secret     = c.get('tw','consumer_secret'),
        access_token_key    = c.get('tw','access_token_key'),
        access_token_secret = c.get('tw','access_token_secret'),
        )
    # event オブジェクトからレコードを取得する
    url = extract_url(event['awslogs']['data'])
    d = datetime.date.today() - datetime.timedelta(1)
    d = u"%s 年 %s 月 %s 日 PM2.5 の状況(九州地方のみ): " % (d.year, d.month, d.day)
    print d + url
    print api.PostUpdate(d + url)

早速...呟かせてみる

以下のように呟かせてみた。

#
# テストログを用意
#
% cat test.log
[
  {
    "timestamp": 1445136614672,
    "message": "I, [2015-10-18T09:45:30.540420 #16] INFO -- : http://pm25.test.inokara.com/kyusyu/2015-10-17.html"
  }
]

#
# ログを put する
#
% aws --region ap-northeast-1 logs put-log-events \
--log-group-name docker-log \
--log-stream-name ${stream_name} \
--log-events file://test.log \
--sequence-token ${token}

おお、きた。

f:id:inokara:20151018201043p:plain


後で...

も少し手順をまとめよう。

以上。