Sunday 21 December 2014

Trouble Shooting for Kafka-Camus Example

Camus (https://github.com/linkedin/camus) is another art of work done by LinkedIn, which provides a pipeline from Kafka to HDFS. Under this project, a single MapReduce job performs the following steps for loading data to HDFS in a distributed manner:

  1. As a first step, it discovers the latest topics and partition offsets from ZooKeeper.
  2. Each task in the MapReduce job fetches events from the Kafka broker and commits the pulled data along with the audit count to the output folders.
  3. After the completion of the job, final offsets are written to HDFS, which can be further consumed by subsequent MapReduce jobs.
  4. Information about the consumed messages is also updated in the Kafka cluster.
When you run the following example, you may encounter some issues.
$hadoop jar camus-example-0.1.0-SNAPSHOT-shaded.jar com.linkedin.camus.etl.kafka.CamusJob -P camus.properties

Issue 0:
java.lang.incompatiableclasschangeerror: found interface org.apache.mapreduce.jobcontext , but expected class

Solution:
This is because the incompatibility between hadoop1 and hadoop 2.
in pom.xml, set hadoop version as 2.2.0


Issue 1:
CamusJob] - Unable to pull requests from Kafka brokers. Exiting the program
java.lang.NumberFormatException: For input string: ""
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:48)
at java.lang.Integer.parseInt(Integer.java:470)
at java.lang.Integer.parseInt(Integer.java:499)
at org.apache.hadoop.conf.Configuration.getInt(Configuration.java:1060)
at com.linkedin.camus.etl.kafka.CamusJob.getKafkaBufferSize(CamusJob.java:744)

Solution:
specify some Kafka-related properties or comment it (this way Camus will use default values):
# Fetch Request Parameters
# kafka.fetch.buffer.size=
# kafka.fetch.request.correlationid=
# kafka.fetch.request.max.wait=
# kafka.fetch.request.min.bytes=
# Connection parameters.
kafka.brokers=localhost:9092
# kafka.timeout.value=

Issue 2:
[CamusJob] - failed to create decoder
com.linkedin.camus.coders.MessageDecoderException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.linkedin.batch.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder not found
at com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory.createMessageDecoder(MessageDecoderFactory.java:28)
at com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.createMessageDecoder(EtlInputFormat.java:408)

Solution:
Chang to the following line in camus.properties file
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder


Issue 3:
[CamusJob] - failed to create decoder
com.linkedin.camus.coders.MessageDecoderException: com.linkedin.camus.coders.MessageDecoderException: java.lang.ClassNotFoundException: com.linkedin.camus.example.DummySchemaRegistry
at com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory.createMessageDecoder(MessageDecoderFactory.java:28)

Solution:
Chang to the following line in camus.properties file 
kafka.message.coder.schema.registry.class=com.linkedin.camus.example. schemaregistry.DummySchemaRegistry



Issue 4: Register Avro Schema in Memory
If we have a few kafka topics, and not frequently changed, we can use a memory-based avro schema register.
1. need to know topic avsc
2. automatically generate topic.java files
3. Register in  DummySchemaRegistry.java

[CamusJob] - failed to create decoder
com.linkedin.camus.coders.MessageDecoderException: com.linkedin.camus.coders.MessageDecoderException: java.lang.InstantiationException: com.linkedin.camus.example.schemaregistry.DummySchemaRegistry
at com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory.createMessageDecoder(MessageDecoderFactory.java:28)

Solution :
 
This issue usually means the topic name isn't registered in the schema registry.
1. Generate DummyLog.java by avro-tool and DummyLog.avsc,
2. Register the topic-> schema pair in DummeySchemaRegistry.java

public class DummySchemaRegistry extends AvroMemorySchemaRegistry {
    public DummySchemaRegistry() {
    super();
//register topic name(DUMMY_LOG)->schema(Dummylog) pair in memory repo
    super.register("DUMMY_LOG", DummyLog.SCHEMA$);
}
}

Then, set Avro decoder in camus.properties file:
camus.message.decoder.class=
com.linkedin.batch.etl.kafka.coders.KafkaAvroMessageDecoder
etl.record.writer.provider.class=
com.linkedin.camus.etl.kafka.common.AvroRecordWriterProvider


Reference:

http://stackoverflow.com/questions/21508355/runing-camus-sample-with-kafka-0-8
https://groups.google.com/forum/#!topic/camus_etl/RzSHsDzOdow
https://groups.google.com/forum/#!topic/camus_etl/4f-Ru7Rhn8w
https://groups.google.com/forum/#!msg/camus_etl/EvJQsAC7wSA/ff9fkzLxrKYJ

No comments:

Post a Comment