1. Components of Execution:
- User code defines a DAG (directed acyclic graph) of RDDs
- Operations on RDDs create new RDDs that refer back to their parents, thereby creating a graph.
- Actions force translation of the DAG to an execution plan
- When you call an action on an RDD it must be computed. This requires computing its parent RDDs as well. Spark’s scheduler submits a job to compute all needed RDDs. That job will have one or more stages, which are parallel waves of computation composed of tasks. Each stage will correspond to one or more RDDs in the DAG. A single stage can correspond to multiple RDDs due to pipelining.
- Tasks are scheduled and executed on a cluster
- Stages are processed in order, with individual tasks launching to compute segments of the RDD. Once the final stage is finished in a job, the action is complete.
In a given Spark application, this entire sequence of steps may occur many times in a continuous fashion as new RDDs are created.
2. Parallelism
An RDD is divided into a set of partitions with each partition containing some subset of the total data. When Spark schedules and runs tasks, it creates a single task for data stored in one partition, and that task will require, by default, a single core in the cluster to execute.
Input RDDs typically choose parallelism based on the underlying storage systems. For example, HDFS input RDDs have one partition for each block of the underlying HDFS file. RDDs that are derived from shuffling other RDDs will have parallelism set based on the size of their parent RDDs.
The degree of parallelism can affect performance in two ways.
When Spark is transferring data over the network or spilling data to disk, it needs to serialize objects into a binary format. This comes into play during shuffle operations, where potentially large amounts of data are transferred. By default Spark will use Java’s built-in serializer. Spark also supports the use of Kryo, a third-party serialization library that improves on Java’s serialization by offering both faster serialization times and a more compact binary representation.
To use Kryo serialization, you can set the
4. Memory Management
In addition to tweaking memory regions, you can improve certain elements of Spark’s default caching behavior for some workloads. Spark’s default cache() operation persists memory using the MEMORY_ONLY storage level. This means that if there is not enough space to cache new RDD partitions, old ones will simply be deleted and, if they are needed again, they will be recomputed. It is sometimes better to call persist() with theMEMORY_AND_DISK storage level, which instead drops RDD partitions to disk and simply reads them back to memory from a local store if they are needed again. This can be much cheaper than recomputing blocks and can lead to more predictable performance. This is particularly useful if your RDD partitions are very expensive to recompute (for instance, if you are reading data from a database). 2. Parallelism
An RDD is divided into a set of partitions with each partition containing some subset of the total data. When Spark schedules and runs tasks, it creates a single task for data stored in one partition, and that task will require, by default, a single core in the cluster to execute.
Input RDDs typically choose parallelism based on the underlying storage systems. For example, HDFS input RDDs have one partition for each block of the underlying HDFS file. RDDs that are derived from shuffling other RDDs will have parallelism set based on the size of their parent RDDs.
The degree of parallelism can affect performance in two ways.
- First, if there is too little parallelism, Spark might leave resources idle.
- If there is too much parallelism, small overheads associated with each partition can add up and become significant.
- The first is that, during operations that shuffle data, you can always give a degree of parallelism for the produced RDD as a parameter.
- The second is that any existing RDD can be redistributed to have more or fewer partitions.
repartition()
operator will randomly shuffle an RDD into the
desired number of partitions. If you know you are shrinking the RDD, you can use the
coalesce()
operator; this is more efficient than
repartition()
since it avoids a shuffle operation. If you think you
have too much or too little parallelism, it can help to redistribute your data
with these operators.# We coalesce the lines RDD before caching
>>>
lines
=
lines
.
coalesce
(
5
)
.
cache
()
>>>
lines
.
getNumPartitions
()
4
3. Serialization FormatWhen Spark is transferring data over the network or spilling data to disk, it needs to serialize objects into a binary format. This comes into play during shuffle operations, where potentially large amounts of data are transferred. By default Spark will use Java’s built-in serializer. Spark also supports the use of Kryo, a third-party serialization library that improves on Java’s serialization by offering both faster serialization times and a more compact binary representation.
To use Kryo serialization, you can set the
spark.serializer
setting
to org.apache.spark.serializer.KryoSerializer
. For best performance,
you’ll also want to register classes with Kryo that you plan to serialize.val
conf
=
new
SparkConf
()
conf
.
set
(
"spark.serializer"
,
"org.apache.spark.serializer.KryoSerializer"
)
// Be strict about class registration
conf
.
set
(
"spark.kryo.registrationRequired"
,
"true"
)
conf
.
registerKryoClasses
(
Array
(
classOf
[
MyClass
],
classOf
[
MyOtherClass
]))
4. Memory Management
Inside of each executor, memory is used for a few purposes:
RDD storage
- When you call
persist()
orcache()
on an RDD, its partitions will be stored in memory buffers. Spark will limit the amount of memory used when caching to a certain fraction of the JVM’s overall heap, set byspark.storage.memoryFraction
. If this limit is exceeded, older partitions will be dropped from memory. - Shuffle and aggregation buffers
- When performing shuffle operations, Spark will create intermediate buffers for storing shuffle output data. These buffers are used to store intermediate results of aggregations in addition to buffering data that is going to be directly output as part of the shuffle. Spark will attempt to limit the total amount of memory used in shuffle-related buffers to
spark.shuffle.memoryFraction
. - User code
- Spark executes arbitrary user code, so user functions can themselves require substantial memory. For instance, if a user application allocates large arrays or other objects, these will contend for overall memory usage. User code has access to everything left in the JVM heap after the space for RDD storage and shuffle storage are allocated.
No comments:
Post a Comment