@zhangyy
2018-04-12T14:22:39.000000Z
字数 1596
阅读 168
协作框架
- flume 监控目录操作
监控某个目录,若目录下面产生成符合条件的文件,flume 就抽取它到hdfs 上,目录 下可能有多中文件,比如当文件以log.tmp 结尾表示正在写,对log.tmp 文件设置size 值,就会变成一个以.log 结尾,则已经是完整文件(往往存在短暂),flume 可以抽取其中的数据,以log.completed 结尾则表示flume已经抽取完成,可以删除。
mkdir /home/hadoop/datas/spooling
mkdir /home/hadoop/datas/checkpoint
mkdir /home/hadoop/datas/data
hdfs dfs -mkdir /spool
cd /home/hadoop/datas/spooling/
touch xx.log
touch yy.log
touch zz.log.tmp
cp -p hive-conf.properties test-dir.properties
# example.conf: A single-node Flume configuration
# Name the components on this agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /home/hadoop/datas/spooling
a3.sources.r3.ignorePattern = ^(.)*\\.tmp$
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://namenode01.hadoop.com:8020/spool/%Y%m/%d
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.writeFormat = Text
a3.sinks.k3.hdfs.batchSize = 10
# 设置二级目录按小时切割
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
# 设置文件回滚条件
a3.sinks.k3.hdfs.rollInterval = 60
a3.sinks.k3.hdfs.rollsize = 128000000
a3.sinks.k3.hdfs.rollCount = 0
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a3.channels.c3.type = file
a3.channels.c3.checkpointDir = /home/hadoop/datas/checkpoint
a3.channels.c3.dataDirs = /home/hadoop/datas/data
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
bin/flume-ng agent --conf conf --conf-file conf/test-dir.properties --name a3