Monday, 25 August 2014

Avro Serialization and Deserialization

Avro can be serialized/deserialized by either tools or java code.

1. Avro tools

Serialize Json to Avro with snappy compression
$java -jar avro-tools-1.7.5.jar fromjson --schema-file object.avsc object.json > object.snappy.avro

Deserialize from snappy Avro to Json
$java -jar avro-tools-1.7.5.jar tojson object.snappy.avro > object.json

Retrieve schema from Avro
$java -jar avro-tools-1.7.5.jar getschema object.snappy.avro > object.avsc

2. Java code

We can generate either java code from schema($java -jar avro-tools-1.7.5.jar compile schema object.avsc  <output>), or use a GenericRecord.
The only difference is that values are assigned to a generated Avro object instead of assigning to GenericRecord object.

Example: Convert a Json file to Avro file based on the given schema.

import java.io.File;
04.import java.io.FileInputStream;
05.import java.io.IOException;
06.import java.io.InputStream;
07.import java.util.Iterator;
08.import java.util.LinkedHashMap;
09. 
10.import org.apache.avro.Schema;
11.import org.apache.avro.file.DataFileReader;
12.import org.apache.avro.file.DataFileWriter;
13.import org.apache.avro.generic.GenericData;
14.import org.apache.avro.generic.GenericDatumReader;
15.import org.apache.avro.generic.GenericDatumWriter;
16.import org.apache.avro.generic.GenericRecord;
17.import org.apache.avro.io.BinaryDecoder;
18.import org.apache.avro.io.DatumReader;
19.import org.apache.avro.io.DatumWriter;
20.import org.codehaus.jackson.JsonFactory;
21.import org.codehaus.jackson.JsonParseException;
22.import org.codehaus.jackson.JsonProcessingException;
23.import org.codehaus.jackson.map.ObjectMapper;
24.import org.json.simple.JSONObject;
25. 
26. 
27. 
28.public class AvroExampleWithoutCodeGeneration {
29. 
30.public void serialize() throws JsonParseException, JsonProcessingException, IOException {
31. 
32.InputStream in = new FileInputStream("resources/StudentActivity.json");
33. 
34.// create a schema
35.Schema schema = new Schema.Parser().parse(newFile("resources/StudentActivity.avsc"));
36.// create a record to hold json
37.GenericRecord AvroRec = new GenericData.Record(schema);
38.// create a record to hold course_details
39.GenericRecord CourseRec = newGenericData.Record(schema.getField("course_details").schema());
40.// this file will have AVro output data
41.File AvroFile = new File("resources/StudentActivity.avro");
42.// Create a writer to serialize the record
43.DatumWriter<GenericRecord> datumWriter = newGenericDatumWriter<GenericRecord>(schema);             
44.DataFileWriter<GenericRecord> dataFileWriter = newDataFileWriter<GenericRecord>(datumWriter);
45. 
46.dataFileWriter.create(schema, AvroFile);
47. 
48.// iterate over JSONs present in input file and write to Avro output file
49.for (Iterator it = new ObjectMapper().readValues(
50.new JsonFactory().createJsonParser(in), JSONObject.class); it.hasNext();) {
51. 
52.JSONObject JsonRec = (JSONObject) it.next();
53.AvroRec.put("id", JsonRec.get("id"));
54.AvroRec.put("student_id", JsonRec.get("student_id"));
55.AvroRec.put("university_id", JsonRec.get("university_id"));
56. 
57.LinkedHashMap CourseDetails = (LinkedHashMap) JsonRec.get("course_details");
58.CourseRec.put("course_id", CourseDetails.get("course_id"));
59.CourseRec.put("enroll_date", CourseDetails.get("enroll_date"));
60.CourseRec.put("verb", CourseDetails.get("verb"));
61.CourseRec.put("result_score", CourseDetails.get("result_score"));
62. 
63.AvroRec.put("course_details", CourseRec);
64. 
65.dataFileWriter.append(AvroRec);
66.}  // end of for loop
67. 
68.in.close();
69.dataFileWriter.close();
70. 
71.// end of serialize method
72. 
73.public void deserialize () throws IOException {
74.// create a schema
75.Schema schema = new Schema.Parser().parse(newFile("resources/StudentActivity.avsc"));
76.// create a record using schema
77.GenericRecord AvroRec = new GenericData.Record(schema);
78.File AvroFile = new File("resources/StudentActivity.avro");
79.DatumReader<GenericRecord> datumReader = newGenericDatumReader<GenericRecord>(schema);
80.DataFileReader<GenericRecord> dataFileReader = newDataFileReader<GenericRecord>(AvroFile, datumReader);
81.System.out.println("Deserialized data is :");
82.while (dataFileReader.hasNext()) {
83.AvroRec = dataFileReader.next(AvroRec);
84.System.out.println(AvroRec);
85.}
86.}
87. 
88.public static void main(String[] args) throws JsonParseException, JsonProcessingException, IOException {
89.AvroExampleWithoutCodeGeneration AvroEx = newAvroExampleWithoutCodeGeneration();
90.AvroEx.serialize();
91.AvroEx.deserialize();
92.}
93.}



Reference:
http://java.dzone.com/articles/getting-started-avro-part-2

No comments:

Post a Comment