2016/8/22

Lets Encrypt SSL Key 申請與更新

Let's Encrypt是由EFF、Mozilla基金會、Akamai和Cisco等等許多大公司及非營利組織於2014年共同創立的ISRG組織所成立的數位憑證認證機構,目標是讓提供免費申請並自動更新的憑證服務,推廣及加速全球網站採用HTTPS安全的加密傳輸協定。


Let's Encrypt已在2016年4月正式進入穩定階段,原本在舊版 windows xp 的相容性問題也解決了,需要注意的是,Let's Encrypt簽發的憑證有效期為3個月(90天),也就是說網站每3個月都需要重新更新一次憑證,但我們可以透過 renew script 來定期更新憑證。


ACME (Automated Certificate Management Environment) protocol


在 Let's Encrypt 的 How It Works 有說明實作的原理,是使用了 ACME (Automated Certificate Management Environment) Protocol,但是 Let's Encrypt 實作的是自己的 Automated Certificate Management Environment (ACME),實際上在 IETF 有另一個 ACME spec: IETF ACME,Let's Encrypt 規格文件中並不保證能夠影響 IETF ACME,Let's Encrypt 應該是為了在市場驗證才先提供這樣的服務,未來在 IETF ACME 標準化之後,或許可能會經歷一段相容性的過渡期。


Let's Encrypt 在去年 beta 時還是以 letsencrypt script 提供申請與更新的服務,今年就改以 EFF certbot release 正式版。


certbot


要安裝 certbot 之前,必須先將要申請的 domain name 的 DNS A record 指向到要申請的機器上,例如 named 的設定中,將 testdomain.com.tw 以及 c1.testdomain.com.tw 指向到 211.72.214.209。


# testdomain.zone

@   IN  A   211.72.214.209
c1  IN  A   211.72.214.209

要安裝 cerbot,就直接在 官網 的下面選擇要安裝的 web server 以及 OS,目前我們是選用 Apache on CentOS 6。


sudo yum install epel-release
wget https://dl.eff.org/certbot-auto
mv certbot-auto certbot

## 如果是 centos 7 則是直接用 yum 安裝 certbot
# yum install certbot

chmod a+x certbot

直接執行 cerbot-auto 可以將 certbot 所有需要的相依性套件都安裝好


./certbot

certbot 支援幾種 plugins:Apache, Webroot, Standalone, Manual, Nginx, 其他 plugins。Standalone 的部分適合機器上沒有 web server daemon 的設定方式,我們可以使用 Apache 或是 Webroot,Apache 或 Nginx 的方式比較簡單,如果是 Webroot 的方式,需要讓 web server 可以存取 .well-known 這個目錄的檔案。


如果只要申請 terdomain.com.tw 這個 domain 的 certificate,只需要執行以下的指令,並依照畫面的問題填寫內容。


./certbot --apache

如果需要一個簽章裡面有多個 sub domain,可以直接在 command line 填寫兩個 domain names。


./certbot --apache -d testdomain.com.tw -d c1.testdomain.com.tw -m test@testdomain.com.tw

可以用 ssltest 檢測 https://www.ssllabs.com/ssltest/analyze.html?d=testdomain.com.tw 憑證的狀態。


檢測結果是 C,ssltest 建議要 disable SSL3,disable RC4 cipher,修改 ssl.conf


SSLProtocol all -SSLv2 -SSLv3

SSLHonorCipherOrder on

SSLCipherSuite ECDH+AESGCM:DH+AESGCM:ECDH+AES256:DH+AES256:ECDH+AES128:DH+AES:ECDH+3DES:DH+3DES:RSA+AESGCM:RSA+AES:RSA+3DES:!aNULL:!MD5:!DSS

然後重新啟動 httpd,再去 ssltest 重新檢測一次,可以達到 Rating A。


Debian 8 安裝程序


安裝 certbot for apache web server & debian 8 (jessie)


sudo apt-get install python-certbot-apache -t jessie-backports

certbot --apache

自動更新 ssl


certbot renew --dry-run

將 certbot 放到 cronjob/systemd


certbot renew --quiet

自動更新


以下指令可測試 cerbot 自動更新是否可以正常運作


./certbot renew --dry-run

將 certbot-auto 移動到 /etc 目錄中,在 crontab 增加一行,每週一凌晨 1:00AM 進行一次 renew。


* 1 * * 1 root /etc/certbot renew --quiet

Rate Limit


Let's Encrypt 在發送簽證有幾個限制,最後兩個是針對開發 ACME client 的限制,可以用 staging environment 替代,就可以避免該限制:
Rate Limits for Let’s Encrypt


  1. 在單一簽證上,最多只支援 100 個 names

  2. 每週每個 domain 限制只能申請 20 個 certificates

  3. 每週每個 FQDN,只能申請 5 個 certificates

  4. 每 3 hrs 接受 500 次註冊動作

  5. 每週每個 account 只能驗證 300 次


