Sunday 25 October 2015

Optimize Shuffle in Spark

Shuffling data in a many-to-many fashion across the network is non-trivial. Compression of Map output files before they are shuffled across the network is popular in most MapReduce frameworks. Combiners begin reducing on the Map side as soon as the Map output is ready.

Shuffle in Hadoop:

When a Map task finishes, its output is first written to a buffer in memory rather than directly to disk. Only after the buffer exceeds some threshold does it spill to disk. The outputting of Map results to disk is therefore specified by two parameters: io.sort.mb, the size of the in-memory buffer, which defaults to 100MB, and io.sort.spill.percent, the threshold of the buffer before its content is spilled to disk. which defaults to 80%.
Further, the remaining data can also be compressed, enabled by setting marred.compress.map.output to true.

Although the Reduce phase is distinct from the Map phase in terms of functionality, these two stages overlap in time. During the copy phase of the Reduce task, each Map task informs the task tracker as soon as it finishes, and then pushes its output to the appropriate Reduce task. Task trackers do not delete Map output as soon as the transfer is complete, but instead keep them persisted in disk in case of the reducer fails. As in the Map phase, the Reduce phase also maintains an in-memory buffer for shuffle files.

Shuffle in Spark:

Instead of maintaining a common in-memory buffer, Spark Map tasks write their output directly to disk on completion, relying on the operating system's disk buffer cache to avoid an excess amount of disk writes. Each Map task writes one shuffle file per Reduce task, which corresponds to the logical Block in Spark.

Each map task writes R shuffle files, where R is the number of Reduce tasks. Unlike Hadoop, these are not intermediary files, as Spark doesn't merge them into a single partitioned one. In general, both M and R are often larger in Spark than in Hadoop due to Spark's lower scheduling overhead per task.
The number of shuffle files is M*R. The sheer number of shuffle files written is a major source of performance degradation.

 Spark provides the option to compress Map output files, specified by spark.shuffle.compress. The default compression.codec is Snappy. Snappy uses only 33KB of buffer for each open file, can reduce the risk of running out of memory.

A difference between Spark and Hadoop is the Reduce phase is that Spark requires all shuffled data per Reduce task to fit into memory when the Reduce task demands it. This could happen if the Reduce task involves a groupByKey, or a reduceByKey that concatenates values. When the memory required of each Reduce task exceeds what it is allocated, then an out of memory exception is thrown.

Another difference is that the Spark Reduce phase does not overlap with the Map phase. Reduce phase begins only after the Map phase has finished. In other words, shuffling in Spark is a pull operation, rather than a push operation as in Hadoop.


Bottlenecks and Solutions

Spark performance is mainly suffering from heavy random I/Os. A large number of shuffle files imposes a heavy load on the operating system.

The solution is to write fewer, larger files. This can reduce the number of random writes during the Map phase as well as the number of indoors necessary to index the shuffle blocks.

Shuffle file consolidation refers to maintaining a shuffle file for each partition, which is the same as the number of Reduce tasks R per core C, rather than per Map task M. Every machine needs to handle only C*R number of shuffle files rather than M*R.
However, due to the performance regression on ext3 on EC2, the feature is not enabled by default.
The solution is to mount ex4.

Recommended Settings:

1. Consolidate a shuffle file for each partition
--conf "spark.shuffle.consolidateFiles=true"
2. Set a more efficient serializer
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"
3.
Configures the number of partitions(tasks) to use when shuffling data for joins or aggregations.
--conf "spark.sql.shuffle.partitions=xx"
4. Compress the shuffle files by the default codec snappy
--conf "spark.shuffle.compress=true"

--conf "spark.shuffle.consolidateFiles=true"

No comments:

Post a Comment