Kafka producers automatically find out the lead broker for the topic as well as partition it by raising a request for the metadata before it sends any message to the the broker.
Below class determines the partitioning in the topic where the message needs to be sent. If the key is null, Kafka uses random partitioning for message assignment.
import kafka.producer.Partitioner;
public class SimplePartitioner implements Partitioner{ public int partition(Integer key, int numPartitions) { int partition = 0; int iKey = key; if (iKey > 0) { partition = iKey % numPartitions; } return partition; } } import java.util.Properties; import java.util.Random; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class MultiBrokerProducer {
private static Producerproducer; private final Properties props = new Properties(); public MultiBrokerProducer() { props.put("metadata.broker.list","localhost:9092, localhost:9093"); props.put("serializer.class","kafka.serializer.StringEncoder"); props.put("partitioner.class", "test.kafka.SimplePartitioner"); props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); producer = new Producer (config); }
public static void main(String[] args) { MultiBrokerProducer sp = new MultiBrokerProducer(); Random rnd = new Random(); String topic = (String) args[0]; for (long messCount = 0; messCount < 10; messCount++) { Integer key = rnd.nextInt(255); String msg = "This message is for key - " + key; KeyedMessage data1 = new KeyedMessage (topic, key, msg); producer.send(data1); } producer.close(); }
}
2. Consumer
A multithreaded high-level consumer is usually based on the number of partitions in the topic and follows a one-to-one mapping approach between the thread and the partitions within the topic.
import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class MultiThreadHLConsumer { private ExecutorService executor; private final ConsumerConnector consumer; private final String topic; public MultiThreadHLConsumer(String zookeeper, String groupId, String topic) { Properties props = new Properties(); props.put("zookeeper.connect", zookeeper); props.put("group.id", groupId); props.put("zookeeper.session.timeout.ms", "500"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); consumer = Consumer.createJavaConsumerConnector(newConsumerConfig(props)); this.topic = topic; } public void testConsumer(int threadCount) { MaptopicCount = new HashMap (); // Define thread count for each topic topicCount.put(topic, new Integer(threadCount)); // Here we have used a single topic but we can also add // multiple topics to topicCount MAP Map >> consumerStreams = consumer.createMessageStreams(topicCount); List > streams = consumerStreams.get(topic); // Launching the thread pool executor = Executors.newFixedThreadPool(threadCount); //Creating an object messages consumption int threadNumber = 0; for (final KafkaStream stream : streams) { ConsumerIterator consumerIte = stream.iterator(); threadNumber++; while (consumerIte.hasNext()) System.out.println("Message from thread :: " + threadNumber + " -- " + new String(consumerIte.next().message())); } if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); } public static void main(String[] args) { String topic = args[0]; int threadCount = Integer.parseInt(args[1]); MultiThreadHLConsumer simpleHLConsumer = new MultiThreadHLConsumer("localhost:2181", "testgroup", topic); simpleHLConsumer.testConsumer(threadCount); } }
Scala Version Example:
https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice2/
No comments:
Post a Comment