2016/7/25

Kafka Quick Start


因為 Kafka 依賴 ZooKeeper 來處理分散式系統的備援機制,以下由 Kafka 的安裝與測試,進一步討論如何用 Scala 來撰寫 Producer 與 Consumer。


ZooKeeper Server


Zookeeper 提供目錄和節點的服務,當有兩台伺服器啟動時,會在zookeeper的指定目錄下創建對應自己的臨時節點(這個過程稱為“註冊”),所謂臨時節點,是靠 heartbeat(定時向zookeeper伺服器發送訊息)維持,當伺服器出現故障,zookeeper 就會刪除臨時節點。當伺服器向zookeeper註冊時,zookeeper會分配序列號,我們認為序列號小的那個,就是“主”,序列號大的那個,就是“備援機”(slave)。


當有客戶端需要使用“寫”服務時,需要連接zookeeper,獲得指定目錄下的臨時節點列表,也就是已經註冊的伺服器信息,取得序列號小的那台“主”伺服器的地址,進行後續的訪問操作。以達到“總是訪問主伺服器”的目的。


接下來處理 ZooKeeper 的設定,首先下載 kafka


wget http://apache.stu.edu.tw/kafka/0.9.0.1/kafka_2.10-0.9.0.1.tgz

tar zxvf kafka_2.11-0.9.0.1.tgz

因為 kafka 必須要連結到 ZooKeeper,所以在啟動 kafka server 之前,必須要先啟動 ZooKeeper,通常 ZooKeeper cluster 以三個節點互相備援,且必須要分散配置在三台機器上,目前只是前期測試,就先只啟動一個 ZooKeeper server。


ZooKeeper 的設定檔在 config/zookeeper.properties 裡面,看一下內容,用到 /tmp/zookeeper 這個資料夾,並使用到 TCP Port 2181 作為服務的 port:


# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper

# the port at which the clients will connect
clientPort=2181

# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0

如果為了未來調整設定方便,可以設定環境變數 KAFKA_HOME


KAFKA_HOME=/root/download/kafka/kafka_2.11-0.9.0.1

啟動 ZooKeeper Server


nohup $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties &

在啟動指令,加上 -daemon 也可以,這應該是比剛剛的方法還正確的方式


$KAFKA_HOME/bin/zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties

關閉 ZooKeeper Server


$KAFKA_HOME/bin/zookeeper-server-stop.sh $KAFKA_HOME/config/zookeeper.properties &

啟動完成後,會在 netstat 裡面看到 TCP 2181 Port


# netstat -tnlp|grep 2181
tcp        0      0 0.0.0.0:2181                0.0.0.0:*                   LISTEN      30841/java



我們也可以使用 Apache ZooKeeper 的套件包 zookeeper-3.4.8.tar.gz,不要用 Kafka 內建的 zookeeper。


ZOOKEEPER_HOME=/root/download/kafka/zookeeper-3.4.8
cp $ZOOKEEPER_HOME/conf/zoo_sample.cfg $ZOOKEEPER_HOME/conf/zoo.cfg

啟動 zookeeper server


$ZOOKEEPER_HOME/bin/zkServer.sh start

查看 zookeeper server status


$ZOOKEEPER_HOME/bin/zkServer.sh status

Kafka Broker


因為 Kafka 可以設定 replication-factor,讓資料複製到多個 Broker。


Kafka Broker server 的設定檔在 config/server.properties
裡面,首先要在設定檔中加上這一行,讓我們可以刪除 topic。


delete.topic.enable=true

然後看設定檔的最前面,broker.id 以及 listeners的設定,這是在多個 broker 時,有需要調整的設定項目。


broker.id=0

listeners=PLAINTEXT://:9092
# The port the socket server listens on
port=9092

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# Zookeeper connection string (see zookeeper docs for details).
zookeeper.connect=localhost:2181

如果只要執行一個 Kafka Broker 測試,直接這樣執行就好了,因為接下來要安裝 kafka-manager,所以啟動時順道加上 JMX_PORT 的資訊。


env JMX_PORT=8092 $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

如果要再執行兩個 Broker,要先把設定檔調整好


cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

調整 config/server-1.properties


broker.id=1

listeners=PLAINTEXT://:9093

log.dirs=/tmp/kafka-logs-1

delete.topic.enable=true

調整 config/server-2.properties