References


Let's Encrypt


Let's Encrypt Get Started


letsencrypt


申請​​ Let's Encrypt 免費​ SSL ​​憑證於​在 NGINX​ 伺服器上配置和​自動更新教學


免費SSL加密: Let's Encrypt 設定教學

2016/8/15

Apache Zeppelin 測試記錄

Zeppelin 是 Big Data 資料分析的筆記本,以網頁為介面,筆記本可以共用、分享、共筆,支援 scala, java, shell script, markdown, SparkSQL 等語法,可以直接在網頁筆記本上面,製作筆記、執行並即時取得結果,在資料分析後,可以將結果直接以圖表方式展現出來,甚至還可以自訂自己的語法。


目前 Zeppelin git 的版本是 0.6,還沒有正式 release,脫離 incubator 的階段,成為一個成熟的產品。


Zeppelin Installation


安裝一些必要的套件


rpm -ivh jdk-8u72-linux-x64.rpm
yum install git nodejs npm libfontconfig

# install maven
wget http://www.eu.apache.org/dist/maven/maven-3/3.3.3/binaries/apache-maven-3.3.3-bin.tar.gz
sudo tar -zxf apache-maven-3.3.3-bin.tar.gz -C /usr/local/
sudo ln -s /usr/local/apache-maven-3.3.3/bin/mvn /usr/local/bin/mvn

直接由 git 取得 zeppelin 的 source code,然後進行編譯。


git clone https://github.com/apache/incubator-zeppelin

mv incubator-zeppelin zeppelin
mv zeppelin /usr/local/

cd /usr/local/zeppelin

mvn install -DskipTests

zeppelin-web 在建構過程中會出錯


首先要修改 zeppelin-web/pom.xml,把以下這個區塊的 plugin 註解掉:


<plugin>
        <groupId>com.github.eirslett</groupId>
....
</plugin>

cd zeppelin-web
mvn clean

npm install
./bower --allow-root install
./grunt build

mvn install -DskipTests

cd ..

mvn install -DskipTests

mvn package -Pspark-1.5 -Dhadoop.version=2.2.0 -Phadoop-2.2 -DskipTests

編譯後的結果會看到


