Monday 14 March 2016

Recursively Joining DataFrames

Below is the sample code to join two dataframes on different column keys; and join multiple dataframes on the same column key.

 join {
      join-keys = ["id", "accountid"]
      join-type = "inner"
    }

class Joiner(joinConfig: JoinConfig) {

  val joinType = joinConfig.joinType

  val keyTuple = joinConfig.keys match {
    case List(left, right) = (left, right)
    case _ =
      throw new IllegalArgumentException("Should be two key columns")
  }

  def joinTwoDFs(leftDf: DataFrame, rightDf: DataFrame): DataFrame = {

    JoinType(joinType) match {

      case RightOuter | LeftOuter | Inner | LeftSemi =
        leftDf
          .join(rightDf, leftDf(keyTuple._1) === rightDf(keyTuple._2), joinType)
          .drop(rightDf(keyTuple._2))

      case _ =
        throw new IllegalArgumentException(s"Unsupported join type '$joinType'. ")
    }
  }

  /**Join multiple dfs on the same column key**/
  def joinMultiDFs(dfs: Seq[Seq[DataFrame]], commonKey: String): DataFrame = {
     dfs.flatten
     .reduceLeft(joinTwoDFs(_, _))
   }


}

No comments:

Post a Comment