Tuesday 9 August 2016

Avoid NotSerializable Error in Spark Job


Sometimes, you may encounter below error in your Spark App.
This happens because some part of your application code is evaluated on the Spark driver, other part on the Spark executors.
org.apache.spark.SparkException:
Task not serializable -> Caused by:java.io.NotSerializableException:
DStream checkpointing has been enabled but the DStreams with their functions are not serializable. 

Or

Object of org.apache.spark.streaming.dstream.MappedDStream is being serialized possibly as a part of closure of an RDD operation...


dstream.foreachRDD { rdd =>
  val where1 = "on the driver"
    rdd.foreach { record =>
      val where2 = "on different executors"
    }
  }
}

The outer loop against rdd is executed locally on the driver. The only place you can access rdd is the driver. But the inner loop will be evaluated in a distributed manner. RDD will be partitioned and inner loop iterates over subset of rddelements on every Spark executor.

Spark uses Java (or Kryo) serialization to send application objects from the driver to the executors. At first you will try to add scala.Serializable marker interface to all of your application classes to avoid weird exceptions. But this blind approach has at least two disadvantages:
  • There might be a performance penalty when complex object graph is serialized and sent to a dozen of remote cluster nodes. It might be mitigated by using Spark broadcast variables, though.
  • Not everything is serializable, e.g: TCP socket cannot be serialized and sent between nodes.

Wrong No 1.

dstream.foreachRDD { rdd =>
  val producer = createKafkaProducer()
  rdd.foreach { message =>
    producer.send(message)
  }
  producer.close()
}

The producer is created (and disposed of) once on the driver but the message is sent to an executor. The producer keeps open sockets to the Kafka brokers so it cannot be serialized and sent over the network.

Wrong No 2.

dstream.foreachRDD { rdd =>
  rdd.foreach { message =>
    val producer = createKafkaProducer()
    producer.send(message)
    producer.close()
  }
}

Kafka producer is created and closed on an executor and does not need to be serialized. But the producer is created and closed for every single message. Establishing a connection to the cluster takes time, as Kafka producer needs to discover leaders for all partitions. 

Wrong No 3.

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val producer = createKafkaProducer()
    partitionOfRecords.foreach { message =>
      connection.send(message)
    }
    producer.close()
  }
}

The partition of records is always processed by a Spark task on a single executor using single JVM. You can safely share a thread-safe Kafka producer instance. But it did not scale as well, when the number of partitions increases.


Besides serializable the class, we can do the followings:

1. Declare the instance only within the lambda function passed in map.
2. Make the NotSeriable object as a static and create it once per machine.
3. Call rdd.forEachPartition and create the NotSerializable object in there.

Reference:
http://allegro.tech/2015/08/spark-kafka-integration.html
http://mkuthan.github.io/blog/2016/01/29/spark-kafka-integration2/
http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/


No comments:

Post a Comment