[INFO] Reactor Summary:
[INFO]
[INFO] Zeppelin .......................................... SUCCESS [23.566s]
[INFO] Zeppelin: Interpreter ............................. SUCCESS [21.797s]
[INFO] Zeppelin: Zengine ................................. SUCCESS [9.997s]
[INFO] Zeppelin: Display system apis ..................... SUCCESS [5.388s]
[INFO] Zeppelin: Spark dependencies ...................... SUCCESS [33.753s]
[INFO] Zeppelin: Spark ................................... SUCCESS [10.863s]
[INFO] Zeppelin: Markdown interpreter .................... SUCCESS [4.452s]
[INFO] Zeppelin: Angular interpreter ..................... SUCCESS [3.246s]
[INFO] Zeppelin: Shell interpreter ....................... SUCCESS [3.275s]
[INFO] Zeppelin: Hive interpreter ........................ SUCCESS [6.779s]
[INFO] Zeppelin: HBase interpreter ....................... SUCCESS [6.773s]
[INFO] Zeppelin: Apache Phoenix Interpreter .............. SUCCESS [8.796s]
[INFO] Zeppelin: PostgreSQL interpreter .................. SUCCESS [4.135s]
[INFO] Zeppelin: JDBC interpreter ........................ SUCCESS [4.550s]
[INFO] Zeppelin: Tajo interpreter ........................ SUCCESS [4.240s]
[INFO] Zeppelin: Flink ................................... SUCCESS [7.043s]
[INFO] Zeppelin: Apache Ignite interpreter ............... SUCCESS [4.051s]
[INFO] Zeppelin: Kylin interpreter ....................... SUCCESS [3.608s]
[INFO] Zeppelin: Lens interpreter ........................ SUCCESS [5.966s]
[INFO] Zeppelin: Cassandra ............................... SUCCESS [19.541s]
[INFO] Zeppelin: Elasticsearch interpreter ............... SUCCESS [4.712s]
[INFO] Zeppelin: Alluxio interpreter ..................... SUCCESS [4.832s]
[INFO] Zeppelin: web Application ......................... SUCCESS [2.911s]
[INFO] Zeppelin: Server .................................. SUCCESS [47.256s]
[INFO] Zeppelin: Packaging distribution .................. SUCCESS [2.663s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 4:15.422s
[INFO] Finished at: Tue Mar 08 16:25:02 CST 2016
[INFO] Final Memory: 101M/289M
[INFO] ------------------------------------------------------------------------

調整 zeppelin 的設定


cp /usr/local/zeppelin/conf/zeppelin-env.sh.template /usr/local/zeppelin/conf/zeppelin-env.sh
cp /usr/local/zeppelin/conf/zeppelin-site.xml.template /usr/local/zeppelin/conf/zeppelin-site.xml

chmod 755 /usr/local/zeppelin/conf/zeppelin-site.xml

為了避免跟後面的 ambari 網頁介面的 port 衝突,我們修改 zeppelin 的 port 設定


vi /usr/local/zeppelin/conf/zeppelin-site.xml
# 將 zeppelin.server.port 改為 8000

<property>
  <name>zeppelin.server.port</name>
  <value>8000</value>
  <description>Server port.</description>
</property>

為了讓 java 可以直接使用 TCP IPv4,必須 disable CentOS 的 ipv6。


vi /etc/sysctl.conf
增加一行
net.ipv6.conf.all.disable_ipv6 = 1

vi /etc/sysconfig/network
增加一行
NETWORKING_IPV6=no

vi /etc/sysconfig/network-scripts/ifcfg-eth0
vi /etc/sysconfig/network-scripts/ifcfg-eth1
檢查要有此行
IPV6INIT="no"

echo 'options ipv6 disable=1' > /etc/modprobe.d/disable-ipv6.conf
service ip6tables stop; chkconfig ip6tables off

reboot

停掉 CentOS 7 的預設 firewall


systemctl stop firewalld
systemctl disable firewalld

啟動 zeppelin


/usr/local/zeppelin/bin/zeppelin-daemon.sh start

Log dir doesn't exist, create /usr/local/zeppelin/logs
Pid dir doesn't exist, create /usr/local/zeppelin/run
Zeppelin start                                             [  OK  ]

停止 zeppelin


/usr/local/zeppelin/bin/zeppelin-daemon.sh stop

Zeppelin 的主畫面如下,如果在右上角,看到的不是 "Connected",這就表示還需要安裝 Spark, Hadoop 等套件,要繼續往下安裝 Ambari,才能正常使用 Zeppelin。



Ambari for CentOS 7


如果直接連結 Zeppelin 網頁 http://192.168.1.24:8000/ ,會看到中間的內容是空白的,而且右上角的連線資訊是 "Disconnected",只有安裝 Zeppelin 還不夠。


我們還需要安裝 HDFS,Spark 等等後端的工具,但一個一個安裝非常地辛苦,有個簡便的安裝套件 ambari 可以使用,Apache Ambari 是用來管理並維護 Hadoop cluster 環境,他也提供了一個簡易的網頁介面,可以直接安裝 Hadoop Cluster 環境。


參考 如何使用Ambari部署HDP 以及 Ambari Installation in RHEL/CentOS/Oracle Linux 7 的說明,我們就能將 ambari 在 CentOS 7 設定好。因為只是一開始的測試用途,我並沒有安裝很多台機器,只安裝了一台,跟 Zeppelin 放在一起。


wget -nv http://public-repo-1.hortonworks.com/ambari/centos7/2.x/updates/2.1.0/ambari.repo -O /etc/yum.repos.d/ambari.repo

yum repolist

yum install ambari-server

# server 設定, 需要檢查 /etc/hosts 裡面有沒有 domain name 的對應設定,要將 hostname: zeppelin 對應到實際的 IP
vi /etc/hosts
192.168.1.24    zeppelin

ambari-server setup

# 啟動 ambari
ambari-server start

安裝並啟動 ambari-server 還不夠,我們還需要安裝 ambari-agent。


yum install ambari-agent

# 測試機器的 hostname 是 zeppelin
ambari-agent reset zeppelin

ambari-agent start

再來就是利用網頁 http://192.168.1.24:8080/ 以帳號:admin 密碼: admin,點擊主頁面上的“Launch Install Wizard”按鈕後,進入安裝程序。


我只填寫了一個節點: zeppelin,選擇服務是 HDFS,Zookeeper,Spark,Kafka,YARN,基本上都沒有調整什麼預設的設定,就可以安裝完成。





用Ambari跟Zeppelin來玩Apache Spark 提供了一個安裝 Zeppelin 的簡便方法,就是透過 Ambari service for Apache Zeppelin notebook 的協助,直接在 Ambari 安裝 Zeppelin,相信這個方法遇到的問題會少一點。以下紀錄一下他的做法,但我們還沒有實際測試過。


#下載zeppelin到Ambari
[sudouser@server1]$ VERSION=`hdp-select status hadoop-client | sed 's/hadoop-client - \([0-9]\.[0-9]\).*/\1/'`
[sudouser@server1]$ sudo git clone https://github.com/hortonworks-gallery/ambari-zeppelin-service.git   /var/lib/ambari-server/resources/stacks/HDP/$VERSION/services/ZEPPELIN

#重開ambari
[sudouser@server1]$ sudo su -
[root@server1]$ ambari-server restart

# 從開好之後從左邊最下面"Actions"選單"Add Service"
# 多了zeppelin的選項可以選,選擇之後一直Next就行了

Zeppelin json 異常


在測試 Zeppelin Tutorial 裡面的 Spark 功能的時候,一直遇到錯誤訊息。後來發現是這個原因:Apache Zeppelin & Spark 解析Json異常


處理方法,就是將 Zeppelin 使用的 Jackson jar 檔的版本都由 2.5 換成 2.4,然後重新啟動 Zeppelin 就可以了。


ls -al /usr/local/zeppelin/zeppelin-server/target/lib/jackson*
-rw-r--r-- 1 root root   39815  3月  8 16:24 jackson-annotations-2.5.0.jar
-rw-r--r-- 1 root root  229998  3月  8 16:24 jackson-core-2.5.3.jar
-rw-r--r-- 1 root root 1143162  3月  8 16:24 jackson-databind-2.5.3.jar

# 找到這些 2.4 版的 jackson jar,然後替換掉這些檔案
-rw-r--r-- 1 root root   38597 11月 25  2014 jackson-annotations-2.4.4.jar
-rw-r--r-- 1 root root  225302  3月  9 14:17 jackson-core-2.4.4.jar
-rw-r--r-- 1 root root 1076926  3月  9 14:17 jackson-databind-2.4.4.jar

Zeppelin Tutorial


參考 Zeppelin Tutorial 的說明,我們先建立一個新的 Notebook: test。



%sh 是直接執行 shell command


%sh
echo $PATH

接下來,先取得測試需要用的資料 bank-full.csv


%sh
rm /root/bank.zip
rm -rf /root/data
cd ~
wget http://archive.ics.uci.edu/ml/machine-learning-databases/00222/bank.zip
mkdir data
unzip bank.zip -d data
#rm bank.zip

因為 Zeppelin 預設是執行 scala 程式,所以我們可以直接呼叫 Spark 的 程式,下面的程式會將 csv 讀取進來,然後以 map 的方式對應到 Bank 這個物件上,再轉換成 DataFrame,產生 Temp Table。


val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val bankText = sc.textFile(s"/root/data/bank-full.csv")

case class Bank(age:Integer, job:String, marital : String, education : String, balance : Integer)

// split each line, filter out header (starts with "age"), and map it into Bank case class

val bank = bankText.map(s=>s.split(";")).filter(s=>s(0)!="\"age\"").map(
    s=>Bank(s(0).toInt, 
            s(1).replaceAll("\"", ""),
            s(2).replaceAll("\"", ""),
            s(3).replaceAll("\"", ""),
            s(5).replaceAll("\"", "").toInt
        )
)

// convert to DataFrame and create temporal table
bank.toDF().registerTempTable("bank")

接下來可以用 %sql 產生圖表


%sql
select age, count(1) value
from bank
where age<30
group by age
order by age


也可以讓 sql 讀取外部的參數,這個功能在 Zeppelin 稱為 Dynamic Form


%sql
select age, count(1) value
from bank
where age<${maxAge=30}
group by age
order by age


Dynamic Form 也可以做成 select 下拉選單的方式,調整參數。


%sql
select age, count(1)
from bank
where marital="${marital=single,single|divorced|married}"
group by age order by age

References


Spark交互式分析平台Apache Zeppelin的安裝


incubator-zeppelin/README.md


Apache Zeppelin安裝及介紹


How-to: Install Apache Zeppelin on CDH


【數據可視化】Zeppelin JDBC 數據可視化(WEB方式)

2016/8/8

Lambda Architecture


Lambda Architecture 這個名詞是 Storm 開發者 Nathan Marz 提出的,用來說明一個 generic, scalable and fault-tolerant data processing architecture 的 high level design 系統架構,這也是他在 Backtype 及 Twitter 多年進行分散式資料處理系統的經驗,而得到的一個一般化的架構結論。


基本概念


Lambda Architecture 的基本概念就是以下這個等式。


query = function(all data)

根據一個查詢的 function,對一個龐大的資料來源,進行分析與查詢,得到查詢後的結果,以往在大數據的資料分析中,都是先收集資料,將資料以 batch 的方式,存放在一個大數據資料庫中,然後資料分析人員再進入這個資料庫,進行數據分析。


然而因為資料收集的來源太多,資料量太大,往往取得資料時,已經過時了,一瞬間就成為了歷史資料。但在資料分析中,還有一些需求,希望能得到更即時的查詢結果,例如最近這 5 mins 的交易量,上線人數。


這時候,同樣的原始資料,必須要用另一種更快的方式處理,也就是以更快速的資料收集方式,只收集最近的資料,查詢也是針對最近的資料進行查詢。


雖然跟原本的歷史資料的資料來源都一樣,但是快速處理即時資料的部分,並不在意太久以前的歷史,所以他會直接將過時的資料丟掉,只保留最新的資料持續進行分析。


架構圖


以下五種架構圖分別來自不同的網頁(參考 references),都是在描述一個 Lambda Architecture 的架構組成。



  1. 進入系統的 data 分別傳入 batch 及speed layer,用以繼續後續的資料處理
  2. batch layer 有兩個功能:(1) 管理 master dataset,這是一個不能被修改而且只能一直增加的 raw dataset (2) 預先產生 batch view,讓 query 能做簡易的查詢
  3. serving layer 會整理 batch views,讓 query 能以 low-latency, ad-hoc 的方式查詢資料
  4. speed layer 彌補了 serving layer 更新速度太慢的問題,專注在處理最近的資料
  5. query 能夠整合 batch view 以及 real-time view 資訊的查詢結果




這個架構圖跟 Lambda Architecture 原始網站的架構一樣,但更精確地把上面五點文字說明的部分填寫到圖形上。





這是個簡化的架構圖。





跟第二個架構圖一樣,網頁上同時提供了每一個 layer 可能的系統,不過這個網頁已經是三年前的資訊了,現在應該有更好的解決方案


  1. batch layer: Apache Hadoop
  2. serving layer: Cloudera Impala
  3. speed layer: Storm, Apache HBase




這個架構圖比較明確地說明 spark 的 Lambda Architecture。


前端的原始資料以 Flume 或 Kafka 進行資料分流,分別導入 Spark Streaming data source 以及 HDFS 進行儲存,Spark Application 同時使用 HDFS 以及 Flume 或 Kafka 的資料進行分析,最後將分析後的資料存放到 HBase,提供並展示 Real-Time 分析結果。


Reference


Lambda Architecture


The Lambda architecture: principles for architecting realtime Big Data systems


Lambda Architecture: Achieving Velocity and Volume with Big Data


Lambda Architecture with Apache Spark


Apache Spark: Usage and Roadmap in Hadoop

2016/8/1

Kafka in Scala and Java

Kafka 的範例,可參考這個 project:A kafka producer and consumer example in scala and java,通常是先執行 Consumer,再執行 Producer,才會在 Consumer console 中看到 Producer 的訊息。


Kafka Comsumer Examples


Scala 版本


import java.util.Properties
import java.util.concurrent._
import scala.collection.JavaConversions._
import kafka.consumer.Consumer
import kafka.consumer.ConsumerConfig
import kafka.utils._
import kafka.utils.Logging
import kafka.consumer.KafkaStream

class ScalaConsumerExample(val zookeeper: String,
                           val groupId: String,
                           val topic: String,
                           val delay: Long) extends Logging {

  val config = createConsumerConfig(zookeeper, groupId)
  val consumer = Consumer.create(config)
  var executor: ExecutorService = null

  def shutdown() = {
    if (consumer != null)
      consumer.shutdown();
    if (executor != null)
      executor.shutdown();
  }

  def createConsumerConfig(zookeeper: String, groupId: String): ConsumerConfig = {
    val props = new Properties()
    props.put("zookeeper.connect", zookeeper);
    props.put("group.id", groupId);
    props.put("auto.offset.reset", "largest");
    props.put("zookeeper.session.timeout.ms", "400");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");
    val config = new ConsumerConfig(props)
    config
  }

  def run(numThreads: Int) = {
    val topicCountMap = Map(topic -> numThreads)
    val consumerMap = consumer.createMessageStreams(topicCountMap);
    val streams = consumerMap.get(topic).get;

    executor = Executors.newFixedThreadPool(numThreads);
    var threadNumber = 0;
    for (stream <- streams) {
      executor.submit(new ScalaConsumerTest(stream, threadNumber, delay))
      threadNumber += 1
    }
  }
}

object ScalaConsumerExample extends App {
  // 程式的進入點

  if (args.length <= 0) {
    val server = "192.168.1.7:2181";
    val group = "group1";
    val topic = "test";
    val delay = 0
    val numThreads = 10

    val example = new ScalaConsumerExample(server, group, topic, delay)
    example.run(numThreads)

  } else {
    val server = args(0)
    val group = args(1)
    val topic = args(2)
    val numThreads = args(3).toInt
    val delay = args(4).toLong

    val example = new ScalaConsumerExample(server, group, topic, delay)
    example.run(numThreads)
  }

}

class ScalaConsumerTest(val stream: KafkaStream[Array[Byte], Array[Byte]], val threadNumber: Int, val delay: Long) extends Logging with Runnable {
  def run {
    val it = stream.iterator()

    while (it.hasNext()) {
      val msg = new String(it.next().message());
      System.out.println(System.currentTimeMillis() + ",Thread " + threadNumber + ": " + msg);
    }

    System.out.println("Shutting down Thread: " + threadNumber);
  }
}

Java 版本


import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConsumerExample {
    private final ConsumerConnector consumer;
    private final String topic;
    private ExecutorService executor;
    private long delay;

    public ConsumerExample(String zookeeper, String groupId, String topic, long delay) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(zookeeper, groupId));
        this.topic = topic;
        this.delay = delay;
    }

    public void shutdown() {
        if (consumer != null)
            consumer.shutdown();
        if (executor != null)
            executor.shutdown();
    }

    public void run(int numThreads) {

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        executor = Executors.newFixedThreadPool(numThreads);
        int threadNumber = 0;
        for (final KafkaStream<byte[], byte[]> stream : streams) {
            executor.submit(new ConsumerTest(consumer, stream, threadNumber, delay));
            threadNumber++;
        }
    }

    private static ConsumerConfig createConsumerConfig(String zookeeper, String groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeper);
        props.put("auto.offset.reset", "largest");
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        //props.put("auto.commit.enable", "false");

        return new ConsumerConfig(props);
    }

    public static void main(String[] args) throws InterruptedException {

        String args0 = "";
        String args1 = "";
        String args2 = "";
        int args3 = 0;
        long args4 = 0;

        if (args.length <= 0) {
            args0 = "192.168.1.7:2181";
            args1 = "group1";
            args2 = "test";
            args3 = 10;
            args4 = 0;
        } else {
            args0 = args[0];
            args1 = args[1];
            args2 = args[2];
            args3 = Integer.parseInt(args[3]);
            args4 = Long.parseLong(args[4]);
        }

        String zooKeeper = args0;
        String groupId = args1;
        String topic = args2;
        int threads = args3;
        long delay = args4;

        ConsumerExample example = new ConsumerExample(zooKeeper, groupId, topic, delay);
        example.run(threads);

        Thread.sleep(24 * 60 * 60 * 1000);

        example.shutdown();
    }
}

