[关闭]
@gaoxiaoyunwei2017 2017-11-16T11:18:39.000000Z 字数 10157 阅读 564

一. Flume简介

Apache Flume是Cloudera公司开源的一款分布式、可靠、可用的服务,可用于从多种不同数据源收集、聚集、移动大量日志数据到集中数据存储中;它通过事务机制提供了可靠的消息传输支持,并自带负载均衡机制来支撑水平扩展。尤其近几年随着Flume的不断被完善以及升级版本的逐一推出,特别是flume-ng的推出,以及Flume内部的各种组件不断丰富,用户在开发的过程中使用的便利性得到很大的改善,现已成为Apache顶级社区项目之一。

二. 中国民生银行Flume实践

中国民生银行服务器的操作系统种类众多,除Linux外,部分生产系统仍采用AIX和HP-UNIX操作系统,由于在AIX和HP-UNIX无法使用Logstash作为日志采集端,在大数据基础平台产品团队经过一系列选型后,采用Flume作为AIX和HP-UNIX操作系统上日志采集端。

2016年我们在测试环境进行试验,使用的版本是Apache Flume 1.6,在使用Taildir Source组件和核心组件的过程中,发现其无法完全满足我们的需求,例如:
1. 若filegroup路径中包含正则表达式,则无法获取文件的完整路径,在日志入到Elasticsearch后无法定位日志的路径;
2. Taildir Source不支持将多行合并为一个event,只能一行一行读取文件;
3. filegroup配置中不支持目录包含正则表达式,不便配置包含多个日期并且日期自动增长的目录,例如/app/logs/yyyymmdd/appLog.log;
4. 在使用Host Interceptor时,发现只能保留主机名或者是IP,二者无法同时保留。

在研究Flume源码之后,我们在源码上扩展开发。截至目前,我们为开源社区贡献了4个Patch,其中FLUME-2955已被社区Merge并在1.7版本中发布,另外我们在Github上开放了一个版本,将FLUME-2960/2961/3187三个Patch合并到Flume 1.7上,欢迎大家下载使用,Github地址:https://github.com/tinawenqiao/flume,分支名trunk-cmbc。

接下来本文将对每个Issue进行详细介绍:

IssueID 相关组件 简要介绍 JIRA地址
FLUME-2955 Taildir Source 根据参数fileHeader的值可将文件的完整路径放在event的头部 https://issues.apache.org/jira/browse/FLUME-2955
FLUME-2960 Taildir Source filegroup的目录中支持正则表达式 https://issues.apache.org/jira/browse/FLUME-2960
FLUME-2961 Taildir Source 实现多行合并处理日志 https://issues.apache.org/jira/browse/FLUME-2961
FLUME-3187 Host Interceptor 支持同时保留主机名和IP https://issues.apache.org/jira/browse/FLUME-3187

三. FLUME-2955

3.1 问题和需求

为了采集后缀为log的日志文件,filegroups设置如下:

  1. agent.sources.s1.type = org.apache.flume.source.taildir.TaildirSource
  2. agent.sources.s1.filegroups = f1
  3. agent.sources.s1.filegroups.f1 = /app/logs/.*.log

若/app/logs目录中存在a.log、b.log、c.log三个文件,在Flume 1.6版本中,虽然可以通过headers.\.\在event的header里放入自定义的key和value,但是由于正则表达式匹配上了目录中多个文件,所以无法通过该方法设置,这样导致日志数据入到Elasticsearch后,用户从Kibana从查询时无法定位到数据所在的日志文件路径。

3.2 解决办法

增加fileHeader和fileHeaderKey两个参数,两个参数含义分别是:

参数 默认值 含义
fileHeader false 是否在event的header里包含文件的绝对路径
fileHeaderKey file 指定header里fileHeader的key值

