Monday 20 July 2015

Incremental Updated State in Spark Streaming

The updateStateByKey operation allows you to maintain arbitrary state while continuously updating it with new information. To use this, you will have to do two steps.
  1. Define the state - The state can be an arbitrary data type.
  2. Define the state update function - Specify with a function how to update the state using the previous state and the new values from an input stream.
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = newValues.sum + runningCount.getOrElse(0)
    Some(newCount)
}

This is applied on a DStream containing words (say, the pairs DStream containing (word, 1) pairs in the earlier example).
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

The update function will be called for each word, with newValues having a sequence of 1’s (from the (word, 1) pairs) and the runningCounthaving the previous count. 

Checkpoint is necessary in some stateful transformations that combine data across multiple batches. In such transformations, the generated RDDs depend on RDDs of previous batches, which causes the length of the dependency chain to keep increasing with time.


Checkpointing must be enabled for applications with any of the following requirements:
  • Usage of stateful transformations - If either updateStateByKey or reduceByKeyAndWindow (with inverse function) is used in the application, then the checkpoint directory must be provided to allow for periodic RDD checkpointing.
  • Recovering from failures of the driver running the application - Metadata checkpoints are used to recover with progress information.

Recreate a StreamingContext from checkpoint or create a new one

  def createContext(): StreamingContext = {
      val sc = new SparkContext(conf)
      val ssc =  new StreamingContext(sc, Milliseconds(Interval))

      KafkaStreaming(sc, ssc, settings, pipeName) 
      ssc.checkpoint(checkpoint)
      ssc
    }

    val hadoopConf = new Configuration()
    StreamingContext.getOrCreate(checkpoint, createContext _, hadoopConf)



Reference:

No comments:

Post a Comment