Saturday 20 December 2014

Broadcast Variables/Join in Spark

Spark’s shared variable, broadcast variables, allows the program to efficiently send a large, read-only value to all the worker nodes for use in one or more Spark operations. They come in handy, for example, if your application needs to send a large, read-only lookup table to all the nodes, or even a large feature vector in a machine learning algorithm. The value is sent to each node only once, using an efficient, BitTorrent-like communication mechanism.

When Spark runs a stage, it creates a binary representation of all the information needed to run tasks in that stage, called the closure of the function that needs to be executed. This closure includes all the data structures on the driver referenced in the function. Spark distributes it to every executor on the cluster.
Broadcast variables are useful in situations where many tasks need access to the same (immutable) data structure. They extend normal handling of task closures to enable:
  • Caching data as raw Java objects on each executor, so they need not be deserialized for each task
  • Caching data across multiple jobs and stages
Each Executor contains a BlockManager to mange its broadcast data. Keep public data in BlockManager can guarantee the tasks in an executor can share data.

Broadcast Values:

A local Map on the driver will be copied automatically with every task. Since many tasks execute in one JVM, it's wasteful to send and store so many copies of the data.
Instead, a broadcast variable makes Spark send and hold in memory just one copy for each machine in the cluster. When there are hundreds of executors and many execute in parallel on each machine, this can save significant network traffic and memory.


The process of using broadcast variables is simple:

  1. Create a Broadcast[T] by calling SparkContext.broadcast on an object of type T. Any type works as long as it is also Serializable.
  2. Access its value with the value property (or value() method in Java).
  3. The variable will be sent to each node only once, and should be treated as read-only (updates will not be propagated to other nodes).
Below example broad a list as lookup dict. Then, look up xdf in the filter function.


val bc = sc.broadcast(Array[String]("login3", "login4"))
val xdf = sqlContext.createDataFrame(
 Array(("login1", 192), ("login2", 146), ("login3", 72))
 ).toDF("name", "cnt")

val func: (String => Boolean) = 
 (arg: String) => bc.value.contains(arg)
val sqlfunc = udf(func)

val filtered = xdf.filter(sqlfunc(col("name")))

Broadcast Hash Join: 


/* Marks a DataFrame as small enough for use in broadcast joins.
*
* The following example marks the right DataFrame for broadcast hash join using `joinKey`.
* {{{
* // left and right are DataFrames
* left.join(broadcast(right), "joinKey")
* }}}
*/
def broadcast(df: DataFrame): DataFrame = {
DataFrame(df.sqlContext, BroadcastHint(df.logicalPlan))
}




Reference:

No comments:

Post a Comment