Tuesday, 17 February 2015

Logstash forwards logs to Kafka, consumed by Spark Streaming


The scenario is to collect the new generated logs from server by Logstash, ship logs to Kafka, then processed by Spark streaming in near real-time.

1. Edit zookeeper server in config/server.properties, then Start Kafka server

$ ./bin/kafka-server-start.sh config/server.properties

2. Create a new Topic

$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic proxy_log

 Check the new topic

$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181

3. Start Kafka Consumer

$ ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic proxy_log

4. Start Logstash deamon

$ ./bin/logstash agent -f conf/kafka_logstash.conf

5. Start Spark streaming

$ spark-shell --driver-memory 2g --executor-memory 4g --jars spark-cassandra-assembly-1.0.0-SNAPSHOT-jar-with-dependencies.jar

The jar contains Kafka and Cassandra dependencies.

import kafka.serializer._
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.SparkContext._

sc.stop()
val sparkConf = new SparkConf().setAppName("Kafka_Stream").setMaster("local[4]")
val ssc = new StreamingContext(sparkConf, Seconds(30))
ssc.checkpoint("checkpoint")

val topic = "proxy_log"
val window_length = 60000
val sliding_interval = 30000

val stream = KafkaUtils.createStream(ssc, "localhost:2181", "sparkjob_1", Map(topic -> 1), StorageLevel.MEMORY_ONLY).map(_._2)
 
windowed_dstream.foreachRDD(
   rdd => if(rdd.count > 0){
   rdd.coalesce(1,true).map(line => (line.split(" ")(1)+':'+line.split(" ")(2),1))
   .reduceByKey(_ + _).saveAsTextFile("hdfs://localhost/stream_output/output_"
   +System.currentTimeMillis(), classOf[org.apache.hadoop.io.compress.GzipCodec])
}) 

ssc.start()
ssc.awaitTermination()


A couple of issues to note in above code:

1. A SparkConf takes in a string pointing to the Spark cluster URL (or “local[n]” if locally deploying, where n >= 2 threads).

2.  We need to establish a directory where checkpoints in the stream can be written, in case of failure.

3. The “.map(_._2)” statement tells Spark to select only the second item in the tuple that Kafka returns – this is most likely all you need since Kafka sends the content back only in the second item within the returned tuple

4. Both window_length and sliding_interval must by times of streaming_interval.

5. coalesce(1, true) guarantees each RDD into one file.

Reference:
http://blog.mmlac.com/log-transport-with-apache-kafka/
http://oobaloo.co.uk/syslogger-forward-syslog-to-apache-kafka
https://github.com/AlvinCJin/logstash-kafka
http://www.elasticsearch.org/blog/logstash-1-5-0-beta1-released/
http://blog.mmlac.com/how-to-pre-process-logs-with-logstash/

No comments:

Post a Comment