Tuesday 1 December 2015

Caching and Broadcast in Spark

CACHING
Although the contents of RDDs are transient by default, Spark provides a mechanism for persisting the data in an RDD. After the first time an action requires computing such an RDD’s contents, they are stored in memory or disk across the cluster. The next time an action depends on the RDD, it need not be recomputed from its dependencies. Its data is returned from the cached partitions directly:
cached.cache()
cached.count()
cached.take(10)
The call to cache indicates that the RDD should be stored the next time it’s computed. The call to count computes it initially. Thetake action returns the first 10 elements of the RDD as a local Array. When take is called, it accesses the cached elements ofcached instead of recomputing them from their dependencies.
Spark defines a few different mechanisms, or StorageLevel values, for persisting RDDs. rdd.cache() is shorthand forrdd.persist(StorageLevel.MEMORY), which stores the RDD as unserialized Java objects. When Spark estimates that a partition will not fit in memory, it simply will not store it, and it will be recomputed the next time it’s needed. This level makes the most sense when the objects will be referenced frequently and/or require low-latency access, because it avoids any serialization overhead. Its drawback is that it takes up larger amounts of memory than its alternatives. Also, holding on to many small objects puts pressure on Java’s garbage collection, which can result in stalls and general slowness.
Spark also exposes a MEMORY_SER storage level, which allocates large byte buffers in memory and serializes the RDD contents into them. When we use the right format (more on this in a bit), serialized data usually takes up two to five times less space than its raw equivalent.
Spark can use disk for caching RDDs as well. The MEMORY_AND_DISK and MEMORY_AND_DISK_SER are similar to the MEMORYand MEMORY_SER storage levels, respectively. For the latter two, if a partition will not fit in memory, it is simply not stored, meaning that it must be recomputed from its dependencies the next time an action uses it. For the former, Spark spills partitions that will not fit in memory to disk.
Deciding when to cache data can be an art. The decision typically involves trade-offs between space and speed, with the specter of garbage collecting looming overhead to occasionally confound things further. In general, RDDs should be cached when they are likely to be referenced by multiple actions and are expensive to regenerate.

BROADCAST VARIABLES
When Spark runs a stage, it creates a binary representation of all the information needed to run tasks in that stage, called theclosure of the function that needs to be executed. This closure includes all the data structures on the driver referenced in the function. Spark distributes it to every executor on the cluster.
Broadcast variables are useful in situations where many tasks need access to the same (immutable) data structure. They extend normal handling of task closures to enable:
  • Caching data as raw Java objects on each executor, so they need not be deserialized for each task
  • Caching data across multiple jobs and stages
For example, consider a natural language processing application that relies on a large dictionary of English words. Broadcasting the dictionary allows transferring it to every executor only once:
val dict = ...
val bDict = sc.broadcast(dict)
...
def query(path: String) = {
  sc.textFile(path).map(l => score(l, bDict.value))
  ...
}

No comments:

Post a Comment