[关闭]
@sasaki 2016-03-23T17:15:39.000000Z 字数 16689 阅读 7818

Flume——分布式日志收集工具

BigData


版本控制

  1. @Title Flume——分布式日志收集工具
  2. @Version v1.0
  3. @Timestamp 2016-01-28 16:03
  4. @Author Nicholas
  5. @Mail redskirt@outlook.com

Flume是一个分布式的、可靠的、高可用的海量日志采集、 聚合和传输的系统。
Flume NG是Cloudera提供的一个分布式、可靠、可用的系统,它能够将不同数据源的海量日志数据进行高效收集、聚合、移动,最后存储到一个中心化数据存储系统中。由原来的Flume OG到现在的Flume NG,进行了架构重构,并且现在NG版本完全不兼容原来的OG版本。经过架构重构后,Flume NG更像是一个轻量的小工具,非常简单,容易适应各种方式日志收集,并支持failover和负载均衡。

QQ截图20160128160515.jpg-88.3kB

例:使用Flume将Nginx Log、Scribe、Kafka数据导入HDFS

Flume基本组件

架构

2012052509592437.jpg-22.1kB
Flume采用了分层架构:分别为agent,collector和storage。其中,agent和collector均由两部分组成:source和sink,source是数据来源,sink是数据去向。

Event:一个数据单元,带有一个可选的消息头
Flow:Event从源点到达目的点的迁移的抽象
Client:操作位于源点处的Event,将其发送到Flume Agent
Agent:一个独立的Flume进程,包含组件Source、Channel、Sink
Source:用来消费传递到该组件的Event
Channel:中转Event的一个临时存储,保存有Source组件传递过来的Event
Sink:从Channel中读取并移除Event,将Event传递到Flow Pipeline中的下一个Agent(如果有的话)

Flume 数据流

QQ截图20160128160715.jpg-93.6kB

Flume 的核心是把数据从数据源收集过来,再送到目的地。为了保证输送一定成功,在送到目的地之前,会先缓存数据,待数据真正到达目的地后,删除自己缓存的数据。

Flume 传输的数据的基本单位是 Event,如果是文本文件,通常是一行记录,这也是事务的基本单位。Event 从 Source,流向 Channel,再到 Sink,本身为一个 byte 数组,并可携带 headers 信息。Event 代表着一个数据流的最小完整单元,从外部数据源来,向外部的目的地去。

Flume 运行的核心是 Agent。它是一个完整的数据收集工具,含有三个核心组件,分别是source、channel、sink。通过这些组件,Event 可以从一个地方流向另一个地方。

source 可以接收外部源发送过来的数据。不同的 source,可以接受不同的数据格式。比如有目录池(spooling directory)数据源,可以监控指定文件夹中的新文件变化,如果目录中有文件产生,就会立刻读取其内容。
channel 是一个存储地,接收 source 的输出,直到有 sink 消费掉 channel 中的数据。channel 中的数据直到进入到下一个channel中或者进入终端才会被删除。当 sink 写入失败后,可以自动重启,不会造成数据丢失,因此很可靠。
sink 会消费 channel 中的数据,然后送给外部源或者其他 source。如数据可以写入到 HDFS 或者 HBase 中。

Flume Source 支持的类型
QQ截图20160129114724.jpg-284.4kB

