1. Negative Select Columns in Dataframe
def selectFromDF(sqlContext: SQLContext, cols: String)(df:DataFrame): DataFrame = {
val columns = cols.replace(" ", "").split(',')
df.select(columns.head, columns.tail: _*)
}
2. Aggregation in Dataframe
import org.apache.spark.sql.functions._
def aggFromDF(sqlContext: SQLContext, keyColumns: String, evalColumns: String)
(df:DataFrame): DataFrame = {
val keys = keyColumns.replace(" ", "").split(',')
val groupDF = df.groupBy(keys.head, keys.tail: _*)
val conds = evalColumns.replace(" ", "").split(':')
val aggList = conds(1).split(',')
conds(0).toLowerCase match {
case "sum" => groupDF.sum(aggList: _*)
case "count" => groupDF.count()
case "distcount" => groupDF.agg(countDistinct(aggList.head, aggList.tail: _*))
}
}
Reference:
https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/functions.html
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/GroupedData.html
No comments:
Post a Comment