Sunday 27 December 2015

Recovering Spark Streaming Failure with Checkpoint and WAL

Recover driver failing with checkpoint

If executor fails, receiver is lost and all blocks are lost. Tasks and receivers restarted by Spark
automatically, no config needed.

When the driver fails, all the executors fail. All computation, all received blocks are lost.
Periodically save the DAG of DStreams to fault-tolerant storage, HDFS, S3, etc. Failed driver can be restarted from checkpoint information.

1. Configure automatic driver restart

  • Spark Standalone: Use spark-submit with “cluster” mode and “--supervise”
  • YARN: Use spark-submit in “cluster” mode
  • Mesos: Marathon can restart applications or use the “--supervise” flag.

val context=new StreamingContext(...)
val lines=KafkaUtils.createStream(...)
val words=lines.flatMap(...)
...
context.checkpoint(hdfsDir)
}

val context=StreamingContext
     .getOrCreate(hdfsDir,creatingFunc)
context.start()


Put all setup code into a function that returns a new StreamingContext. Get context setup from HDFS dir OR create a new one with the function.

Recovering block data lost with Write Ahead Logs.

To avoid in-memory blocks of buffered data are lost on driver restart. Synchronously save received data to fault-tolerant storage.


  • Enable logs written in checkpoint directory
  • Enabled WAL in SparkConf configuration
    sparkConf.set("spark.streaming.receiver.writeAheadLog.enable","true")
  • Receiver should also be reliable
    Acknowledge source only after data saved to WAL
    Unacked data will be replayed from source by restarted receiver
  • Disable in-memory replication (already replicated by HDFS)
    Use StorageLevel.MEMORY_AND_DISK_SER for input DStreams



No comments:

Post a Comment