ども、かっぱです。
tl;dr
何気なくラジオから流れてきたシシド・カフカという名前のアーティストについて調べようと思ったら、Apache Kafka に入門していたのでメモ。
上記はシシド・カフカさんのオフィシャルサイト。
上記が今回入門した Apache Kafka。
参考
以下の書籍、記事を参考にさせて頂いた。また、随所で引用させて頂いている。誠に有難うございます。
- Apache Kafka
- Apache Kafka 入門
- Apache Kafkaに入門した
- Apache Kafka ―入門からTrifectaを用いた可視化まで―
- fluent-plugin-kafka: fluentd と Kafka の連携
Apache Kafka について
背景
Apache Kafka とは...
- 2011 年に LinkedIn より公開されたオープンソースのメッセージングミドルウェア
- LinkedIn では Kafka を使いウェブサイト上の PV やユーザーのアクティビティ、監視データ等を処理している
特徴
以下のような特徴がある。
- Pub / Sub 型
- オンライン、オフラインに対応(オフライン = バッチ処理、オンライン = リアルタイム処理)
- 一般的なメッセージングシステムと異なりメッセージを一定期間保持する(ディスク容量次第)
- シーケンシャルなファイルアクセスを行うことで高速な処理を実現
- 独自のメッセージフォーマットでメッセージをやり取りすることでスループット性能を引き上げている
- JVM 上で動作する
- AWS のサービスだと Kinesis とか SQS が競合になると思われる
コンポーネント
Apache Kafka を構成するコンポーネントは以下の通り。
- Producer
- Broker
- Consumer
- メッセージ(Kafka におけるデータのことで、実際にアプリケーションが利用するデータは Payload と呼ぶ)
- Zookeeper
下図のように Producer が Broker に対してメッセージを送信、Consumer が Broker からメッセージを取得する。
Zookeeper については、後述する(つもり)だが、Kafka では Broker の情報を Producer や Consumer が参照する為に利用されている。(超ザックリ)また、Zookeeper は必須の構成では無く、Zookeeper 無しでも Kafka は利用出来る。
Producer から Broker へのメッセージ送信
- Produce リクエスト、Multi Produce リクエスト
- Producer から送信されたメッセージを Broker が受け取るとメッセージをディスクに保存する
- メッセージを送信する際には topic 名(キュー名)を指定して送信する
- Broker がディスクに保存する際には topic 名毎に保存される
- Broker はメッセージが保存されたことを保証しない(?)
Broker から Consumer へのメッセージ受信(要求)
- Fetch リクエスト、Multi Fetch リクエスト
- Consumer からの要求に応じて Broker からメッセージを送信する(pull 型)
- pull 型を採用している為 Consumer の処理能力を越えてもメッセージを取りこぼすことが無い
Broker
- Kafka 本体
- Producer と Consumer の間でメッセージのやり取りを仲介するメッセージキューとして動作する
- Zookeeper 構成では Broker は自身のホスト名、ポート番号、トピック、パーティションの情報を Zookeeper に登録する
- Producer と Consumer は Zookeeper を参照してリクエストを送信すべき Broker を判断する
メッセージの保存
- メッセージはトピック → Broker → パーティション → ログ(セグメントファイル)という階層で保存される
- トピックは Broker をまたいで存在出来る(メッセージ保存構成では最上位に位置する)
- Broker はプロセスとして存在し、それらを個々にログとして保存する為のディレクトリが存在する
- パーティションは Broker 毎のログデータを分割し管理する為の階層で、パーティション毎にメッセージの開始バイト位置をオフセット番号として管理している
- 受信したメッセージをファイル(セグメントファイル)として保存、一定期間、上限のメッセージ数を超えると削除され、新しいファイルに切り替わる
Zookeeper について
- https://zookeeper.apache.org/
- ZooKeeperによる分散システム管理 という書籍が出版されている位なので数行で語れるものでは無いという認識
- Zookeeper とは分散システムを構築する為に必要な情報の保持、同期等を行うミドルウェアシステム
- ファイルシステムに似た構造を持つノードと呼ばれる領域に Key / Value 形式でデータを保存する
- ノードには一時ノードと呼ばれるものがあり、各プロセスとの接続情報の管理を行っている(接続セッションが切れると自動的に削除される)
- Consul や etcd とライバル
引き続き...
気になる部分をメモっていく予定。
Apache Kafka を Docker で試してみる
Kafka Docker
上記で紹介されている Docker イメージを Docker Compose を利用して Kafka クラスタを起動してチュートリアルしてみる。
% docker version Client: Version: 1.8.0 API version: 1.20 Go version: go1.4.2 Git commit: 0d03096 Built: Tue Aug 11 17:17:40 UTC 2015 OS/Arch: darwin/amd64 Server: Version: 1.11.0 API version: 1.23 Go version: go1.5.4 Git commit: 4dc5990 Built: 2016-04-13T19:36:04.249835152+00:00 OS/Arch: linux/amd64 % docker-compose version docker-compose version 1.5.2, build 7240ff3 docker-py version: 1.5.0 CPython version: 2.7.9 OpenSSL version: OpenSSL 1.0.1j 15 Oct 2014 % git clone https://github.com/wurstmeister/kafka-docker.git % cd kafka-docker # # KAFKA_ADVERTISED_HOST_NAME を環境に合わせて修正する # % git diff docker-compose.yml diff --git a/docker-compose.yml b/docker-compose.yml index 73dd854..2022aa1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,6 +9,6 @@ kafka: links: - zookeeper:zk environment: - KAFKA_ADVERTISED_HOST_NAME: 192.168.59.103 + KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100 volumes: - /var/run/docker.sock:/var/run/docker.sock % docker-compose up -d Starting kafkadocker_zookeeper_1 Starting kafkadocker_kafka_1 # # コンテナの起動を確認 # % docker-compose ps Name Command State Ports ---------------------------------------------------------------------------------------------------------------------- kafkadocker_kafka_1 start-kafka.sh Up 0.0.0.0:32773->9092/tcp kafkadocker_zookeeper_1 /bin/sh -c /usr/sbin/sshd ... Up 0.0.0.0:32772->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
上記で Kafka と Zookeeper コンテナ起動する。
Producer と Consumer の起動
それぞれのコンテナが起動したところで、Producer と Consumer コンテナを起動してみる。
まずは Producer コンテナの起動。
# # Producer コンテナを起動 # - topic の作成 # - topic の情報を確認 # - Producer プロセスを起動 # % ./start-kafka-shell.sh 192.168.99.100 192.168.99.100:32772 # topic の作成 bash-4.3# $KAFKA_HOME/bin/kafka-topics.sh --create --topic topic \ > --partitions 4 --zookeeper $ZK --replication-factor 2 # 以下のようなエラー(Broker の数 < Replication Factor は NG) Error while executing topic command : replication factor: 2 larger than available brokers: 1 [2016-05-01 01:06:01,466] ERROR kafka.admin.AdminOperationException: replication factor: 2 larger than available brokers: 1 at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:77) at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:236) at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:105) at kafka.admin.TopicCommand$.main(TopicCommand.scala:60) at kafka.admin.TopicCommand.main(TopicCommand.scala) (kafka.admin.TopicCommand$) ... # topic を一旦削除 bash-4.3# $KAFKA_HOME/bin/kafka-topics.sh --delete --topic topic --zookeeper $ZK Topic topic is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. # topic を別名で再作成 bash-4.3# $KAFKA_HOME/bin/kafka-topics.sh --create --topic demo-topic --partitions 4 --zookeeper $ZK --replication-factor 1 Created topic "demo-topic". # topic の情報を確認 bash-4.3# $KAFKA_HOME/bin/kafka-topics.sh --describe --topic demo-topic --zookeeper $ZK Topic:demo-topic PartitionCount:4 ReplicationFactor:1 Configs: Topic: demo-topic Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001 Topic: demo-topic Partition: 1 Leader: 1001 Replicas: 1001 Isr: 1001 Topic: demo-topic Partition: 2 Leader: 1001 Replicas: 1001 Isr: 1001 Topic: demo-topic Partition: 3 Leader: 1001 Replicas: 1001 Isr: 1001 # Producer プロセスを起動 bash-4.3# $KAFKA_HOME/bin/kafka-console-producer.sh --topic=topic \ > --broker-list=`broker-list.sh` hoge
Consumer コンテナを以下のように起動する。
# # Consumer コンテナを起動 # - Consumer プロセスを起動 # % ./start-kafka-shell.sh 192.168.99.100 192.168.99.100:32772 bash-4.3# bash-4.3# $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=topic --zookeeper=$ZK
以下、デモ。
Producer コンテナに適当に文字を入力する(上)と...(Broker に保存されて) Consumer コンテナ(下)のコンソールに入力した文字が出力される。(Consumer プロセスが Broker に Fetch リクエストを投げて Broker からメッセージを受信している図)
Trifecta で Kafka を流れるデータを可視化
Trifecta という Kafka メッセージや Zookeeper のデータを可視化出来るツールをりようして Kafka を流れるデータを可視化してみる。
これも Docker コンテナを利用してみる。
# # Zookeeper の IP アドレス、プロセスを環境変数に指定して起動する # % docker run -it --rm -p 8888:8888 -e ZOOKEEPERS="192.168.99.100:32772" chatu/trifecta
起動すると...
Broker や Consumer や Zookeeper の情報を確認することが出来る。
Observe タブではリアルタイムにメッセージを確認することも可能。
fluentd + fluent-plugin-kafka デモ
せっかくなので fluentd との連携
- Gemfile
source "https://rubygems.org" gem 'ruby-kafka' # 必須では無い gem 'fluentd' gem 'fluent-plugin-kafka' gem 'apache-loggen'
fluentd
- fluentd.conf
<source> @type tail path apache.log tag dummy.apache format apache </source> <match dummy.apache> @type copy <store> @type stdout </store> <store> @type kafka brokers 192.168.99.100:32777 zookeeper 192.168.99.100:32776 default_topic apache-topic </store> </match>
- 起動
% bundle exec fluentd -c fluentd.conf -l debug.log
topic を新規作成
bash-4.3# $KAFKA_HOME/bin/kafka-topics.sh --create --topic apache-topic \ > --partitions 4 --zookeeper $ZK --replication-factor 1
apache-loggen でダミーログを出力
% bundle exec apache-loggen --rate=10 --limit=10000 --progress apache.log
デモ
以上
シシド・カフカさんが気になって調べていたら Apache Kafka に入門してみた。
個人的に(ギョーム的にも)Apache Kafka の使いドコロは今のところは無いけど、Amazon SQS や RabbitMQ のようなメッセージングシステムが個人的に好きなので(実運用の経験は無いけど)引き続き勉強していけたらなと思うが、Amazon SQS や RabbitMQ についても勉強せねば...。
ちなみに、シシド・カフカさんのカフカは kavka と書くということで...
以上。