When Spark runs a stage, it creates a binary representation of all the information needed to run tasks in that stage, called the closure of the function that needs to be executed. This closure includes all the data structures on the driver referenced in the function. Spark distributes it to every executor on the cluster.
Broadcast variables are useful in situations where many tasks need access to the same (immutable) data structure. They extend normal handling of task closures to enable:
- Caching data as raw Java objects on each executor, so they need not be deserialized for each task
- Caching data across multiple jobs and stages
Broadcast Values:
A local Map on the driver will be copied automatically with every task. Since many tasks execute in one JVM, it's wasteful to send and store so many copies of the data.
Instead, a broadcast variable makes Spark send and hold in memory just one copy for each machine in the cluster. When there are hundreds of executors and many execute in parallel on each machine, this can save significant network traffic and memory.
The process of using broadcast variables is simple:
- Create a
Broadcast[T]
by callingSparkContext.broadcast
on an object of typeT
. Any type works as long as it is alsoSerializable
. - Access its value with the
value
property (orvalue()
method in Java). - The variable will be sent to each node only once, and should be treated as read-only (updates will not be propagated to other nodes).
Below example broad a list as lookup dict. Then, look up xdf in the filter function.
val bc = sc.broadcast(Array[String]("login3", "login4")) val xdf = sqlContext.createDataFrame( Array(("login1", 192), ("login2", 146), ("login3", 72)) ).toDF("name", "cnt") val func: (String => Boolean) = (arg: String) => bc.value.contains(arg) val sqlfunc = udf(func) val filtered = xdf.filter(sqlfunc(col("name")))
Broadcast Hash Join:
/* Marks a DataFrame as small enough for use in broadcast joins. * * The following example marks the right DataFrame for broadcast hash join using `joinKey`. * {{{ * // left and right are DataFrames * left.join(broadcast(right), "joinKey") * }}} */
def broadcast(df: DataFrame): DataFrame = { DataFrame(df.sqlContext, BroadcastHint(df.logicalPlan)) }
Reference:
No comments:
Post a Comment