Sunday 31 August 2014

Pig UDF: Recursively parse XML for all elements and attributes

Usage:
XParser(xml, "/School/students", false)

import org.apache.pig.FuncSpec;
import org.apache.pig.PigWarning;
import org.apache.pig.data.*;
import org.apache.pig.impl.*;
import org.dom4j.*;


public class XParser extends org.apache.pig.EvalFunc {



   private String xml = null;
   private Document document;
   private static boolean cache = true;

   BagFactory mBagFactory = BagFactory.getInstance();
   TupleFactory mTupleFactory = TupleFactory.getInstance();
   
   public DataBag exec(final Tuple input) throws IOException {

       if (input == null || input.size() <= 1) {

           warn("Error processing input, not enough parameters or null input"
           + input, PigWarning.UDF_WARNING_1);

           return null;
       }

       if (input.size() > 3) {
           warn("Error processing input, too many parameters" + input,
           PigWarning.UDF_WARNING_1);
           return null;
       }

       try {

         final String xml = (String) input.get(0);
         if (input.size() > 2)
            cache = (Boolean) input.get(2);

         if (!cache || !xml.equals(this.xml)) {
           this.xml = xml; // track the xml for subsequent calls to this udf                                
           SAXReader reader = new SAXReader();
           this.document = reader.read(new StringReader(this.xml)
         }
 
         final String xpathString = (String) input.get(1); 
         @SuppressWarnings("unchecked")
         List list = document.selectNodes(xpathString);
         return retrieveNodes(list);

      } catch (Exception e) {

       warn("Error processing input " + input.getType(0),

       PigWarning.UDF_WARNING_1);

       return null;

      }


 }


    @Override

    public Schema outputSchema(Schema input) {

        try {

         return new Schema(
          new FieldSchema(null,
           new Schema(
            new FieldSchema(null,
             new Schema(
             new FieldSchema("passivedataxml", DataType.CHARARRAY)), DataType.TUPLE)),
            DataType.BAG));

        } catch (Exception e) {
            return null;
        }
    }



 @Override

 public List getArgToFuncMapping() throws FrontendException {

       final List funcList = new ArrayList();
       //two  arguments
       List fields = new ArrayList();
       fields.add(new Schema.FieldSchema(null, DataType.CHARARRAY));
       fields.add(new Schema.FieldSchema(null, DataType.CHARARRAY));

        Schema twoArgInSchema = new Schema(fields);

        funcList.add(new FuncSpec(this.getClass().getName(), twoArgInSchema));
        // three arguments
        fields = new ArrayList();
        fields.add(new Schema.FieldSchema(null, DataType.CHARARRAY));
        fields.add(new Schema.FieldSchema(null, DataType.CHARARRAY));
        fields.add(new Schema.FieldSchema(null, DataType.BOOLEAN));

        Schema threeArgInSchema = new Schema(fields);
        funcList.add(new FuncSpec(this.getClass().getName(), threeArgInSchema));
        return funcList;

 }

    public DataBag retrieveNodes(List list){
      
       DataBag xmlbag = mBagFactory.newDefaultBag();
       for (Element element : list){

        Tuple tuple = mTupleFactory.newTuple();  //list of attributes in a node              

        Iterator itr = element.attributeIterator();
        //add all attribute values to a tuple
        while (itr.hasNext()
        {
          Attribute att = (Attribute)itr.next();
          String value = att.getValue();
          tuple.append(value);
        }

        List subList = element.elements();
        if(!subList.isEmpty())
           tuple.append(retrieveNodes(subList));

        xmlbag.add(tuple); //a dbag keeps all tuples from a xml string

        }

        return xmlbag;

    }


}

No comments:

Post a Comment