We can build a customized one through Spark UDAF.
class CollectListFunction[T](val colType: DataType)
extends UserDefinedAggregateFunction { def inputSchema: StructType = new StructType().add("inputCol", colType) def bufferSchema: StructType = new StructType().add("outputCol", ArrayType(colType)) def dataType: DataType = ArrayType(colType) def deterministic: Boolean = true def initialize(buffer: MutableAggregationBuffer): Unit = { buffer.update(0, new mutable.ArrayBuffer[T]) } def update(buffer: MutableAggregationBuffer, input: Row): Unit = { val list = buffer.getSeq[T](0) if (!input.isNullAt(0)) { val sales = input.getAs[T](0) buffer.update(0, list:+sales) } } def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1.update(0, buffer1.getSeq[T](0) ++ buffer2.getSeq[T](0)) } def evaluate(buffer: Row): Any = { buffer.getSeq[T](0) } }
Below is an example to get multiple columns in an aggregation.
class SumOnCondition(val value: Int) extends UserDefinedAggregateFunction { override def inputSchema: StructType = { StructType(Array( StructField("col1", IntegerType), StructField("col2", LongType) )) } override def bufferSchema: StructType = { StructType(Array( StructField("output", DoubleType) )) } override def dataType: DataType = DoubleType override def deterministic: Boolean = true override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0.0 } override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { if (input.getLong(1) >= value) { buffer(0) = buffer.getAs[Double](0) + input.getInt(0) } } override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getAs[Double](0) + buffer2.getAs[Double](0) } override def evaluate(buffer: Row): Any = { buffer.getDouble(0) } } val fun1 = new SumOnCondition(1) val fun2 = new SumOnCondition(3) df.groupBy(col("key")).agg( fun1(col("col1"), col("col2")).as("output1"), fun2(col("col1"), col("col2")).as("output1") )
Reference:
https://mail-archives.apache.org/mod_mbox/spark-user/201509.mbox/%3cCADONuiSp8QFjoteoOOeMveHTuVYHZJC=+sptU5BKtaLVdWG0mw@mail.gmail.com%3e
Is there any implementation for collect_set ?
ReplyDeleteHi Alvin Jin, I'm in the same area and would like to learn or get guidance from you.Please let me know if you're fine with it. I've 12 years of DB/ETL experience.
ReplyDelete