public void store(Status status) throws IOException, InterruptedException{
final String zkConnection = kafkaHostname + ":" + kafkaPort;
final String topic = kafkaTopicName;
TwitterStatusUpdate update = TwitterStatusUpdateConverter.convert(status);
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter writer =
new SpecificDatumWriter(TwitterStatusUpdate.class);
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(update, encoder);
encoder.flush();
out.close();
Message message = new Message(out.toByteArray());
if (producer == null) {
Properties props = new Properties();
props.put("zk.connect", zkConnection);
props.put("producer.type", "async");
props.put("compression.codec", "1");
producer = new kafka.javaapi.producer.Producer
(new ProducerConfig(props));
}
producer.send(new ProducerData(topic, message);
}
Wednesday, 4 March 2015
A Kafka Avro Producer Example
Below is a method of a Kafka producer, which sends tweets in avro format to Kafka.
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment