16/06/02 16:40:15 WARN CheckpointReader: Error reading checkpoint from file file:/Users/IdeaProjects/checkpointDir/checkpoint-1464899960000
java.io.IOException: java.lang.ClassCastException: cannot assign instance of com.alvin.niagara.sparkstreaming.SparkStreamingConsumerApp$$anonfun$5 to field org.apache.spark.streaming.StateSpecImpl.function of type scala.Function4 in instance of org.apache.spark.streaming.StateSpecImpl
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1207)
at org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:187)
16/06/07 14:45:53 ERROR StreamingContext: Error starting the context, marking it as stopped
org.apache.spark.SparkException: org.apache.spark.streaming.dstream.FilteredDStream@433af236 has not been initialized
at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:323)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
Solution:
Put the tranformations and action logics on
DStream
in def functionToCreateContext()val context = StreamingContext.getOrCreate(checkpointDir,
functionToCreateContext _)
def functionToCreateContext(): StreamingContext = { val ssc = new StreamingContext(sparkConf, Seconds(10)) ssc.checkpoint(checkpointDir) consumeMessagesFromKafka(ssc) ssc }context.start() context.awaitTermination()
Issue 2:
Implement a stateful mapping(aggregation) with "mapWithState"
Below error happens when the function is defined after the method is called.
Solution: Put the function definition val updateState before the method call.
val spec = StateSpec.function(updateState)
val tagCounts = sessions.mapWithState(spec)
val tagCounts = sessions.mapWithState(spec)
val updateState = (batchTime: Time, key: String, value: Option[Int],
state: State[Int]) => { val sum = value.getOrElse(0) + state.getOption.getOrElse(0) state.update(sum) Some((key, sum)) }
Exception in thread "main" java.lang.NullPointerException at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:172)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.streaming.StateSpec$.function(StateSpec.scala:159)
ClosureCleaner class tries to null out references to objects not used in a closure before sending it to the cluster, because Scala's closure objects sometimes reference other variables in their scope.
Reference:
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-streaming-operators-stateful.html
http://stackoverflow.com/questions/31702456/error-in-starting-spark-streaming-context
http://stackoverflow.com/questions/31702456/error-in-starting-spark-streaming-context
No comments:
Post a Comment