Sunday 18 October 2015

Tuning Spark Job on Resource Allocation and Parallelism

How to allocate resources?

Here’s a worked example of configuring a Spark app to use as much of the cluster as possible: Imagine a cluster with six nodes running NodeManagers, each equipped with 16 cores and 64GB of memory. The NodeManager capacities, yarn.nodemanager.resource.memory-mb and yarn.nodemanager.resource.cpu-vcores, should probably be set to 63 * 1024 = 64512 (megabytes) and 15 respectively. We avoid allocating 100% of the resources to YARN containers because the node needs some resources to run the OS and Hadoop daemons. In this case, we leave a gigabyte and a core for these system processes. Cloudera Manager helps by accounting for these and configuring these YARN properties automatically.

The likely first impulse would be to use --num-executors 6 --executor-cores 15 --executor-memory 63G. However, this is the wrong approach because:

63GB + the executor memory overhead won’t fit within the 63GB capacity of the NodeManagers.
The application master will take up a core on one of the nodes, meaning that there won’t be room for a 15-core executor on that node.
15 cores per executor can lead to bad HDFS I/O throughput.
A better option would be to use --num-executors 17 --executor-cores 5 --executor-memory 19G. Why?

This config results in three executors on all nodes except for the one with the AM, which will have two executors.
--executor-memory was derived as (63/3 executors per node) = 21.  21 * 0.07 = 1.47.  21 – 1.47 ~ 19.


How to improve parallelism?

Every Spark stage has a number of tasks, each of which processes data sequentially. In tuning Spark jobs, this number is probably the single most important parameter in determining performance.
The number of tasks in a stage is the same as the number of partitions in the last RDD in the stage. The number of partitions in an RDD is the same as the number of partitions in the RDD on which it depends, with a couple exceptions: thecoalesce transformation allows creating an RDD with fewer partitions than its parent RDD, the union transformation creates an RDD with the sum of its parents’ number of partitions, and cartesian creates an RDD with their product.

To determine the number of partitions in an RDD, you can always call rdd.partitions().size().

The primary concern is that the number of tasks will be too small. If there are fewer tasks than slots available to run them in, the stage won’t be taking advantage of all the CPU available.

To determine the number of partitions in an RDD, you can always call rdd.partitions().size().

The primary concern is that the number of tasks will be too small. If there are fewer tasks than slots available to run them in, the stage won’t be taking advantage of all the CPU available.

When the records destined for these aggregation operations do not easily fit in memory, some mayhem can ensue.
First, holding many records in these data structures puts pressure on garbage collection, which can lead to pauses down the line.
Second, when the records do not fit in memory, Spark will spill them to disk, which causes disk I/O and sorting. This overhead during large shuffles is probably the number one cause of job stalls.

Choosing too few partitions can result in slowness when each task is forced to handle too much data. The amount of time it takes a task to complete often increases nonlinearly with the size of the data assigned to it, because aggregation operations must spill to disk when their data does not fit in memory.
On the other side, a large number of partitions leads to increased overhead in tasks on the parent side when sorting records by their target partition, as well as more of the overhead associated with scheduling and launching each task on the child side.


So how do you increase the number of partitions? If the stage in question is reading from Hadoop, your options are:

  • Use the repartition transformation, which will trigger a shuffle.
  • Configure your InputFormat to create more splits.
  • Write the input data out to HDFS with a smaller block size.
  • If the stage is getting its input from another stage, the transformation that triggered the stage boundary will accept a numPartitions argument, such as

val rdd2 = rdd1.reduceByKey(_ + _, numPartitions = X)

The most straightforward way to tune the number of partitions is experimentation: Look at the number of partitions in the parent RDD and then keep multiplying that by 1.5 until performance stops improving. The main goal is to run enough tasks so that the data destined for each task fits in the memory available to that task.

Reference:
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

No comments:

Post a Comment