aggregate {
key = "id"
mappings = [
{"sales": "max"},
{"price": "min"}
]
}
class AggregateTF (aggregateConfig: AggregateConfig){ def transform(dataset: DataFrame): DataFrame = { val key = aggregateConfig.key
val opColList = aggregateConfig .mappings.map{ entry =
val evalCol = entry._1.toLowerCase val evalFunc = entry._2.toLowerCase OpType(evalFunc) match { case Max => max(evalCol) case Min => min(evalCol) case _ => throw new UnsupportedOpsException(s"Unsupported function '$evalFunc'.") } }.toList val aggreDF = dataset.groupBy(key) .agg(opColList.head, opColList.tail:_*)
/**Change Names for all aggregated columns,
since parquet doesn't support column name including "()"**/ opColList .foldLeft(aggreDF){ (currentdf, opCol) = val newColName = opCol.toString() .replace("(","_") .replace(")","") currentdf .withColumnRenamed(opCol.toString(), newColName) } } }Reference:
No comments:
Post a Comment