ようへいの日々精進XP

よかろうもん

embulk で URL クエリパラメータをパースするフィルタプラグインを作ってみた (Cloudfront のログを吸い上げるの巻)

追記

github.com

このプラグインも同じようなことをやっていた. こっちを利用すれば良かったかも...orz (この記事を書いてから気付いた

tl;dr

embulk で S3 に保存されている CloudFront のログを扱うにあたって, HTTP リクエスhttps://example.com/test?aa=123&bb=456 の URL クエリパラメータをパースして keyvalue に分けた上で csv に書き出す要件 (要件と言ってもギョームではありません) がありました. 以下のような表を作るイメージです.

日時 メソッド リクエストパス aa bb
YYYY.MM.DD HH:mm:ss GET /test xxx1 xxx2

その要件を満たす為, URL クエリパラメータをパースするプラグインRuby で作ってみたのでメモしておきます.

要件

以下のようなリクエストがあったとします.

# HTTP リクエスト
https://example.com/test?foo=123&bar=abc&baz=456

このリクエストのログは S3 に保存されると, 以下のような感じになります.

#Version: 1.0
#Fields: date time x-edge-location sc-bytes c-ip cs-method cs(Host) cs-uri-stem sc-status cs(Referer) cs(User-Agent) cs-uri-query cs(Cookie) x-edge-result-type x-edge-request-id x-host-header cs-protocol cs-bytes time-taken x-forwarded-for ssl-protocol ssl-cipher x-edge-response-result-type cs-protocol-version fle-status fle-encrypted-fields c-port time-to-first-byte x-edge-detailed-result-type sc-content-type sc-content-len sc-range-start sc-range-end
2020-05-12      11:15:33        SEA19-C2        7190    xx.xxx.xxx.xxx   GET     d3f7koyt0yburg.cloudfront.net   /       200     -       Mozilla/5.0%20(X11;%20Linux%20x86_64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20HeadlessChrome/68.0.3440.106%20Safari/537.36      foo=123&bar=abc&baz=456       -       Miss    x4zs2FrOi-BGIdQNVl6QpSFrQTJDMEl5tcY39BHw7k4453tts60qRQ== oreno.tools     http    369     0.387   -       -       -       Miss    HTTP/1.1        -       -       35234   0.387   Miss    text/html       6738    --

ログには cs-uri-query という列に URL クエリパラメータの foo=123&bar=abc&baz=456 が入ります.

そして, 最終的に csv で以下のように出力したいという要件です.

...,sc-content-type,sc-content-len,sc-range-start,sc-range-end,foo,bar,baz
...,text/html,6378,,,123,abc,456

列の foo, bar baz が追加され, それぞれの値が列にセットされることを想定しています.

Cloudfront のログを吸い上げる

embulk-imput-s3

まずは, S3 に溜まっている Cloudfront のログを吸い上げる必要があります. これは, embulk-imput-s3 を利用することで瞬く間に吸い上げることが出来ます.

github.com

吸い上げたログを標準出力に出力するだけであれば, 以下のような設定で吸い上げることが可能です.

in:
  type: s3
  bucket: {{ env.BUCKET_NAME }}
  path_prefix: {{ env.PATH_PREFIX }}
  endpoint: s3-ap-northeast-1.amazonaws.com
  access_key_id: {{ env.ACCESS_KEY }}
  secret_access_key: {{ env.SECRET_KEY }}
  decoders:
    - {type: gzip}
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: "\t"
    trim_if_not_quoted: false
    skip_header_lines: 2
    allow_extra_columns: true
    allow_optional_columns: true
    null_string: "-"
    columns:
      - {name: date, type: string}
      - {name: time, type: string}
      - {name: x_edge_location, type: string}
      - {name: sc_bytes, type: string}
      - {name: c_ip, type: string}
      - {name: cs_method, type: string}
      - {name: cs_host, type: string}
      - {name: cs_uri_stem, type: string}
      - {name: sc_status, type: string}
      - {name: cs_referer, type: string}
      - {name: cs_user_agent, type: string}
      - {name: cs_uri_query, type: string}
      - {name: cs_cookie, type: string}
      - {name: x_edge_result_type, type: string}
      - {name: x_edge_request_id, type: string}
      - {name: x_host_header, type: string}
      - {name: cs_protocol, type: string}
      - {name: cs_bytes, type: string}
      - {name: time_taken, type: string}
      - {name: x_forwarded_for, type: string}
      - {name: ssl_protocol, type: string}
      - {name: ssl_cipher, type: string}
      - {name: x_edge_response_result_type, type: string}
      - {name: cs_protocol_version, type: string}
      - {name: fle_status, type: string}
      - {name: fle_encrypted_fields, type: string}
exec: {}
out:
  type: stdout

embulk preview でちょっと試してみます.

$ embulk preview -G example.yml
...
*************************** 2 ***************************
                       date (string) : 2020-05-10
                       time (string) : 00:16:43
            x_edge_location (string) : SEA19-C1
                   sc_bytes (string) : 7198
                       c_ip (string) : xx.xxx.xxx.xxx
                  cs_method (string) : GET
                    cs_host (string) : d3f7koyt0yburg.cloudfront.net
                cs_uri_stem (string) : /
                  sc_status (string) : 200
                 cs_referer (string) :
              cs_user_agent (string) : Go-http-client/1.1
               cs_uri_query (string) :
                  cs_cookie (string) :
         x_edge_result_type (string) : Hit
          x_edge_request_id (string) : 1TLNWap6yNbcYg_HMDULkLF9tgmkLMkxWLdlWK686espfAQzeP5GkA==
              x_host_header (string) : oreno.tools
                cs_protocol (string) : http
                   cs_bytes (string) : 92
                 time_taken (string) : 0.003
            x_forwarded_for (string) :
               ssl_protocol (string) :
                 ssl_cipher (string) :
x_edge_response_result_type (string) : Hit
        cs_protocol_version (string) : HTTP/1.1
                 fle_status (string) :
       fle_encrypted_fields (string) :

クエリパラメータの解析

やりたいこと

やりたいことの再掲.

# HTTP リクエスト
https://example.com/test?aa=123&bb=456

このリクエストのログを以下のようなデータ構造にしたいと思います.

日時 メソッド リクエストパス aa bb
YYYY.MM.DD HH:mm:ss GET /test xxx1 xxx2

試行錯誤

当初は, 以下のプラグインがめっちゃ要件を満たしていそうだったので試してみました.

github.com

ちなみに, 設定は以下のように書きましが...

...
filters:
  - type: query_string
    query_string_column_name: cs_uri_query
    expanded_columns:
      - {name: aa, type: string}
      - {name: bb, type: string}
...

以下のようなメッセージが出力して期待した挙動が得られずに断念しました.

The column was ignored because it does not seem to be a query string: aa=xxx1&bb=xxx2

どうやら, プラグインの中でクエリパラメータが文字列として認識されない状態になっていて, テストコードを読む限りだと, プラグインに入っている来る文字列は, 以下のようにパスとクエリが同じ文字列になっていることが期待されているようです.

/test?foo=123&bar=abc&baz=456

以下, 該当行です.

github.com

ということで

プラグインを作りました.

github.com

以下, プラグインソースコードです.

require 'uri'

module Embulk
  module Filter

    class QueryStringParseFilterPlugin < FilterPlugin
      Plugin.register_filter('query_string_parse', self)

      def self.transaction(config, in_schema, &control)
        task = {
          'query_string_column_name' => config.param('query_string_column_name', :string, default: "column_name"),
          'expanded_columns' => config.param('expanded_columns', :array, default: [])
        }

        idx = in_schema.size
        task['expanded_columns'].each do |c|
          in_schema = in_schema + [Column.new(idx, c['name'], :string)]
          idx = idx + 1
        end

        yield(task, in_schema)
      end

      def initialize(task, in_schema, out_schema, page_builder)
        super
        @column = task['query_string_column_name']
        @expanded = task['expanded_columns']
      end

      def close
      end

      def add(page)
        page.each do |record|
          begin
            record = hash_record(record)
            # 対象とするカラムに値が入っていない場合の対処
            if record[@column].nil?
              @page_builder.add(record.values + Array.new(@expanded.length, ''))
              next
            end

            parsed_query = parse_query(record[@column])
            result = {}
            parsed_query.each do |pk, pv|
              @expanded.each do |ec|
                result[pk] = pv if ec['name'] == pk
              end
            end
            result_array = []
            result_array = Array.new(@expanded.length, '') if result.values.empty?
            result_array = result.values unless result.values.empty?
            @page_builder.add(record.values + result_array)
          rescue
          end
        end
      end

      def finish
        @page_builder.finish
      end

      def hash_record(record)
        Hash[in_schema.names.zip(record)]
      end

      def parse_query(query)
        query_array = URI::decode_www_form(query)
        Hash[query_array]
      end
    end

  end
end

gem での配布する予定は無いので, シンプルに 1 枚のファイルに記述しました. 設定ファイルは以下のように記述します.

...
filters:
  - type: query_string_parse
    query_string_column_name: cs_uri_query
    expanded_columns:
      - {name: aa, type: string}
      - {name: bb, type: string}
...

先述の embulk-filter-query_string の設定を踏襲しました.

動かしてみる

プラグインをロードして動かしてみます. お手製プラグインは, 以下のようなパスに保存しています.

$ ls -l plugins/embulk/filter/query_string_parse.rb
-rw-r--r--    1 root     root          2100 May 11 23:11 plugins/embulk/filter/query_string_parse.rb

plugins 以下のディレクトリ構成 (embulk/filter/) は, 上記の通りにしておかないと, --load-path オプションを指定した場合, プラグインがロードされませんでした.

$ embulk preview sample.yml --load-path plugins
$ embulk run sample.yml --load-path plugins

以下のように出力されました.

...
2020-05-10,12:23:58,NRT51-C1,334,xx.xxx.xxx.xxxHEAD,d3f7koyt0yburg.cloudfront.net,/test,403,,curl/7.54.0,aa=xxx1&bb=xxx2,,Error,BLZOJ-4vKN508q4VHFsmDvYCmT7hMEBcob21ROkd0m
U4-ennMlc6zA==,oreno.tools,http,96,0.065,,,,Error,HTTP/1.1,,,xxx1,xxx2

...

2020-05-12,14:13:02,NRT51-C1,334,xxx.xxx.xxx.xxx,HEAD,d3f7koyt0yburg.cloudfront.net,/test,403,,curl/7.54.0,cc=xxx1&dd=xxx2,,Error,2jLGruGZgZsUhDW0sesQBe6z97jbrM8lGxo1NVjUNb
rlypM8RZRxAw==,oreno.tools,http,96,0.027,,,,Error,HTTP/1.1,,,,

...
2020-05-10,21:51:14,IAD89-C2,616,xxx.xxx.xxx.xxx,GET,d3f7koyt0yburg.cloudfront.net,/rss.xml,403,https://www.google.com/,Mozilla/5.0%20(Macintosh;%20Intel%20Mac%20OS%20X%2010.13;%20rv:73.0)%20Gecko/20100101%20Firefox/73.0,,,Error,RD3F6dFNkWDelG0xTmPTx5lphN98A6zJF2oM9RArkZS5783hlNO5Hg==,oreno.tools,https,338,0.636,,TLSv1.2,ECDHE-RSA-AES128-GCM-SHA256,Error,HTTP/1.1,,,,
...

URL クエリパラメータ (Cloudfront の場合, cs_uri_query 列に記録される) に aa=xxx1&bb=xxx2 が含まれている場合, ログの末尾に xxx1,xxx2 が追記されています. また, クエリパラメータが存在していないレコードには値が記録されていないことが判ります.

要件を満たされました!

以上

見様見真似で embulk プラグインを作ってみましたが, 要件通りにログデータを加工することが出来ました. 有難うございました.

参考

embulk plugin developer guide (WIP)

github.com

dev.classmethod.jp