Spark Streaming combines one or more stream consumers with a Spark transformation process
Once the interval of time is completed, the collected data blocks are given to Spark for processing.
That data is placed in blocks, implemented as arrays, and delivered to the Block Manager. These blocks become the RDD partitions that Spark will work on.
At any point in time, Spark is processing the previous batch of data, while the Streaming Consumers are collecting data for the current interval. Once a batch has been processed by Spark, it can be cleaned up.
Spark Streaming consists of two processes:
- Fetching data; done by the streaming consumer (in our case, the Kafka consumer)
- Processing the data; done by Spark
The time to process the data of a batch interval must be less than the batch interval time.
we can tune our Spark job to process that data within the time interval restriction.
- A Spark job consists of transformations and actions. It is broken down in stages of operations that can be inlined.
- An RDD acts on a distributed collection of data, broken down in partitions spread over nodes.
- A task is applying a stage to a data partition on an executor. Scheduling a task has some fixed cost.
#tasks = #stages x #partitions
From these two statements, we can infer that to minimize the processing time of a batch we need to minimize the stages and partitions and maximize parallelism. Note how interval time is not explicit in this set of equations.
To maximize the chances of data locality and even parallel execution,
Tuning guide for
Big blocks of data created with a large configured value for
Tuning guide for
Avoid using logging calls within the DStream and RDD transformations in a Spark Streaming Job. Spark is quite ‘chatty’ on the logs. Set the right log levels for your application.
The above contents are from below reference.
Reference:
http://www.virdata.com/tuning-spark/
http://www.slideshare.net/sawjd/spark-afterdark
Scaling up consumers
In order to increase the amount of messages consumed by the system we can create multiple consumers that will fetch data in parallel. Each consumer is assigned one core on an executor. This is a common pattern:
@transient val inKafkaList:List[DStream[(K,V)]] = List.fill(kafkaParallelism) { KafkaUtils.createStream[K, V, KDecoder, VDecoder](ssc, kafkaConfig, topics, StorageLevel.MEMORY_AND_DISK_SER)} @transient val inKafka = inKafkaList.tail.foldLeft(inKafkaList.head){_.union(_)}
The union
of the created DStreams is important as this reduces the number of transformation pipelines on the input DStream to one. Not doing this will multiply the number of stages by the number of consumers.Notes:
kafkaParallelism
is the (configurable) number of consumers to create- Storage level
MEMORY_AND_DISK_SER
will allow Spark Streaming to spill serialized data to disk in cases of overload, when the available memory is not sufficient to hold the incoming data- declaring the dstream references as ‘transient’ is often necessary to avoid them being serialized with the job. This would result in a serialization exception as DStreams are not supposed to be serialized.
Parallelism:
As we explained before, Spark Streaming is in fact two processes running concurrently: the data consumers and Spark. The parallelism level of the consumer side is defined by the number of consumers created (see the previous section on how to create consumers). The parallelism of the Spark processing cluster is determined by the total number of cores configured for the job minus the number of consumers.
Given the total number of cores, controlled by the configuration parameter
spark.cores.max
:- Consumer parallelism =
#consumers
created (‘kafkaParallelism’ in the previous example) - Spark parallelism =
spark.cores.max - #consumers
spark.cores.max
should be a multiple of #consumers
Partitions
As we discussed previously, reducing the number of partitions is important in order to reduce the overall processing time, as it leads to less tasks and therefore bigger chunks of data to operate on at once and less scheduling overhead.
Each receiver fetches data. That data is provided by the Receiver to its executor, a ReceiverSupervisor that takes care of managing the blocks. Each block becomes a partition of the RDD produced during each batch interval. The size of these blocks is time-bound and defined by the configuration parameter
spark.streaming.blockInterval = 200
“Interval (milliseconds) at which data received by Spark Streaming receivers is coalesced into blocks of data before storing them in Spark.”
#partitions = #consumers * batchIntervalMillis / blockInterval
Tuning guide for
spark.streaming.blockInterval
:
Increasing
Given:
spark.streaming.blockInterval
will reduce the number of partitions in the RDD and therefore, the number of tasks per batch. blockInterval
must be an integer divisor of batch interval. Following the Spark guideline of having the number of partitions roughly 2x-3x the number of available cores, we have been successfully implementing the following guideline:Given:
batchIntervalMillis = configured batch interval in milliseconds
spark.cores.max = total cores
#consumers = created streaming consumers
sparkCores = spark.cores.max - #consumers
partitionFactor = # of partitions / core (1, 2, 3,... ideally in multiples of k where sparkCores = k * #consumers)
- Then:
spark.streaming.blockInterval = batchIntervalMillis * #consumers / (partitionFactor x sparkCores)
Data Locality:
Big blocks of data created with a large configured value for spark.streaming.blockInterval
are great when they can be processed on the same node where they reside, using data locality level: NODE_LOCAL
. But they can be heavy to transport over the network if another node has idle processing capacity.
We try to improve our data locality odds by allocating ‘k
’ Spark nodes per consumer task, so that collected data can be evenly processed.Nevertheless, depending on the complexity of the Spark job defined over the DStream, some executors might decide to launch a task with a lesser locality level.
The time Spark will wait for locality is controlled by the configuration parameter: spark.locality.wait
, with a default value of 3000ms.
Tuning guide for
spark.locality.wait
:
Set this parameter to a value between 500 to 1000 ms helps lowering the total processing time of a Spark Streaming job.
Notes:
- We need to find a balance between
spark.streaming.blockInterval
andspark.locality.wait
. If we observe that tasks are taken by a non-local executor, setting a lowerspark.streaming.blockInterval
will improve the network transfer time while increasingspark.locality.wait
. will increase the chance of that task executing with data localityNODE_LOCAL
.
Caching:
dstream.foreachRDD{rdd => rdd.cache() // cache the RDD before iterating! keys.foreach{ key => rdd.filter(elem=> key(elem) == key).saveAsFooBar(...)} rdd.unpersist() }
Using
rdd.cache
speeds up the process considerably. Like in Spark, while the first cycle takes the same time as the uncached version, each subsequent iteration takes only a fraction of the time.
Use if the Streaming job involves iterating over the dstream or RDDs.
Note:In case of window operations, DStreams are implicitly cached as the RDDs are preserved beyond the limits of a single batch interval.
Note:In case of window operations, DStreams are implicitly cached as the RDDs are preserved beyond the limits of a single batch interval.
logging
: Avoid using logging calls within the DStream and RDD transformations in a Spark Streaming Job. Spark is quite ‘chatty’ on the logs. Set the right log levels for your application.
The above contents are from below reference.
Reference:
http://www.virdata.com/tuning-spark/
http://www.slideshare.net/sawjd/spark-afterdark
No comments:
Post a Comment