Tuesday, 24 February 2015

Tuning Spark Streaming


Spark Streaming combines one or more stream consumers with a Spark transformation process
Once the interval of time is completed, the collected data blocks are given to Spark for processing.
That data is placed in blocks, implemented as arrays, and delivered to the Block Manager. These blocks become the RDD partitions that Spark will work on.
At any point in time, Spark is processing the previous batch of data, while the Streaming Consumers are collecting data for the current interval. Once a batch has been processed by Spark, it can be cleaned up. 


Spark Streaming consists of two processes:
  • Fetching data; done by the streaming consumer (in our case, the Kafka consumer)
  • Processing the data; done by Spark
These two processes are connected by the timely delivery of collected data blocks from Spark Streaming to Spark.
The time to process the data of a batch interval must be less than the batch interval time.
we can tune our Spark job to process that data within the time interval restriction.


  • A Spark job consists of transformations and actions. It is broken down in stages of operations that can be inlined.
  • An RDD acts on a distributed collection of data, broken down in partitions spread over nodes.
  • A task is applying a stage to a data partition on an executor. Scheduling a task has some fixed cost.
processing time ~= #tasks * scheduling cost + #tasks * time-complexity per task / parallelism level
#tasks = #stages x #partitions

From these two statements, we can infer that to minimize the processing time of a batch we need to minimize the stages and partitions and maximize parallelism. Note how interval time is not explicit in this set of equations.

Scaling up consumers

In order to increase the amount of messages consumed by the system we can create multiple consumers that will fetch data in parallel. Each consumer is assigned one core on an executor. This is a common pattern:
@transient val inKafkaList:List[DStream[(K,V)]] = List.fill(kafkaParallelism) {
KafkaUtils.createStream[K, V, KDecoder, VDecoder](ssc, kafkaConfig, topics, StorageLevel.MEMORY_AND_DISK_SER)}

@transient val inKafka = inKafkaList.tail.foldLeft(inKafkaList.head){_.union(_)}

The union of the created DStreams is important as this reduces the number of transformation pipelines on the input DStream to one. Not doing this will multiply the number of stages by the number of consumers.
Notes:
  • kafkaParallelism is the (configurable) number of consumers to create
  • Storage level MEMORY_AND_DISK_SER will allow Spark Streaming to spill serialized data to disk in cases of overload, when the available memory is not sufficient to hold the incoming data
  • declaring the dstream references as ‘transient’ is often necessary to avoid them being serialized with the job. This would result in a serialization exception as DStreams are not supposed to be serialized.

Parallelism:


As we explained before, Spark Streaming is in fact two processes running concurrently: the data consumers and Spark. The parallelism level of the consumer side is defined by the number of consumers created (see the previous section on how to create consumers). The parallelism of the Spark processing cluster is determined by the total number of cores configured for the job minus the number of consumers.

Given the total number of cores, controlled by the configuration parameter spark.cores.max:

  • Consumer parallelism = #consumers created (‘kafkaParallelism’ in the previous example)
  • Spark parallelism = spark.cores.max - #consumers
To maximize the chances of data locality and even parallel execution, spark.cores.max should be a multiple of #consumers

Partitions

As we discussed previously, reducing the number of partitions is important in order to reduce the overall processing time, as it leads to less tasks and therefore bigger chunks of data to operate on at once and less scheduling overhead.
Each receiver fetches data. That data is provided by the Receiver to its executor, a ReceiverSupervisor that takes care of managing the blocks. Each block becomes a partition of the RDD produced during each batch interval. The size of these blocks is time-bound and defined by the configuration parameter 
spark.streaming.blockInterval = 200
“Interval (milliseconds) at which data received by Spark Streaming receivers is coalesced into blocks of data before storing them in Spark.” 
#partitions = #consumers * batchIntervalMillis / blockInterval

Tuning guide for 
spark.streaming.blockInterval:
Increasing spark.streaming.blockInterval will reduce the number of partitions in the RDD and therefore, the number of tasks per batch. blockInterval must be an integer divisor of batch interval. Following the Spark guideline of having the number of partitions roughly 2x-3x the number of available cores, we have been successfully implementing the following guideline:
Given:
  • batchIntervalMillis = configured batch interval in milliseconds
  • spark.cores.max = total cores
  • #consumers = created streaming consumers
  • sparkCores = spark.cores.max - #consumers
  • partitionFactor = # of partitions / core (1, 2, 3,... ideally in multiples of k where sparkCores = k * #consumers)
  • Then:
    spark.streaming.blockInterval = batchIntervalMillis * #consumers / (partitionFactor x sparkCores)

Data Locality:

Big blocks of data created with a large configured value for spark.streaming.blockInterval are great when they can be processed on the same node where they reside, using data locality level: NODE_LOCAL. But they can be heavy to transport over the network if another node has idle processing capacity.
We try to improve our data locality odds by allocating ‘k’ Spark nodes per consumer task, so that collected data can be evenly processed.Nevertheless, depending on the complexity of the Spark job defined over the DStream, some executors might decide to launch a task with a lesser locality level.
The time Spark will wait for locality is controlled by the configuration parameter: spark.locality.wait, with a default value of 3000ms.

Tuning guide for spark.locality.wait:
Set this parameter to a value between 500 to 1000 ms helps lowering the total processing time of a Spark Streaming job.

Notes:

  • We need to find a balance between spark.streaming.blockInterval and spark.locality.wait. If we observe that tasks are taken by a non-local executor, setting a lower spark.streaming.blockInterval will improve the network transfer time while increasing spark.locality.wait. will increase the chance of that task executing with data locality NODE_LOCAL.

Caching:

dstream.foreachRDD{rdd  =>

rdd.cache() // cache the RDD before iterating!

keys.foreach{ key =>

rdd.filter(elem=> key(elem) == key).saveAsFooBar(...)}

rdd.unpersist()
} 

Using rdd.cache speeds up the process considerably. Like in Spark, while the first cycle takes the same time as the uncached version, each subsequent iteration takes only a fraction of the time.
Use if the Streaming job involves iterating over the dstream or RDDs.
Note:In case of window operations, DStreams are implicitly cached as the RDDs are preserved beyond the limits of a single batch interval.
Tuning guide for logging: 
Avoid using logging calls within the DStream and RDD transformations in a Spark Streaming Job. Spark is quite ‘chatty’ on the logs. Set the right log levels for your application.

The above contents are from below reference.


Reference:
http://www.virdata.com/tuning-spark/
http://www.slideshare.net/sawjd/spark-afterdark

No comments:

Post a Comment