Friday 16 January 2015

Integration of SparkStream + Kafka + Cassandra

Start Cassandra server, create a keyspace "test" and a table "words".
Start Kafka server, create a topic as "json_test".



import kafka.serializer._
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.StreamingContext._


sc.stop
val conf = new SparkConf(true).set("spark.cassandra.connection.host", "127.0.0.1").setMaster("local[4]")
val ssc = new StreamingContext(conf, Seconds(10))
val kafkaParams = Map(
        "zookeeper.connect" -> "localhost:2181",
        "zookeeper.connection.timeout.ms" -> "10000",
        "group.id" -> "sparkGroup"
    )


val topic = "json_test"
val stream = KafkaUtils.createStream(
     ssc, "localhost:2181", "sparkGroup", Map(topic -> 1), StorageLevel.MEMORY_ONLY)


val wc = stream.flatMap(_.split("\\s+"))
.map(x => (x, 1))
.reduceByKey(_ + _)
.saveToCassandra("test", "words", SomeColumns("word", "count"))


ssc.start()
ssc.awaitTermination()



 Create an input stream that pulls messages from Kafka Brokers.
 @param ssc       StreamingContext object
 @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
 @param groupId   The group id for this consumer
 @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed                  in its own thread
 @param storageLevel  Storage level to use for storing the received objects  (default: StorageLevel.MEMORY_AND_DISK_SER_2)


Referene:
https://github.com/AlvinCJin/spark-cassandra-connector/blob/master/doc/8_streaming.md
https://spark.apache.org/docs/1.1.1/streaming-programming-guide.html
http://stackoverflow.com/questions/25582099/how-to-apply-rdd-function-on-dstream-while-writing-code-in-scala
http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html

No comments:

Post a Comment