Flume Channel 支持的类型
QQ截图20160129114830.jpg-163.3kB

  1. # 启动Kafks集群监控软件
  2. java -cp KafkaOffsetMonitor-assembly-0.2.0.jar \
  3. com.quantifind.kafka.offsetapp.OffsetGetterWeb \
  4. --zk master \
  5. --port 8082 \
  6. --refresh 10.seconds \
  7. --retain 2.days &
  8. [root@slave01 flume-ng]# flume-ng agent -c /etc/flume-ng/conf -f /etc/flume-ng/conf/flume.propervities -Dflume.root.logger=INFO,console -n agent-1
  9. # 创建Flume项目,监控TCP端口收集日志
  10. # Flume文件示例
  11. [root@master conf]# pwd
  12. /usr/git-repo/bootcamp/flume-tutorial/test_flume/src/main/java/cn/chinahadoop/flume/conf
  13. [root@master conf]# cat example.conf
  14. # example.conf: A single-node Flume configuration
  15. # Name the components on this agent
  16. a1.sources = r1
  17. a1.sinks = k1
  18. a1.channels = c1
  19. # Describe/configure the source
  20. a1.sources.r1.type = netcat
  21. a1.sources.r1.bind = localhost
  22. a1.sources.r1.port = 44444
  23. # Describe the sink
  24. a1.sinks.k1.type = logger
  25. # Use a channel which buffers events in memory
  26. a1.channels.c1.type = memory
  27. a1.channels.c1.capacity = 1000
  28. a1.channels.c1.transactionCapacity = 100
  29. # Bind the source and sink to the channel
  30. a1.sources.r1.channels = c1
  31. a1.sinks.k1.channel = c1
  32. [root@slave01 conf]# pwd
  33. /usr/git-repo/bootcamp/flume-tutorial/test_flume/src/main/java/cn/chinahadoop/flume/conf
  34. [root@slave01 conf]# cp example.conf /etc/flume-ng/conf/
  35. # 启动flume-ng 注意--name a1要与example.conf中定义的agent名相同
  36. [root@slave01 ~]# flume-ng agent --conf conf --conf-file /etc/flume-ng/conf/example.conf --name a1 -Dflume.root.logger=INFO,console
  37. # 启动另一个客户端,查看4444端口已被监控
  38. [root@slave01 ~]# netstat -antp|grep 44444
  39. tcp 0 0 ::ffff:127.0.0.1:44444 :::* LISTEN 25220/java
  40. # 用telnet工具接入44444端口,发送数据
  41. [root@slave01 ~]# telnet localhost 44444
  42. Trying 127.0.0.1...
  43. Connected to localhost.
  44. Escape character is '^]'.
  45. hello flume
  46. OK
  47. this is a log......................
  48. OK
  49. # 在flume-ng console中可以查看收到的日志数据
  50. # 一条消息为一个Event
  51. 16/01/29 14:27:14 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F 20 66 6C 75 6D 65 0D hello flume. }
  52. 16/01/29 14:28:39 INFO sink.LoggerSink: Event: { headers:{} body: 74 68 69 73 20 69 73 20 61 20 6C 6F 67 2E 2E 2E this is a log... }

QQ截图20160129143835.jpg-703.1kB

Flume Interceptor
Interceptor:events在写入channel前,会先经过interceptor chain 进行处理,可以修改或丢弃event

在example.conf文件中加入以下拦截器配置

  1. #interceptor
  2. a1.sources.r1.interceptors = i1 i2 i3
  3. a1.sources.r1.interceptors.i1.type = timestamp
  4. a1.sources.r1.interceptors.i2.type = host
  5. a1.sources.r1.interceptors.i3.type = static
  6. a1.sources.r1.interceptors.i3.key = datacenter
  7. a1.sources.r1.interceptors.i3.value = NEW_YORK

注:type可为alias或类名,参考InterceptorType.java

再次发送日志,查看日志收集情况

  1. [root@slave01 conf]#
  2. flume-ng agent --conf conf --conf-file /etc/flume-ng/conf/flume_kafka.conf --n producer -Dflume.root.logger=INFO,console
  3. ...
  4. test interceptor log.............
  5. OK
  6. ...
  7. 16/01/29 14:51:48 INFO sink.LoggerSink: Event: { headers:{timestamp=1454050308809, host=192.168.53.231, datacenter=NEW_YORK} body: 74 65 73 74 20 69 6E 74 65 72 63 65 70 74 6F 72 test interceptor }

QQ截图20160129191831.jpg-129.2kB

QQ截图20160129184659.jpg-323.5kB

内置Interceptor:timestamp、host、static、uuid、regex_filter、
regex_extractor
自定义Interceptor:实现Interceptor接口,参考TimestampInterceptor.java

  1. # maven编译
  2. [root@master flume-ng]# scp /opt/cloudera/parcels/CDH-5.3.8-1.cdh5.3.8.p0.5/lib/flume-ng/lib/test_flume-0.0.1-SNAPSHOT.jar root@slave01:/opt/cloudera/parcels/CDH-5.3.8-1.cdh5.3.8.p0.5/lib/flume-ng/lib
  3. test_flume-0.0.1-SNAPSHOT.jar

flume分发(fan out)的两种策略

一个简单的Case:Flume --> Kafka --> HDFS

准备Kafka集群
QQ截图20160219112827.jpg-81.9kB