Kafka Producer Examples


Scala 版本


import kafka.producer.ProducerConfig
import java.util.Properties
import scala.util.Random
import kafka.producer.Producer
import kafka.producer.KeyedMessage
import java.util.Date

object ScalaProducerExample extends App {

  // java -cp kafka_example-0.1.0-SNAPSHOT.jar com.colobu.kafka.ScalaProducerExample 10000 colobu localhost:9092
  var args0 = 0;
  var args1 = "";
  var args2 = "";

  if (args.length <= 0) {
    args0 = 500
    args1 = "test"
    args2 = "192.168.1.7:9092,192.168.1.7:9093,192.168.1.7:9094"

  } else {
    args0 = args(0).toInt
    args1 = args(1)
    args2 = args(2)
  }


  val events = args0
  val topic = args1
  val brokers = args2
  val rnd = new Random()

  val props = new Properties()
  props.put("metadata.broker.list", brokers)
  props.put("serializer.class", "kafka.serializer.StringEncoder")
  //props.put("partitioner.class", "com.colobu.kafka.SimplePartitioner")
  props.put("producer.type", "async")
  //props.put("request.required.acks", "1")


  val config = new ProducerConfig(props)
  val producer = new Producer[String, String](config)
  val t = System.currentTimeMillis()

  for (nEvents <- Range(0, events)) {
    val runtime = new Date().getTime()
    val ip = "192.168.2." + rnd.nextInt(255)
    val msg = runtime + "," + nEvents + ",www.example.com," + ip
    val data = new KeyedMessage[String, String](topic, ip, msg)
    producer.send(data)
  }

