实时监控一个文件目录下的多个文件
有时我们不需要监控一个文件的追加数据,而是进行检测一个文件目录下是否有新的文件,这是就可以采用spool源进行替代exec那种方式,其实用exec也可以,将命令换一下,但是可能效率较低,因为是一行一行的读取嘛。
采用spool源就可以指定文件目录进行监控,它的作用机制是当我们向该文件夹中传输文件时,flume会将该文件传到sink,然后flume会将传进来的数据加个后缀,用来区别该文件是否上传过,所以当我们把一个文件传输进来后,再次进行更改该文件,flume不会监听的到,因为flume只会检查文件后缀名,只有没有的后缀的文件才会进入sink。
另外有一点就是如果我们将一个带有该区别后缀的文件传到该文件夹时,sink端会收到一些数据,但不是源数据,可能是一些报错等错误信息,也发现该文件夹下该文件也没有改名字,所以推断flume执行顺序是,先把监控的文件传输到sink,然后将该文件加个后缀名,之后就不会再次被监听到。
我们用spool监控本地的文件目录,然后将监听到的新文件上传到HDFS。
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = spooldir # 监听的文件夹 a1.sources.r1.spoolDir = /home/hadoop/data2 # 定义监听文件的后缀,最好不要与一些文件重复 a1.sources.r1.fileSuffix = .LOG # 忽略所有以.tmp 结尾的文件,不上传 a1.sources.r1.ignorePattern = ([^ ]*\.tmp) a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path=hdfs://hadoop102:9000/flume/%Y%m%d/%H a1.sinks.k1.hdfs.filePrefix=logs- a1.sinks.k1.hdfs.round=true a1.sinks.k1.hdfs.roundValue=1 a1.sinks.k1.hdfs.roundUnit=hour a1.sinks.k1.hdfs.useLocalTimeStamp=true a1.sinks.k1.hdfs.batchSize=1000 a1.sinks.k1.hdfs.fileType=DataStream a1.sinks.k1.hdfs.rollInterval=30 a1.sinks.k1.hdfs.rollSize=134217700 a1.sinks.k1.hdfs.rollCount=0 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
我们看一下结果:
我们成功的将data2文件夹中的数据上传到了HDFS,上面我们配置了一个忽略正则的,他就是用于区分不上传什么样的文件,上面配置的意思就是不上传以.tmp结尾的文件,将这种文件传输到该文件夹,flume会忽略该文件。
我们还可以进行下面修改:
a1.sources.r1.includePattern = ([^ ]*\.txt)
它的意思就是只监控以.txt文件结尾的文件,这样就可以将文件进行区别,不至于该文件夹下的每一个文件都被监控然后上传。
还有一点就是如果我们多次上传相同的文件,flume也是会进行上传的,只是传进去后该文件不会被添加后缀,因为已经存在了该文件。
虽然能够上传,但是也会打印报错信息到日志文件中。