Flume NG收集日志导入Kafka。
1)配置flume-kafka.properties

  1. [root@master tmp]# ls flumeng-kafka-plugin.jar
  2. flumeng-kafka-plugin.jar
  3. [root@master conf]# cp /usr/application/tmp/flumeng-kafka-plugin.jar /opt/cloudera/parcels/CDH-5.3.8-1.cdh5.3.8.p0.5/lib/flume-ng/lib/
  4. [root@slave01 data]# pwd
  5. /usr/git-repo/bootcamp/practise/sogouquery/data
  6. [root@slave01 conf]# vim flume-kafka.properties
  7. # agent
  8. producer.sources = sources1
  9. producer.channels = channel1
  10. producer.sinks = sink1
  11. # flume spooldir方式,监控日志产生目录
  12. producer.sources.sources1.type = spooldir
  13. producer.sources.sources1.channels = channel1
  14. producer.sources.sources1.spoolDir = /usr/git-repo/bootcamp/practise/sogouquery/data/generated
  15. producer.sources.sources1.fileHeader = true
  16. # 配置Flume的Sink数据流向,flume sink消息发送方,也即kafka producer消息生产者
  17. #producer.sinks.r.type = org.apache.flume.sink.kafka.KafkaSink
  18. #producer.sinks.r.type = cn.chinahadoop.flume.KafkaSink
  19. #producer.sinks.r.type = com.flume.flume2kafka.Flume2Kafka
  20. producer.sinks.sink1.type = org.apache.flume.plugins.KafkaSink
  21. #producer.sinks.r.metadata.broker.list=192.168.53.230:9092,192.168.53.231:9092,192.168.53.232:9092,192.168.53.233:9092
  22. producer.sinks.sink1.metadata.broker.list = 192.168.53.230:9092
  23. #producer.sinks.r.kafkaSink.brokerList=192.168.53.230:9092
  24. producer.sinks.sink1.partitioner.class = org.apache.flume.plugins.SinglePartition
  25. producer.sinks.sink1.serializer.class = kafka.serializer.StringEncoder
  26. producer.sinks.sink1.key.serializer.class = kafka.serializer.StringEncoder
  27. producer.sinks.sink1.request.required.acks = 0
  28. producer.sinks.sink1.max.message.size = 100000000
  29. producer.sinks.sink1.producer.type = async
  30. producer.sinks.sink1.custom.encoding = UTF-8
  31. producer.sinks.sink1.custom.topic.name = topic1
  32. producer.sinks.sink1.channel = channel1
  33. # Channel
  34. producer.channels.channel1.type = memory
  35. producer.channels.channel1.capacity = 1000
  36. # 启动Flume
  37. [root@slave01 conf]# flume-ng agent -n producer -c conf -f /etc/flume-ng/conf/flume-kafka.properties -Dflume.root.logger=INFO,console

由于使用了自定义Sink producer.sinks.sink1.type = org.apache.flume.plugins.KafkaSink,将下载的flumeng-kafka-plugin.jar放入/opt/cloudera/parcels/CDH-5.3.8-1.cdh5.3.8.p0.5/lib/flume-ng/lib/目录,否则会提示找不到类的错误以致flume-ng启动失败。

