2016/8/1

Kafka in Scala and Java

Kafka 的範例,可參考這個 project:A kafka producer and consumer example in scala and java,通常是先執行 Consumer,再執行 Producer,才會在 Consumer console 中看到 Producer 的訊息。


Kafka Comsumer Examples


Scala 版本


import java.util.Properties
import java.util.concurrent._
import scala.collection.JavaConversions._
import kafka.consumer.Consumer
import kafka.consumer.ConsumerConfig
import kafka.utils._
import kafka.utils.Logging
import kafka.consumer.KafkaStream

class ScalaConsumerExample(val zookeeper: String,
                           val groupId: String,
                           val topic: String,
                           val delay: Long) extends Logging {

  val config = createConsumerConfig(zookeeper, groupId)
  val consumer = Consumer.create(config)
  var executor: ExecutorService = null

  def shutdown() = {
    if (consumer != null)
      consumer.shutdown();
    if (executor != null)
      executor.shutdown();
  }

  def createConsumerConfig(zookeeper: String, groupId: String): ConsumerConfig = {
    val props = new Properties()
    props.put("zookeeper.connect", zookeeper);
    props.put("group.id", groupId);
    props.put("auto.offset.reset", "largest");
    props.put("zookeeper.session.timeout.ms", "400");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");
    val config = new ConsumerConfig(props)
    config
  }

  def run(numThreads: Int) = {
    val topicCountMap = Map(topic -> numThreads)
    val consumerMap = consumer.createMessageStreams(topicCountMap);
    val streams = consumerMap.get(topic).get;

    executor = Executors.newFixedThreadPool(numThreads);
    var threadNumber = 0;
    for (stream <- streams) {
      executor.submit(new ScalaConsumerTest(stream, threadNumber, delay))
      threadNumber += 1
    }
  }
}

object ScalaConsumerExample extends App {
  // 程式的進入點

  if (args.length <= 0) {
    val server = "192.168.1.7:2181";
    val group = "group1";
    val topic = "test";
    val delay = 0
    val numThreads = 10

    val example = new ScalaConsumerExample(server, group, topic, delay)
    example.run(numThreads)

  } else {
    val server = args(0)
    val group = args(1)
    val topic = args(2)
    val numThreads = args(3).toInt
    val delay = args(4).toLong

    val example = new ScalaConsumerExample(server, group, topic, delay)
    example.run(numThreads)
  }

}

class ScalaConsumerTest(val stream: KafkaStream[Array[Byte], Array[Byte]], val threadNumber: Int, val delay: Long) extends Logging with Runnable {
  def run {
    val it = stream.iterator()

    while (it.hasNext()) {
      val msg = new String(it.next().message());
      System.out.println(System.currentTimeMillis() + ",Thread " + threadNumber + ": " + msg);
    }

    System.out.println("Shutting down Thread: " + threadNumber);
  }
}

Java 版本


import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConsumerExample {
    private final ConsumerConnector consumer;
    private final String topic;
    private ExecutorService executor;
    private long delay;

    public ConsumerExample(String zookeeper, String groupId, String topic, long delay) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(zookeeper, groupId));
        this.topic = topic;
        this.delay = delay;
    }

    public void shutdown() {
        if (consumer != null)
            consumer.shutdown();
        if (executor != null)
            executor.shutdown();
    }

    public void run(int numThreads) {

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        executor = Executors.newFixedThreadPool(numThreads);
        int threadNumber = 0;
        for (final KafkaStream<byte[], byte[]> stream : streams) {
            executor.submit(new ConsumerTest(consumer, stream, threadNumber, delay));
            threadNumber++;
        }
    }

    private static ConsumerConfig createConsumerConfig(String zookeeper, String groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeper);
        props.put("auto.offset.reset", "largest");
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        //props.put("auto.commit.enable", "false");

        return new ConsumerConfig(props);
    }

    public static void main(String[] args) throws InterruptedException {

        String args0 = "";
        String args1 = "";
        String args2 = "";
        int args3 = 0;
        long args4 = 0;

        if (args.length <= 0) {
            args0 = "192.168.1.7:2181";
            args1 = "group1";
            args2 = "test";
            args3 = 10;
            args4 = 0;
        } else {
            args0 = args[0];
            args1 = args[1];
            args2 = args[2];
            args3 = Integer.parseInt(args[3]);
            args4 = Long.parseLong(args[4]);
        }

        String zooKeeper = args0;
        String groupId = args1;
        String topic = args2;
        int threads = args3;
        long delay = args4;

        ConsumerExample example = new ConsumerExample(zooKeeper, groupId, topic, delay);
        example.run(threads);

        Thread.sleep(24 * 60 * 60 * 1000);

        example.shutdown();
    }
}

