XParser(xml, "/School/students", false)
import org.apache.pig.FuncSpec; import org.apache.pig.PigWarning; import org.apache.pig.data.*; import org.apache.pig.impl.*; import org.dom4j.*; public class XParser extends org.apache.pig.EvalFunc{ private String xml = null; private Document document; private static boolean cache = true; BagFactory mBagFactory = BagFactory.getInstance(); TupleFactory mTupleFactory = TupleFactory.getInstance(); public DataBag exec(final Tuple input) throws IOException { if (input == null || input.size() <= 1) { warn("Error processing input, not enough parameters or null input" + input, PigWarning.UDF_WARNING_1); return null; } if (input.size() > 3) { warn("Error processing input, too many parameters" + input, PigWarning.UDF_WARNING_1); return null; } try { final String xml = (String) input.get(0); if (input.size() > 2) cache = (Boolean) input.get(2); if (!cache || !xml.equals(this.xml)) { this.xml = xml; // track the xml for subsequent calls to this udf SAXReader reader = new SAXReader(); this.document = reader.read(new StringReader(this.xml) } final String xpathString = (String) input.get(1); @SuppressWarnings("unchecked") List list = document.selectNodes(xpathString); return retrieveNodes(list); } catch (Exception e) { warn("Error processing input " + input.getType(0), PigWarning.UDF_WARNING_1); return null; } } @Override public Schema outputSchema(Schema input) { try { return new Schema( new FieldSchema(null, new Schema( new FieldSchema(null, new Schema( new FieldSchema("passivedataxml", DataType.CHARARRAY)), DataType.TUPLE)), DataType.BAG)); } catch (Exception e) { return null; } } @Override public List getArgToFuncMapping() throws FrontendException { final List funcList = new ArrayList (); //two arguments List fields = new ArrayList (); fields.add(new Schema.FieldSchema(null, DataType.CHARARRAY)); fields.add(new Schema.FieldSchema(null, DataType.CHARARRAY)); Schema twoArgInSchema = new Schema(fields); funcList.add(new FuncSpec(this.getClass().getName(), twoArgInSchema)); // three arguments fields = new ArrayList (); fields.add(new Schema.FieldSchema(null, DataType.CHARARRAY)); fields.add(new Schema.FieldSchema(null, DataType.CHARARRAY)); fields.add(new Schema.FieldSchema(null, DataType.BOOLEAN)); Schema threeArgInSchema = new Schema(fields); funcList.add(new FuncSpec(this.getClass().getName(), threeArgInSchema)); return funcList; } public DataBag retrieveNodes(List list){ DataBag xmlbag = mBagFactory.newDefaultBag(); for (Element element : list){ Tuple tuple = mTupleFactory.newTuple(); //list of attributes in a node Iterator itr = element.attributeIterator(); //add all attribute values to a tuple while (itr.hasNext() { Attribute att = (Attribute)itr.next(); String value = att.getValue(); tuple.append(value); } List subList = element.elements(); if(!subList.isEmpty()) tuple.append(retrieveNodes(subList)); xmlbag.add(tuple); //a dbag keeps all tuples from a xml string } return xmlbag; } }
No comments:
Post a Comment