@rickyChen
2017-08-14T12:51:21.000000Z
字数 815
阅读 1460
WaterDrop
2017.08.14
events遍历问题, 每个插件遍历一次events?
// WaterDropMain.scala
for (f <- filters) {
val (processedEvents, isSuccess) = f.filter(events)
events = processedEvents
}
// Split.scala
for (i <- 0 until events.length) {
for (v <- events(i).getField(srcField)) {
}
}
提供支持删除source_field
的配置
// WaterDropMain.scala
val parts = v.asInstanceOf[String].split(conf.getString("delimiter")).map(_.trim)
val kvs = (keys zip parts).toMap
events(i).setField(conf.getString("target_field") kvs)
beforeOutput
和afterOutput
为什么放在inputs
模块里
// WaterDropMain.scala
inputs.head.beforeOutput
eventRDD.foreachPartition { partitionOfRecords =>
outputs.head.process(partitionOfRecords)
}
inputs.head.afterOutput
多日志源支持
// WaterDropMain.scala
val dstream = inputs.head.getDstream.mapPartitions{ iter => ...
Interval配置问题
// WaterDropMain.scala
val ssc = new StreamingContext(sparkConf, Seconds(15))