Sunday 21 December 2014

Camus Consumes Avro and JSON from Kafka


Issue 1:
Consume JSON messages from Kafka to HDFS

Solution:
Set the following two properties in camus.properties file:

etl.record.writer.provider.class=
com.linkedin.camus.etl.kafka.common.StringRecordWriterProvider

camus.message.decoder.class=
com.linkedin.camus.etl.kafka.coders.JsonStringMessageDecoder

Issue 2:
Use avro-schema-repo service for schema sharing,

Error: com.linkedin.camus.coders.MessageDecoderException: java.lang.ClassNotFoundException: com.linkedin.camus.schemaregistry.AvroRestSchemaRegistry
at com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory.createMessageDecoder(MessageDecoderFactory.java:28)

Solution:

1. Add below two properties in camus.properties file.
etl.schema.registry.url=http://localhost:2876/schema-repo
kafka.message.coder.schema.registry.class=com.linkedin.camus.schemaregistry.AvroRestSchemaRegistry

2. Because 'camus-schema-registry-avro' is not in the default camus-master/pom.xml and camus-example/pom.xml, we need add it in these two places and rebuild the jar.

Issue 3:
[CamusJob] - java.io.IOException: java.lang.RuntimeException: org.apache.avro.AvroTypeException: Expected record-start. Got VALUE_STRING
        at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.getWrappedRecord(EtlRecordReader.java:135)
        at com.linkedin.camus.etl.kafka.mapred.EtlRecordReader.nextKeyValue(EtlRecordReader.java:261)
        at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:483)


Solution:
This is because the producer uses JsonEncoder.
Should use KafkaAvroMessageEncoder instead of JsonEncoder.


Issue 4:

Caused by: org.codehaus.jackson.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
 at [Source: java.io.StringReader@32b8f675; line: 1, column: 2]
at org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291)
at org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParserMinimalBase.java:385)
at org.codehaus.jackson.impl.JsonParserMinimalBase._throwInvalidSpace(JsonParserMinimalBase.java:331)
at org.codehaus.jackson.impl.ReaderBasedParser._skipWSOrEnd(ReaderBasedParser.java:950)
at org.codehaus.jackson.impl.ReaderBasedParser.nextToken(ReaderBasedParser.java:247)
at org.apache.avro.io.JsonDecoder.configure(JsonDecoder.java:131)
at org.apache.avro.io.JsonDecoder.<init>(JsonDecoder.java:73)
at org.apache.avro.io.JsonDecoder.<init>(JsonDecoder.java:81)
at org.apache.avro.io.DecoderFactory.jsonDecoder(DecoderFactory.java:268)
at com.linkedin.camus.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder.decode(LatestSchemaKafkaAvroMessageDecoder.java:26)
... 14 more

Solution: 
This is caused by using different kind of encoder and decoder on producer and consumer.
In camus.properties,
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageDecoder


Below is a working Camus.properties example for Avro and JSON.

# Needed Camus properties, more cleanup to come

# final top-level data output directory, sub-directory will be dynamically created for each topic pulled
etl.destination.path= camus_kafka_etl
# HDFS location where you want to keep execution files, i.e. offsets, error logs, and count files
etl.execution.base.path=camus_kafka_etl/base
# where completed Camus job output directories are kept, usually a sub-dir in the base.path
etl.execution.history.path=camus_kafka_etl/base/history
etl.execution.counts.path=camus_kafka_etl/base/counts
#new added
zookeeper.broker.topics=/brokers/topics

# Concrete implementation of the Encoder class to use (used by Kafka Audit, and thus optional for now)
#camus.message.encoder.class=com.linkedin.camus.etl.kafka.coders.DummyKafkaMessageEncoder

# Concrete implementation of the Decoder class to use
#camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder
#camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.JsonStringMessageDecoder
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageDecoder

# This is new added
#etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.StringRecordWriterProvider
etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.AvroRecordWriterProvider


