join {
join-keys = ["id", "accountid"]
join-type = "inner"
}
class Joiner(joinConfig: JoinConfig) {
val joinType = joinConfig.joinType
val keyTuple = joinConfig.keys match {
case List(left, right) = (left, right)
case _ =
throw new IllegalArgumentException("Should be two key columns")
}
def joinTwoDFs(leftDf: DataFrame, rightDf: DataFrame): DataFrame = {
JoinType(joinType) match {
case RightOuter | LeftOuter | Inner | LeftSemi =
leftDf
.join(rightDf, leftDf(keyTuple._1) === rightDf(keyTuple._2), joinType)
.drop(rightDf(keyTuple._2))
case _ =
throw new IllegalArgumentException(s"Unsupported join type '$joinType'. ")
}
}
/**Join multiple dfs on the same column key**/
def joinMultiDFs(dfs: Seq[Seq[DataFrame]], commonKey: String): DataFrame = {
dfs.flatten
.reduceLeft(joinTwoDFs(_, _))
}
}
No comments:
Post a Comment