追記
github.com
このプラグインも同じようなことをやっていた. こっちを利用すれば良かったかも...orz (この記事を書いてから気付いた
tl;dr
embulk で S3 に保存されている CloudFront のログを扱うにあたって, HTTP リクエスト https://example.com/test?aa=123&bb=456
の URL クエリパラメータをパースして key
と value
に分けた上で csv に書き出す要件 (要件と言ってもギョームではありません) がありました. 以下のような表を作るイメージです.
日時 |
メソッド |
リクエストパス |
aa |
bb |
YYYY.MM.DD HH:mm:ss |
GET |
/test |
xxx1 |
xxx2 |
その要件を満たす為, URL クエリパラメータをパースするプラグインを Ruby で作ってみたのでメモしておきます.
要件
以下のようなリクエストがあったとします.
https://example.com/test?foo=123&bar=abc&baz=456
このリクエストのログは S3 に保存されると, 以下のような感じになります.
#Version: 1.0
#Fields: date time x-edge-location sc-bytes c-ip cs-method cs(Host) cs-uri-stem sc-status cs(Referer) cs(User-Agent) cs-uri-query cs(Cookie) x-edge-result-type x-edge-request-id x-host-header cs-protocol cs-bytes time-taken x-forwarded-for ssl-protocol ssl-cipher x-edge-response-result-type cs-protocol-version fle-status fle-encrypted-fields c-port time-to-first-byte x-edge-detailed-result-type sc-content-type sc-content-len sc-range-start sc-range-end
2020-05-12 11:15:33 SEA19-C2 7190 xx.xxx.xxx.xxx GET d3f7koyt0yburg.cloudfront.net / 200 - Mozilla/5.0%20(X11;%20Linux%20x86_64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20HeadlessChrome/68.0.3440.106%20Safari/537.36 foo=123&bar=abc&baz=456 - Miss x4zs2FrOi-BGIdQNVl6QpSFrQTJDMEl5tcY39BHw7k4453tts60qRQ== oreno.tools http 369 0.387 - - - Miss HTTP/1.1 - - 35234 0.387 Miss text/html 6738 --
ログには cs-uri-query
という列に URL クエリパラメータの foo=123&bar=abc&baz=456
が入ります.
そして, 最終的に csv で以下のように出力したいという要件です.
...,sc-content-type,sc-content-len,sc-range-start,sc-range-end,foo,bar,baz
...,text/html,6378,,,123,abc,456
列の foo
, bar
baz
が追加され, それぞれの値が列にセットされることを想定しています.
Cloudfront のログを吸い上げる
embulk-imput-s3
まずは, S3 に溜まっている Cloudfront のログを吸い上げる必要があります. これは, embulk-imput-s3 を利用することで瞬く間に吸い上げることが出来ます.
github.com
吸い上げたログを標準出力に出力するだけであれば, 以下のような設定で吸い上げることが可能です.
in:
type: s3
bucket: {{ env.BUCKET_NAME }}
path_prefix: {{ env.PATH_PREFIX }}
endpoint: s3-ap-northeast-1.amazonaws.com
access_key_id: {{ env.ACCESS_KEY }}
secret_access_key: {{ env.SECRET_KEY }}
decoders:
- {type: gzip}
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: "\t"
trim_if_not_quoted: false
skip_header_lines: 2
allow_extra_columns: true
allow_optional_columns: true
null_string: "-"
columns:
- {name: date, type: string}
- {name: time, type: string}
- {name: x_edge_location, type: string}
- {name: sc_bytes, type: string}
- {name: c_ip, type: string}
- {name: cs_method, type: string}
- {name: cs_host, type: string}
- {name: cs_uri_stem, type: string}
- {name: sc_status, type: string}
- {name: cs_referer, type: string}
- {name: cs_user_agent, type: string}
- {name: cs_uri_query, type: string}
- {name: cs_cookie, type: string}
- {name: x_edge_result_type, type: string}
- {name: x_edge_request_id, type: string}
- {name: x_host_header, type: string}
- {name: cs_protocol, type: string}
- {name: cs_bytes, type: string}
- {name: time_taken, type: string}
- {name: x_forwarded_for, type: string}
- {name: ssl_protocol, type: string}
- {name: ssl_cipher, type: string}
- {name: x_edge_response_result_type, type: string}
- {name: cs_protocol_version, type: string}
- {name: fle_status, type: string}
- {name: fle_encrypted_fields, type: string}
exec: {}
out:
type: stdout
embulk preview
でちょっと試してみます.
$ embulk preview -G example.yml
...
*************************** 2 ***************************
date (string) : 2020-05-10
time (string) : 00:16:43
x_edge_location (string) : SEA19-C1
sc_bytes (string) : 7198
c_ip (string) : xx.xxx.xxx.xxx
cs_method (string) : GET
cs_host (string) : d3f7koyt0yburg.cloudfront.net
cs_uri_stem (string) : /
sc_status (string) : 200
cs_referer (string) :
cs_user_agent (string) : Go-http-client/1.1
cs_uri_query (string) :
cs_cookie (string) :
x_edge_result_type (string) : Hit
x_edge_request_id (string) : 1TLNWap6yNbcYg_HMDULkLF9tgmkLMkxWLdlWK686espfAQzeP5GkA==
x_host_header (string) : oreno.tools
cs_protocol (string) : http
cs_bytes (string) : 92
time_taken (string) : 0.003
x_forwarded_for (string) :
ssl_protocol (string) :
ssl_cipher (string) :
x_edge_response_result_type (string) : Hit
cs_protocol_version (string) : HTTP/1.1
fle_status (string) :
fle_encrypted_fields (string) :
クエリパラメータの解析
やりたいこと
やりたいことの再掲.
https://example.com/test?aa=123&bb=456
このリクエストのログを以下のようなデータ構造にしたいと思います.
日時 |
メソッド |
リクエストパス |
aa |
bb |
YYYY.MM.DD HH:mm:ss |
GET |
/test |
xxx1 |
xxx2 |
試行錯誤
当初は, 以下のプラグインがめっちゃ要件を満たしていそうだったので試してみました.
github.com
ちなみに, 設定は以下のように書きましが...
...
filters:
- type: query_string
query_string_column_name: cs_uri_query
expanded_columns:
- {name: aa, type: string}
- {name: bb, type: string}
...
以下のようなメッセージが出力して期待した挙動が得られずに断念しました.
The column was ignored because it does not seem to be a query string: aa=xxx1&bb=xxx2
どうやら, プラグインの中でクエリパラメータが文字列として認識されない状態になっていて, テストコードを読む限りだと, プラグインに入っている来る文字列は, 以下のようにパスとクエリが同じ文字列になっていることが期待されているようです.
/test?foo=123&bar=abc&baz=456
以下, 該当行です.
github.com
ということで
プラグインを作りました.
github.com
以下, プラグインのソースコードです.
require 'uri'
module Embulk
module Filter
class QueryStringParseFilterPlugin < FilterPlugin
Plugin.register_filter('query_string_parse', self)
def self.transaction(config, in_schema, &control)
task = {
'query_string_column_name' => config.param('query_string_column_name', :string, default: "column_name"),
'expanded_columns' => config.param('expanded_columns', :array, default: [])
}
idx = in_schema.size
task['expanded_columns'].each do |c|
in_schema = in_schema + [Column.new(idx, c['name'], :string)]
idx = idx + 1
end
yield(task, in_schema)
end
def initialize(task, in_schema, out_schema, page_builder)
super
@column = task['query_string_column_name']
@expanded = task['expanded_columns']
end
def close
end
def add(page)
page.each do |record|
begin
record = hash_record(record)
if record[@column].nil?
@page_builder.add(record.values + Array.new(@expanded.length, ''))
next
end
parsed_query = parse_query(record[@column])
result = {}
parsed_query.each do |pk, pv|
@expanded.each do |ec|
result[pk] = pv if ec['name'] == pk
end
end
result_array = []
result_array = Array.new(@expanded.length, '') if result.values.empty?
result_array = result.values unless result.values.empty?
@page_builder.add(record.values + result_array)
rescue
end
end
end
def finish
@page_builder.finish
end
def hash_record(record)
Hash[in_schema.names.zip(record)]
end
def parse_query(query)
query_array = URI::decode_www_form(query)
Hash[query_array]
end
end
end
end
gem での配布する予定は無いので, シンプルに 1 枚のファイルに記述しました. 設定ファイルは以下のように記述します.
...
filters:
- type: query_string_parse
query_string_column_name: cs_uri_query
expanded_columns:
- {name: aa, type: string}
- {name: bb, type: string}
...
先述の embulk-filter-query_string の設定を踏襲しました.
動かしてみる
プラグインをロードして動かしてみます. お手製プラグインは, 以下のようなパスに保存しています.
$ ls -l plugins/embulk/filter/query_string_parse.rb
-rw-r--r-- 1 root root 2100 May 11 23:11 plugins/embulk/filter/query_string_parse.rb
plugins
以下のディレクトリ構成 (embulk/filter/
) は, 上記の通りにしておかないと, --load-path
オプションを指定した場合, プラグインがロードされませんでした.
$ embulk preview sample.yml --load-path plugins
$ embulk run sample.yml --load-path plugins
以下のように出力されました.
...
2020-05-10,12:23:58,NRT51-C1,334,xx.xxx.xxx.xxxHEAD,d3f7koyt0yburg.cloudfront.net,/test,403,,curl/7.54.0,aa=xxx1&bb=xxx2,,Error,BLZOJ-4vKN508q4VHFsmDvYCmT7hMEBcob21ROkd0m
U4-ennMlc6zA==,oreno.tools,http,96,0.065,,,,Error,HTTP/1.1,,,xxx1,xxx2
...
2020-05-12,14:13:02,NRT51-C1,334,xxx.xxx.xxx.xxx,HEAD,d3f7koyt0yburg.cloudfront.net,/test,403,,curl/7.54.0,cc=xxx1&dd=xxx2,,Error,2jLGruGZgZsUhDW0sesQBe6z97jbrM8lGxo1NVjUNb
rlypM8RZRxAw==,oreno.tools,http,96,0.027,,,,Error,HTTP/1.1,,,,
...
2020-05-10,21:51:14,IAD89-C2,616,xxx.xxx.xxx.xxx,GET,d3f7koyt0yburg.cloudfront.net,/rss.xml,403,https://www.google.com/,Mozilla/5.0%20(Macintosh;%20Intel%20Mac%20OS%20X%2010.13;%20rv:73.0)%20Gecko/20100101%20Firefox/73.0,,,Error,RD3F6dFNkWDelG0xTmPTx5lphN98A6zJF2oM9RArkZS5783hlNO5Hg==,oreno.tools,https,338,0.636,,TLSv1.2,ECDHE-RSA-AES128-GCM-SHA256,Error,HTTP/1.1,,,,
...
URL クエリパラメータ (Cloudfront の場合, cs_uri_query
列に記録される) に aa=xxx1&bb=xxx2
が含まれている場合, ログの末尾に xxx1,xxx2
が追記されています. また, クエリパラメータが存在していないレコードには値が記録されていないことが判ります.
要件を満たされました!
以上
見様見真似で embulk プラグインを作ってみましたが, 要件通りにログデータを加工することが出来ました. 有難うございました.
参考
embulk plugin developer guide (WIP)
github.com
dev.classmethod.jp