Wednesday 26 November 2014

Avro HttpSource Handler in Flume


A HttpSource accepts Flume Events by HTTP POST and GET.  HTTP requests are converted into flume events by a pluggable "handler" which must implement the HTTPSourceHandler interface. This handler takes a HttpServletRequest and returns a list of flume events.All events deserialized from one Http request are committed to the channel in one transaction, thus allowing for increased efficiency on channels like the file channel.

HttpSource with an Avro handler receives Avro message through http POST request from clients, then convert it to Event into Channel.

Both avro clients and Avro handler have to know the schema of message. You cannot read the data without the schema used to write it.

 
An Avro message contains a http header and avro binary body.
A flume event contains a header map<String, String> and a body(byte[]).
newEvents.add(EventBuilder.withBody(e.getBody(), e.getHeaders()));


1. A client sends Avro message through http to Flume http-source
The way to build "Avro binary encoded-data" as a http message can be found here.

import org.apache.avro.*;
import org.apache.http.*;

try{

 //read schema
 File schema_file = new File("./src/avro/event.avsc");
 Schema schema = new Schema.Parser().parse(schema_file);
 DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
// endcode Avro binary
 ByteArrayOutputStream output = new ByteArrayOutputStream();
 BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(output, null);

 GenericRecord datum = new GenericData.Record(schema);
 datum.put("id", 123);
 datum.put("message", "this is a test");
 writer.write(datum, encoder);//no schema is written
 encoder.flush();
 output.close();

  // do HTTP POST
  AndroidHttpClient http = AndroidHttpClient.newInstance("AvroClient");
  HttpPost post = new HttpPost(serverUrl);
  post.setHeader("Accept", "application/json");
  post.setHeader("Content-Type", "avro/binary");
  post.setEntity(new ByteArrayEntity(outputStream.toByteArray()));
  
  HttpResponse res = http.execute(post);
  outputStream = new ByteArrayOutputStream(1024);
  res.getEntity().writeTo(outputStream);
  http.close();

}
catch (IOException ioe)
{
  Log.e(kLogTag, ioe.toString());
  return null;
}

Please note the above code just writes "Avro binary encoded-data" into outputstream, without avro schema. The schema can be represented as a hashcode, id, url, etc, and pass in http header.


2. An AvroHTTPHandler(deserializer) implements HTTPSourceHandler converts
binary request to flume event.


We can refer the BlobHandler and JSONHandler to implement AvroHandler.
  1. AvroHandler extracts avro schema or identifier from http request header. 
  2. Insert avro JSON schema(or schema identifier) in flume event header, which is a string map.
  3. Insert byte[] message body in event body.
  4. In sink, an AvroEventSerializer is needed to extract schema(or its identifier) from event header to parse the avro data in event body.
Code example:

 
public List getEvents(HttpServletRequest request) throws Exception {
    
     Map headers = getHeaders(request);    
     InputStream in = request.getInputStream();

     try {

       byte[] array = in.toByteArray();
       Event event = EventBuilder.withBody(array, headers);
       return Collections.singletonList(event);

     } finally {

      in.close();
    }
  }

private Map<String, String> getHeaders(HttpServletRequest request) 
{
   if (LOGGER.isDebugEnabled()) {
      Map requestHeaders = new HashMap();
      Enumeration iter = request.getHeaderNames();
      while (iter.hasMoreElements()) {
        String name = (String) iter.nextElement();
        requestHeaders.put(name, request.getHeader(name));        
      }
      LOGGER.debug("requestHeaders: {}", requestHeaders);
    }   


    Map<String, String> headers = new HashMap();
    if (request.getContentType() != null) {
      headers.put(Metadata.CONTENT_TYPE, request.getContentType());
    }

    Enumeration iter = request.getParameterNames();
    while (iter.hasMoreElements()) {
      String name = (String) iter.nextElement();
      headers.put(name, request.getParameter(name));        
    }

   return headers;
}

Reference:

Flurry Avro client: https://github.com/flurry/avro-mobile/blob/master/avro-android-client/src/com/flurry/avroclient/AvroClientActivity.java
BlobHandler:
http://grepcode.com/file/repo1.maven.org/maven2/org.apache.flume.flume-ng-sinks/flume-ng-morphline-solr-sink/1.4.0/org/apache/flume/sink/solr/morphline/BlobHandler.java
XMLHandler:
https://github.com/harishreedharan/usingflumecode/blob/master/ch03/src/main/java/usingflume/ch03/HTTPSourceXMLHandler.java
JSONHandler:
https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/source/http/JSONHandler.java
http://qnalist.com/questions/4546378/how-to-use-apache-avro-to-encode-the-data-in-avro-binary-encoded-form

No comments:

Post a Comment