Thursday 16 July 2015

Tuning Spark Job Performance on Execution Plan

A Spark application consists of a driver process, which in spark-shell’s case, is the process that the user is interacting with, and a set of executor processes scattered across nodes on the cluster. The driver is in charge of the high-level control flow of work that needs to be done. The executor processes are responsible for executing this work, in the form of tasks, as well as for storing any data that the user chooses to cache. Both the driver and the executors typically stick around for the entire time the application is running. A single executor has a number of slots for running tasks, and will run many concurrently throughout its lifetime.

At the top of the execution model are jobs. Invoking an action inside a Spark application triggers the launch of a Spark job to fulfill it. To decide what this job looks like, Spark examines the graph of RDDs that the action depends on and formulates an execution plan that starts with computing the farthest back RDDs and culminates in the RDDs required to produce the action’s results.


Stages:

The execution plan consists of assembling the job’s transformations into stages. A stage corresponds to a collection of tasksthat all execute the same code, each on a different partition of the data. Each stage contains a sequence of transformations that can be completed without shuffling the full data.

However, Spark also supports transformations with wide dependencies such as groupByKey and reduceByKey. In these dependencies, the data required to compute the records in a single partition may reside in many partitions of the parent RDD. All of the tuples with the same key must end up in the same partition, processed by the same task. To satisfy these operations, Spark must execute a shuffle, which transfers data around the cluster and results in a new stage with a new set of partitions.

At each stage boundary, data is written to disk by tasks in the parent stage and then fetched over the network by tasks in the child stage. Thus, stage boundaries can be expensive and should be avoided when possible. The number of data partitions in the parent stage may be different than the number of partitions in the child stage.


Shuffle:

What determines whether data needs to be shuffled? For the RDDs returned by so-called narrow transformations like map, the data required to compute a single partition resides in a single partition in the parent RDD. Each object is only dependent on a single object in the parent. However, Spark also supports transformations with wide dependencies like groupByKey and reduceByKey. In these, the data required to compute a single partition may reside in many partitions of the parent RDD. All of the tuples with the same key must end up in the same partition. To satisfy these operations, Spark must execute a shuffle, which transfers data around the cluster and results in a new stage with a new set of partitions.

The primary goal when choosing an arrangement of operators is to reduce the number of shuffles and the amount of data shuffled. This is because shuffles are fairly expensive operations; all shuffle data must be written to disk and then transferred over the network. repartition , join, cogroup, and any of the *By or *ByKey transformations can result in shuffles. Not all these operations are equal, however, and a few of the most common performance pitfalls for novice Spark developers arise from picking the wrong one:

Avoid groupByKey when performing an associative reductive operation. For example, rdd.groupByKey().mapValues(_.sum) will produce the same results as rdd.reduceByKey(_ + _). However, the former will transfer the entire dataset across the network, while the latter will compute local sums for each key in each partition and combine those local sums into larger sums after shuffling.
Avoid reduceByKey When the input and output value types are different. For example, consider writing a transformation that finds all the unique strings corresponding to each key. One way would be to use map to transform each element into a Set and then combine the Sets with reduceByKey.
It’s better to use aggregateByKey, which performs the map-side aggregation more efficiently

Avoid the flatMap-join-groupBy pattern. When two datasets are already grouped by key and you want to join them and keep them grouped, you can just use cogroup. That avoids all the overhead associated with unpacking and repacking the groups.

One way to avoid shuffles when joining two datasets is to take advantage of broadcast variables. When one of the datasets is small enough to fit in memory in a single executor, it can be loaded into a hash table on the driver and then broadcast to every executor. A map transformation can then reference the hash table to do lookups.


Reference:
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-1/

No comments:

Post a Comment