[关闭]
@awsekfozc 2016-02-18T23:18:29.000000Z 字数 1998 阅读 1755

Kafka+Spark Streaming

Kafka

push

KafkaWordCountPush.scala

  1. ###KafkaWordCount.scala
  2. import org.apache.spark._
  3. import org.apache.spark.streaming._
  4. import org.apache.spark.streaming.kafka._
  5. val ssc = new StreamingContext(sc, Seconds(5))
  6. val topicMap = Map("test" -> 1)
  7. val lines = KafkaUtils.createStream(ssc, "hadoop.zc.com:2181", "testWordCountGroup", topicMap).map(_._2)
  8. val words = lines.flatMap(_.split(" "))
  9. val counts = words.map((_, 1L)).reduceByKey(_ + _)
  10. counts.print()
  11. ssc.start()
  12. ssc.awaitTermination()

运行

  1. $ bin/spark-shell \
  2. --jars /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/otherjar/spark-streaming-kafka_2.10-1.3.0.jar,\
  3. /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/otherja/kafka_2.10-0.8.2.1.jar,\
  4. /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/otherjar/kafka-clients-0.8.2.1.jar,\
  5. /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/otherjar/metrics-core-2.2.0.jar,\
  6. /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/otherjar/zkclient-0.3.jar \
  7. --master local[2]
  8. ##加载Kafka
  9. scala> :load /opt/cdh/KafkaWordCountPush.scala

pull

KafkaWordCountPull.scala

  1. import kafka.serializer.StringDecoder
  2. import org.apache.spark._
  3. import org.apache.spark.streaming._
  4. import org.apache.spark.streaming.kafka._
  5. val ssc = new StreamingContext(sc, Seconds(5))
  6. val kafkaMapParams = Map("metadata.broker.list" -> "hadoop.zc.com:9092")
  7. val topicsSet = Set("test")
  8. val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaMapParams, topicsSet).map(_._2)
  9. val words = lines.flatMap(_.split(" "))
  10. val counts = words.map((_, 1L)).reduceByKey(_ + _)
  11. counts.print()
  12. ssc.start()
  13. ssc.awaitTermination()

运行

  1. $ bin/spark-shell \
  2. --jars /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/otherjar/spark-streaming-kafka_2.10-1.3.0.jar,\
  3. /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/otherja/kafka_2.10-0.8.2.1.jar,\
  4. /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/otherjar/kafka-clients-0.8.2.1.jar,\
  5. /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/otherjar/metrics-core-2.2.0.jar,\
  6. /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/otherjar/zkclient-0.3.jar \
  7. --master local[2]
  8. ##加载Kafka
  9. scala> :load /opt/cdh/KafkaWordCountPull.scala

在此输入正文

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注