ようへいの日々精進XP

よかろうもん

stream2es で大量データをサクッと Amazon Elasticsearch Service に放り込む 〜 もう bulk API を操作しなくても良いかもしれない 〜

検証の為に

大量データを Amazon Elasticsearch Service に放り込みたいです。

データは

github.com

こちらを利用させて頂きます。

また、

qiita.com

上記の記事を参考にさせて頂きました。ありがとうございます。

放り込む

放り込む前に csv から json データにする

上記の記事を参考にさせて頂いて Python で書いてみました。標準入力から csv を読み込もうと試行錯誤しましたが、とりあえず csv ファイルを読み込む体でいきます。

# -*- coding: utf-8 -*-

import csv
import json
import random
import sys

INDEX = "ldgourmet"
TYPE  = "restaurant"

#
# http://racchai.hatenablog.com/entry/2016/04/16/173000 を参考にさせて頂きました.
# ありがとうございます.
#
def random_string(length, seq='0123456789abcdefghijklmnopqrstuvwxyz'):
    sr = random.SystemRandom()
    return ''.join([sr.choice(seq) for i in xrange(length)])

with open('restaurants.csv', 'r') as f:
    reader = csv.reader(f)
    header = next(reader)

    for row in reader:
        # index = { 'index': { '_index': INDEX, '_type': TYPE, '_id': random_string(16) }} # bulk API だと Index を付けなきゃいけないけど...
        # print json.dumps(index)
        print json.dumps(dict(zip(header, row)))

レストランデータを展開して、上記のスクリプトを以下のように実行します。

$ tar zxvf ldgourmet.tar.gz
$ ls -l restaurants.csv
-rw-r--r-- 1 cloudpack cloudpack 59311909 Apr 22  2011 restaurants.csv
$ python csv2json.py > restaurants.json

インデックスの作成

レストランデータを放り込むインデックスを作成する。

ひとまず放り込むだけなので、マッピングは超シンプルに。

$ cat mapping.json
{
  "mappings": {
    "restaurant": {
      "properties": {
        "restaurant_id": {
          "type": "integer"
        },
        "name": {
          "type": "string",
        },
        "name_alphabet": {
          "type": "string",
        },
        "name_kana": {
          "type": "string",
        },
        "address": {
          "type": "string",
        },
        "description": {
          "type": "string",
        },
        "purpose": {
          "type": "string",
        },
        "category": {
          "type": "string",
          "analyzer": "whitespace"
        },
        "photo_count": {
          "type": "integer"
        },
        "menu_count": {
          "type": "integer"
        },
        "access_count": {
          "type": "integer"
        },
        "closed": {
          "type": "boolean"
        },
        "location": {
          "type": "geo_point",
          "store": "yes"
        }
      }
    }
  }
}

以下のように実行する。

#
# インデックスを作成
#
_ESS_ENDPOINT=${Amazon Elasticsearch Service Endpoint}
_INDEX='ldgourmet'
_TARGET=${_ESS_ENDPOINT}/${_INDEX}
curl -XPUT ${_TARGET} -d @mapping.json

ここで真打ち stream2es の登場

stream2es については @johtani さんの以下の記事がバイブルです。

blog.johtani.info

stream2es を動かすには Java8 以上が必要ですので、事前に Java8 をインストールしておきましょう。

$ java -version
openjdk version "1.8.0_121"
OpenJDK Runtime Environment (build 1.8.0_121-b13)
OpenJDK 64-Bit Server VM (build 25.121-b13, mixed mode)

あとは、以下のようにダウンロードして実行権限を付与した後にデータの投入を行います。

#
# stream2es を取得
#
curl -O download.elasticsearch.org/stream2es/stream2es; chmod +x stream2es

#
# データの投入
#
_ESS_ENDPOINT=${Amazon Elasticsearch Service Endpoint}
_INDEX='ldgourmet'
_TYPE='restaurant'
_TARGET=${_ESS_ENDPOINT}/${_INDEX}/${_TYPE}
cat restaurants.json | ./stream2es stdin --target ${_TARGET}

実行すると以下のように出力されます。

