Sunday, 21 June 2015

Stateful Transformations in Spark Streaming

Stateful transformations are operations on DStreams that track data across time; that is, some data from previous batches is used to generate the results for a new batch. The two main types are 


  • windowed operations, which act over a sliding window of time periods, 
  • and updateStateByKey(), which is used to track state across events for each key.
Stateful transformations require checkpointing to be enabled in your StreamingContext for fault tolerance.

ssc.checkpoint("hdfs://...")

For example:
//sum the previous popularity value and current value
val updatePopularityValue = (iterator: Iterator[(String, Seq[Double], Option[Double])]) => {
  iterator.flatMap(t => {
  val newValue:Double = t._2.sum
  val stateValue:Double = t._3.getOrElse(0);
  Some(newValue + stateValue)
  }.map(sumedValue => (t._1, sumedValue)))
}
 val initialRDD = ssc.sparkContext.parallelize(List(("page1", 0.00)))
 val stateDstream = popularityData.updateStateByKey[Double](updatePopularityValue, new HashPartitioner(ssc.sparkContext.defaultParallelism), true, initialRDD)
 //set the checkpoint interval to avoid too frequently data checkpoint //which may significantly reduce operation throughput
 stateDstream.checkpoint(Duration(8*processingInterval.toInt*1000))



Windowed Transformation

All windowed operations need two parameters, window duration and sliding duration, both of which must be a multiple of the StreamingContext’s batch interval. The window duration controls how many previous batches of data are considered, namely the last windowDuration/batchInterval.
The sliding duration, which defaults to the batch interval, controls how frequently the new DStream computes results.
reduceByWindow() andreduceByKeyAndWindow() allow us to perform reductions on each window more efficiently. They take a single reduce function to run on the whole window, such as +. In addition, they have a special form that allows Spark to compute the reduction incrementally, by considering only which data is coming into the window and which data is going out. This special form requires an inverse of the reduce function, such as - for +. It is much more efficient for large windows if your function has an inverse.

val ipCountDStream = ipDStream.reduceByKeyAndWindow(
  {(x, y) => x + y}, // Adding elements in the new batches entering the window
  {(x, y) => x - y}, // Removing elements from the oldest batches exiting the window
  Seconds(30),       // Window duration 
  Seconds(10))       // Slide duration



Much like lazy evaluation in RDDs, if no output operation is applied on a DStream and any of its descendants, then those DStreams will not be evaluated. And if there are no output operations set in a StreamingContext, then the context will not start.
foreachRDD() is a generic output operation that lets us run arbitrary computations on the RDDs on the DStream. It is similar to transform() in that it gives you access to each RDD. Within foreachRDD(), we can reuse all the actions we have in Spark.





Cluster Size:
Each receiver runs as a long-running task within Spark’s executors, and hence occupies CPU cores allocated to the application. In addition, there need to be available cores for processing the data. This means that in order to run multiple receivers, you should have at least as many cores as the number of receivers, plus however many are needed to run your computation.
Do not run Spark Streaming programs locally with master configured as "local" or "local[1]". This allocates only one CPU for tasks and if a receiver is running on it, there is no resource left to process the received data. Use at least "local[2]" to have more cores.
 
Reference:
https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice2/




No comments:

Post a Comment