ようへいの日々精進XP

よかろうもん

シシド・カフカさんが気になって調べていたら Apache Kafka 入門していたメモ

ども、かっぱです。

tl;dr

何気なくラジオから流れてきたシシド・カフカという名前のアーティストについて調べようと思ったら、Apache Kafka に入門していたのでメモ。

shishido-kavka.com

上記はシシド・カフカさんのオフィシャルサイト。

kafka.apache.org

上記が今回入門した Apache Kafka。


参考

以下の書籍、記事を参考にさせて頂いた。また、随所で引用させて頂いている。誠に有難うございます。


Apache Kafka について

背景

Apache Kafka とは...

  • 2011 年に LinkedIn より公開されたオープンソースのメッセージングミドルウェア
  • LinkedIn では Kafka を使いウェブサイト上の PV やユーザーのアクティビティ、監視データ等を処理している

特徴

以下のような特徴がある。

  • Pub / Sub 型
  • オンライン、オフラインに対応(オフライン = バッチ処理、オンライン = リアルタイム処理)
  • 一般的なメッセージングシステムと異なりメッセージを一定期間保持する(ディスク容量次第)
  • シーケンシャルなファイルアクセスを行うことで高速な処理を実現
  • 独自のメッセージフォーマットでメッセージをやり取りすることでスループット性能を引き上げている
  • JVM 上で動作する
  • AWS のサービスだと Kinesis とか SQS が競合になると思われる

コンポーネント

Apache Kafka を構成するコンポーネントは以下の通り。

  • Producer
  • Broker
  • Consumer
  • メッセージ(Kafka におけるデータのことで、実際にアプリケーションが利用するデータは Payload と呼ぶ)
  • Zookeeper

下図のように Producer が Broker に対してメッセージを送信、Consumer が Broker からメッセージを取得する。

f:id:inokara:20160501215543p:plain

Zookeeper については、後述する(つもり)だが、Kafka では Broker の情報を Producer や Consumer が参照する為に利用されている。(超ザックリ)また、Zookeeper は必須の構成では無く、Zookeeper 無しでも Kafka は利用出来る。

Producer から Broker へのメッセージ送信

  • Produce リクエスト、Multi Produce リクエスト
  • Producer から送信されたメッセージを Broker が受け取るとメッセージをディスクに保存する
  • メッセージを送信する際には topic 名(キュー名)を指定して送信する
  • Broker がディスクに保存する際には topic 名毎に保存される
  • Broker はメッセージが保存されたことを保証しない(?)

Broker から Consumer へのメッセージ受信(要求)

  • Fetch リクエスト、Multi Fetch リクエスト
  • Consumer からの要求に応じて Broker からメッセージを送信する(pull 型)
  • pull 型を採用している為 Consumer の処理能力を越えてもメッセージを取りこぼすことが無い

Broker

  • Kafka 本体
  • Producer と Consumer の間でメッセージのやり取りを仲介するメッセージキューとして動作する
  • Zookeeper 構成では Broker は自身のホスト名、ポート番号、トピック、パーティションの情報を Zookeeper に登録する
  • Producer と Consumer は Zookeeper を参照してリクエストを送信すべき Broker を判断する

メッセージの保存

  • メッセージはトピック → Broker → パーティション → ログ(セグメントファイル)という階層で保存される
  • トピックは Broker をまたいで存在出来る(メッセージ保存構成では最上位に位置する)
  • Broker はプロセスとして存在し、それらを個々にログとして保存する為のディレクトリが存在する
  • パーティションは Broker 毎のログデータを分割し管理する為の階層で、パーティション毎にメッセージの開始バイト位置をオフセット番号として管理している
  • 受信したメッセージをファイル(セグメントファイル)として保存、一定期間、上限のメッセージ数を超えると削除され、新しいファイルに切り替わる

Zookeeper について

  • https://zookeeper.apache.org/
  • ZooKeeperによる分散システム管理 という書籍が出版されている位なので数行で語れるものでは無いという認識
  • Zookeeper とは分散システムを構築する為に必要な情報の保持、同期等を行うミドルウェアシステム
  • ファイルシステムに似た構造を持つノードと呼ばれる領域に Key / Value 形式でデータを保存する
  • ノードには一時ノードと呼ばれるものがあり、各プロセスとの接続情報の管理を行っている(接続セッションが切れると自動的に削除される)
  • Consul や etcd とライバル

引き続き...

気になる部分をメモっていく予定。


Apache Kafka を Docker で試してみる

Kafka Docker

github.com

上記で紹介されている Docker イメージを Docker Compose を利用して Kafka クラスタを起動してチュートリアルしてみる。

% docker version
Client:
 Version:      1.8.0
 API version:  1.20
 Go version:   go1.4.2
 Git commit:   0d03096
 Built:        Tue Aug 11 17:17:40 UTC 2015
 OS/Arch:      darwin/amd64

Server:
 Version:      1.11.0
 API version:  1.23
 Go version:   go1.5.4
 Git commit:   4dc5990
 Built:        2016-04-13T19:36:04.249835152+00:00
 OS/Arch:      linux/amd64

% docker-compose version
docker-compose version 1.5.2, build 7240ff3
docker-py version: 1.5.0
CPython version: 2.7.9
OpenSSL version: OpenSSL 1.0.1j 15 Oct 2014

% git clone https://github.com/wurstmeister/kafka-docker.git
% cd kafka-docker

#
# KAFKA_ADVERTISED_HOST_NAME を環境に合わせて修正する
#
% git diff docker-compose.yml
diff --git a/docker-compose.yml b/docker-compose.yml
index 73dd854..2022aa1 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -9,6 +9,6 @@ kafka:
   links: 
     - zookeeper:zk
   environment:
