Monday 14 September 2015

Implementing SQL Aggregation Clauses with Dataframe in Spark

We can use Dataframe operations to implement the logics of SQL. Users can pass SQL clauses in a config file. Then assemble some DF operations to get the SQL results.

1. Negative Select Columns in Dataframe

 def selectFromDF(sqlContext: SQLContext, cols: String)(df:DataFrame): DataFrame = {
   val columns = cols.replace(" ", "").split(',')
   df.select(columns.head, columns.tail: _*)
 }


2. Aggregation in Dataframe

import org.apache.spark.sql.functions._
def aggFromDF(sqlContext: SQLContext, keyColumns: String, evalColumns: String)
(df:DataFrame): DataFrame = {
    val keys = keyColumns.replace(" ", "").split(',')

    val groupDF = df.groupBy(keys.head, keys.tail: _*)
    val conds = evalColumns.replace(" ", "").split(':')
    val aggList = conds(1).split(',')
    conds(0).toLowerCase match {
      case "sum" => groupDF.sum(aggList: _*)
      case "count" => groupDF.count()
      case "distcount" => groupDF.agg(countDistinct(aggList.head, aggList.tail: _*))
    }
}



Reference:
https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/functions.html
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/GroupedData.html

No comments:

Post a Comment