Sunday 27 September 2015

AggregateByKey implements Collect_list in Spark 1.4

Spark 1.4 doesn't support UDAF. There is no corresponding implementation of "collect_list" in HiveQL.  Implemented a collect_list function with aggregateByKey()

def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]

Aggregate the values of each key, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of the values in this RDD, V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, as in scala.TraversableOnce. The former operation is used for merging values within a partition, and the latter is used for merging values between partitions. To avoid memory allocation, both of these functions are allowed to modify and return their first argument instead of creating a new U.


val rdd : RDD[(String, String)] = testDf.map (row => (row.getAs[String](0), row.getAs[String](1)))

rdd.aggregateByKey(new mutable.MutableList[String])
  (
   (queue, item) => {
        queue += item
        queue
      },
   (queue1, queue2) => {
        queue1 ++= queue2
        queue1
      }
  ).mapValues(_.toList)


Another "out of the box" approach to the reduction is to use "aggregateByKey", which guarantees that all of the partitions can be reduced separately AND IN PARALLEL, and then the partial results can be combined combined -- essentially this relaxes the strict condition imposed on "reduceByKey" that the supplied function must be associative.

val reducedRDD2 = pairsRDD.aggregateByKey(Int.MaxValue)(Math.min(_,_), Math.min(_,_))
analyze(reducedRDD2)
val reducedRDD = pairsRDD.reduceByKey(Math.min(_,_))



Reference:
http://stackoverflow.com/questions/32100973/how-can-i-define-and-use-a-user-defined-aggregate-function-in-spark-sql


No comments:

Post a Comment