Sunday, 5 June 2016

Checkpoint and function defining issues in Spark Streaming

Spark 1.6.1 Issue 1: When there is existing checkpointDir, the next time assuming Spark streaming job will run into below errors.

16/06/02 16:40:15 WARN CheckpointReader: Error reading checkpoint from file file:/Users/IdeaProjects/checkpointDir/checkpoint-1464899960000
java.io.IOException: java.lang.ClassCastException: cannot assign instance of com.alvin.niagara.sparkstreaming.SparkStreamingConsumerApp$$anonfun$5 to field org.apache.spark.streaming.StateSpecImpl.function of type scala.Function4 in instance of org.apache.spark.streaming.StateSpecImpl
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1207)

    at org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:187)

16/06/07 14:45:53 ERROR StreamingContext: Error starting the context, marking it as stopped
org.apache.spark.SparkException: org.apache.spark.streaming.dstream.FilteredDStream@433af236 has not been initialized
at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:323)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)


Solution:
Put the tranformations and action logics on DStream in  def functionToCreateContext()


val context = StreamingContext.getOrCreate(checkpointDir
functionToCreateContext _)

def functionToCreateContext(): StreamingContext = {

  val ssc = new StreamingContext(sparkConf, Seconds(10))
  ssc.checkpoint(checkpointDir)
  consumeMessagesFromKafka(ssc)
  ssc
}

context.start()
context.awaitTermination()


Issue 2: 

Implement a stateful mapping(aggregation) with "mapWithState"
Below error happens when the function is defined after the method is called. 

Solution: Put the function definition  val updateState before the method call.

val spec = StateSpec.function(updateState)
val tagCounts sessions.mapWithState(spec)

val updateState = (batchTime: Time, key: String, value: Option[Int]
state: State[Int]) => {
  val sum = value.getOrElse(0) + state.getOption.getOrElse(0)
  state.update(sum)
  Some((key, sum))
}


Exception in thread "main" java.lang.NullPointerException at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:172)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.streaming.StateSpec$.function(StateSpec.scala:159)

ClosureCleaner class tries to null out references to objects not used in a closure before sending it to the cluster, because Scala's closure objects sometimes reference other variables in their scope.

Reference:
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-streaming-operators-stateful.html
http://stackoverflow.com/questions/31702456/error-in-starting-spark-streaming-context


No comments:

Post a Comment