-    KAFKA_ADVERTISED_HOST_NAME: 192.168.59.103
+    KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100
   volumes:
     - /var/run/docker.sock:/var/run/docker.sock

% docker-compose up -d
Starting kafkadocker_zookeeper_1
Starting kafkadocker_kafka_1

#
# コンテナの起動を確認
#
% docker-compose ps
         Name                        Command               State                          Ports                        
----------------------------------------------------------------------------------------------------------------------
kafkadocker_kafka_1       start-kafka.sh                   Up      0.0.0.0:32773->9092/tcp                             
kafkadocker_zookeeper_1   /bin/sh -c /usr/sbin/sshd  ...   Up      0.0.0.0:32772->2181/tcp, 22/tcp, 2888/tcp, 3888/tcp 

上記で Kafka と Zookeeper コンテナ起動する。

Producer と Consumer の起動

それぞれのコンテナが起動したところで、Producer と Consumer コンテナを起動してみる。

まずは Producer コンテナの起動。

#
# Producer コンテナを起動
#   - topic の作成
#   - topic の情報を確認
#   - Producer プロセスを起動
#
% ./start-kafka-shell.sh 192.168.99.100 192.168.99.100:32772

# topic の作成
bash-4.3# $KAFKA_HOME/bin/kafka-topics.sh --create --topic topic \
> --partitions 4 --zookeeper $ZK --replication-factor 2

# 以下のようなエラー(Broker の数 < Replication Factor は NG)
Error while executing topic command : replication factor: 2 larger than available brokers: 1
[2016-05-01 01:06:01,466] ERROR kafka.admin.AdminOperationException: replication factor: 2 larger than available brokers: 1
        at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:77)
        at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:236)
        at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:105)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)
 (kafka.admin.TopicCommand$)
...

# topic を一旦削除
bash-4.3# $KAFKA_HOME/bin/kafka-topics.sh --delete --topic topic --zookeeper $ZK
Topic topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

# topic を別名で再作成
bash-4.3# $KAFKA_HOME/bin/kafka-topics.sh --create --topic demo-topic --partitions 4 --zookeeper $ZK --replication-factor 1
Created topic "demo-topic".

# topic の情報を確認
bash-4.3# $KAFKA_HOME/bin/kafka-topics.sh --describe --topic demo-topic --zookeeper $ZK                                                                                                          
Topic:demo-topic        PartitionCount:4        ReplicationFactor:1     Configs:
        Topic: demo-topic       Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: demo-topic       Partition: 1    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: demo-topic       Partition: 2    Leader: 1001    Replicas: 1001  Isr: 1001
        Topic: demo-topic       Partition: 3    Leader: 1001    Replicas: 1001  Isr: 1001

# Producer プロセスを起動
bash-4.3# $KAFKA_HOME/bin/kafka-console-producer.sh --topic=topic \
> --broker-list=`broker-list.sh`

hoge

Consumer コンテナを以下のように起動する。

#
# Consumer コンテナを起動
#   - Consumer プロセスを起動
#
% ./start-kafka-shell.sh 192.168.99.100 192.168.99.100:32772
bash-4.3# 
bash-4.3# $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=topic --zookeeper=$ZK

以下、デモ。

www.youtube.com

Producer コンテナに適当に文字を入力する(上)と...(Broker に保存されて) Consumer コンテナ(下)のコンソールに入力した文字が出力される。(Consumer プロセスが Broker に Fetch リクエストを投げて Broker からメッセージを受信している図)

Trifecta で Kafka を流れるデータを可視化

Trifecta という Kafka メッセージや Zookeeper のデータを可視化出来るツールをりようして Kafka を流れるデータを可視化してみる。

github.com

これも Docker コンテナを利用してみる。

#
# Zookeeper の IP アドレス、プロセスを環境変数に指定して起動する
#
% docker run -it --rm -p 8888:8888 -e ZOOKEEPERS="192.168.99.100:32772" chatu/trifecta

起動すると...

f:id:inokara:20160501110430p:plain

Broker や Consumer や Zookeeper の情報を確認することが出来る。

f:id:inokara:20160501110703p:plain

Observe タブではリアルタイムにメッセージを確認することも可能。


fluentd + fluent-plugin-kafka デモ

せっかくなので fluentd との連携

  • Gemfile
source "https://rubygems.org"

gem 'ruby-kafka'                # 必須では無い
gem 'fluentd'
gem 'fluent-plugin-kafka'
gem 'apache-loggen'

fluentd

  • fluentd.conf
<source>
  @type tail  path apache.log
  tag dummy.apache
  format apache
</source>

<match dummy.apache>
  @type copy
  <store>
    @type stdout
  </store>
  <store>
    @type kafka
    brokers 192.168.99.100:32777
    zookeeper 192.168.99.100:32776
    default_topic apache-topic
  </store>
</match>
  • 起動
% bundle exec fluentd -c fluentd.conf -l debug.log

topic を新規作成

bash-4.3# $KAFKA_HOME/bin/kafka-topics.sh --create --topic apache-topic \
> --partitions 4 --zookeeper $ZK --replication-factor 1

apache-loggen でダミーログを出力

% bundle exec apache-loggen --rate=10 --limit=10000 --progress apache.log

デモ

youtu.be


以上

シシド・カフカさんが気になって調べていたら Apache Kafka に入門してみた。

個人的に(ギョーム的にも)Apache Kafka の使いドコロは今のところは無いけど、Amazon SQS や RabbitMQ のようなメッセージングシステムが個人的に好きなので(実運用の経験は無いけど)引き続き勉強していけたらなと思うが、Amazon SQS や RabbitMQ についても勉強せねば...。

ちなみに、シシド・カフカさんのカフカは kavka と書くということで...

以上。