2)准备日志生成脚本,每运行一次脚本将在/usr/git-repo/bootcamp/practise/sogouquery/data/generated目录中生成两个日志文件

  1. # 开启另一个终端
  2. [root@slave01 log-generator]# pwd
  3. /usr/git-repo/bootcamp/practise/sogouquery/log-generator
  4. [root@slave01 log-generator]# vim log-generator.sh
  5. #!/bin/bash
  6. bin=`dirname "$0"`
  7. export bin=`cd "$bin"; pwd`
  8. export datadir=$bin/../data
  9. export data=$datadir/SogouQ.reduced
  10. export generateddir=$datadir/generated
  11. rm -rf $generateddir/*
  12. mkdir $generateddir
  13. echo genrateddir is $generateddir
  14. i=0
  15. surfix="20150601"
  16. while(($i<2))
  17. do
  18. echo generate the $i file.
  19. sleep 5
  20. echo "cat $data > $generateddir/SogouQ.reduced.$surfix"
  21. cat $data > $generateddir/SogouQ.reduced.$surfix
  22. second=`date -d "$surfix" +%s`
  23. second=$(($second+86400))
  24. surfix=`date -d @"$second" +%Y%m%d`
  25. i=$(($i+1))
  26. done

3)启动日志生成脚本,在Flume监控的终端中将看到日志已被收集并装入Kafka

  1. [root@slave01 generated]# pwd
  2. /usr/git-repo/bootcamp/practise/sogouquery/data/generated
  3. [root@slave01 generated]# ../../log-generator/log-generator.sh
  4. mkdir: cannot create directory `/usr/git-repo/bootcamp/practise/sogouquery/log-generator/../data/generated': File exists
  5. genrateddir is /usr/git-repo/bootcamp/practise/sogouquery/log-generator/../data/generated
  6. generate the 0 file.
  7. cat /usr/git-repo/bootcamp/practise/sogouquery/log-generator/../data/SogouQ.reduced > /usr/git-repo/bootcamp/practise/sogouquery/log-generator/../data/generated/SogouQ.reduced.20150601
  8. generate the 1 file.
  9. cat /usr/git-repo/bootcamp/practise/sogouquery/log-generator/../data/SogouQ.reduced > /usr/git-repo/bootcamp/practise/sogouquery/log-generator/../data/generated/SogouQ.reduced.20150602

QQ截图20160218184442.jpg-1566.4kB

备注:演示环境为一个Flume的Sink收集日志并装入一个Kafka的Producer,不难设想在集群环境中也是同样道理,Kafka会根据Flume中配置的producer.sinks.sink1.custom.topic.name = topic1来区分消息。

4)另开启一个终端,用Kafka工具消费Topic

  1. # 查看Topic详细信息
  2. [root@slave01 kafka_2.10-0.8.2.0]# /usr/kafka_2.10-0.8.2.0/bin/kafka-topics.sh --describe --zookeeper master:2181 --topic topic
  3. Topic:topic PartitionCount:1 ReplicationFactor:1 Configs:
  4. Topic: topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
  5. [root@slave01 kafka_2.10-0.8.2.0]# /usr/kafka_2.10-0.8.2.0/bin/kafka-console-consumer.sh --zookeeper master:2181 --topic topic
  6. this is a topic......
  7. [root@slave01 generated]# pwd
  8. /usr/git-repo/bootcamp/practise/sogouquery/data/generated
  9. [root@slave01 generated]# ls
  10. SogouQ.reduced.20150601.COMPLETED test.COMPLETED
  11. SogouQ.reduced.20150602.COMPLETED

可以看到,只要在Flume监控的目录下新建一个文件,内容都会邮Flume收集并导入Kafka,最终在Console消费。

Flume中收集日志
QQ截图20160219114515.jpg-1647kB

Kafka Console Consumer消费日志
QQ截图20160219114452.jpg-1310.9kB

5)使用Kafka API消费Topic

代码结构
QQ截图20160219143650.jpg-210.5kB

参数介绍

测试Consumer类

  1. package cn.chinahadoop.kafka.consumer;
  2. import kafka.consumer.ConsumerConfig;
  3. import kafka.consumer.KafkaStream;
  4. import kafka.javaapi.consumer.ConsumerConnector;
  5. import java.util.HashMap;
  6. import java.util.List;
  7. import java.util.Map;
  8. import java.util.Properties;
  9. import java.util.concurrent.ExecutorService;
  10. import java.util.concurrent.Executors;
  11. public class TestConsumer {
  12. private final ConsumerConnector consumer;
  13. private final String topic;
  14. private ExecutorService executor;
  15. public TestConsumer(String a_zookeeper, String a_groupId, String a_topic) {
  16. consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId));
  17. this.topic = a_topic;
  18. }
  19. public void shutdown() {
  20. if (consumer != null)
  21. consumer.shutdown();
  22. if (executor != null)
  23. executor.shutdown();
  24. }
  25. // 调用run方法,在内部启用子进程
  26. public void run(int a_numThreads) {
  27. Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
  28. topicCountMap.put(topic, new Integer(a_numThreads));
  29. Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
  30. List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
  31. // 启动所有线程
  32. executor = Executors.newFixedThreadPool(a_numThreads);
  33. // 开始消费消息
  34. int threadNumber = 0;
  35. for (final KafkaStream stream : streams) {
  36. executor.submit(new SubTaskConsumer(stream, threadNumber));
  37. threadNumber++;
  38. }
  39. }
  40. // 准备配置文件,配置Zookeeper
  41. private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
  42. Properties props = new Properties();
  43. props.put("zookeeper.connect", a_zookeeper);
  44. props.put("group.id", a_groupId);
  45. props.put("zookeeper.session.timeout.ms", "60000");
  46. props.put("auto.commit.interval.ms", "1000");
  47. props.put("auto.offset.reset", "smallest");
  48. return new ConsumerConfig(props);
  49. }
  50. public static void main(String[] args) throws Exception {
  51. String zooKeeper = "192.168.53.230:2181";
  52. String topic = "topic";
  53. String groupId = "group";
  54. int threads = Integer.parseInt("1");
  55. TestConsumer example = new TestConsumer(zooKeeper, groupId, topic);
  56. example.run(threads);
  57. }
  58. }

执行的子进程

  1. package cn.chinahadoop.kafka.consumer;
  2. import kafka.consumer.ConsumerIterator;
  3. import kafka.consumer.KafkaStream;
  4. // 子进程负责遍历数据
  5. public class SubTaskConsumer implements Runnable {
  6. private KafkaStream m_stream;
  7. private int m_threadNumber;
  8. public SubTaskConsumer(KafkaStream a_stream, int a_threadNumber) {
  9. m_threadNumber = a_threadNumber;
  10. m_stream = a_stream;
  11. }
  12. @Override
  13. public void run() {
  14. ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
  15. while (it.hasNext()){
  16. byte[] by = it.next().message();
  17. System.out.println("Thread " + m_threadNumber + ": " + new String(by) +"-id:"+Thread.currentThread().getId());
  18. }
  19. System.out.println("Shutting down Thread: " + m_threadNumber);
  20. }
  21. }
  1. # 执行准备好的程序,也可看到消费的Topic
  2. [root@master]# java -cp test-kafka-0.0.1-SNAPSHOT.jar cn.chinahadoop.kafka.consumer.TestConsumer
  3. log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProp
  4. log4j:WARN Please initialize the log4j system properly.
  5. [2016-02-19 14:25:24,600] INFO Closing socket connection to /192.168.53.230. cessor)
  6. Thread 0: akdjsfwqer-id:18
  7. Thread 0: -id:18

6)使用Hadoop Consumer,将Kafka消息导出HDFS

  1. package cn.chinahadoop.kafka.hadoop_consumer;
  2. import java.io.IOException;
  3. import java.util.HashMap;
  4. import java.util.List;
  5. import java.util.Map;
  6. import java.util.Properties;
  7. import java.util.concurrent.ExecutorService;
  8. import java.util.concurrent.Executors;
  9. import kafka.consumer.ConsumerConfig;
  10. import kafka.consumer.KafkaStream;
  11. import kafka.javaapi.consumer.ConsumerConnector;
  12. import org.apache.hadoop.fs.FileSystem;
  13. import org.apache.hadoop.hdfs.HdfsConfiguration;
  14. /**
  15. * 初始化HDFS文件对象
  16. * @author sasaki
  17. *
  18. */
  19. public class TestHadoopConsumer {
  20. private final ConsumerConnector consumer;
  21. private final String topic;
  22. private ExecutorService executor;
  23. private FileSystem hdfs;
  24. public TestHadoopConsumer(String a_zookeeper, String a_groupId, String a_topic) {
  25. consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId));
  26. this.topic = a_topic;
  27. try {
  28. hdfs = FileSystem.get(new HdfsConfiguration());
  29. } catch (IOException e) {
  30. // TODO Auto-generated catch block
  31. e.printStackTrace();
  32. }
  33. }
  34. public void shutdown() {
  35. if (consumer != null)
  36. consumer.shutdown();
  37. if (executor != null)
  38. executor.shutdown();
  39. }
  40. public void run(int a_numThreads) {
  41. Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
  42. topicCountMap.put(topic, new Integer(a_numThreads));
  43. Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
  44. List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
  45. // 启动所有线程
  46. executor = Executors.newFixedThreadPool(a_numThreads);
  47. // 开始消费消息
  48. int threadNumber = 0;
  49. for (final KafkaStream stream : streams) {
  50. executor.submit(new SubTaskConsumer(stream, threadNumber, hdfs));
  51. threadNumber++;
  52. }
  53. }
  54. private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
  55. Properties props = new Properties();
  56. props.put("zookeeper.connect", a_zookeeper);
  57. props.put("group.id", a_groupId);
  58. props.put("zookeeper.session.timeout.ms", "60000");
  59. props.put("zookeeper.sync.time.ms", "2000");
  60. props.put("auto.commit.interval.ms", "1000");
  61. props.put("auto.offset.reset", "smallest");
  62. return new ConsumerConfig(props);
  63. }
  64. public static void main(String[] args) throws Exception {
  65. String zooKeeper = "192.168.53.230:2181";
  66. String topic = "topic";
  67. String groupId = "group";
  68. int threads = Integer.parseInt("1");
  69. TestHadoopConsumer example = new TestHadoopConsumer(zooKeeper, groupId, topic);
  70. example.run(threads);
  71. }
  72. }
  1. package cn.chinahadoop.kafka.hadoop_consumer;
  2. import java.io.IOException;
  3. import org.apache.hadoop.fs.FSDataOutputStream;
  4. import org.apache.hadoop.fs.FileSystem;
  5. import org.apache.hadoop.fs.Path;
  6. import kafka.consumer.ConsumerIterator;
  7. import kafka.consumer.KafkaStream;
  8. public class SubTaskConsumer implements Runnable {
  9. private KafkaStream m_stream;
  10. private int m_threadNumber;
  11. FileSystem hdfs;
  12. public SubTaskConsumer(KafkaStream a_stream, int a_threadNumber,FileSystem fs) {
  13. m_threadNumber = a_threadNumber;
  14. m_stream = a_stream;
  15. hdfs = fs;
  16. System.out.println("come in 11111111");
  17. }
  18. /**
  19. * 启动子进程读取遍历数据
  20. */
  21. @Override
  22. public void run() {
  23. Path path = new Path("/user/root/kafka/consumer.txt");
  24. try {
  25. FSDataOutputStream dos = hdfs.create(path);
  26. ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
  27. while (it.hasNext()){
  28. byte[] by = it.next().message();
  29. dos.write(by);
  30. System.out.println("Thread " + m_threadNumber + ": " + new String(by) +"-id:"+Thread.currentThread().getId());
  31. }
  32. System.out.println("Shutting down Thread: " + m_threadNumber);
  33. dos.flush();
  34. dos.close();
  35. hdfs.close();
  36. } catch (IOException e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. }
  1. [root@master tmp]# java -cp test-kafka-0.0.1-SNAPSHOT.jar cn.chinahadoop.kafka.hadoop_consumer.TestHadoopConsumer

在Kafka监控页面中查看消息
QQ截图20160219145429.jpg-246.8kB

QQ截图20160219145602.jpg-313kB

HDFS中目标文件已生成
QQ截图20160219150746.jpg-90kB

  1. # 查看HDFS中最终文件
  2. [root@slave01 kafka_2.10-0.8.2.0]# hadoop fs -tail /user/root/kafka/consumer.txt
  3. d=9621&star=100:00:01 7812322229275207 [link:jejie.cn] 1 3 jejie.cn/00:00:01 9998054335177979 [sj+c中文首站] 2 9 chinasj.ifensi.com/archiver/?tid-5992.html00:00:02 7332328301924345 [女犯鞭刑] 6 8 blog.sina.com.cn/u/49f2b53b010007e000:00:02 9026201537815861 [scat] 13 24 www.scatwebmaster.com/00:00:02 1421205460982763 [健美] 8 2 www.mandf.cn/00:00:02 2671748246892677 [哄抢救灾物资] 2 5 pic.news.mop.com/gs/2008/0528/12985.shtml00:00:02

注意:即使程序正确无误,Kafka也可能不会立即将消息写入HDFS,当消息达到一定量或者关闭程序时才将消息写入文件。

其他收集与参考链接

flume和kafka整合
1.下载flume-kafka-plus: https://github.com/beyondj2ee/flumeng-kafka-plugin
2.提取插件中的flume-conf.properties文件
修改该文件:#source section
producer.sources.s.type = exec
producer.sources.s.command = tail -f -n+1 /mnt/hgfs/vmshare/test.log
producer.sources.s.channels = c
修改所有topic的值改为test
将改后的配置文件放进flume/conf目录下

Kafka实战-Flume到Kafka http://www.open-open.com/lib/view/open1435884136903.html

高可用Hadoop平台-Flume NG实战图解篇 http://www.cnblogs.com/smartloli/p/4468708.html

http://www.cnblogs.com/xfly/p/3825804.html

http://blog.csdn.net/xiao_jun_0820/article/details/41576999

http://blog.csdn.net/u014373825/article/details/42711191

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注