The updateStateByKey
operation allows you to maintain arbitrary state while continuously updating it with new information. To use this, you will have to do two steps.
- Define the state - The state can be an arbitrary data type.
- Define the state update function - Specify with a function how to update the state using the previous state and the new values from an input stream.
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = newValues.sum + runningCount.getOrElse(0)
Some(newCount)
}
This is applied on a DStream containing words (say, the pairs
DStream containing (word, 1)
pairs in the earlier example).
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
The update function will be called for each word, with newValues
having a sequence of 1’s (from the (word, 1)
pairs) and the runningCount
having the previous count.
Checkpoint is necessary in some stateful transformations that combine data across multiple batches. In such transformations, the generated RDDs depend on RDDs of previous batches, which causes the length of the dependency chain to keep increasing with time.
Checkpointing must be enabled for applications with any of the following requirements:
- Usage of stateful transformations - If either
updateStateByKey
or reduceByKeyAndWindow
(with inverse function) is used in the application, then the checkpoint directory must be provided to allow for periodic RDD checkpointing.
- Recovering from failures of the driver running the application - Metadata checkpoints are used to recover with progress information.
Recreate a StreamingContext from checkpoint or create a new one
def createContext(): StreamingContext = {
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Milliseconds(Interval))
KafkaStreaming(sc, ssc, settings, pipeName)
ssc.checkpoint(checkpoint)
ssc
}
val hadoopConf = new Configuration()
StreamingContext.getOrCreate(checkpoint, createContext _, hadoopConf)
Reference:
updateStateByKey
operation allows you to maintain arbitrary state while continuously updating it with new information. To use this, you will have to do two steps.def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = newValues.sum + runningCount.getOrElse(0)
Some(newCount)
}
This is applied on a DStream containing words (say, the
pairs
DStream containing (word, 1)
pairs in the earlier example).val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
The update function will be called for each word, with
newValues
having a sequence of 1’s (from the (word, 1)
pairs) and the runningCount
having the previous count.
Checkpoint is necessary in some stateful transformations that combine data across multiple batches. In such transformations, the generated RDDs depend on RDDs of previous batches, which causes the length of the dependency chain to keep increasing with time.
Checkpointing must be enabled for applications with any of the following requirements:
- Usage of stateful transformations - If either
updateStateByKey
orreduceByKeyAndWindow
(with inverse function) is used in the application, then the checkpoint directory must be provided to allow for periodic RDD checkpointing. - Recovering from failures of the driver running the application - Metadata checkpoints are used to recover with progress information.
def createContext(): StreamingContext = { val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Milliseconds(Interval)) KafkaStreaming(sc, ssc, settings, pipeName) ssc.checkpoint(checkpoint) ssc } val hadoopConf = new Configuration() StreamingContext.getOrCreate(checkpoint, createContext _, hadoopConf)
Reference:
No comments:
Post a Comment