1、解压
[centos@hadoop10 data]$ tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /data/module/
[centos@hadoop10 module]$ mv apache-flume-1.9.0-bin/ flume
2、jar包冲突删除包
[centos@hadoop10 flume]$ cd ./lib/
[centos@hadoop10 lib]$ rm guava-11.0.2.jar
3、更改日志目录
[centos@hadoop10 flume]$ cd ./conf/
[centos@hadoop10 conf]$ vim log4j.properties
更改日志路径
flume.log.dir=/data/moudle/flume/logs
[centos@hadoop10 conf]$ cp flume-env.sh.template flume-env.sh
打开jdk参数
# Give Flume more memory and pre-allocate, enable remote monitoring via JMX
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
4、配置采集文件
配置sources
#定义组件
a1.sources = r1
a1.channels = c1
#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /data/applog/log/app.*
a1.sources.r1.positionFile = /data/module/flume/taildir_position.json
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
配置channels
#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop10:9092,hadoop11:9092,hadoop12:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false
#组装
a1.sources.r1.channels = c1
配置sink省略
5、创建任务目录,复制第四步内容
[centos@hadoop10 flume]$ mkdir job
[centos@hadoop10 job]$ vim file_to_kafka.conf
6、启动
[centos@hadoop10 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.logger=info,console
7、开启消费数据
[centos@hadoop10 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop10:9092 --topic topic_log
8、拦截器编写
[source channel sink]
异常: 手写配置不对,复制后没问题
2023-08-22 06:58:24,796 (conf-file-poller-0) [WARN - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateConfigFilterSet(FlumeConfiguration.java:623)] Agent configuration for 'a1' has no configfilters.
2023-08-22 06:58:24,823 (conf-file-poller-0) [WARN - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:809)] Agent configuration for 'a1' has no sinks.
2023-08-22 06:58:24,825 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:163)] Post-validation flume configuration contains configuration for agents: [a1]
2023-08-22 06:58:24,827 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:151)] Creating channels
2023-08-22 06:58:24,840 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel c1 type org.apache.flume.channel.kafka.KafkaChannel
2023-08-22 06:58:24,845 (conf-file-poller-0) [INFO - org.apache.flume.channel.kafka.KafkaChannel.configure(KafkaChannel.java:184)] Group ID was not specified. Using flume as the group id.
2023-08-22 06:58:24,855 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:205)] Created channel c1