aggregate {
key = "id"
mappings = [
{"sales": "max"},
{"price": "min"}
]
}
class AggregateTF (aggregateConfig: AggregateConfig){
def transform(dataset: DataFrame): DataFrame = {
val key = aggregateConfig.key
val opColList = aggregateConfig
.mappings.map{ entry =
val evalCol = entry._1.toLowerCase
val evalFunc = entry._2.toLowerCase
OpType(evalFunc) match {
case Max => max(evalCol)
case Min => min(evalCol)
case _ => throw
new UnsupportedOpsException(s"Unsupported function '$evalFunc'.")
}
}.toList
val aggreDF = dataset.groupBy(key)
.agg(opColList.head, opColList.tail:_*)
/**Change Names for all aggregated columns,
since parquet doesn't support column name including "()"**/
opColList
.foldLeft(aggreDF){
(currentdf, opCol) =
val newColName = opCol.toString()
.replace("(","_")
.replace(")","")
currentdf
.withColumnRenamed(opCol.toString(), newColName)
}
}
}
Reference:
No comments:
Post a Comment