Camus (https://github.com/linkedin/camus) is another art of work done by LinkedIn, which provides a pipeline from Kafka to HDFS. Under this project, a single MapReduce job performs the following steps for loading data to HDFS in a distributed manner:
- As a first step, it discovers the latest topics and partition offsets from ZooKeeper.
- Each task in the MapReduce job fetches events from the Kafka broker and commits the pulled data along with the audit count to the output folders.
- After the completion of the job, final offsets are written to HDFS, which can be further consumed by subsequent MapReduce jobs.
- Information about the consumed messages is also updated in the Kafka cluster.
$hadoop jar camus-example-0.1.0-SNAPSHOT-shaded.jar com.linkedin.camus.etl.kafka.CamusJob -P camus.properties
Issue 0:
java.lang.incompatiableclasschangeerror: found interface org.apache.mapreduce.jobcontext , but expected class
Solution:
This is because the incompatibility between hadoop1 and hadoop 2.
in pom.xml, set hadoop version as 2.2.0
Issue 1:
CamusJob] - Unable to pull requests from Kafka brokers. Exiting the program
java.lang.NumberFormatException: For input string: ""
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:48)
at java.lang.Integer.parseInt(Integer.java:470)
at java.lang.Integer.parseInt(Integer.java:499)
at org.apache.hadoop.conf.Configuration.getInt(Configuration.java:1060)
at com.linkedin.camus.etl.kafka.CamusJob.getKafkaBufferSize(CamusJob.java:744)
specify some Kafka-related properties or comment it (this way Camus will use default values):
# Fetch Request Parameters
# kafka.fetch.buffer.size=
# kafka.fetch.request.correlationid=
# kafka.fetch.request.max.wait=
# kafka.fetch.request.min.bytes=
# Connection parameters.
kafka.brokers=localhost:9092
# kafka.timeout.value=
Issue 2:
[CamusJob] - failed to create decoder
com.linkedin.camus.coders.MessageDecoderException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.linkedin.batch.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder not found
at com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory.createMessageDecoder(MessageDecoderFactory.java:28)
at com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.createMessageDecoder(EtlInputFormat.java:408)
Solution:
Chang to the following line in camus.properties file
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder
Issue 3:
[CamusJob] - failed to create decoder
com.linkedin.camus.coders.MessageDecoderException: com.linkedin.camus.coders.MessageDecoderException: java.lang.ClassNotFoundException: com.linkedin.camus.example.DummySchemaRegistry
at com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory.createMessageDecoder(MessageDecoderFactory.java:28)
Solution:
Chang to the following line in camus.properties file
kafka.message.coder.schema.registry.class=com.linkedin.camus.example. schemaregistry.DummySchemaRegistry
Issue 4: Register Avro Schema in Memory
If we have a few kafka topics, and not frequently changed, we can use a memory-based avro schema register.
1. need to know topic avsc
2. automatically generate topic.java files
3. Register in DummySchemaRegistry.java
Solution :
Chang to the following line in camus.properties file
kafka.message.coder.schema.registry.class=com.linkedin.camus.example. schemaregistry.DummySchemaRegistry
Issue 4: Register Avro Schema in Memory
If we have a few kafka topics, and not frequently changed, we can use a memory-based avro schema register.
1. need to know topic avsc
2. automatically generate topic.java files
3. Register in DummySchemaRegistry.java
[CamusJob] - failed to create decoder
com.linkedin.camus.coders. MessageDecoderException: com.linkedin.camus.coders. MessageDecoderException: java.lang. InstantiationException: com.linkedin.camus.example. schemaregistry. DummySchemaRegistry
at com.linkedin.camus.etl.kafka. coders.MessageDecoderFactory. createMessageDecoder( MessageDecoderFactory.java:28)
Solution :
This issue usually means the topic name isn't registered in the schema registry.
1. Generate DummyLog.java by avro-tool and DummyLog.avsc,
2. Register the topic-> schema pair in DummeySchemaRegistry.java
2. Register the topic-> schema pair in DummeySchemaRegistry.java
public class DummySchemaRegistry extends AvroMemorySchemaRegistry { public DummySchemaRegistry() { super();
//register topic name(DUMMY_LOG)->schema(Dummylog) pair in memory repo
super.register("DUMMY_LOG", DummyLog.SCHEMA$); } }
Then, set Avro decoder in camus.properties file:
camus.message.decoder.class=
com.linkedin.batch.etl.kafka.coders.KafkaAvroMessageDecoder
etl.record.writer.provider.class=
camus.message.decoder.class=
com.linkedin.batch.etl.kafka.coders.KafkaAvroMessageDecoder
etl.record.writer.provider.class=
com.linkedin.camus.etl. kafka.common. AvroRecordWriterProvider
Reference:
http://stackoverflow.com/questions/21508355/runing-camus-sample-with-kafka-0-8
https://groups.google.com/forum/#!topic/camus_etl/RzSHsDzOdow
https://groups.google.com/forum/#!topic/camus_etl/4f-Ru7Rhn8w
https://groups.google.com/forum/#!msg/camus_etl/EvJQsAC7wSA/ff9fkzLxrKYJ
https://groups.google.com/forum/#!topic/camus_etl/RzSHsDzOdow
https://groups.google.com/forum/#!topic/camus_etl/4f-Ru7Rhn8w
https://groups.google.com/forum/#!msg/camus_etl/EvJQsAC7wSA/ff9fkzLxrKYJ
No comments:
Post a Comment