Saturday 25 October 2014

Deserialize and Serialize Avro in Flume




Deserializer:
An Avro deserializer that parses Avro container files, generating one Flume event per record in the Avro file, and storing binary avro-encoded records in the Flume event body. The deserializer can be configured by passing in parameters with the deserializer. prefix. Text-based deserializers can call the readChar method to read a character. The way a character is represented differs by character set. To tell the source what character set to use, set inputCharset to the name of the character set, which by default is UTF-8.

a1.sources.src1.deserializer.schemaType = LITERAL 

To interpret the data, it is important to know the schema used. 

  • Setting this to flume.avro.schema.hash causes the 64-bit Rabin fingerprint of the schema to be inserted in the headers with the key "flume.avro.schema.hash"
  • If the value of this parameter is set to flume.avro.schema.literal, the entire JSONified schema is inserted into the header with the "flume.avro.schema.literal" key.

Serializer:
The HDFS Sink allows users to write data to HDFS in a format that is suitable for them by allowing the users to plug in serializers that convert the Flume events into a format that can be understood by the systems that process them and writes them out to a stream that eventually gets flushed out to HDFS. Remember that Flume's HDFS Sink will by default write to HDFS as Sequence Files. If you want it to write as text or avro, you must use DataStream. 
To interoperate avro event to avro container, AvroEventSerializer will lookup both "flume.avro.schema.literal/hash" keys in the map of event header to figure out the schema for the avro event.

tier1.sinks.sink-1.hdfs.fileType = DataStream
tier1.sinks.sink-1.serializer = org.apache.flume.serialization.AvroEventSerializer$Builder

Reference:
JSON needs serializer to ouput proper format.
https://github.com/wdavidw/flume/blob/master/src/main/java/com/adaltas/flume/serialization/JSONEventSerializer.java
https://github.com/cloudera/cdk-examples
http://grepcode.com/file/repo1.maven.org/maven2/org.apache.flume/flume-ng-core/1.4.0/org/apache/flume/serialization/AvroEventDeserializer.java

2 comments:

  1. i tried to install your code but the output is empty files , so i guess the problem maybe in either i did something wrong while installing your code or in my configuration file , here is my configuration file

    TwitterAgent.sources = Twitter
    TwitterAgent.channels = MemChannel
    TwitterAgent.sinks = HDFS

    TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
    TwitterAgent.sources.Twitter.deserializer.schemaType = LITERAL
    TwitterAgent.sources.Twitter.channels = MemChannel
    TwitterAgent.sources.Twitter.consumerKey = [personal]
    TwitterAgent.sources.Twitter.consumerSecret = [personal]
    TwitterAgent.sources.Twitter.accessToken = [personal]
    TwitterAgent.sources.Twitter.accessTokenSecret = [personal]
    TwitterAgent.sources.Twitter.keywords = twitter, egypt

    TwitterAgent.sinks.HDFS.channel = MemChannel
    TwitterAgent.sinks.HDFS.type = hdfs
    TwitterAgent.sinks.HDFS.hdfs.path = /tweets_2/
    TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
    TwitterAgent.sinks.HDFS.hdfs.serializer = com.adaltas.flume.serialization.HeaderAndBodyTextEventSerializer$Builder
    TwitterAgent.sinks.HDFS.hdfs.serializer.columns = timestamp hostname Facility Severity
    TwitterAgent.sinks.HDFS.hdfs.serializer.format = CSV
    TwitterAgent.sinks.HDFS.hdfs.serializer.appendNewline = true
    TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
    TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
    TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
    TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000

    TwitterAgent.channels.MemChannel.type = memory
    TwitterAgent.channels.MemChannel.capacity = 10000
    TwitterAgent.channels.MemChannel.transactionCapacity = 100

    i am using hortonworks sandbox v 2.2

    ReplyDelete
    Replies
    1. hello,
      in my article, the input message is in avro format, and the output is also avro.
      Are you working with avro or json?
      If your message is json, you need a serializer to ouput proper format.
      https://github.com/wdavidw/flume/blob/master/src/main/java/com/adaltas/flume/serialization/JSONEventSerializer.java

      Delete