The scenario is to collect the new generated logs from server by Logstash, ship logs to Kafka, then processed by Spark streaming in near real-time.
1. Edit zookeeper server in config/server.properties, then Start Kafka server
$ ./bin/kafka-server-start.sh config/server.properties
2. Create a new Topic
$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic proxy_log
Check the new topic
$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181
3. Start Kafka Consumer
$ ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic proxy_log
4. Start Logstash deamon
$ ./bin/logstash agent -f conf/kafka_logstash.conf
5. Start Spark streaming
$ spark-shell --driver-memory 2g --executor-memory 4g --jars spark-cassandra-assembly-1.0.0-SNAPSHOT-jar-with-dependencies.jar
The jar contains Kafka and Cassandra dependencies.
import kafka.serializer._ import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka._ import org.apache.spark.storage.StorageLevel import org.apache.spark.SparkContext._ sc.stop() val sparkConf = new SparkConf().setAppName("Kafka_Stream").setMaster("local[4]") val ssc = new StreamingContext(sparkConf, Seconds(30)) ssc.checkpoint("checkpoint")
val topic = "proxy_log"
val window_length = 60000 val sliding_interval = 30000
val stream = KafkaUtils.createStream(ssc, "localhost:2181", "sparkjob_1", Map(topic -> 1), StorageLevel.MEMORY_ONLY).map(_._2)
windowed_dstream.foreachRDD( rdd => if(rdd.count > 0){
rdd.coalesce(1,true).map(line => (line.split(" ")(1)+':'+line.split(" ")(2),1))
.reduceByKey(_ + _).saveAsTextFile("hdfs://localhost/stream_output/output_"
+System.currentTimeMillis(), classOf[org.apache.hadoop.io.compress.GzipCodec])
}) ssc.start() ssc.awaitTermination()
A couple of issues to note in above code:
1. A SparkConf takes in a string pointing to the Spark cluster URL (or “local[n]” if locally deploying, where n >= 2 threads).
2. We need to establish a directory where checkpoints in the stream can be written, in case of failure.
3. The “.map(_._2)” statement tells Spark to select only the second item in the tuple that Kafka returns – this is most likely all you need since Kafka sends the content back only in the second item within the returned tuple
4. Both window_length and sliding_interval must by times of streaming_interval.
5. coalesce(1, true) guarantees each RDD into one file.
Reference:
http://blog.mmlac.com/log-transport-with-apache-kafka/
http://oobaloo.co.uk/syslogger-forward-syslog-to-apache-kafka
https://github.com/AlvinCJin/logstash-kafka
http://www.elasticsearch.org/blog/logstash-1-5-0-beta1-released/
http://blog.mmlac.com/how-to-pre-process-logs-with-logstash/
No comments:
Post a Comment