読者です 読者をやめる 読者になる 読者になる

ようへいの日々精進XP

よかろうもん

stream2es で大量データをサクッと Amazon Elasticsearch Service に放り込む 〜 もう bulk API を操作しなくても良いかもしれない 〜

検証の為に

大量データを Amazon Elasticsearch Service に放り込みたいです。

データは

github.com

こちらを利用させて頂きます。

また、

qiita.com

上記の記事を参考にさせて頂きました。ありがとうございます。

放り込む

放り込む前に csv から json データにする

上記の記事を参考にさせて頂いて Python で書いてみました。標準入力から csv を読み込もうと試行錯誤しましたが、とりあえず csv ファイルを読み込む体でいきます。

# -*- coding: utf-8 -*-

import csv
import json
import random
import sys

INDEX = "ldgourmet"
TYPE  = "restaurant"

#
# http://racchai.hatenablog.com/entry/2016/04/16/173000 を参考にさせて頂きました.
# ありがとうございます.
#
def random_string(length, seq='0123456789abcdefghijklmnopqrstuvwxyz'):
    sr = random.SystemRandom()
    return ''.join([sr.choice(seq) for i in xrange(length)])

with open('restaurants.csv', 'r') as f:
    reader = csv.reader(f)
    header = next(reader)

    for row in reader:
        # index = { 'index': { '_index': INDEX, '_type': TYPE, '_id': random_string(16) }} # bulk API だと Index を付けなきゃいけないけど...
        # print json.dumps(index)
        print json.dumps(dict(zip(header, row)))

レストランデータを展開して、上記のスクリプトを以下のように実行します。

$ tar zxvf ldgourmet.tar.gz
$ ls -l restaurants.csv
-rw-r--r-- 1 cloudpack cloudpack 59311909 Apr 22  2011 restaurants.csv
$ python csv2json.py > restaurants.json

インデックスの作成

レストランデータを放り込むインデックスを作成する。

ひとまず放り込むだけなので、マッピングは超シンプルに。

$ cat mapping.json
{
  "mappings": {
    "restaurant": {
      "properties": {
        "restaurant_id": {
          "type": "integer"
        },
        "name": {
          "type": "string",
        },
        "name_alphabet": {
          "type": "string",
        },
        "name_kana": {
          "type": "string",
        },
        "address": {
          "type": "string",
        },
        "description": {
          "type": "string",
        },
        "purpose": {
          "type": "string",
        },
        "category": {
          "type": "string",
          "analyzer": "whitespace"
        },
        "photo_count": {
          "type": "integer"
        },
        "menu_count": {
          "type": "integer"
        },
        "access_count": {
          "type": "integer"
        },
        "closed": {
          "type": "boolean"
        },
        "location": {
          "type": "geo_point",
          "store": "yes"
        }
      }
    }
  }
}

以下のように実行する。

#
# インデックスを作成
#
_ESS_ENDPOINT=${Amazon Elasticsearch Service Endpoint}
_INDEX='ldgourmet'
_TARGET=${_ESS_ENDPOINT}/${_INDEX}
curl -XPUT ${_TARGET} -d @mapping.json

ここで真打ち stream2es の登場

stream2es については @johtani さんの以下の記事がバイブルです。

blog.johtani.info

stream2es を動かすには Java8 以上が必要ですので、事前に Java8 をインストールしておきましょう。

$ java -version
openjdk version "1.8.0_121"
OpenJDK Runtime Environment (build 1.8.0_121-b13)
OpenJDK 64-Bit Server VM (build 25.121-b13, mixed mode)

あとは、以下のようにダウンロードして実行権限を付与した後にデータの投入を行います。

#
# stream2es を取得
#
curl -O download.elasticsearch.org/stream2es/stream2es; chmod +x stream2es

#
# データの投入
#
_ESS_ENDPOINT=${Amazon Elasticsearch Service Endpoint}
_INDEX='ldgourmet'
_TYPE='restaurant'
_TARGET=${_ESS_ENDPOINT}/${_INDEX}/${_TYPE}
cat restaurants.json | ./stream2es stdin --target ${_TARGET}

実行すると以下のように出力されます。

$ cat restaurants.json | ./stream2es stdin --target ${_TARGET}
2017-03-31T08:56:18.797+0000 INFO  01:36.489 2220.3d/s 1848.3K/s (174.2mb) indexed 214236 streamed 214236 errors 0
2017-03-31T08:56:18.823+0000 INFO  done

ひとまず

ちゃんと放り込めたかを Count API を使って確認します。

_ESS_ENDPOINT=${Amazon Elasticsearch Service Endpoint}
_INDEX='/ldgourmet/restaurant'
_TARGET=${_ESS_ENDPOINT}${_INDEX}
$ curl -s -XGET ${_TARGET}/_count | jq .
{
  "count": 214236,
  "_shards": {
    "total": 2,
    "successful": 2,
    "failed": 0
  }
}

