Tuesday 15 December 2015

Spark Dataframe Aggregation Operation

Below is sample code for some data frame aggregation on the same column key with different aggregation functions, based on the config files:

 aggregate {
      key = "id"
      mappings = [
        {"sales": "max"},
        {"price": "min"}
      ]
  }

class  AggregateTF (aggregateConfig: AggregateConfig){

  def transform(dataset: DataFrame): DataFrame = {

    val key = aggregateConfig.key
    
    val opColList = aggregateConfig
      .mappings.map{ entry =
          val evalCol = entry._1.toLowerCase
          val evalFunc = entry._2.toLowerCase

          OpType(evalFunc) match {
           case Max => max(evalCol)
           case Min => min(evalCol)
           case _ => throw 
              new UnsupportedOpsException(s"Unsupported function '$evalFunc'.")
         }
      }.toList

    val aggreDF = dataset.groupBy(key)
      .agg(opColList.head, opColList.tail:_*)
    
    /**Change Names for all aggregated columns, 
     since parquet doesn't support column name including "()"**/
    opColList
      .foldLeft(aggreDF){
        (currentdf, opCol) =

        val newColName = opCol.toString()
          .replace("(","_")
          .replace(")","")

        currentdf
          .withColumnRenamed(opCol.toString(), newColName)
    }
  }
}


Reference:

No comments:

Post a Comment