検証の為に
大量データを Amazon Elasticsearch Service に放り込みたいです。
データは
こちらを利用させて頂きます。
また、
上記の記事を参考にさせて頂きました。ありがとうございます。
放り込む
放り込む前に csv から json データにする
上記の記事を参考にさせて頂いて Python で書いてみました。標準入力から csv を読み込もうと試行錯誤しましたが、とりあえず csv ファイルを読み込む体でいきます。
# -*- coding: utf-8 -*- import csv import json import random import sys INDEX = "ldgourmet" TYPE = "restaurant" # # http://racchai.hatenablog.com/entry/2016/04/16/173000 を参考にさせて頂きました. # ありがとうございます. # def random_string(length, seq='0123456789abcdefghijklmnopqrstuvwxyz'): sr = random.SystemRandom() return ''.join([sr.choice(seq) for i in xrange(length)]) with open('restaurants.csv', 'r') as f: reader = csv.reader(f) header = next(reader) for row in reader: # index = { 'index': { '_index': INDEX, '_type': TYPE, '_id': random_string(16) }} # bulk API だと Index を付けなきゃいけないけど... # print json.dumps(index) print json.dumps(dict(zip(header, row)))
レストランデータを展開して、上記のスクリプトを以下のように実行します。
$ tar zxvf ldgourmet.tar.gz $ ls -l restaurants.csv -rw-r--r-- 1 cloudpack cloudpack 59311909 Apr 22 2011 restaurants.csv $ python csv2json.py > restaurants.json
インデックスの作成
レストランデータを放り込むインデックスを作成する。
ひとまず放り込むだけなので、マッピングは超シンプルに。
$ cat mapping.json { "mappings": { "restaurant": { "properties": { "restaurant_id": { "type": "integer" }, "name": { "type": "string", }, "name_alphabet": { "type": "string", }, "name_kana": { "type": "string", }, "address": { "type": "string", }, "description": { "type": "string", }, "purpose": { "type": "string", }, "category": { "type": "string", "analyzer": "whitespace" }, "photo_count": { "type": "integer" }, "menu_count": { "type": "integer" }, "access_count": { "type": "integer" }, "closed": { "type": "boolean" }, "location": { "type": "geo_point", "store": "yes" } } } } }
以下のように実行する。
# # インデックスを作成 # _ESS_ENDPOINT=${Amazon Elasticsearch Service Endpoint} _INDEX='ldgourmet' _TARGET=${_ESS_ENDPOINT}/${_INDEX} curl -XPUT ${_TARGET} -d @mapping.json
ここで真打ち stream2es の登場
stream2es については @johtani さんの以下の記事がバイブルです。
stream2es を動かすには Java8 以上が必要ですので、事前に Java8 をインストールしておきましょう。
$ java -version openjdk version "1.8.0_121" OpenJDK Runtime Environment (build 1.8.0_121-b13) OpenJDK 64-Bit Server VM (build 25.121-b13, mixed mode)
あとは、以下のようにダウンロードして実行権限を付与した後にデータの投入を行います。
# # stream2es を取得 # curl -O download.elasticsearch.org/stream2es/stream2es; chmod +x stream2es # # データの投入 # _ESS_ENDPOINT=${Amazon Elasticsearch Service Endpoint} _INDEX='ldgourmet' _TYPE='restaurant' _TARGET=${_ESS_ENDPOINT}/${_INDEX}/${_TYPE} cat restaurants.json | ./stream2es stdin --target ${_TARGET}
実行すると以下のように出力されます。
$ cat restaurants.json | ./stream2es stdin --target ${_TARGET} 2017-03-31T08:56:18.797+0000 INFO 01:36.489 2220.3d/s 1848.3K/s (174.2mb) indexed 214236 streamed 214236 errors 0 2017-03-31T08:56:18.823+0000 INFO done
ひとまず
ちゃんと放り込めたかを Count API を使って確認します。
_ESS_ENDPOINT=${Amazon Elasticsearch Service Endpoint} _INDEX='/ldgourmet/restaurant' _TARGET=${_ESS_ENDPOINT}${_INDEX} $ curl -s -XGET ${_TARGET}/_count | jq . { "count": 214236, "_shards": { "total": 2, "successful": 2, "failed": 0 } }
問題なさそうです。
これで検証を進められそうです。
以上
stream2es を使えば
bulk API を駆使しなくても(bulk API を利用する為にデータを整形しなくても)大量データをサクッと放り込んでくれました。また、各ドキュメントの ID についても自動で採番してくれるのも嬉しい限りです。
stream2es ヘルプ
単に標準入力から Elasticsearch にバルクでデータを放り込んでくれるだけでは無さそうです。
$ ./stream2es --help 2017-03-31T12:22:27.534+0000 INFO Copyright 2013 Elasticsearch Usage: stream2es [CMD] [OPTS] Available commands: wiki, twitter, stdin, es Common opts: --authinfo Stored stream credentials (default: "/home/cloudpack/.authinfo.stream2es") --clobber Use elasticsearch 'index' operation, clobbering existing documents, no-clobber uses 'create' which will skip/error existing documents (default: false) -h --help Display help (default: false) --http-insecure Don't verify peer cert (default: false) --http-keystore /path/to/keystore (default: null) --http-keystore-pass Keystore password (default: null) --http-trust-store /path/to/keystore (default: null) --http-trust-store-pass Truststore password (default: null) --indexing Whether to actually send data to ES (default: true) --log Log level (trace debug info warn error fatal report) (default: "info") --mappings Index mappings (default: null) -d --max-docs Number of docs to index (default: -1) --offset Add __s2e_offset__ field TO EACH DOCUMENT with the sequence offset of the stream (default: false) -q --queue-size Size of the internal bulk queue (default: 40) --replace Delete index before streaming (default: false) --settings Index settings (default: null) -s --skip Skip this many docs before indexing (default: 0) --stream-buffer Buffer up to this many pages (default: 50) --stream-timeout Wait seconds for data on the stream (default: -1) --target ES location (default: "http://localhost:9200") --tee Save json request payloads as files in path (default: null) --tee-bulk Save bulk request payloads as files in path (default: null) --tee-errors Create error-{id} files (default: true) -v --version Print version (default: false) -w --workers Number of indexing threads (default: 2) ElasticsearchStream opts: -b --bulk-bytes Bulk size in bytes (default: 1048576) --query Query to _scan from source (default: "{"query":{"match_all":{}}}") -q --queue-size Size of the internal bulk queue (default: 1000) --scroll-size Source scroll size (default: 500) --scroll-time Source scroll context TTL (default: "60s") --source Source ES url (default: null) --source-http-insecure Don't verify peer cert (default: false) --source-http-keystore /path/to/keystore (default: null) --source-http-keystore-pass Keystore password (default: null) --source-http-trust-store /path/to/keystore (default: null) --source-http-trust-store-pass Truststore password (default: null) --target Target ES url (default: null) GeneratorStream opts: -b --bulk-bytes Bulk size in bytes (default: 102400) --dictionary Dictionary location (default: "/usr/share/dict/words") --fields Field template (str, string, dbl, double, int, integer) (default: "f1:str:1") -q --queue-size Size of the internal bulk queue (default: 40) --stream-buffer Buffer up to this many docs (default: 100000) --target ES index (default: "http://localhost:9200/foo/t") StdinStream opts: -b --bulk-bytes Bulk size in bytes (default: 102400) -q --queue-size Size of the internal bulk queue (default: 40) --stream-buffer Buffer up to this many docs (default: 100) --target Target ES http://host:port/index/type (default: "http://localhost:9200/foo/t") TwitterStream opts: --authorize Create oauth credentials (default: false) -b --bulk-bytes Bulk size in bytes (default: 102400) --key Twitter app consumer key, only for --authorize (default: null) -q --queue-size Size of the internal bulk queue (default: 1000) --secret Twitter app consumer secret, only for --authorize (default: null) --stream-buffer Buffer up to this many tweets (default: 1000) --target Target ES http://host:port/index/type (default: "http://localhost:9200/twitter/status") --track %%-separated list of strings to filter the stream (default: null) WikiStream opts: -b --bulk-bytes Bulk size in bytes (default: 3145728) -q --queue-size Size of the internal bulk queue (default: 40) --source Wiki dump location (default: "https://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles.xml.bz2") --stream-buffer Buffer up to this many pages (default: 50) --target Target ES http://host:port/index (we handle types here) (default: "http://localhost:9200/wiki")