Sunday 15 February 2015

Spark Streaming from Files to Files in HDFS

The problem to solve is to stream files in a hdfs folder into an output folder in hdfs.
The input folder contains multiple small files. The output file is expected to combine small files into several larger files in compression.

$spark-shell --driver-memory 1g --executor-memory 2g

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

val sparkConf = new SparkConf().setAppName("fileCopy").setMaster("local[4]")
val ssc = new StreamingContext(sparkConf, Seconds(120))
val stream = ssc.textFileStream("hdfs://input_data/") 

stream.foreachRDD(rdd =>rdd.coalesce(1,true).map(line => (line.split(" ")(1)+':'+line.split(" ")(2),1)).reduceByKey(_ + _).saveAsTextFile("hdfs://output_data/output_"+System.currentTimeMillis(),classOf[org.apache.hadoop.io.compress.GzipCodec])) 
ssc.start()
ssc.awaitTermination()


There are a couple of issues in this process.

1. The process stops when a new file is added to the input folder.
Everything works well, except saveAsTextFile() saves nothing.
There is only a _temporary folder in output, and never committed.

Solution:
If you run master as local, Spark could not assign slot to execute task. So you have to specify master as local[x], x > 1. Spark requires at least 2 threads to run and the documentation examples are misleading as they use just local as well.

  • When running a Spark Streaming program locally, do not use “local” or “local[1]” as the master URL. Either of these means that only one thread will be used for running tasks locally. If you are using a input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single thread will be used to run the receiver, leaving no thread for processing the received data. Hence, when running locally, always use “local[n]” as the master URL, where n > number of receivers to run.
  • Extending the logic to running on a cluster, the number of cores allocated to the Spark Streaming application must be more than the number of receivers. Otherwise the system will receive data, but not be able to process it.

2. The input files will be split into multiple files in the output folder.
Solution:
The reason it saves it as multiple files is because the computation is distributed.To merge them into larger files, you can use coalesce(1,true).saveAsTextFile(). This basically means do the computation then coalesce to 1 partition. You can also use repartition(1) which is just a wrapper for coalesce with the shuffle argument set to true.

Reference:
http://stackoverflow.com/questions/24992079/spark-streaming-output-not-saved-to-hdfs-file
http://stackoverflow.com/questions/24371259/how-to-make-saveastextfile-not-split-output-into-multiple-file
https://spark.apache.org/docs/latest/streaming-programming-guide.html

No comments:

Post a Comment