修改类ReliableTaildirEventReader中readEvents()方法,根据配置文件的值,选择是否在event的header里加入文件的路径,主要代码如下:

  1. Map<String, String> headers = currentFile.getHeaders();
  2. if (annotateFileName || (headers != null && !headers.isEmpty())) {
  3. for (Event event : events) {
  4. if (headers != null && !headers.isEmpty()) {
  5. event.getHeaders().putAll(headers);
  6. }
  7. if (annotateFileName) {
  8. event.getHeaders().put(fileNameHeader, currentFile.getPath());
  9. }
  10. }
  11. }

3.3 相关配置示例

  1. agent.sources.s1.type = org.apache.flume.source.taildir.TaildirSource
  2. agent.sources.s1.filegroups = f1
  3. agent.sources.s1.filegroups.f1 = /app/logs/.*.log
  4. agent.sources.s1.fileHeader = true
  5. agent.sources.s1.fileHeaderKey = path

四. FLUME-2960

4.1 问题和需求

在实际应用写日志时,很多系统是根据日期生成日期目录,每个日期目录中包含一个或多个日志文件,因此存在/app/logs/20170101/、/app/logs/20170102/、/app/logs/20170103/等多个目录,且/app/logs/目录下每天会自动生成新的日期目录,但是根据Taildir Source中filegroups.\的描述,只支持文件名带正则,因此1.6版本的Taildir Source无法满足该需求。

4.2 解决办法

增加filegroups.\.parentDir和filegroups.\.filePattern两个参数,两个参数含义分别是:

参数 含义
filegroups.\.parentDir filegroup的目录,不能包含通配符或正则表达式
filegroups.\.filePattern 相对于parentDir的路径,filePattern可包含目录,可使用正则

修改类TaildirMatcher中匹配文件的方法,相关代码如下:

  1. private List<File> getMatchingFilesNoCache() {
  2. final List<File> result = Lists.newArrayList();
  3. try {
  4. Set options = EnumSet.of(FOLLOW_LINKS);
  5. Files.walkFileTree(Paths.get(parentDir.toString()), options, Integer.MAX_VALUE,
  6. new SimpleFileVisitor<Path>() {
  7. @Override
  8. public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
  9. if (fileMatcher.matches(file.toAbsolutePath())) {
  10. result.add(file.toFile());
  11. }
  12. return FileVisitResult.CONTINUE;
  13. }
  14. @Override
  15. public FileVisitResult visitFileFailed(Path file, IOException exc) {
  16. return FileVisitResult.CONTINUE;
  17. }
  18. });
  19. }
  20. ...
  21. }

另外进行了配置参数的兼容性处理,用户仍可保留以前的filegroups配置,不需单独配置parentDir和filePattern,程序会将filegroups中的文件的目录赋值给parentDir,文件名赋值给filePattern。
需要注意的是:在Taildir Source中有个参数cachePatternMatching,默认值是true,其作用是缓存正则匹配的文件列表和消费文件的顺序,若目录中文件较多时,使用正则匹配比较耗时,设置该参数可提高性能,当发现文件的目录修改后会刷新缓存列表。由于filePattern中可包含目录,若cachePatternMatching设为true,在filePattern的子目录中新增文件,parentDir的修改时间不变,此时新增的日志文件不能被跟踪到,因此,建议在filePattern包含目录的情况下,将cachePatternMatching设置为false

4.3 相关配置示例

  1. agent.sources.s2.type = org.apache.flume.source.taildir.TaildirSource
  2. agent.sources.s2.filegroups = f1 f2
  3. agent.sources.s2.filegroups.f1.parentDir = /app/log/
  4. agent.sources.s2.filegroups.f1.filePattern = /APP.log.\\d{8}
  5. agent.sources.s2.filegroups.f2.parentDir = /app/log/
  6. agent.sources.s2.filegroups.f2.filePattern = /\\w/.*log
  7. agent.sources.s2.cachePatternMatching = false

五. FLUME-2961

5.1 问题和需求

