@gaoxiaoyunwei2017
2017-11-16T11:18:39.000000Z
字数 10157
阅读 564
Apache Flume是Cloudera公司开源的一款分布式、可靠、可用的服务,可用于从多种不同数据源收集、聚集、移动大量日志数据到集中数据存储中;它通过事务机制提供了可靠的消息传输支持,并自带负载均衡机制来支撑水平扩展。尤其近几年随着Flume的不断被完善以及升级版本的逐一推出,特别是flume-ng的推出,以及Flume内部的各种组件不断丰富,用户在开发的过程中使用的便利性得到很大的改善,现已成为Apache顶级社区项目之一。
中国民生银行服务器的操作系统种类众多,除Linux外,部分生产系统仍采用AIX和HP-UNIX操作系统,由于在AIX和HP-UNIX无法使用Logstash作为日志采集端,在大数据基础平台产品团队经过一系列选型后,采用Flume作为AIX和HP-UNIX操作系统上日志采集端。
2016年我们在测试环境进行试验,使用的版本是Apache Flume 1.6,在使用Taildir Source组件和核心组件的过程中,发现其无法完全满足我们的需求,例如:
1. 若filegroup路径中包含正则表达式,则无法获取文件的完整路径,在日志入到Elasticsearch后无法定位日志的路径;
2. Taildir Source不支持将多行合并为一个event,只能一行一行读取文件;
3. filegroup配置中不支持目录包含正则表达式,不便配置包含多个日期并且日期自动增长的目录,例如/app/logs/yyyymmdd/appLog.log;
4. 在使用Host Interceptor时,发现只能保留主机名或者是IP,二者无法同时保留。
在研究Flume源码之后,我们在源码上扩展开发。截至目前,我们为开源社区贡献了4个Patch,其中FLUME-2955已被社区Merge并在1.7版本中发布,另外我们在Github上开放了一个版本,将FLUME-2960/2961/3187三个Patch合并到Flume 1.7上,欢迎大家下载使用,Github地址:https://github.com/tinawenqiao/flume,分支名trunk-cmbc。
接下来本文将对每个Issue进行详细介绍:
IssueID | 相关组件 | 简要介绍 | JIRA地址 |
---|---|---|---|
FLUME-2955 | Taildir Source | 根据参数fileHeader的值可将文件的完整路径放在event的头部 | https://issues.apache.org/jira/browse/FLUME-2955 |
FLUME-2960 | Taildir Source | filegroup的目录中支持正则表达式 | https://issues.apache.org/jira/browse/FLUME-2960 |
FLUME-2961 | Taildir Source | 实现多行合并处理日志 | https://issues.apache.org/jira/browse/FLUME-2961 |
FLUME-3187 | Host Interceptor | 支持同时保留主机名和IP | https://issues.apache.org/jira/browse/FLUME-3187 |
为了采集后缀为log的日志文件,filegroups设置如下:
agent.sources.s1.type = org.apache.flume.source.taildir.TaildirSource
agent.sources.s1.filegroups = f1
agent.sources.s1.filegroups.f1 = /app/logs/.*.log
若/app/logs目录中存在a.log、b.log、c.log三个文件,在Flume 1.6版本中,虽然可以通过headers.\.\在event的header里放入自定义的key和value,但是由于正则表达式匹配上了目录中多个文件,所以无法通过该方法设置,这样导致日志数据入到Elasticsearch后,用户从Kibana从查询时无法定位到数据所在的日志文件路径。
增加fileHeader和fileHeaderKey两个参数,两个参数含义分别是:
参数 | 默认值 | 含义 |
---|---|---|
fileHeader | false | 是否在event的header里包含文件的绝对路径 |
fileHeaderKey | file | 指定header里fileHeader的key值 |
修改类ReliableTaildirEventReader中readEvents()方法,根据配置文件的值,选择是否在event的header里加入文件的路径,主要代码如下:
Map<String, String> headers = currentFile.getHeaders();
if (annotateFileName || (headers != null && !headers.isEmpty())) {
for (Event event : events) {
if (headers != null && !headers.isEmpty()) {
event.getHeaders().putAll(headers);
}
if (annotateFileName) {
event.getHeaders().put(fileNameHeader, currentFile.getPath());
}
}
}
agent.sources.s1.type = org.apache.flume.source.taildir.TaildirSource
agent.sources.s1.filegroups = f1
agent.sources.s1.filegroups.f1 = /app/logs/.*.log
agent.sources.s1.fileHeader = true
agent.sources.s1.fileHeaderKey = path
在实际应用写日志时,很多系统是根据日期生成日期目录,每个日期目录中包含一个或多个日志文件,因此存在/app/logs/20170101/、/app/logs/20170102/、/app/logs/20170103/等多个目录,且/app/logs/目录下每天会自动生成新的日期目录,但是根据Taildir Source中filegroups.\的描述,只支持文件名带正则,因此1.6版本的Taildir Source无法满足该需求。
增加filegroups.\.parentDir和filegroups.\.filePattern两个参数,两个参数含义分别是:
参数 | 含义 |
---|---|
filegroups.\.parentDir | filegroup的目录,不能包含通配符或正则表达式 |
filegroups.\.filePattern | 相对于parentDir的路径,filePattern可包含目录,可使用正则 |
修改类TaildirMatcher中匹配文件的方法,相关代码如下:
private List<File> getMatchingFilesNoCache() {
final List<File> result = Lists.newArrayList();
try {
Set options = EnumSet.of(FOLLOW_LINKS);
Files.walkFileTree(Paths.get(parentDir.toString()), options, Integer.MAX_VALUE,
new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
if (fileMatcher.matches(file.toAbsolutePath())) {
result.add(file.toFile());
}
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult visitFileFailed(Path file, IOException exc) {
return FileVisitResult.CONTINUE;
}
});
}
...
}
另外进行了配置参数的兼容性处理,用户仍可保留以前的filegroups配置,不需单独配置parentDir和filePattern,程序会将filegroups中的文件的目录赋值给parentDir,文件名赋值给filePattern。
需要注意的是:在Taildir Source中有个参数cachePatternMatching,默认值是true,其作用是缓存正则匹配的文件列表和消费文件的顺序,若目录中文件较多时,使用正则匹配比较耗时,设置该参数可提高性能,当发现文件的目录修改后会刷新缓存列表。由于filePattern中可包含目录,若cachePatternMatching设为true,在filePattern的子目录中新增文件,parentDir的修改时间不变,此时新增的日志文件不能被跟踪到,因此,建议在filePattern包含目录的情况下,将cachePatternMatching设置为false。
agent.sources.s2.type = org.apache.flume.source.taildir.TaildirSource
agent.sources.s2.filegroups = f1 f2
agent.sources.s2.filegroups.f1.parentDir = /app/log/
agent.sources.s2.filegroups.f1.filePattern = /APP.log.\\d{8}
agent.sources.s2.filegroups.f2.parentDir = /app/log/
agent.sources.s2.filegroups.f2.filePattern = /\\w/.*log
agent.sources.s2.cachePatternMatching = false
Taildir Source按行读取日志,把每一行作为内容放入flume event的body中,对于以下这种每行就可以结束的日志处理没有问题:
13 七月 2016 23:37:30,580 INFO [lifecycleSupervisor-1-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start:62) - Configuration provider starting
13 七月 2016 23:37:30,585 INFO [conf-file-poller-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:134) - Reloading configuration file:conf/taildir.conf
13 七月 2016 23:37:30,592 INFO [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1013) - Processing:s1
但是对于类似Java Stacktrace的日志,如果按上述处理,以下日志被截断成9个flume event(一共9行)输出,而我们希望这样的日志记录,要作为1个flume event,而不是9个输出:
13 七月 2016 23:37:41,942 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.kafka.KafkaSink.process:229) - Failed to publish events
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 2000067 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:200)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 2000067 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
设计一个buffer event缓存多行内容,仿照Logstash的codec/mulitline插件配置,增加了如下参数:
参数 | 默认值 | 含义 |
---|---|---|
multiline | false | 是否使用多行合并 |
multilinePattern | \n | 指定正则表达式,匹配指定的正则表达式来确定是前一个event的内容还是下一个event的内容 |
multilinePatternBelong | next | 值可选"next"或"previous"。若multilineMatched=true,"previous" 是指匹配正则表达式的行是前一event的内容,"next"是指匹配正则表达式的行是下一event的内容;若multilineMatched=false,"previous"是指不匹配正则表达式的行是前一event的内容,"next"是指不匹配正则表达式的行是后一event的内容 |
multilineMatched | true | 是否匹配正则表达式 |
multilineEventTimeoutSeconds | 0 | 缓冲区buffer event的超时时间,0表示永远不超时 |
multilineMaxBytes | 10485760 | 单位字节数,默认值是10MB,若缓冲的内容长度超过该参数,则缓冲区中的数据作为一个event被flush输出 |
multilineMaxLines | 500 | 行数,默认值是500行,若缓冲的内容行数超过该参数,则缓冲区中的数据作为一个event被flush输出 |
主要修改了类TailFile里的readEvents()方法,相关代码如下:
if (this.multiline) {
if (raf != null) { // when file has not closed yet
boolean match = this.multilinePatternMatched;
while (events.size() < numEvents) {
LineResult line = readLine();
if (line == null) {
break;
}
Event event = null;
logger.debug("TailFile.readEvents: Current line = " + new String(line.line) +
". Current time : " + new Timestamp(System.currentTimeMillis()) +
". Pos:" + pos +
". LineReadPos:" + lineReadPos + ",raf.getPointer:" + raf.getFilePointer());
switch (this.multilinePatternBelong) {
case "next":
event = readMultilineEventNext(line, match);
break;
case "previous":
event = readMultilineEventPre(line, match);
break;
default:
break;
}
if (event != null) {
events.add(event);
}
if (bufferEvent != null) {
if (bufferEvent.getBody().length >= multilineMaxBytes
|| Integer.parseInt(bufferEvent.getHeaders().get("lineCount")) == multilineMaxLines) {
flushBufferEvent(events);
}
}
}
}
if (needFlushTimeoutEvent()) {
flushBufferEvent(events);
}
}
合并多行处理的方法代码如下:
private Event readMultilineEventPre(LineResult line, boolean match)
throws IOException {
Event event = null;
Matcher m = multilinePattern.matcher(new String(line.line));
boolean find = m.find();
match = (find && match) || (!find && !match);
byte[] lineBytes = toOriginBytes(line);
if (match) {
/** If matched, merge it to the buffer event. */
mergeEvent(line);
} else {
/**
* If not matched, this line is not part of previous event when the buffer event is not null.
* Then create a new event with buffer event's message and put the current line into the
* cleared buffer event.
*/
if (bufferEvent != null) {
event = EventBuilder.withBody(bufferEvent.getBody());
}
bufferEvent = null;
bufferEvent = EventBuilder.withBody(lineBytes);
if (line.lineSepInclude) {
bufferEvent.getHeaders().put("lineCount", "1");
} else {
bufferEvent.getHeaders().put("lineCount", "0");
}
long now = System.currentTimeMillis();
bufferEvent.getHeaders().put("time", Long.toString(now));
}
return event;
}
private Event readMultilineEventNext(LineResult line, boolean match)
throws IOException {
Event event = null;
Matcher m = multilinePattern.matcher(new String(line.line));
boolean find = m.find();
match = (find && match) || (!find && !match);
if (match) {
/** If matched, merge it to the buffer event. */
mergeEvent(line);
} else {
/**
* If not matched, this line is not part of next event. Then merge the current line into the
* buffer event and create a new event with the merged message.
*/
mergeEvent(line);
event = EventBuilder.withBody(bufferEvent.getBody());
bufferEvent = null;
}
return event;
}
agent.sources.s3.multiline = true
agent.sources.s3.multilinePattern = ^AGENT_IP:
agent.sources.s3.multilinePatternBelong = previous
agent.sources.s3.multilineMatched = false
agent.sources.s3.multilineEventTimeoutSeconds = 120
agent.sources.s3.multilineMaxBytes = 3145728
agent.sources.s3.multilineMaxLines = 3000
为了获取Flume agent所在机器的主机名或IP,我们使用了主机名拦截器(Host Interceptor),但是根据主机名拦截器的定义,只能保留主机名和IP中的一种,无法同时保留主机名和IP。
Host Interceptor
This interceptor inserts the hostname or IP address of the host that this agent is running on. It inserts a header with key host or a configured key whose value is the hostname or IP address of the host, based on configuration.
将原来的useIP参数扩展,增加一个参数useHostname,若同时设置为true,可同时保留主机名和IP;另外支持自定义主机名和IP地址在event header里的key,参数如下:
参数 | 默认值 | 含义 |
---|---|---|
useIP | true | 是否保留IP地址 |
useHostname | true | 是否保留主机名 |
ip | ip | 指定event header里IP地址的key值 |
hostname | hostname | 指定event header里主机名的key值 |
修改了类HostInterceptor中的构造方法和拦截方法,相关代码如下:
addr = InetAddress.getLocalHost();
if (useIP) {
ip = addr.getHostAddress();
}
if (useHostname) {
hostname = addr.getCanonicalHostName();
}
agent.sources.s4.interceptors = i1
agent.sources.s4.interceptors.i1.type = host
agent.sources.s4.interceptors.i1.useIP = true
agent.sources.s4.interceptors.i1.useHostname = true
agent.sources.s4.interceptors.i1.ip = ip
agent.sources.s4.interceptors.i1.hostname = hostname
目前上述4个Patch在我行A类和B类生产系统已实际运行使用,“拥抱开源,回馈开源”,我们用的是开源软件,我们希望也能对开源软件做出贡献。后续我们将分享我行ELK日志平台架构演进的详细细节,敬请大家关注!
作者介绍:
文乔,工作于中国民生银行总行信息技术部大数据基础产品平台组,负责行内大数据管控平台的开发,天眼日志平台主要参与人。微信tinawenqiao,邮箱wenqiao@cmbc.com.cn。