Wednesday 4 March 2015

A Kafka Avro Producer Example

Below is a method of a Kafka producer, which sends tweets in avro format to Kafka.


public void store(Status status) throws IOException, InterruptedException{

 final String zkConnection = kafkaHostname + ":" + kafkaPort;
 final String topic = kafkaTopicName;

 TwitterStatusUpdate update = TwitterStatusUpdateConverter.convert(status);

 ByteArrayOutputStream out = new ByteArrayOutputStream();
 DatumWriter writer = 
 new SpecificDatumWriter(TwitterStatusUpdate.class);

 Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
 writer.write(update, encoder);
 encoder.flush();
 out.close();

 Message message = new Message(out.toByteArray());
 if (producer == null) {
  Properties props = new Properties();
  props.put("zk.connect", zkConnection);
  props.put("producer.type", "async");
  props.put("compression.codec", "1");
  producer = new kafka.javaapi.producer.Producer
  (new ProducerConfig(props));
 }
 producer.send(new ProducerData(topic, message);

}

No comments:

Post a Comment