public void store(Status status) throws IOException, InterruptedException{ final String zkConnection = kafkaHostname + ":" + kafkaPort; final String topic = kafkaTopicName; TwitterStatusUpdate update = TwitterStatusUpdateConverter.convert(status); ByteArrayOutputStream out = new ByteArrayOutputStream(); DatumWriterwriter = new SpecificDatumWriter (TwitterStatusUpdate.class); Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); writer.write(update, encoder); encoder.flush(); out.close(); Message message = new Message(out.toByteArray()); if (producer == null) { Properties props = new Properties(); props.put("zk.connect", zkConnection); props.put("producer.type", "async"); props.put("compression.codec", "1"); producer = new kafka.javaapi.producer.Producer (new ProducerConfig(props)); } producer.send(new ProducerData (topic, message); }
Wednesday 4 March 2015
A Kafka Avro Producer Example
Below is a method of a Kafka producer, which sends tweets in avro format to Kafka.
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment