Start Kafka server, create a topic as "json_test".
import kafka.serializer._ import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream._ import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka._ import org.apache.spark.storage.StorageLevel import org.apache.spark.SparkContext._ import org.apache.spark.streaming.StreamingContext._
sc.stop
val conf = new SparkConf(true).set("spark.cassandra.connection.host", "127.0.0.1").setMaster("local[4]")
val ssc = new StreamingContext(conf, Seconds(10)) val kafkaParams = Map( "zookeeper.connect" -> "localhost:2181", "zookeeper.connection.timeout.ms" -> "10000", "group.id" -> "sparkGroup" )
val topic = "json_test" val stream = KafkaUtils.createStream( ssc, "localhost:2181", "sparkGroup", Map(topic -> 1), StorageLevel.MEMORY_ONLY) val wc = stream.flatMap(_.split("\\s+")) .map(x => (x, 1)) .reduceByKey(_ + _) .saveToCassandra("test", "words", SomeColumns("word", "count")) ssc.start()
ssc.awaitTermination()
Create an input stream that pulls messages from Kafka Brokers.
@param ssc StreamingContext object
@param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
@param groupId The group id for this consumer
@param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread
@param storageLevel Storage level to use for storing the received objects (default: StorageLevel.MEMORY_AND_DISK_SER_2)
Referene:
https://github.com/AlvinCJin/spark-cassandra-connector/blob/master/doc/8_streaming.md
https://spark.apache.org/docs/1.1.1/streaming-programming-guide.html
http://stackoverflow.com/questions/25582099/how-to-apply-rdd-function-on-dstream-while-writing-code-in-scala
http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html
No comments:
Post a Comment