tl;dr
先日作った、そらまめ君のデータを拝借させて作った可視化サイト(今後は「PM2.5 in 九州」という名前で暫定的に呼ぶ)を更新したのでメモ。
更新内容
概要
- 観測地点全ての情報が網羅されていなかった問題を一部解決
- 観測データを JSON っぽい感じでも出力するようにした(とりあえず PM 2.5 のみ)
- サイトが作成されたタイミングで Twitter に URL を投稿するようにした
以下、詳細。
1. 観測地点全ての情報が網羅されていなかった問題を一部解決
原因
そもそも致命的な問題だと思うのだが、観測地点毎にグラフを生成しているはずが生成されたグラフの数と観測地点の数(205 地点)が大きく乖離している状態だった。原因は DynamoDB から観測地点のコード一覧を取得する際にちゃんと全レコードが返ってきていなかったという初歩的なミス。
詳細
1 回の Query または Scan オペレーションで、最大 1 MB のデータを取得できます。この制限は、結果への任意のフィルタ式の適用前に適用されます。
DynamoDB は上記のように Scan や Query 時に返ってくる結果のサイズに制限があるとのことで、今回、観測地点のコード一覧を取得する際にこの制限を超えた可能性があり半分の結果しか返ってきていなかった(※地点コード 8 桁 x 205 地点 x 24 という計算からすると 1MB を超えることは無さそうなんだけど...)。
改修
詳細は上記のドキュメントが詳しいが、ザックリと書くと以下の通り。
- 全件獲得が複数ページに分かれてしまう場合に
scan
メソッドのレスポンスにlast_evaluated_key
が含まれる last_evaluated_key
には、該当のページの最後の Hash キーと Range キーが含まれるので、このlast_evaluated_key
が存在する場合にはレコードが残っているので次のスキャンを行う必要があると判断する- 次にスキャンする場合には
exclusive_start_key
にlast_evaluated_key
に含まれている Hash キーと Range キーを利用してスキャンする - 尚、レコードが最後まで到達すると
last_evaluated_key
はnull
が入る。
ということで、雑に以下のように観測地点コードを取得する部分を修正した。
def get_mon_st_codes(table_name, check_date_time) # 最終的な結果を放り込む配列を初期化 result = [] # 一回目の scan result1 = dynamodb.scan( table_name: table_name, select: "SPECIFIC_ATTRIBUTES", attributes_to_get: ["mon_st_code"], ) # もし last_evaluated_key があったらもう一回 scan if result1.last_evaluated_key then result2 = dynamodb.scan( table_name: table_name, select: "SPECIFIC_ATTRIBUTES", attributes_to_get: ["mon_st_code"], # キモ exclusive_start_key: { "CHECK_DATE_TIME"=>result1.last_evaluated_key['CHECK_DATE_TIME'], "mon_st_code" => result1.last_evaluated_key['mon_st_code'] } ) result = result1.items.concat(result2.items) else result = result1.items end # 後は観測地点コードをのみを取得する codes = [] result.each do |item| codes << item['mon_st_code'] end return codes.uniq end
雑にというのは、result1
と result2
というような変数のもたせ方が場当たり的過ぎるから。このあたりは引き続き改善。
2. 観測データを JSON っぽい感じでも出力するようにした(とりあえず PM 2.5 のみ)
モチベーション
- 観測地点のデータとグラフの出力に関して整合性が取れているかの確認をサクッとやれないかなあと思ったから
- 将来的に API っぽい感じのものを提供出来ないかなあと思ったから
こんな感じ
グラフをクリックすると...
上記のように表示される。また、curl 等で叩くと以下のように。
% curl -s http://pm25.test.inokara.com/20150921/20150921-46201520-PM2.5.json | jq . { "24": 21, "23": 15, "22": 37, "21": 41, "20": 25, "19": 41, "18": 33, "17": 33, "08": 26, "07": 20, "06": 33, "05": 23, "04": 34, "03": 23, "02": 32, "01": 23, "09": 26, "10": 18, "11": 30, "12": 33, "13": 25, "14": 44, "15": 35, "16": 52 }
一応、JSON っぽい。
3. サイトが作成されたタイミングで Twitter に URL を投稿するようにした
モチベーション
- Data Pipeline → SNS → Lambda → Twitter という流れで Lambda を使ってみたかった
- サイトの情報を Twitter でつぶやくなんていう Web サービスっぽいことを自動でしてみたかった
純粋な Data Pipeline 連携はとりあえず挫折
- スクリプトの標準出力に出力される URL を Data Pipeline で拾って SNS に飛ばす方法 = 純粋な Data Pipeline 連携
- Data Pipeline から SNS にどのような情報を飛ばすことが出来るのかが理解出来ていない為、純粋な Data Pipeline 連携はとりあえず諦める
擬似的な Data Pipeline 連携
- Data Pipeline 連携を諦めきれない
- とりあえず ShellCommandActivity オブジェクトの OnSuccess で送信するメッセージに直接 URL を書いておく
- 動的に変わる部分(pm25.test.inokara.com/YYYY-MM-DD.html)を Data Pipeline の日付関数を使って動的に出力する
こんな感じ
イメージとしては以下のような感じ。
Lambda Function は以下の通り。
Data Pipeline 側の設定を以下のように。
OnSuccess オブジェクトでアクションを定義して...
アクション側で SNS の Topic を指定して、Message に Data Pipeline の日付関数を使って動的に出力するようにする。後は SNS 側で Subscribe 先に Tweet する Lambda Function を定義しておく。利用出来る関数については以下のドキュメントが詳しい。
そして、以下のようにつぶやかれる。
ひとまずはテストアカウントで呟かせておいてから実運用に乗っけてみる。