Tuesday, 15 December 2015

Spark Dataframe Aggregation Operation

Below is sample code for some data frame aggregation on the same column key with different aggregation functions, based on the config files:

 aggregate {
      key = "id"
      mappings = [
        {"sales": "max"},
        {"price": "min"}
      ]
  }

class  AggregateTF (aggregateConfig: AggregateConfig){

  def transform(dataset: DataFrame): DataFrame = {

    val key = aggregateConfig.key
    
    val opColList = aggregateConfig
      .mappings.map{ entry =
          val evalCol = entry._1.toLowerCase
          val evalFunc = entry._2.toLowerCase

          OpType(evalFunc) match {
           case Max => max(evalCol)
           case Min => min(evalCol)
           case _ => throw 
              new UnsupportedOpsException(s"Unsupported function '$evalFunc'.")
         }
      }.toList

    val aggreDF = dataset.groupBy(key)
      .agg(opColList.head, opColList.tail:_*)
    
    /**Change Names for all aggregated columns, 
     since parquet doesn't support column name including "()"**/
    opColList
      .foldLeft(aggreDF){
        (currentdf, opCol) =

        val newColName = opCol.toString()
          .replace("(","_")
          .replace(")","")

        currentdf
          .withColumnRenamed(opCol.toString(), newColName)
    }
  }
}


Reference:

No comments:

Post a Comment