  System.out.println("sent per second: " + events * 1000 / (System.currentTimeMillis() - t));
  producer.close();
}

Java 版本


import java.util.Date;
import java.util.Properties;
import java.util.Random;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class ProducerExample {
    public static void main(String[] args) {

        long args0 = 0;
        String args1 = "";
        String args2 = "";

        if (args.length <= 0) {
            args0 = 500;
            args1 = "test";
            args2 = "192.168.1.7:9092,192.168.1.7:9093,192.168.1.7:9094";

        } else {
            args0 = Long.parseLong(args[0]);
            args1 = args[1];
            args2 = args[2];
        }


        long events = args0;
        String topic = args1;
        String brokers = args2;
        Random rnd = new Random();
 
        Properties props = new Properties();
        props.put("metadata.broker.list", brokers);
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        //props.put("partitioner.class", "com.colobu.kafka.SimplePartitioner");
        props.put("producer.type", "async");
        //props.put("request.required.acks", "1");
 
        ProducerConfig config = new ProducerConfig(props);
 
        Producer<String, String> producer = new Producer<String, String>(config);
 
        long t = System.currentTimeMillis();
        for (long nEvents = 0; nEvents < events; nEvents++) { 
               long runtime = new Date().getTime();  
               String ip = "192.168.2." + rnd.nextInt(255); 
               String msg = runtime + "," + nEvents + ",www.example.com," + ip; 
               KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, ip, msg);
               producer.send(data);
        }
        
        System.out.println("sent per second: " + events * 1000/ (System.currentTimeMillis() - t));
        producer.close();
    }
}

