@awsekfozc
2016-02-18T23:18:29.000000Z
字数 1998
阅读 1747
Kafka
###KafkaWordCount.scala
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
val ssc = new StreamingContext(sc, Seconds(5))
val topicMap = Map("test" -> 1)
val lines = KafkaUtils.createStream(ssc, "hadoop.zc.com:2181", "testWordCountGroup", topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val counts = words.map((_, 1L)).reduceByKey(_ + _)
counts.print()
ssc.start()
ssc.awaitTermination()
$ bin/spark-shell \
--jars /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/otherjar/spark-streaming-kafka_2.10-1.3.0.jar,\
/opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/otherja/kafka_2.10-0.8.2.1.jar,\
/opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/otherjar/kafka-clients-0.8.2.1.jar,\
/opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/otherjar/metrics-core-2.2.0.jar,\
/opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/otherjar/zkclient-0.3.jar \
--master local[2]
##加载Kafka
scala> :load /opt/cdh/KafkaWordCountPush.scala
import kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
val ssc = new StreamingContext(sc, Seconds(5))
val kafkaMapParams = Map("metadata.broker.list" -> "hadoop.zc.com:9092")
val topicsSet = Set("test")
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaMapParams, topicsSet).map(_._2)
val words = lines.flatMap(_.split(" "))
val counts = words.map((_, 1L)).reduceByKey(_ + _)
counts.print()
ssc.start()
ssc.awaitTermination()
$ bin/spark-shell \
--jars /opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/otherjar/spark-streaming-kafka_2.10-1.3.0.jar,\
/opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/otherja/kafka_2.10-0.8.2.1.jar,\
/opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/otherjar/kafka-clients-0.8.2.1.jar,\
/opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/otherjar/metrics-core-2.2.0.jar,\
/opt/cdh5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/otherjar/zkclient-0.3.jar \
--master local[2]
##加载Kafka
scala> :load /opt/cdh/KafkaWordCountPull.scala
在此输入正文