@rickyChen
2017-08-14T04:51:21.000000Z
字数 815
阅读 1684
WaterDrop
2017.08.14
events遍历问题, 每个插件遍历一次events?
// WaterDropMain.scalafor (f <- filters) {val (processedEvents, isSuccess) = f.filter(events)events = processedEvents}// Split.scalafor (i <- 0 until events.length) {for (v <- events(i).getField(srcField)) {}}
提供支持删除source_field的配置
// WaterDropMain.scalaval parts = v.asInstanceOf[String].split(conf.getString("delimiter")).map(_.trim)val kvs = (keys zip parts).toMapevents(i).setField(conf.getString("target_field") kvs)
beforeOutput和afterOutput为什么放在inputs模块里
// WaterDropMain.scalainputs.head.beforeOutputeventRDD.foreachPartition { partitionOfRecords =>outputs.head.process(partitionOfRecords)}inputs.head.afterOutput
多日志源支持
// WaterDropMain.scalaval dstream = inputs.head.getDstream.mapPartitions{ iter => ...
Interval配置问题
// WaterDropMain.scalaval ssc = new StreamingContext(sparkConf, Seconds(15))