Taildir Source按行读取日志,把每一行作为内容放入flume event的body中,对于以下这种每行就可以结束的日志处理没有问题:

  1. 13 七月 2016 23:37:30,580 INFO [lifecycleSupervisor-1-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start:62) - Configuration provider starting
  2. 13 七月 2016 23:37:30,585 INFO [conf-file-poller-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:134) - Reloading configuration file:conf/taildir.conf
  3. 13 七月 2016 23:37:30,592 INFO [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1013) - Processing:s1

但是对于类似Java Stacktrace的日志,如果按上述处理,以下日志被截断成9个flume event(一共9行)输出,而我们希望这样的日志记录,要作为1个flume event,而不是9个输出:

  1. 13 七月 2016 23:37:41,942 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.kafka.KafkaSink.process:229) - Failed to publish events
  2. java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 2000067 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
  3. at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686)
  4. at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449)
  5. at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:200)
  6. at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
  7. at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
  8. at java.lang.Thread.run(Thread.java:745)
  9. Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 2000067 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

5.2 解决办法

设计一个buffer event缓存多行内容,仿照Logstash的codec/mulitline插件配置,增加了如下参数:

参数 默认值 含义
multiline false 是否使用多行合并
multilinePattern \n 指定正则表达式,匹配指定的正则表达式来确定是前一个event的内容还是下一个event的内容
multilinePatternBelong next 值可选"next"或"previous"。若multilineMatched=true,"previous" 是指匹配正则表达式的行是前一event的内容,"next"是指匹配正则表达式的行是下一event的内容;若multilineMatched=false,"previous"是指不匹配正则表达式的行是前一event的内容,"next"是指不匹配正则表达式的行是后一event的内容
multilineMatched true 是否匹配正则表达式
multilineEventTimeoutSeconds 0 缓冲区buffer event的超时时间,0表示永远不超时
multilineMaxBytes 10485760 单位字节数,默认值是10MB,若缓冲的内容长度超过该参数,则缓冲区中的数据作为一个event被flush输出
multilineMaxLines 500 行数,默认值是500行,若缓冲的内容行数超过该参数,则缓冲区中的数据作为一个event被flush输出

主要修改了类TailFile里的readEvents()方法,相关代码如下:

  1. if (this.multiline) {
  2. if (raf != null) { // when file has not closed yet
  3. boolean match = this.multilinePatternMatched;
  4. while (events.size() < numEvents) {
  5. LineResult line = readLine();
  6. if (line == null) {
  7. break;
  8. }
  9. Event event = null;
  10. logger.debug("TailFile.readEvents: Current line = " + new String(line.line) +
  11. ". Current time : " + new Timestamp(System.currentTimeMillis()) +
  12. ". Pos:" + pos +
  13. ". LineReadPos:" + lineReadPos + ",raf.getPointer:" + raf.getFilePointer());
  14. switch (this.multilinePatternBelong) {
  15. case "next":
  16. event = readMultilineEventNext(line, match);
  17. break;
  18. case "previous":
  19. event = readMultilineEventPre(line, match);
  20. break;
  21. default:
  22. break;
  23. }
  24. if (event != null) {
  25. events.add(event);
  26. }
  27. if (bufferEvent != null) {
  28. if (bufferEvent.getBody().length >= multilineMaxBytes
  29. || Integer.parseInt(bufferEvent.getHeaders().get("lineCount")) == multilineMaxLines) {
  30. flushBufferEvent(events);
  31. }
  32. }
  33. }
  34. }
  35. if (needFlushTimeoutEvent()) {
  36. flushBufferEvent(events);
  37. }
  38. }