Kafka Producer Examples


Scala 版本


import kafka.producer.ProducerConfig
import java.util.Properties
import scala.util.Random
import kafka.producer.Producer
import kafka.producer.KeyedMessage
import java.util.Date

object ScalaProducerExample extends App {

  // java -cp kafka_example-0.1.0-SNAPSHOT.jar com.colobu.kafka.ScalaProducerExample 10000 colobu localhost:9092
  var args0 = 0;
  var args1 = "";
  var args2 = "";

  if (args.length <= 0) {
    args0 = 500
    args1 = "test"
    args2 = "192.168.1.7:9092,192.168.1.7:9093,192.168.1.7:9094"

  } else {
    args0 = args(0).toInt
    args1 = args(1)
    args2 = args(2)
  }


  val events = args0
  val topic = args1
  val brokers = args2
  val rnd = new Random()

  val props = new Properties()
  props.put("metadata.broker.list", brokers)
  props.put("serializer.class", "kafka.serializer.StringEncoder")
  //props.put("partitioner.class", "com.colobu.kafka.SimplePartitioner")
  props.put("producer.type", "async")
  //props.put("request.required.acks", "1")


  val config = new ProducerConfig(props)
  val producer = new Producer[String, String](config)
  val t = System.currentTimeMillis()

  for (nEvents <- Range(0, events)) {
    val runtime = new Date().getTime()
    val ip = "192.168.2." + rnd.nextInt(255)
    val msg = runtime + "," + nEvents + ",www.example.com," + ip
    val data = new KeyedMessage[String, String](topic, ip, msg)
    producer.send(data)
  }

  System.out.println("sent per second: " + events * 1000 / (System.currentTimeMillis() - t));
  producer.close();
}

Java 版本


import java.util.Date;
import java.util.Properties;
import java.util.Random;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class ProducerExample {
    public static void main(String[] args) {

        long args0 = 0;
        String args1 = "";
        String args2 = "";

        if (args.length <= 0) {
            args0 = 500;
            args1 = "test";
            args2 = "192.168.1.7:9092,192.168.1.7:9093,192.168.1.7:9094";

        } else {
            args0 = Long.parseLong(args[0]);
            args1 = args[1];
            args2 = args[2];
        }


        long events = args0;
        String topic = args1;
        String brokers = args2;
        Random rnd = new Random();
 
        Properties props = new Properties();
        props.put("metadata.broker.list", brokers);
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        //props.put("partitioner.class", "com.colobu.kafka.SimplePartitioner");
        props.put("producer.type", "async");
        //props.put("request.required.acks", "1");
 
        ProducerConfig config = new ProducerConfig(props);
 
        Producer<String, String> producer = new Producer<String, String>(config);
 
        long t = System.currentTimeMillis();
        for (long nEvents = 0; nEvents < events; nEvents++) { 
               long runtime = new Date().getTime();  
               String ip = "192.168.2." + rnd.nextInt(255); 
               String msg = runtime + "," + nEvents + ",www.example.com," + ip; 
               KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, ip, msg);
               producer.send(data);
        }
        
        System.out.println("sent per second: " + events * 1000/ (System.currentTimeMillis() - t));
        producer.close();
    }
}

Reference


ProducerExample.scala


kafka-storm-starter


Getting Started with Kafka from Scala: Scala Clients


0.8.0 Producer Example


0.8.0 SimpleConsumer Example


社區電商系統架構之消息隊列篇:kafka的實驗