Friday 30 January 2015

Kafka Producer and Consumer Examples

1. Producer

Kafka producers automatically find out the lead broker for the topic as well as partition it by raising a request for the metadata before it sends any message to the the broker.
Below class determines the partitioning in the topic where the message needs to be sent. If the key is null, Kafka uses random partitioning for message assignment.

import kafka.producer.Partitioner;
public class SimplePartitioner implements Partitioner {
    public int partition(Integer key, int numPartitions) {
        int partition = 0;
        int iKey = key;
        if (iKey > 0) {
           partition = iKey % numPartitions;
        }
       return partition;
  }
}



import java.util.Properties;
import java.util.Random;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class MultiBrokerProducer {
    private static Producer producer;
    private final Properties props = new Properties();

    public MultiBrokerProducer()
    {
    props.put("metadata.broker.list","localhost:9092, localhost:9093");
      props.put("serializer.class","kafka.serializer.StringEncoder"); 

      props.put("partitioner.class", "test.kafka.SimplePartitioner");

      props.put("request.required.acks", "1");

      ProducerConfig config = new ProducerConfig(props); 
      producer = new Producer(config);
    }

  public static void main(String[] args) {
    MultiBrokerProducer sp = new MultiBrokerProducer();
        Random rnd = new Random();

        String topic = (String) args[0];
        for (long messCount = 0; messCount < 10; messCount++) { 
               Integer key = rnd.nextInt(255); 
               String msg = "This message is for key - " + key; 
               KeyedMessage data1 = new KeyedMessage(topic, key, msg);
               producer.send(data1);
        }
    producer.close();
  }

}



2. Consumer

A multithreaded high-level consumer is usually based on the number of partitions in the topic and follows a one-to-one mapping approach between the thread and the partitions within the topic.


import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class MultiThreadHLConsumer {

  private ExecutorService executor;
  private final ConsumerConnector consumer;
  private final String topic;

  public MultiThreadHLConsumer(String zookeeper, String groupId, String topic) {

    Properties props = new Properties();
    props.put("zookeeper.connect", zookeeper);
    props.put("group.id", groupId);
    props.put("zookeeper.session.timeout.ms", "500");
    props.put("zookeeper.sync.time.ms", "250");
    props.put("auto.commit.interval.ms", "1000");

    consumer = Consumer.createJavaConsumerConnector(newConsumerConfig(props));
    this.topic = topic;
  }

  public void testConsumer(int threadCount) {

    Map topicCount = new HashMap();

    // Define thread count for each topic
    topicCount.put(topic, new Integer(threadCount));

    // Here we have used a single topic but we can also add 
    // multiple topics to topicCount MAP 
    Map>> consumerStreams = consumer.createMessageStreams(topicCount);

    List> streams = consumerStreams.get(topic);

    // Launching the thread pool
    executor = Executors.newFixedThreadPool(threadCount);

    //Creating an object messages consumption
    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
      ConsumerIterator consumerIte = stream.iterator();
      threadNumber++;

      while (consumerIte.hasNext())
    System.out.println("Message from thread :: " + threadNumber + " -- " + new String(consumerIte.next().message()));
    }
    if (consumer != null)
      consumer.shutdown();
    if (executor != null)
      executor.shutdown();
  }

  public static void main(String[] args) {

    String topic = args[0];
    int threadCount = Integer.parseInt(args[1]);
    MultiThreadHLConsumer simpleHLConsumer = new MultiThreadHLConsumer("localhost:2181", "testgroup", topic);
    simpleHLConsumer.testConsumer(threadCount);
  }
}



Scala Version Example:
https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice2/

No comments:

Post a Comment