Reference


ProducerExample.scala


kafka-storm-starter


Getting Started with Kafka from Scala: Scala Clients


0.8.0 Producer Example


0.8.0 SimpleConsumer Example


社區電商系統架構之消息隊列篇:kafka的實驗

2016/7/25

Kafka Quick Start


因為 Kafka 依賴 ZooKeeper 來處理分散式系統的備援機制,以下由 Kafka 的安裝與測試,進一步討論如何用 Scala 來撰寫 Producer 與 Consumer。


ZooKeeper Server


Zookeeper 提供目錄和節點的服務,當有兩台伺服器啟動時,會在zookeeper的指定目錄下創建對應自己的臨時節點(這個過程稱為“註冊”),所謂臨時節點,是靠 heartbeat(定時向zookeeper伺服器發送訊息)維持,當伺服器出現故障,zookeeper 就會刪除臨時節點。當伺服器向zookeeper註冊時,zookeeper會分配序列號,我們認為序列號小的那個,就是“主”,序列號大的那個,就是“備援機”(slave)。


當有客戶端需要使用“寫”服務時,需要連接zookeeper,獲得指定目錄下的臨時節點列表,也就是已經註冊的伺服器信息,取得序列號小的那台“主”伺服器的地址,進行後續的訪問操作。以達到“總是訪問主伺服器”的目的。


