@awsekfozc
2016-02-17T22:04:19.000000Z
字数 2153
阅读 1887
Spark
许多应用需要即时处理收到的数据,例如用来实时追踪页面访问统计的应用、训练机器学
习模型的应用,还有自动检测异常的应用。Spark Streaming 是 Spark 为这些应用而设计的
模型。和 Spark 基于 RDD 的概念很相似,Spark Streaming 使用离散化流(discretized stream)作
为抽象表示,叫作 DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个
时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此
得名“离散化” ) 。DStream 可以从各种输入源创建,比如 Flume、Kafka 或者 HDFS。创
建出来的 DStream 支持两种操作,一种是转化操作(transformation) ,会生成一个新的
DStream,另一种是输出操作(output operation) ,可以把数据写入外部系统中。DStream
提供了许多与 RDD 所支持的操作相类似的操作支持,还增加了与时间相关的新操作,比
如滑动窗口。
Spark Streaming 使用“微批次”的架构,把流式计算当作一系列连续的小规模批处理来对
待。Spark Streaming 从各种输入源中读取数据,并把数据分组为小的批次。新的批次按均
匀的时间间隔创建出来。在每个时间区间开始的时候,一个新的批次就创建出来,在该区
间内收到的数据都会被添加到这个批次中。在时间区间结束时,批次停止增长。时间区间
的大小是由批次间隔这个参数决定的。批次间隔一般设在 500 毫秒到几秒之间,由应用开
发者配置。每个输入批次都形成一个 RDD,以 Spark 作业的方式处理并生成其他的 RDD。
处理的结果可以以批处理的方式传给外部系统。
DStream是Spark Streaming 的编程抽象是离散化流。你可以从外部输入源创建 DStream,也可以对其他 DStream 应用进行转化操作得到新的
DStream。DStream 支持许多第 3 章中所讲到的 RDD 支持的转化操作。另外,DStream 还有
“有状态”的转化操作,可以用来聚合不同时间片内的数据。DStream 是一个持续的 RDD 序列
import org.apache.spark._
import org.apache.spark.streaming._
// 从SparkConf创建StreamingContext并指定1秒钟的批处理大小
val ssc = new StreamingContext(conf, Seconds(1))
// 连接到本地机器9999端口上后, 使用收到的数据创建DStream
val lines = ssc.socketTextStream("localhost", 9999)
// 从DStream中筛选出包含字符串"error"的行
val errorLines = lines.filter(_.contains("error"))
// 打印出有"error"的行
errorLines.print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
import org.apache.spark._
import org.apache.spark.streaming._
// 从SparkContext建StreamingContext并指定5秒钟的批处理大小
val ssc = new StreamingContext(sc, Seconds(5))
// 监听HDFS目录
val lines = ssc.textFileStream("/user/zc/spark/streaming/inputhdfs")
val words = lines.flatMap(_.split("\t"))
val wordCounts = words.map((_,1)).reduceByKey(_ + _)
wordCounts.print
ssc.start()
ssc.awaitTermination()
ssc.stop()
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
val ssc = new StreamingContext(sc, Seconds(5))
// Create a flume stream
val stream = FlumeUtils.createStream(ssc, "hadoop.zc.com", 9999)
stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
NOTE:需要加入Flume依赖包,spark-streaming-flume_2.10-1.3.0.jar,flume-ng-sdk-1.5.0-cdh5.3.6.jar,flume-avro-source-1.5.0-cdh5.3.6.jar
在此输入正文