$ cat restaurants.json | ./stream2es stdin --target ${_TARGET}
2017-03-31T08:56:18.797+0000 INFO  01:36.489 2220.3d/s 1848.3K/s (174.2mb) indexed 214236 streamed 214236 errors 0
2017-03-31T08:56:18.823+0000 INFO  done

ひとまず

ちゃんと放り込めたかを Count API を使って確認します。

_ESS_ENDPOINT=${Amazon Elasticsearch Service Endpoint}
_INDEX='/ldgourmet/restaurant'
_TARGET=${_ESS_ENDPOINT}${_INDEX}
$ curl -s -XGET ${_TARGET}/_count | jq .
{
  "count": 214236,
  "_shards": {
    "total": 2,
    "successful": 2,
    "failed": 0
  }
}

問題なさそうです。

これで検証を進められそうです。

以上

stream2es を使えば

bulk API を駆使しなくても(bulk API を利用する為にデータを整形しなくても)大量データをサクッと放り込んでくれました。また、各ドキュメントの ID についても自動で採番してくれるのも嬉しい限りです。

stream2es ヘルプ

単に標準入力から Elasticsearch にバルクでデータを放り込んでくれるだけでは無さそうです。

$ ./stream2es --help
2017-03-31T12:22:27.534+0000 INFO  Copyright 2013 Elasticsearch

Usage: stream2es [CMD] [OPTS]

Available commands: wiki, twitter, stdin, es

Common opts:
   --authinfo   Stored stream credentials (default: "/home/cloudpack/.authinfo.stream2es")
--clobber       Use elasticsearch 'index' operation, clobbering existing documents, no-clobber uses 'create' which will skip/error existing documents (default: false)
-h --help       Display help (default: false)
--http-insecure Don't verify peer cert (default: false)
   --http-keystore /path/to/keystore (default: null)
   --http-keystore-pass Keystore password (default: null)
   --http-trust-store /path/to/keystore (default: null)
   --http-trust-store-pass Truststore password (default: null)
--indexing      Whether to actually send data to ES (default: true)
   --log        Log level (trace debug info warn error fatal report) (default: "info")
   --mappings   Index mappings (default: null)
-d --max-docs   Number of docs to index (default: -1)
--offset        Add __s2e_offset__ field TO EACH DOCUMENT with the sequence offset of the stream (default: false)
-q --queue-size Size of the internal bulk queue (default: 40)
--replace       Delete index before streaming (default: false)
   --settings   Index settings (default: null)
-s --skip       Skip this many docs before indexing (default: 0)
   --stream-buffer Buffer up to this many pages (default: 50)
   --stream-timeout Wait seconds for data on the stream (default: -1)
   --target     ES location (default: "http://localhost:9200")
   --tee        Save json request payloads as files in path (default: null)
   --tee-bulk   Save bulk request payloads as files in path (default: null)
--tee-errors    Create error-{id} files (default: true)
-v --version    Print version (default: false)
-w --workers    Number of indexing threads (default: 2)

ElasticsearchStream opts:
-b --bulk-bytes Bulk size in bytes (default: 1048576)
   --query      Query to _scan from source (default: "{"query":{"match_all":{}}}")
-q --queue-size Size of the internal bulk queue (default: 1000)
   --scroll-size Source scroll size (default: 500)
   --scroll-time Source scroll context TTL (default: "60s")
   --source     Source ES url (default: null)
--source-http-insecure Don't verify peer cert (default: false)
   --source-http-keystore /path/to/keystore (default: null)
   --source-http-keystore-pass Keystore password (default: null)
   --source-http-trust-store /path/to/keystore (default: null)
   --source-http-trust-store-pass Truststore password (default: null)
   --target     Target ES url (default: null)

GeneratorStream opts:
-b --bulk-bytes Bulk size in bytes (default: 102400)
   --dictionary Dictionary location (default: "/usr/share/dict/words")
   --fields     Field template (str, string, dbl, double, int, integer) (default: "f1:str:1")
-q --queue-size Size of the internal bulk queue (default: 40)
   --stream-buffer Buffer up to this many docs (default: 100000)
   --target     ES index (default: "http://localhost:9200/foo/t")