接下來處理 ZooKeeper 的設定,首先下載 kafka


wget http://apache.stu.edu.tw/kafka/0.9.0.1/kafka_2.10-0.9.0.1.tgz

tar zxvf kafka_2.11-0.9.0.1.tgz

因為 kafka 必須要連結到 ZooKeeper,所以在啟動 kafka server 之前,必須要先啟動 ZooKeeper,通常 ZooKeeper cluster 以三個節點互相備援,且必須要分散配置在三台機器上,目前只是前期測試,就先只啟動一個 ZooKeeper server。


ZooKeeper 的設定檔在 config/zookeeper.properties 裡面,看一下內容,用到 /tmp/zookeeper 這個資料夾,並使用到 TCP Port 2181 作為服務的 port:


# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper

# the port at which the clients will connect
clientPort=2181

# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0

如果為了未來調整設定方便,可以設定環境變數 KAFKA_HOME


KAFKA_HOME=/root/download/kafka/kafka_2.11-0.9.0.1

啟動 ZooKeeper Server


nohup $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties &

在啟動指令,加上 -daemon 也可以,這應該是比剛剛的方法還正確的方式


$KAFKA_HOME/bin/zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties

關閉 ZooKeeper Server


$KAFKA_HOME/bin/zookeeper-server-stop.sh $KAFKA_HOME/config/zookeeper.properties &

啟動完成後,會在 netstat 裡面看到 TCP 2181 Port


# netstat -tnlp|grep 2181
tcp        0      0 0.0.0.0:2181                0.0.0.0:*                   LISTEN      30841/java



我們也可以使用 Apache ZooKeeper 的套件包 zookeeper-3.4.8.tar.gz,不要用 Kafka 內建的 zookeeper。


ZOOKEEPER_HOME=/root/download/kafka/zookeeper-3.4.8
cp $ZOOKEEPER_HOME/conf/zoo_sample.cfg $ZOOKEEPER_HOME/conf/zoo.cfg

啟動 zookeeper server


$ZOOKEEPER_HOME/bin/zkServer.sh start

查看 zookeeper server status


$ZOOKEEPER_HOME/bin/zkServer.sh status

Kafka Broker


因為 Kafka 可以設定 replication-factor,讓資料複製到多個 Broker。


Kafka Broker server 的設定檔在 config/server.properties
裡面,首先要在設定檔中加上這一行,讓我們可以刪除 topic。


delete.topic.enable=true

然後看設定檔的最前面,broker.id 以及 listeners的設定,這是在多個 broker 時,有需要調整的設定項目。


broker.id=0

listeners=PLAINTEXT://:9092
# The port the socket server listens on
port=9092

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# Zookeeper connection string (see zookeeper docs for details).
zookeeper.connect=localhost:2181

