1. Negative Select Columns in Dataframe
def selectFromDF(sqlContext: SQLContext, cols: String)(df:DataFrame): DataFrame = { val columns = cols.replace(" ", "").split(',') df.select(columns.head, columns.tail: _*) }
2. Aggregation in Dataframe
import org.apache.spark.sql.functions._
def aggFromDF(sqlContext: SQLContext, keyColumns: String, evalColumns: String)
(df:DataFrame): DataFrame = { val keys = keyColumns.replace(" ", "").split(',') val groupDF = df.groupBy(keys.head, keys.tail: _*) val conds = evalColumns.replace(" ", "").split(':') val aggList = conds(1).split(',') conds(0).toLowerCase match { case "sum" => groupDF.sum(aggList: _*) case "count" => groupDF.count() case "distcount" => groupDF.agg(countDistinct(aggList.head, aggList.tail: _*)) } }
Reference:
https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/functions.html
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/GroupedData.html
No comments:
Post a Comment