Checkpoint:
Checkpointing is the main
mechanism that needs to be set up for fault tolerance in Spark Streaming. It allows Spark Streaming to periodically save data about the application to a reliable storage system, such as HDFS or Amazon S3, for use in recovering. Specifically, checkpointing serves two purposes:
Limiting the state that must be recomputed on failure. Spark Streaming can recompute state using the lineage graph of transformations, but checkpointing controls how far back it must go.
Providing fault tolerance for the driver. If the driver program in a streaming application crashes, you can launch it again and tell it to recover from a checkpoint, in which case Spark Streaming will read how far the previous run of the program got in processing the data and take over from there.
For these reasons, checkpointing is important to set up in any production streaming application.
ssc
.
checkpoint
(
"hdfs://..."
)
Driver Fault Tolerance
Tolerating failures of the driver node requires a special way of creating our StreamingContext, which takes in the checkpoint directory.
def
createStreamingContext
()
=
{
...
val
sc
=
new
SparkContext
(
conf
)
// Create a StreamingContext with a 1 second batch size
val
ssc
=
new
StreamingContext
(
sc
,
Seconds
(
1
))
ssc
.
checkpoint
(
checkpointDir
)
}
...
val
ssc
=
StreamingContext
.
getOrCreate
(
checkpointDir
,
createStreamingContext
_
)
After the driver fails, if you restart it and run this code again, getOrCreate()
will reinitialize a StreamingContext from the checkpoint directory and resume processing.
On most cluster managers, Spark does not automatically relaunch the driver if it crashes, so you need to monitor it using a tool like monit
and restart it.
Finally, note that when the driver crashes, executors in Spark will also restart. As the executors are not able to continue processing data without a driver. Your relaunched driver will start new executors to pick up where it left off.
Performance:
Batch and Window Sizes
The most common question is what minimum batch
size Spark Streaming can use. In general, 500 milliseconds has proven to be a good minimum size for many applications. The best approach is to start with a larger batch size (around 10 seconds) and work your way down to a smaller batch size. If the processing times reported in the
Streaming UI remain consistent, then you can continue to decrease the batch size, but if they are increasing you may have reached the limit for your application.
In a similar way, for windowed operations, the interval at which you compute a result (i.e., the slide interval) has a big impact on performance. Consider increasing this interval for expensive computations if it is a bottleneck.
Level of Parallelism
A common way to reduce the processing time of batches is to increase the parallelism.
There are three ways to increase the parallelism:
- Increasing the number of receivers
Receivers can sometimes act as a bottleneck if
there are too many records for a single machine to read in and distribute. You can add more receivers by creating multiple input DStreams (which creates multiple receivers), and then applying
union
to merge them into a single stream.
- Explicitly repartitioning received data
If receivers
cannot be increased anymore, you can further redistribute the received data by explicitly
repartitioning the input stream (or the union of multiple streams) using
DStream.repartition
.
- Increasing parallelism in aggregation
For operations like reduceByKey()
, you can specify the parallelism as a second parameter, as already discussed for RDDs.
Garbage Collection and Memory Usage
Another aspect that can cause problems is Java’s garbage collection.
You can minimize unpredictably large pauses due to GC by
enabling Java’s Concurrent Mark-Sweep garbage collector. The Concurrent Mark-Sweep garbage collector does consume more resources overall, but introduces fewer pauses.
We can control the GC by adding
-XX:+UseConcMarkSweepGC
to the
spark.executor.extraJavaOptions
configuration parameter.
spark-submit --conf spark.executor.extraJavaOptions=
-XX:+UseConcMarkSweepGC App.jar
In addition to using a garbage collector less likely to introduce pauses, you can make a big difference by reducing GC pressure. Caching RDDs in
serialized form (instead of as native objects) also reduces GC pressure, which is why, by default, RDDs generated by Spark Streaming are stored in serialized form. Using Kryo serialization further reduces the memory required for the in-memory representation of cached data.
Spark also allows us to control how cached/persisted RDDs are evicted from the cache. By default Spark uses an LRU cache. Spark will also explicitly evict RDDs older than a certain time period if you setspark.cleaner.ttl
. By preemptively evicting RDDs that we are unlikely to need from the cache, we may be able to reduce the GC pressure.
No comments:
Post a Comment