如果只要執行一個 Kafka Broker 測試,直接這樣執行就好了,因為接下來要安裝 kafka-manager,所以啟動時順道加上 JMX_PORT 的資訊。


env JMX_PORT=8092 $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

如果要再執行兩個 Broker,要先把設定檔調整好


cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

調整 config/server-1.properties


broker.id=1

listeners=PLAINTEXT://:9093

log.dirs=/tmp/kafka-logs-1

delete.topic.enable=true

調整 config/server-2.properties


broker.id=2

listeners=PLAINTEXT://:9094

log.dirs=/tmp/kafka-logs-2

zookeeper.connect=localhost:2181

接下來再啟動兩個 kafka servers


env JMX_PORT=8093 $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties

env JMX_PORT=8094 $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties

netstat 可看到這三個 Brokers 的 Port


# netstat -tnlp|grep 909
tcp        0      0 0.0.0.0:9092                0.0.0.0:*                   LISTEN      2694/java
tcp        0      0 0.0.0.0:9093                0.0.0.0:*                   LISTEN      3593/java
tcp        0      0 0.0.0.0:9094                0.0.0.0:*                   LISTEN      3657/java

Topic


啟動 Kafka Broker 之後,必須先建立 topic。


因為我們只有三個 Broker,所以 replication-factor 只能指定為 3,如果指定為 4,就會發生 kafka.admin.AdminOperationException。


$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 4 --partitions 1 --topic test
Error while executing topic command : replication factor: 4 larger than available brokers: 3
[2016-02-26 16:17:15,436] ERROR kafka.admin.AdminOperationException: replication factor: 4 larger than available brokers: 3
    at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:77)
    at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:236)
    at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:105)
    at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
    at kafka.admin.TopicCommand.main(TopicCommand.scala)
 (kafka.admin.TopicCommand$)

所以我們將 replication-factor 改為 3


$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic test
Created topic "test".

可以用指令查看 kafka-topic 的資訊,ReplicationFactor是3份,
Replicate 在 0, 1, 2 這三台broker上面,
topic的leader是 broker id 0,leader負責 partition 的read and write。Isr的意思是有哪些 broker 正在同步當中,簡單的說可以知道哪些broker是活著的。


$KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test  PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: test Partition: 0    Leader: 0   Replicas: 0,1,2 Isr: 0,1,2

可以用以下指令刪除 topic


$KAFKA_HOME/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

刪除了 test 這個 topic 之後,就查不到 topic 的資訊了


Message Producer Consumer 測試


啟動 Console Producer,啟動後會進入互動模式,可以一直往 test 這個 topic 發送文字訊息。


$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test

啟動 Console Consumer,這裡要注意的是,Consumer 是連結到 ZooKeeper 而不是直接連到 Broker,--from-beginning 這個參數的意思是要從頭開始,而不是從連結的時候開始。


bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

如果在啟動 consumer 之前,已經在 producer 對 topic: test 發送了幾個訊息,那啟動 consumer 如果有加上 --from-beginning 這個參數,就會把先前那幾個訊息也列印出來。


如果沒有加上 --from-beginning 這個參數,就會從啟動 consumer 開始接收後面的訊息。


列印所有 consumer group 的資訊


bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list

刪除 consumer group: test


bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --delete -group test

yahoo kafka-manager


Yahoo 對 Kafka 開發了一個 kafka-manager 網頁管理介面


我們在這個連結下載 kafka-manager 1.3.0.4.tar.gz


解壓縮後,利用 sbt 就可以建置 kadka-manager 的 production deployment package


sbt clean dist

可以在這個目錄中,找到編譯後的 zip file


kafka-manager-1.3.0.4/target/universal/kafka-manager-1.3.0.4.zip

KAFKA_MANAGER_HOME=/root/download/kafka/kafka-manager-1.3.0.4

解壓縮 kafka-manager-1.3.0.4.zip 可以看到 bin, conf, lib, share 這些資料夾。


首先修改 conf/application.conf 裡面的 zkhosts


kafka-manager.zkhosts="192.168.1.7:2181"

然後用這個指令,啟動 kafka-manager


nohup $KAFKA_MANAGER_HOME/bin/kafka-manager -Dconfig.file=$KAFKA_MANAGER_HOME/conf/application.conf -Dhttp.port=9000 > /dev/null 2>&1

依照這個畫面的資訊,把 zookeeper 加入 kafka cluster list



接下來就可以用網頁查看 Kafka cluster 的資訊



References


kafka 文件


kafka-example-in-scala


Apache kafka原理與特性(0.8V)


Deploy Apache Kafka Cluster on AWS


Apache Kafka: Distributed messaging system


架構師一席談(二) zookeeper在分佈式應用中的作用


ZooKeeper 基礎知識、部署和應用程序


kafka集群操作指南