Start Kafka server, create a topic as "json_test".
import kafka.serializer._ import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream._ import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka._ import org.apache.spark.storage.StorageLevel import org.apache.spark.SparkContext._ import org.apache.spark.streaming.StreamingContext._
sc.stop
val conf = new SparkConf(true).set("spark.cassandra.connection.host", "127.0.0.1").setMaster("local[4]")
val ssc = new StreamingContext(conf, Seconds(10))
val kafkaParams = Map(
"zookeeper.connect" -> "localhost:2181",
"zookeeper.connection.timeout.ms" -> "10000",
"group.id" -> "sparkGroup"
)
val topic = "json_test"
val stream = KafkaUtils.createStream(
ssc, "localhost:2181", "sparkGroup", Map(topic -> 1), StorageLevel.MEMORY_ONLY)
val wc = stream.flatMap(_.split("\\s+"))
.map(x => (x, 1))
.reduceByKey(_ + _)
.saveToCassandra("test", "words", SomeColumns("word", "count"))
ssc.start()
ssc.awaitTermination()
Create an input stream that pulls messages from Kafka Brokers.
@param ssc StreamingContext object
@param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
@param groupId The group id for this consumer
@param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread
@param storageLevel Storage level to use for storing the received objects (default: StorageLevel.MEMORY_AND_DISK_SER_2)
Referene:
https://github.com/AlvinCJin/spark-cassandra-connector/blob/master/doc/8_streaming.md
https://spark.apache.org/docs/1.1.1/streaming-programming-guide.html
http://stackoverflow.com/questions/25582099/how-to-apply-rdd-function-on-dstream-while-writing-code-in-scala
http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html
No comments:
Post a Comment