追記
このプラグインも同じようなことをやっていた. こっちを利用すれば良かったかも...orz (この記事を書いてから気付いた
tl;dr
embulk で S3 に保存されている CloudFront のログを扱うにあたって, HTTP リクエスト https://example.com/test?aa=123&bb=456
の URL クエリパラメータをパースして key
と value
に分けた上で csv に書き出す要件 (要件と言ってもギョームではありません) がありました. 以下のような表を作るイメージです.
日時 | メソッド | リクエストパス | aa | bb |
---|---|---|---|---|
YYYY.MM.DD HH:mm:ss | GET | /test |
xxx1 | xxx2 |
その要件を満たす為, URL クエリパラメータをパースするプラグインを Ruby で作ってみたのでメモしておきます.
要件
以下のようなリクエストがあったとします.
# HTTP リクエスト https://example.com/test?foo=123&bar=abc&baz=456
このリクエストのログは S3 に保存されると, 以下のような感じになります.
#Version: 1.0 #Fields: date time x-edge-location sc-bytes c-ip cs-method cs(Host) cs-uri-stem sc-status cs(Referer) cs(User-Agent) cs-uri-query cs(Cookie) x-edge-result-type x-edge-request-id x-host-header cs-protocol cs-bytes time-taken x-forwarded-for ssl-protocol ssl-cipher x-edge-response-result-type cs-protocol-version fle-status fle-encrypted-fields c-port time-to-first-byte x-edge-detailed-result-type sc-content-type sc-content-len sc-range-start sc-range-end 2020-05-12 11:15:33 SEA19-C2 7190 xx.xxx.xxx.xxx GET d3f7koyt0yburg.cloudfront.net / 200 - Mozilla/5.0%20(X11;%20Linux%20x86_64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20HeadlessChrome/68.0.3440.106%20Safari/537.36 foo=123&bar=abc&baz=456 - Miss x4zs2FrOi-BGIdQNVl6QpSFrQTJDMEl5tcY39BHw7k4453tts60qRQ== oreno.tools http 369 0.387 - - - Miss HTTP/1.1 - - 35234 0.387 Miss text/html 6738 --
ログには cs-uri-query
という列に URL クエリパラメータの foo=123&bar=abc&baz=456
が入ります.
そして, 最終的に csv で以下のように出力したいという要件です.
...,sc-content-type,sc-content-len,sc-range-start,sc-range-end,foo,bar,baz ...,text/html,6378,,,123,abc,456
列の foo
, bar
baz
が追加され, それぞれの値が列にセットされることを想定しています.
Cloudfront のログを吸い上げる
embulk-imput-s3
まずは, S3 に溜まっている Cloudfront のログを吸い上げる必要があります. これは, embulk-imput-s3 を利用することで瞬く間に吸い上げることが出来ます.
吸い上げたログを標準出力に出力するだけであれば, 以下のような設定で吸い上げることが可能です.
in: type: s3 bucket: {{ env.BUCKET_NAME }} path_prefix: {{ env.PATH_PREFIX }} endpoint: s3-ap-northeast-1.amazonaws.com access_key_id: {{ env.ACCESS_KEY }} secret_access_key: {{ env.SECRET_KEY }} decoders: - {type: gzip} parser: charset: UTF-8 newline: CRLF type: csv delimiter: "\t" trim_if_not_quoted: false skip_header_lines: 2 allow_extra_columns: true allow_optional_columns: true null_string: "-" columns: - {name: date, type: string} - {name: time, type: string} - {name: x_edge_location, type: string} - {name: sc_bytes, type: string} - {name: c_ip, type: string} - {name: cs_method, type: string} - {name: cs_host, type: string} - {name: cs_uri_stem, type: string} - {name: sc_status, type: string} - {name: cs_referer, type: string} - {name: cs_user_agent, type: string} - {name: cs_uri_query, type: string} - {name: cs_cookie, type: string} - {name: x_edge_result_type, type: string} - {name: x_edge_request_id, type: string} - {name: x_host_header, type: string} - {name: cs_protocol, type: string} - {name: cs_bytes, type: string} - {name: time_taken, type: string} - {name: x_forwarded_for, type: string} - {name: ssl_protocol, type: string} - {name: ssl_cipher, type: string} - {name: x_edge_response_result_type, type: string} - {name: cs_protocol_version, type: string} - {name: fle_status, type: string} - {name: fle_encrypted_fields, type: string} exec: {} out: type: stdout
embulk preview
でちょっと試してみます.
$ embulk preview -G example.yml ... *************************** 2 *************************** date (string) : 2020-05-10 time (string) : 00:16:43 x_edge_location (string) : SEA19-C1 sc_bytes (string) : 7198 c_ip (string) : xx.xxx.xxx.xxx cs_method (string) : GET cs_host (string) : d3f7koyt0yburg.cloudfront.net cs_uri_stem (string) : / sc_status (string) : 200 cs_referer (string) : cs_user_agent (string) : Go-http-client/1.1 cs_uri_query (string) : cs_cookie (string) : x_edge_result_type (string) : Hit x_edge_request_id (string) : 1TLNWap6yNbcYg_HMDULkLF9tgmkLMkxWLdlWK686espfAQzeP5GkA== x_host_header (string) : oreno.tools cs_protocol (string) : http cs_bytes (string) : 92 time_taken (string) : 0.003 x_forwarded_for (string) : ssl_protocol (string) : ssl_cipher (string) : x_edge_response_result_type (string) : Hit cs_protocol_version (string) : HTTP/1.1 fle_status (string) : fle_encrypted_fields (string) :
クエリパラメータの解析
やりたいこと
やりたいことの再掲.
# HTTP リクエスト https://example.com/test?aa=123&bb=456
このリクエストのログを以下のようなデータ構造にしたいと思います.
日時 | メソッド | リクエストパス | aa | bb |
---|---|---|---|---|
YYYY.MM.DD HH:mm:ss | GET | /test |
xxx1 | xxx2 |
試行錯誤
当初は, 以下のプラグインがめっちゃ要件を満たしていそうだったので試してみました.
ちなみに, 設定は以下のように書きましが...
... filters: - type: query_string query_string_column_name: cs_uri_query expanded_columns: - {name: aa, type: string} - {name: bb, type: string} ...
以下のようなメッセージが出力して期待した挙動が得られずに断念しました.
The column was ignored because it does not seem to be a query string: aa=xxx1&bb=xxx2
どうやら, プラグインの中でクエリパラメータが文字列として認識されない状態になっていて, テストコードを読む限りだと, プラグインに入っている来る文字列は, 以下のようにパスとクエリが同じ文字列になっていることが期待されているようです.
/test?foo=123&bar=abc&baz=456
以下, 該当行です.
ということで
プラグインを作りました.
require 'uri' module Embulk module Filter class QueryStringParseFilterPlugin < FilterPlugin Plugin.register_filter('query_string_parse', self) def self.transaction(config, in_schema, &control) task = { 'query_string_column_name' => config.param('query_string_column_name', :string, default: "column_name"), 'expanded_columns' => config.param('expanded_columns', :array, default: []) } idx = in_schema.size task['expanded_columns'].each do |c| in_schema = in_schema + [Column.new(idx, c['name'], :string)] idx = idx + 1 end yield(task, in_schema) end def initialize(task, in_schema, out_schema, page_builder) super @column = task['query_string_column_name'] @expanded = task['expanded_columns'] end def close end def add(page) page.each do |record| begin record = hash_record(record) # 対象とするカラムに値が入っていない場合の対処 if record[@column].nil? @page_builder.add(record.values + Array.new(@expanded.length, '')) next end parsed_query = parse_query(record[@column]) result = {} parsed_query.each do |pk, pv| @expanded.each do |ec| result[pk] = pv if ec['name'] == pk end end result_array = [] result_array = Array.new(@expanded.length, '') if result.values.empty? result_array = result.values unless result.values.empty? @page_builder.add(record.values + result_array) rescue end end end def finish @page_builder.finish end def hash_record(record) Hash[in_schema.names.zip(record)] end def parse_query(query) query_array = URI::decode_www_form(query) Hash[query_array] end end end end
gem での配布する予定は無いので, シンプルに 1 枚のファイルに記述しました. 設定ファイルは以下のように記述します.
... filters: - type: query_string_parse query_string_column_name: cs_uri_query expanded_columns: - {name: aa, type: string} - {name: bb, type: string} ...
先述の embulk-filter-query_string の設定を踏襲しました.
動かしてみる
プラグインをロードして動かしてみます. お手製プラグインは, 以下のようなパスに保存しています.
$ ls -l plugins/embulk/filter/query_string_parse.rb -rw-r--r-- 1 root root 2100 May 11 23:11 plugins/embulk/filter/query_string_parse.rb
plugins
以下のディレクトリ構成 (embulk/filter/
) は, 上記の通りにしておかないと, --load-path
オプションを指定した場合, プラグインがロードされませんでした.
$ embulk preview sample.yml --load-path plugins $ embulk run sample.yml --load-path plugins
以下のように出力されました.
... 2020-05-10,12:23:58,NRT51-C1,334,xx.xxx.xxx.xxxHEAD,d3f7koyt0yburg.cloudfront.net,/test,403,,curl/7.54.0,aa=xxx1&bb=xxx2,,Error,BLZOJ-4vKN508q4VHFsmDvYCmT7hMEBcob21ROkd0m U4-ennMlc6zA==,oreno.tools,http,96,0.065,,,,Error,HTTP/1.1,,,xxx1,xxx2 ... 2020-05-12,14:13:02,NRT51-C1,334,xxx.xxx.xxx.xxx,HEAD,d3f7koyt0yburg.cloudfront.net,/test,403,,curl/7.54.0,cc=xxx1&dd=xxx2,,Error,2jLGruGZgZsUhDW0sesQBe6z97jbrM8lGxo1NVjUNb rlypM8RZRxAw==,oreno.tools,http,96,0.027,,,,Error,HTTP/1.1,,,, ... 2020-05-10,21:51:14,IAD89-C2,616,xxx.xxx.xxx.xxx,GET,d3f7koyt0yburg.cloudfront.net,/rss.xml,403,https://www.google.com/,Mozilla/5.0%20(Macintosh;%20Intel%20Mac%20OS%20X%2010.13;%20rv:73.0)%20Gecko/20100101%20Firefox/73.0,,,Error,RD3F6dFNkWDelG0xTmPTx5lphN98A6zJF2oM9RArkZS5783hlNO5Hg==,oreno.tools,https,338,0.636,,TLSv1.2,ECDHE-RSA-AES128-GCM-SHA256,Error,HTTP/1.1,,,, ...
URL クエリパラメータ (Cloudfront の場合, cs_uri_query
列に記録される) に aa=xxx1&bb=xxx2
が含まれている場合, ログの末尾に xxx1,xxx2
が追記されています. また, クエリパラメータが存在していないレコードには値が記録されていないことが判ります.
要件を満たされました!
以上
見様見真似で embulk プラグインを作ってみましたが, 要件通りにログデータを加工することが出来ました. 有難うございました.