合并多行处理的方法代码如下:

  1. private Event readMultilineEventPre(LineResult line, boolean match)
  2. throws IOException {
  3. Event event = null;
  4. Matcher m = multilinePattern.matcher(new String(line.line));
  5. boolean find = m.find();
  6. match = (find && match) || (!find && !match);
  7. byte[] lineBytes = toOriginBytes(line);
  8. if (match) {
  9. /** If matched, merge it to the buffer event. */
  10. mergeEvent(line);
  11. } else {
  12. /**
  13. * If not matched, this line is not part of previous event when the buffer event is not null.
  14. * Then create a new event with buffer event's message and put the current line into the
  15. * cleared buffer event.
  16. */
  17. if (bufferEvent != null) {
  18. event = EventBuilder.withBody(bufferEvent.getBody());
  19. }
  20. bufferEvent = null;
  21. bufferEvent = EventBuilder.withBody(lineBytes);
  22. if (line.lineSepInclude) {
  23. bufferEvent.getHeaders().put("lineCount", "1");
  24. } else {
  25. bufferEvent.getHeaders().put("lineCount", "0");
  26. }
  27. long now = System.currentTimeMillis();
  28. bufferEvent.getHeaders().put("time", Long.toString(now));
  29. }
  30. return event;
  31. }
  32. private Event readMultilineEventNext(LineResult line, boolean match)
  33. throws IOException {
  34. Event event = null;
  35. Matcher m = multilinePattern.matcher(new String(line.line));
  36. boolean find = m.find();
  37. match = (find && match) || (!find && !match);
  38. if (match) {
  39. /** If matched, merge it to the buffer event. */
  40. mergeEvent(line);
  41. } else {
  42. /**
  43. * If not matched, this line is not part of next event. Then merge the current line into the
  44. * buffer event and create a new event with the merged message.
  45. */
  46. mergeEvent(line);
  47. event = EventBuilder.withBody(bufferEvent.getBody());
  48. bufferEvent = null;
  49. }
  50. return event;
  51. }

3.3 相关配置示例

  1. agent.sources.s3.multiline = true
  2. agent.sources.s3.multilinePattern = ^AGENT_IP:
  3. agent.sources.s3.multilinePatternBelong = previous
  4. agent.sources.s3.multilineMatched = false
  5. agent.sources.s3.multilineEventTimeoutSeconds = 120
  6. agent.sources.s3.multilineMaxBytes = 3145728
  7. agent.sources.s3.multilineMaxLines = 3000

六. FLUME-3187

6.1 问题和需求

为了获取Flume agent所在机器的主机名或IP,我们使用了主机名拦截器(Host Interceptor),但是根据主机名拦截器的定义,只能保留主机名和IP中的一种,无法同时保留主机名和IP。

Host Interceptor
This interceptor inserts the hostname or IP address of the host that this agent is running on. It inserts a header with key host or a configured key whose value is the hostname or IP address of the host, based on configuration.

6.2 解决办法

将原来的useIP参数扩展,增加一个参数useHostname,若同时设置为true,可同时保留主机名和IP;另外支持自定义主机名和IP地址在event header里的key,参数如下:

参数 默认值 含义
useIP true 是否保留IP地址
useHostname true 是否保留主机名
ip ip 指定event header里IP地址的key值
hostname hostname 指定event header里主机名的key值

修改了类HostInterceptor中的构造方法和拦截方法,相关代码如下:

  1. addr = InetAddress.getLocalHost();
  2. if (useIP) {
  3. ip = addr.getHostAddress();
  4. }
  5. if (useHostname) {
  6. hostname = addr.getCanonicalHostName();
  7. }

6.3 相关配置示例

  1. agent.sources.s4.interceptors = i1
  2. agent.sources.s4.interceptors.i1.type = host
  3. agent.sources.s4.interceptors.i1.useIP = true
  4. agent.sources.s4.interceptors.i1.useHostname = true
  5. agent.sources.s4.interceptors.i1.ip = ip
  6. agent.sources.s4.interceptors.i1.hostname = hostname

总结

目前上述4个Patch在我行A类和B类生产系统已实际运行使用,“拥抱开源,回馈开源”,我们用的是开源软件,我们希望也能对开源软件做出贡献。后续我们将分享我行ELK日志平台架构演进的详细细节,敬请大家关注!

作者介绍:
文乔,工作于中国民生银行总行信息技术部大数据基础产品平台组,负责行内大数据管控平台的开发,天眼日志平台主要参与人。微信tinawenqiao,邮箱wenqiao@cmbc.com.cn。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注