# url for avro schema repo
etl.schema.registry.url=http://localhost:2876/schema-repo
kafka.message.coder.schema.registry.class=com.linkedin.camus.schemaregistry.AvroRestSchemaRegistry

# Used by avro-based Decoders to use as their Schema Registry
#kafka.message.coder.schema.registry.class=com.linkedin.camus.example.DummySchemaRegistry
#kafka.message.coder.schema.registry.class=com.linkedin.camus.example.schemaregistry.DummySchemaRegistry

# Used by the committer to arrange .avro files into a partitioned scheme. This will be the default partitioner for all
# topic that do not have a partitioner specified
#etl.partitioner.class=com.linkedin.camus.etl.kafka.coders.DefaultPartitioner

# Partitioners can also be set on a per-topic basis
#etl.partitioner.class.<topic-name>=com.your.custom.CustomPartitioner

# all files in this dir will be added to the distributed cache and placed on the classpath for hadoop tasks
#hdfs.default.classpath.dir=hdfs://localhost.localdomain:8020

# This is new added
fs.default.name=hdfs://localhost:8020/

# max hadoop tasks to use, each task can pull multiple topic partitions
mapred.map.tasks=30
# max historical time that will be pulled from each partition based on event timestamp
kafka.max.pull.hrs=1
# events with a timestamp older than this will be discarded.
kafka.max.historical.days=3
# Max minutes for each mapper to pull messages (-1 means no limit)
kafka.max.pull.minutes.per.task=-1

# if whitelist has values, only whitelisted topic are pulled.  nothing on the blacklist is pulled
kafka.blacklist.topics=test,avrotest,avrotopic, avrotest1, avrotest2, jsontest, DUMMY_LOG,DUMMY_LOG1,DUMMY_LOG2,DUMMY_LOG3, DUMMY_LOG4, DUMMY_LOG5,DUMMY_LOG_2
kafka.whitelist.topics=DUMMY_LOG_3
log4j.configuration=true

# Name of the client as seen by kafka
kafka.client.name=camus
# Fetch Request Parameters
#kafka.fetch.buffer.size=
#kafka.fetch.request.correlationid=
#kafka.fetch.request.max.wait=
#kafka.fetch.request.min.bytes=
# Connection parameters.
#kafka.brokers= ec2-54-197-143-117.compute-1.amazonaws.com:9092
kafka.brokers=localhost:9092
kafka.timeout.value= 60000


#Stops the mapper from getting inundated with Decoder exceptions for the same topic
#Default value is set to 10
max.decoder.exceptions.to.print=5

#Controls the submitting of counts to Kafka
#Default value set to true
post.tracking.counts.to.kafka=true
monitoring.event.class=class.that.generates.record.to.submit.counts.to.kafka

# everything below this point can be ignored for the time being, will provide more documentation down the road
##########################
etl.run.tracking.post=false
kafka.monitor.tier=
etl.counts.path=
kafka.monitor.time.granularity=10

etl.hourly=hourly
etl.daily=daily
etl.ignore.schema.errors=false

# configure output compression for deflate or snappy. Defaults to deflate
etl.output.codec=deflate
etl.deflate.level=6
#etl.output.codec=snappy

etl.default.timezone=America/Los_Angeles
etl.output.file.time.partition.mins=60
etl.keep.count.files=false
etl.execution.history.max.of.quota=.8

mapred.output.compress=true
mapred.map.max.attempts=1

kafka.client.buffer.size=20971520
kafka.client.so.timeout=60000

#zookeeper.session.timeout=
#zookeeper.connection.timeout=



Reference:
https://groups.google.com/forum/#!searchin/camus_etl/.JsonParseException$3A$20Unexpected$20character/camus_etl/O0HNjYKQiUo/vz9iQuxb_nUJ
https://medium.com/@thedude_rog/camus-gotchas-b8ecebc08645

1 comment: