Friday 13 November 2015

Using Tachyon with Spark on HDFS

Tachyon solves some of the challenges with Spark RDD management. :
  • RDD only exists for the duration of the Spark application
  • The same process performs the compute and RDD in-memory storage; so, if a process crashes, in-memory storage also goes away
  • Different jobs cannot share an RDD even if they are for the same underlying data, for example, an HDFS block that leads to:
    • Slow writes to disk
    • Duplication of data in memory, higher memory footprint
  • If the output of one application needs to be shared with the other application, it's slow due to the replication in the disk
Tachyon provides an off-heap memory layer to solve these problems. This layer, being off-heap, is immune to process crashes and is also not subject to garbage collection. This also lets RDDs be shared across applications and outlive a specific job or session; in essence, one single copy of data resides in memory, as shown in the following figure:
Use Spark with Tachyon on HDFS
Edit tachyon-env.sh file to set the under storage address to the HDFS namenode address (e.g., hdfs://localhost:9000).
export TACHYON_UNDERFS_ADDRESS=hdfs://NAMENODE:PORT
$ hadoop fs -put -f foo hdfs://localhost:9000/foo
> sc.hadoopConfiguration.set("fs.tachyon.impl", "tachyon.hadoop.TFS")
> val s = sc.textFile("tachyon://localhost:19998/foo")
> val double = s.map(line => line + line)
> double.saveAsTextFile("tachyon://localhost:19998/bar")

Persist RDD in Tachyon
Storing RDD as OFF_HEAP storage in Tachyon has several advantages (more info):
  • It allows multiple executors to share the same pool of memory in Tachyon.
  • It significantly reduces garbage collection costs.
  • Cached data is not lost if individual executors crash.
To persist Spark RDDs, Spark programs need to have two parameters:

spark.externalBlockStore.url: is the URL of the Tachyon Filesystem in the TachyonStore.

spark.externalBlockStore.baseDir: is the base directory in the Tachyon Filesystem to store the RDDs.

To persist an RDD into Tachyon, you need to pass the StorageLevel.OFF_HEAP parameter.
> val rdd = sc.textFile(inputPath)
> rdd.persist(StorageLevel.OFF_HEAP)
The persisted RDD blocks files will be cleaned up when the Spark application finishes.

It uses Tachyon to write that data into Tachyon's memory space as a file. This removes it from the Java heap thus giving Spark more heap memory to work with.

Currently, it does not write the lineage information so if your data is too large to fit into your configured Tachyon clusters memory portions of the RDD will be lost and your Spark jobs can fail.



Reference:
https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-tachyon/


No comments:

Post a Comment