問題なさそうです。

これで検証を進められそうです。

以上

stream2es を使えば

bulk API を駆使しなくても(bulk API を利用する為にデータを整形しなくても)大量データをサクッと放り込んでくれました。また、各ドキュメントの ID についても自動で採番してくれるのも嬉しい限りです。

stream2es ヘルプ

単に標準入力から Elasticsearch にバルクでデータを放り込んでくれるだけでは無さそうです。

$ ./stream2es --help
2017-03-31T12:22:27.534+0000 INFO  Copyright 2013 Elasticsearch

Usage: stream2es [CMD] [OPTS]

Available commands: wiki, twitter, stdin, es

Common opts:
   --authinfo   Stored stream credentials (default: "/home/cloudpack/.authinfo.stream2es")
--clobber       Use elasticsearch 'index' operation, clobbering existing documents, no-clobber uses 'create' which will skip/error existing documents (default: false)
-h --help       Display help (default: false)
--http-insecure Don't verify peer cert (default: false)
   --http-keystore /path/to/keystore (default: null)
   --http-keystore-pass Keystore password (default: null)
   --http-trust-store /path/to/keystore (default: null)
   --http-trust-store-pass Truststore password (default: null)
--indexing      Whether to actually send data to ES (default: true)
   --log        Log level (trace debug info warn error fatal report) (default: "info")
   --mappings   Index mappings (default: null)
-d --max-docs   Number of docs to index (default: -1)
--offset        Add __s2e_offset__ field TO EACH DOCUMENT with the sequence offset of the stream (default: false)
-q --queue-size Size of the internal bulk queue (default: 40)
--replace       Delete index before streaming (default: false)
   --settings   Index settings (default: null)
-s --skip       Skip this many docs before indexing (default: 0)
   --stream-buffer Buffer up to this many pages (default: 50)
   --stream-timeout Wait seconds for data on the stream (default: -1)
   --target     ES location (default: "http://localhost:9200")
   --tee        Save json request payloads as files in path (default: null)
   --tee-bulk   Save bulk request payloads as files in path (default: null)
--tee-errors    Create error-{id} files (default: true)
-v --version    Print version (default: false)
-w --workers    Number of indexing threads (default: 2)

ElasticsearchStream opts:
-b --bulk-bytes Bulk size in bytes (default: 1048576)
   --query      Query to _scan from source (default: "{"query":{"match_all":{}}}")
-q --queue-size Size of the internal bulk queue (default: 1000)
   --scroll-size Source scroll size (default: 500)
   --scroll-time Source scroll context TTL (default: "60s")
   --source     Source ES url (default: null)
--source-http-insecure Don't verify peer cert (default: false)
   --source-http-keystore /path/to/keystore (default: null)
   --source-http-keystore-pass Keystore password (default: null)
   --source-http-trust-store /path/to/keystore (default: null)
   --source-http-trust-store-pass Truststore password (default: null)
   --target     Target ES url (default: null)

GeneratorStream opts:
-b --bulk-bytes Bulk size in bytes (default: 102400)
   --dictionary Dictionary location (default: "/usr/share/dict/words")
   --fields     Field template (str, string, dbl, double, int, integer) (default: "f1:str:1")
-q --queue-size Size of the internal bulk queue (default: 40)
   --stream-buffer Buffer up to this many docs (default: 100000)
   --target     ES index (default: "http://localhost:9200/foo/t")

StdinStream opts:
-b --bulk-bytes Bulk size in bytes (default: 102400)
-q --queue-size Size of the internal bulk queue (default: 40)
   --stream-buffer Buffer up to this many docs (default: 100)
   --target     Target ES http://host:port/index/type (default: "http://localhost:9200/foo/t")

TwitterStream opts:
--authorize     Create oauth credentials (default: false)
-b --bulk-bytes Bulk size in bytes (default: 102400)
   --key        Twitter app consumer key, only for --authorize (default: null)
-q --queue-size Size of the internal bulk queue (default: 1000)
   --secret     Twitter app consumer secret, only for --authorize (default: null)
   --stream-buffer Buffer up to this many tweets (default: 1000)
   --target     Target ES http://host:port/index/type (default: "http://localhost:9200/twitter/status")
   --track      %%-separated list of strings to filter the stream (default: null)

WikiStream opts:
-b --bulk-bytes Bulk size in bytes (default: 3145728)
-q --queue-size Size of the internal bulk queue (default: 40)
   --source     Wiki dump location (default: "https://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles.xml.bz2")
   --stream-buffer Buffer up to this many pages (default: 50)
   --target     Target ES http://host:port/index (we handle types here) (default: "http://localhost:9200/wiki")