broker.id=2

listeners=PLAINTEXT://:9094

log.dirs=/tmp/kafka-logs-2

zookeeper.connect=localhost:2181

接下來再啟動兩個 kafka servers


env JMX_PORT=8093 $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties

env JMX_PORT=8094 $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties

netstat 可看到這三個 Brokers 的 Port


# netstat -tnlp|grep 909
tcp        0      0 0.0.0.0:9092                0.0.0.0:*                   LISTEN      2694/java
tcp        0      0 0.0.0.0:9093                0.0.0.0:*                   LISTEN      3593/java
tcp        0      0 0.0.0.0:9094                0.0.0.0:*                   LISTEN      3657/java

Topic


啟動 Kafka Broker 之後,必須先建立 topic。


因為我們只有三個 Broker,所以 replication-factor 只能指定為 3,如果指定為 4,就會發生 kafka.admin.AdminOperationException。


$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 4 --partitions 1 --topic test
Error while executing topic command : replication factor: 4 larger than available brokers: 3
[2016-02-26 16:17:15,436] ERROR kafka.admin.AdminOperationException: replication factor: 4 larger than available brokers: 3
    at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:77)
    at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:236)
    at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:105)
    at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
    at kafka.admin.TopicCommand.main(TopicCommand.scala)
 (kafka.admin.TopicCommand$)

所以我們將 replication-factor 改為 3


$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic test
Created topic "test".

可以用指令查看 kafka-topic 的資訊,ReplicationFactor是3份,
Replicate 在 0, 1, 2 這三台broker上面,
topic的leader是 broker id 0,leader負責 partition 的read and write。Isr的意思是有哪些 broker 正在同步當中,簡單的說可以知道哪些broker是活著的。


$KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test  PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: test Partition: 0    Leader: 0   Replicas: 0,1,2 Isr: 0,1,2

可以用以下指令刪除 topic


$KAFKA_HOME/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

刪除了 test 這個 topic 之後,就查不到 topic 的資訊了


Message Producer Consumer 測試


啟動 Console Producer,啟動後會進入互動模式,可以一直往 test 這個 topic 發送文字訊息。


$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test

啟動 Console Consumer,這裡要注意的是,Consumer 是連結到 ZooKeeper 而不是直接連到 Broker,--from-beginning 這個參數的意思是要從頭開始,而不是從連結的時候開始。


bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

如果在啟動 consumer 之前,已經在 producer 對 topic: test 發送了幾個訊息,那啟動 consumer 如果有加上 --from-beginning 這個參數,就會把先前那幾個訊息也列印出來。


如果沒有加上 --from-beginning 這個參數,就會從啟動 consumer 開始接收後面的訊息。


列印所有 consumer group 的資訊


bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list

刪除 consumer group: test


bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --delete -group test

yahoo kafka-manager


Yahoo 對 Kafka 開發了一個 kafka-manager 網頁管理介面


我們在這個連結下載 kafka-manager 1.3.0.4.tar.gz


解壓縮後,利用 sbt 就可以建置 kadka-manager 的 production deployment package


sbt clean dist

可以在這個目錄中,找到編譯後的 zip file


kafka-manager-1.3.0.4/target/universal/kafka-manager-1.3.0.4.zip

KAFKA_MANAGER_HOME=/root/download/kafka/kafka-manager-1.3.0.4

解壓縮 kafka-manager-1.3.0.4.zip 可以看到 bin, conf, lib, share 這些資料夾。


首先修改 conf/application.conf 裡面的 zkhosts


kafka-manager.zkhosts="192.168.1.7:2181"

然後用這個指令,啟動 kafka-manager


nohup $KAFKA_MANAGER_HOME/bin/kafka-manager -Dconfig.file=$KAFKA_MANAGER_HOME/conf/application.conf -Dhttp.port=9000 > /dev/null 2>&1

依照這個畫面的資訊,把 zookeeper 加入 kafka cluster list



接下來就可以用網頁查看 Kafka cluster 的資訊



References


kafka 文件


kafka-example-in-scala


Apache kafka原理與特性(0.8V)


Deploy Apache Kafka Cluster on AWS


Apache Kafka: Distributed messaging system


架構師一席談(二) zookeeper在分佈式應用中的作用


ZooKeeper 基礎知識、部署和應用程序


kafka集群操作指南