StdinStream opts:
-b --bulk-bytes Bulk size in bytes (default: 102400)
-q --queue-size Size of the internal bulk queue (default: 40)
   --stream-buffer Buffer up to this many docs (default: 100)
   --target     Target ES http://host:port/index/type (default: "http://localhost:9200/foo/t")

TwitterStream opts:
--authorize     Create oauth credentials (default: false)
-b --bulk-bytes Bulk size in bytes (default: 102400)
   --key        Twitter app consumer key, only for --authorize (default: null)
-q --queue-size Size of the internal bulk queue (default: 1000)
   --secret     Twitter app consumer secret, only for --authorize (default: null)
   --stream-buffer Buffer up to this many tweets (default: 1000)
   --target     Target ES http://host:port/index/type (default: "http://localhost:9200/twitter/status")
   --track      %%-separated list of strings to filter the stream (default: null)

WikiStream opts:
-b --bulk-bytes Bulk size in bytes (default: 3145728)
-q --queue-size Size of the internal bulk queue (default: 40)
   --source     Wiki dump location (default: "https://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles.xml.bz2")
   --stream-buffer Buffer up to this many pages (default: 50)
   --target     Target ES http://host:port/index (we handle types here) (default: "http://localhost:9200/wiki")

2017 年 03 月 30 日(木)

ランチ

よし本の日替わり。650 円でコスパ良し。

Ansible徹底入門 輪読会 #0

cirasu.connpass.com

久しぶりの勉強会でした。

Ansible は公私ともに使っていないけど、「Ansible徹底入門」を読んで改めて Ansible の魅力について気付きがあったと思う。(宣伝ではありません)

小ネタ道場一本勝負 〜 MacOS X でランダムな英数字を生成する 〜

たのもう

MacOS X でランダムな英数字を生成したいんです。

もちろん、pwgen というコマンドがあるのは知っていますが、今回は pwgen は使わない方法を模索します。

技あり!を取られた気分

こんな感じで

cat /dev/urandom | tr -dc A-Za-z0-9 | fold -w32 | head -n1

いけるはず…

ところが…

$ cat /dev/urandom | tr -dc A-Za-z0-9 | fold -w32 | head -n1
tr: Illegal byte sequence

お…技ありを取られた気分。

なぜ、こんな事が起こるかについては、以下の記事に書かれていました。

unix.stackexchange.com

Computers store data as sequences of bytes. A text is a sequence of characters. There are several ways to encode characters as bytes, called character encodings. The de facto standard character encoding in most of the world, especially on OSX, is UTF-8, which is an encoding for the Unicode character set. There are only 256 possible bytes, but over a million possible Unicode characters, so most characters are encoded as multiple bytes. UTF-8 is a variable-length encoding: depending on the character, it can take from one to four bytes to encode a character. Some sequences of bytes do not represent any character in UTF-8. Therefore, there are sequences of bytes which are not valid UTF-8 text files.

tr is complaining because it encountered such a byte sequence. It expects to see a text file encoded in UTF-8, but it sees binary data which is not valid UTF-8.

cat /dev/urandomtr が期待しない文字列(UTF-8 では無い文字列)が流れてきているからとのことです。

ということで、一本

こんな感じで

$ export LC_ALL=C; cat /dev/urandom | tr -dc A-Za-z0-9 | fold -w32 | head -n1
2u8uJ3zweCwHAvRjTsIYUKjXGG9KId5y

有難うございました

unix.stackexchange.com

やっぱり、pwgen が良い気がしました。

Markdown でスライドが書ける Deckset がなかなか良いのでメモ

さようなら Keynote さようなら PowerPoint

は言い過ぎかもしれないけど Markdown でスライドを書ける Deckset を使い始めました。

www.decksetapp.com

Deckset の何が良いとや?

  • 繰り返しになるが Markdown で書けるので、ざっと書いたメモがそのままスライドになる

これに限る。

凝ったアニメーションや装飾等は無く、テーマの数も少ないが、元々 KeynotePowerPoint でも使いこなせていなかったので、自分としてデメリットにならない。

また、プレゼンター用のノートも書けたりするので、カンペが必須な自分も心配 Nothing 。

書式等については、以下のドキュメントに纏められているので安心ですたい。

docs.decksetapp.com

要望的な何か

