@zhangyy
2018-04-12T06:22:39.000000Z
字数 1596
阅读 320
协作框架
- flume 监控目录操作
监控某个目录,若目录下面产生成符合条件的文件,flume 就抽取它到hdfs 上,目录 下可能有多中文件,比如当文件以log.tmp 结尾表示正在写,对log.tmp 文件设置size 值,就会变成一个以.log 结尾,则已经是完整文件(往往存在短暂),flume 可以抽取其中的数据,以log.completed 结尾则表示flume已经抽取完成,可以删除。
mkdir /home/hadoop/datas/spoolingmkdir /home/hadoop/datas/checkpointmkdir /home/hadoop/datas/data
hdfs dfs -mkdir /spool
cd /home/hadoop/datas/spooling/touch xx.logtouch yy.logtouch zz.log.tmp
cp -p hive-conf.properties test-dir.properties
# example.conf: A single-node Flume configuration# Name the components on this agenta3.sources = r3a3.sinks = k3a3.channels = c3# Describe/configure the sourcea3.sources.r3.type = spooldira3.sources.r3.spoolDir = /home/hadoop/datas/spoolinga3.sources.r3.ignorePattern = ^(.)*\\.tmp$# Describe the sinka3.sinks.k3.type = hdfsa3.sinks.k3.hdfs.path = hdfs://namenode01.hadoop.com:8020/spool/%Y%m/%da3.sinks.k3.hdfs.fileType = DataStreama3.sinks.k3.hdfs.writeFormat = Texta3.sinks.k3.hdfs.batchSize = 10# 设置二级目录按小时切割a3.sinks.k3.hdfs.round = truea3.sinks.k3.hdfs.roundValue = 1a3.sinks.k3.hdfs.roundUnit = hour# 设置文件回滚条件a3.sinks.k3.hdfs.rollInterval = 60a3.sinks.k3.hdfs.rollsize = 128000000a3.sinks.k3.hdfs.rollCount = 0a3.sinks.k3.hdfs.useLocalTimeStamp = truea3.sinks.k3.hdfs.minBlockReplicas = 1# Use a channel which buffers events in memorya3.channels.c3.type = filea3.channels.c3.checkpointDir = /home/hadoop/datas/checkpointa3.channels.c3.dataDirs = /home/hadoop/datas/data# Bind the source and sink to the channela3.sources.r3.channels = c3a3.sinks.k3.channel = c3
bin/flume-ng agent --conf conf --conf-file conf/test-dir.properties --name a3
