join {
join-keys = ["id", "accountid"]
join-type = "inner"
}
class Joiner(joinConfig: JoinConfig) { val joinType = joinConfig.joinType val keyTuple = joinConfig.keys match { case List(left, right) = (left, right) case _ = throw new IllegalArgumentException("Should be two key columns") } def joinTwoDFs(leftDf: DataFrame, rightDf: DataFrame): DataFrame = { JoinType(joinType) match { case RightOuter | LeftOuter | Inner | LeftSemi = leftDf .join(rightDf, leftDf(keyTuple._1) === rightDf(keyTuple._2), joinType) .drop(rightDf(keyTuple._2)) case _ = throw new IllegalArgumentException(s"Unsupported join type '$joinType'. ") } } /**Join multiple dfs on the same column key**/ def joinMultiDFs(dfs: Seq[Seq[DataFrame]], commonKey: String): DataFrame = { dfs.flatten .reduceLeft(joinTwoDFs(_, _)) } }
No comments:
Post a Comment