スライドのリモートコントロールKeynote Remote のように iOSバイスから手軽に出来ると嬉しいです。

github.com

github.com

一応、上記のようにオープンソースで公開されているものの、個人的には敷居が高いです…

ということで

初めての Deckset 作品

speakerdeck.com

今週、開催予定の Ansible徹底入門 輪読会 #0で発表する予定の資料です。

Deckset は

華美な装飾は不要、スライドの中身で勝負したい(笑)、Markdown が母国語になっているエンジニアの皆さんにお薦めだと思います。

「Ansible 徹底入門」を読んでる途中だけど Ansible Callback Plugin(CloudWatch Logs にイベントを送信するプラグイン) を作ってみたのメモ

Ansible 徹底入門

www.shoeisha.co.jp

自分自身はまだ読んでいる途中ですが、Ansible をこれから利用しようと考えている方から、各種クラウド環境と組み合わせた一歩踏み込んだ利用を考えて方まで幅広い層の皆さんにお薦め出来ると思います。

燃えてキタ━━━━(゚∀゚)━━━━!!

Ansible 徹底入門を読み進めているうちに、燃えてきたので Ansible Callback Plugin(CloudWatch Logs にイベントを送信するプラグイン) を作ってみました。

Ansible Plugin について

以下の資料にまとめました。

speakerdeck.com

作った Ansible Plugin

github.com

Playbook の実行結果等を CloudWatch Logs に転送するやつです。

プラグインを実装するにあたり、logstash プラグインの実装を参考にさせて頂きました。ありがとうございます。

デモ

デモ環境

$ python --version
Python 2.7.13

$ ansible --version
ansible 2.2.1.0
  config file = /path/to/.ansible.cfg
  configured module search path = Default w/o overrides

プラグインの置き場

ansible.cfg で指定します。

[default]
callback_plugins = ~/path/to/plugins
callback_whitelist = cloudwatch_logs

callback_plugins に指定したディレクトリにプラグインを設置。設置しただけではダメのようで、callback_whitelist に利用したいプラグイン名を記述する必要があるとのことです。(実施に試してところ、callback_whitelist への記載が不要な挙動だったので引き続き調べたい)

Playbook

引き続き、前の記事で紹介した Datadog のタグを管理するモジュールを利用する Playbook を利用します。

- hosts: localhost
  tasks:
    - name: Test datadog_tags(present)
      datadog_tags:
        state: present
        host: myhost
        tags: 'aa,bb,cc,dd'
        api_key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
        app_key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
    - name: Test datadog_tags(absent)
      datadog_tags:
        state: absent
        host: myhost
        api_key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
        app_key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

プロビジョニング

$ ansible-playbook datadog.yml
 [WARNING]: Host file not found: /etc/ansible/hosts

 [WARNING]: provided hosts list is empty, only localhost is available


PLAY [localhost] ***************************************************************

TASK [setup] *******************************************************************
ok: [localhost]

TASK [Test datadog_tags(present)] **********************************************
changed: [localhost]

TASK [Test datadog_tags(absent)] ***********************************************
changed: [localhost]

PLAY RECAP *********************************************************************
localhost                  : ok=3    changed=2    unreachable=0    failed=0

CloudWatch Logs を確認

プラグインのデフォルトでは AWS_PROFILE は default が設定され、Log Group は ansible 及び Log Stream が provision が設定されます。

$ aws --profile ${AWS_PROFILE} --region ${AWS_REGION} \
  logs get-log-events \
    --log-group-name ansible --log-stream-name provision | jq '.events|sort_by(.timestamp)|.[-2,-1]'
{
  "ingestionTime": 1490456583837,
  "timestamp": 1490456583257,
  "message": "{'log_action': 'ansible ok', 'status': 'OK', 'host': 'myhost', 'session': 'b8891b28-1171-11e7-9524-3c15c2bdc014', 'ansible_type': 'task', 'ansible_playbook': 'datadog.yml', 'ansible_host': 'localhost', 'ansible_task': TASK: Test datadog_tags(present), 'ansible_result': '{\"changed\": false}'}"
}
{
  "ingestionTime": 1490456583837,
  "timestamp": 1490456583260,
  "message": "{'log_action': 'ansible stats', 'status': 'OK', 'host': 'myhost', 'session': 'b8891b28-1171-11e7-9524-3c15c2bdc014', 'ansible_type': 'finish', 'ansible_playbook': 'datadog.yml', 'ansible_result': '{\"localhost\": {\"ok\": 2, \"failures\": 0, \"unreachable\": 0, \"changed\": 0, \"skipped\": 0}}'}"
}

