Thursday, 18 June 2015

Tuning and Deguging Spark

1. Components of Execution:

The following phases occur during Spark execution:
User code defines a DAG (directed acyclic graph) of RDDs
Operations on RDDs create new RDDs that refer back to their parents, thereby creating a graph.
Actions force translation of the DAG to an execution plan
When you call an action on an RDD it must be computed. This requires computing its parent RDDs as well. Spark’s scheduler submits a job to compute all needed RDDs. That job will have one or more stages, which are parallel waves of computation composed of tasksEach stage will correspond to one or more RDDs in the DAG. A single stage can correspond to multiple RDDs due to pipelining.
Tasks are scheduled and executed on a cluster
Stages are processed in order, with individual tasks launching to compute segments of the RDD. Once the final stage is finished in a job, the action is complete.
In a given Spark application, this entire sequence of steps may occur many times in a continuous fashion as new RDDs are created.

2. Parallelism

An RDD is divided into a set of partitions with each partition containing some subset of the total data. When Spark schedules and runs tasks, it creates a single task for data stored in one partition, and that task will require, by default, a single core in the cluster to execute.
Input RDDs typically choose parallelism based on the underlying storage systems. For example, HDFS input RDDs have one partition for each block of the underlying HDFS file. RDDs that are derived from shuffling other RDDs will have parallelism set based on the size of their parent RDDs.

The degree of parallelism can affect performance in two ways.
  • First, if there is too little parallelism, Spark might leave resources idle.
  • If there is too much parallelism, small overheads associated with each partition can add up and become significant.
Spark offers two ways to tune the degree of parallelism for operations.
  • The first is that, during operations that shuffle data, you can always give a degree of parallelism for the produced RDD as a parameter. 
  • The second is that any existing RDD can be redistributed to have more or fewer partitions. 
The repartition() operator will randomly shuffle an RDD into the desired number of partitions. If you know you are shrinking the RDD, you can use the coalesce() operator; this is more efficient than repartition() since it avoids a shuffle operation. If you think you have too much or too little parallelism, it can help to redistribute your data with these operators.

# We coalesce the lines RDD before caching
>>> lines = lines.coalesce(5).cache()
>>> lines.getNumPartitions()
3. Serialization Format

When Spark is transferring data over the network or spilling data to disk, it needs to serialize objects into a binary format. This comes into play during shuffle operations, where potentially large amounts of data are transferred. By default Spark will use Java’s built-in serializer. Spark also supports the use of Kryo, a third-party serialization library that improves on Java’s serialization by offering both faster serialization times and a more compact binary representation.

To use Kryo serialization, you can set the spark.serializer setting to org.apache.spark.serializer.KryoSerializer. For best performance, you’ll also want to register classes with Kryo that you plan to serialize.

val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// Be strict about class registration
conf.set("spark.kryo.registrationRequired", "true")
conf.registerKryoClasses(Array(classOf[MyClass], classOf[MyOtherClass]))

4. Memory Management

Inside of each executor, memory is used for a few purposes:
RDD storage
When you call persist() or cache() on an RDD, its partitions will be stored in memory buffers. Spark will limit the amount of memory used when caching to a certain fraction of the JVM’s overall heap, set by If this limit is exceeded, older partitions will be dropped from memory.
Shuffle and aggregation buffers
When performing shuffle operations, Spark will create intermediate buffers for storing shuffle output data. These buffers are used to store intermediate results of aggregations in addition to buffering data that is going to be directly output as part of the shuffle. Spark will attempt to limit the total amount of memory used in shuffle-related buffers tospark.shuffle.memoryFraction.
User code
Spark executes arbitrary user code, so user functions can themselves require substantial memory. For instance, if a user application allocates large arrays or other objects, these will contend for overall memory usage. User code has access to everything left in the JVM heap after the space for RDD storage and shuffle storage are allocated.
By default Spark will leave 60% of space for RDD storage, 20% for shuffle memory, and the remaining 20% for user programs.

In addition to tweaking memory regions, you can improve certain elements of Spark’s default caching behavior for some workloads. Spark’s default cache() operation persists memory using the MEMORY_ONLY storage level. This means that if there is not enough space to cache new RDD partitions, old ones will simply be deleted and, if they are needed again, they will be recomputed. It is sometimes better to call persist() with theMEMORY_AND_DISK storage level, which instead drops RDD partitions to disk and simply reads them back to memory from a local store if they are needed again. This can be much cheaper than recomputing blocks and can lead to more predictable performance. This is particularly useful if your RDD partitions are very expensive to recompute (for instance, if you are reading data from a database).

A second improvement on the default caching policy is to cache serialized objects instead of raw Java objects, which you can accomplish using the MEMORY_ONLY_SER or MEMORY_AND_DISK_SER storage levels. Caching serialized objects will slightly slow down the cache operation due to the cost of serializing objects, but it can substantially reduce time spent on garbage collection in the JVM, since many individual records can be stored as a single serialized buffer. This is because the cost of garbage collection scales with the number of objects on the heap, not the number of bytes of data, and this caching method will take many objects and serialize them into a single giant buffer. Consider this option if you are caching large amounts of data (e.g., gigabytes) as objects and/or seeing long garbage collection pauses. Such pauses would be visible in the application UI under the GC Time column for each task.

One caveat to the “more is better” guideline is when sizing memory for executors. Using very large heap sizes can cause garbage collection pauses to hurt the throughput of a Spark job.

No comments:

Post a Comment