実装にあたり

Callback Plugin では以下のようなイベントをフックすることが出来ます。

$ grep v2_ plugins/cloudwatch_logs.py
    def v2_playbook_on_start(self, playbook):
    def v2_playbook_on_stats(self, stats):
    def v2_runner_on_ok(self, result, **kwargs):
    def v2_runner_on_skipped(self, result, **kwargs):
    def v2_playbook_on_import_for_host(self, result, imported_file):
    def v2_playbook_on_not_import_for_host(self, result, missing_file):
    def v2_runner_on_failed(self, result, **kwargs):
    def v2_runner_on_unreachable(self, result, **kwargs):
    def v2_runner_on_async_failed(self, result, **kwargs):

また、Python Logging のハンドラで CloudWatch Logs に転送出来る watchtower を利用することで、Logging のインターフェースを利用しています。(参考にした logstash プラグインも同様の実装になっています)

github.com

ということで

CloudWatch Logs に飛ばして(みた)レベルだが、Callback プラグインであれば意外に簡単に実装することが出来ました。

モジュールの実装に関する情報はチラホラ見かけることが出来ますが、プラグインそのものに関する情報があまり見かけない気がしていますが、モジュールやプラグインの側面から Ansible を見てみるのも面白いと考えています。

「Ansible 徹底入門」を読んでる途中だけど Ansible Module(Datadog のタグを管理するモジュール) を作ってみたのメモ

Ansible 徹底入門

www.shoeisha.co.jp

自分自身はまだ読んでいる途中ですが、Ansible をこれから利用しようと考えている方から、各種クラウド環境と組み合わせた一歩踏み込んだ利用を考えて方まで幅広い層の皆さんにお薦め出来ると思います。

ムラムラ

Ansible 徹底入門を読み進めているうちに、ムラムラっと Ansible Module を作ってみたくなったので作ってみました。

Ansible Module について

以下の資料にまとめました。

speakerdeck.com

作った Ansible Module

github.com

Datadog のホストに付与するタグを管理するモジュール。

デモ

デモ環境

$ python --version
Python 2.7.13

$ ansible --version
ansible 2.2.1.0
  config file = /path/to/.ansible.cfg
  configured module search path = Default w/o overrides

モジュールの置き場

以下の何れかのディレクトリに置きます。

Playbook

- hosts: localhost
  tasks:
    - name: Test datadog_tags(present)
      datadog_tags:
        state: present
        host: myhost
        tags: 'aa,bb,cc,dd'
        api_key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
        app_key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
    - name: Test datadog_tags(absent)
      datadog_tags:
        state: absent
        host: myhost
        api_key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
        app_key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

プロビジョニング

$ ansible-playbook datadog.yml
 [WARNING]: Host file not found: /etc/ansible/hosts

 [WARNING]: provided hosts list is empty, only localhost is available


PLAY [localhost] ***************************************************************

TASK [setup] *******************************************************************
ok: [localhost]

TASK [Test datadog_tags(present)] **********************************************
changed: [localhost]

TASK [Test datadog_tags(absent)] ***********************************************
changed: [localhost]

PLAY RECAP *********************************************************************
localhost                  : ok=3    changed=2    unreachable=0    failed=0

実行例では present して absent しているので、実行結果としてはタグは付与されていないことになるので、一応、present のみ実行した状態は以下のように指定したホストにタグが付与されています。

f:id:inokara:20170326000854p:plain

実装にあたり

  • 引数は AnsibleModule クラスを継承したインスタンスの params に含まれているので適宜取り出して処理すれば良いのは楽
  • モジュール側で冪等性を頑張る必要がある(既に指定したタグが存在している場合には module.exit_json(changed=False) で終了するとか)

参考

以下のリンクを参考にさせて頂くことで、思いのほか簡単に Ansible Module を作ることが出来た。

ありがとうございました。