3.4 Flume高级组件
高级组件
- Source Interceptors:Source可以指定一个或者多个拦截器按先后顺序依次对采集到的数据进行处 理。
- Channel Selectors:Source发往多个Channel的策略设置,如果source后面接了多个channel,到 底是给所有的channel都发,还是根据规则发送到不同channel,这些是由Channel Selectors来控制 的
- Sink Processors:Sink 发送数据的策略设置,一个channel后面可以接多个sink,channel中的数据 是被哪个sink获取,这个是由Sink Processors控制的
事件
Event是Flume传输数据的基本单位,也是事务的基本单位,在文本文件中,通常一行记录就是一个Event
Event中包含header和body;
- body是采集到的那一行记录的原始内容
- header类型为Map,里面可以存储一些属性信息,方便后面使用
- 我们可以在Source中给每一条数据的header中增加key-value,然后在Channel和Sink中使用header的值
Source Interceptors
常见的有:Timestamp Interceptor、Host Interceptor、Search and Replace Interceptor 、Static Interceptor、Regex Extractor Interceptor
- Timestamp Interceptor:向event中的header里面添加timestamp 时间戳信息
- Host Interceptor:向event中的header里面添加host属性,host的值为当前机器的主机名或者ip
- Search and Replace Interceptor:根据指定的规则查询Event中body里面的数据,然后进行替换, 这个拦截器会修改event中body的值,也就是会修改原始采集到的数据内容
- Static Interceptor:向event中的header里面添加固定的key和value
- Regex Extractor Interceptor:根据指定的规则从Event中的body里面抽取数据,生成key和value, 再把key和value添加到header中
我们event中的header里面添加key-value类型的数据,方便后面的channel和sink组件使用,对采集到的原始数据内容没有任何影响。
Search and Replace Interceptor是会根据规则修改event中body里面的原始数据内容,对header没有任 何影响,使用这个拦截器需要特别小心,因为他会修改原始数据内容。
案例数据准备
video_info {"id":"14943445328940974601","uid":"840717325115457536","lat":"53.530598","lnt":"-2.5620373","hots":0,"title":"0","status":"1","topicId":"0","end_time":"1494344570","watch_num":0,"share_num":"1","replay_url":null,"replay_num":0,"start_time":"1494344544","timestamp":1494344571,"type":"video_info"} user_info {"uid":"861848974414839801","nickname":"mick","usign":"","sex":1,"birthday":"","face":"","big_face":"","email":"abc@qq.com","mobile":"","reg_type":"102","last_login_time":"1494344580","reg_time":"1494344580","last_update_time":"1494344580","status":"5","is_verified":"0","verified_info":"","is_seller":"0","level":1,"exp":0,"anchor_level":0,"anchor_exp":0,"os":"android","timestamp":1494344580,"type":"user_info"} gift_record {"send_id":"834688818270961664","good_id":"223","video_id":"14943443045138661356","gold":"10","timestamp":1494344574,"type":"gift_record"}
首先确定这是一份json格式的数据
包括:视频信息、用户信息、送礼信息
当业务平台运行时,会实时产生这些日志数据,我们希望把这些数据采集到hdfs上进行分目录存储,并且根据type类型来进行区分。
针对这个需求配置agent的话,source使用基于文件的execsource、channle使用基于文件的channel, 我们希望保证数据的完整性和准确性,sink使用hdfssink。
最终要生成的目录如下
hdfs://192.168.197.101:9000/moreType/20200101/videoInfo hdfs://192.168.197.101:9000/moreType/20200101/userInfo hdfs://192.168.197.101:9000/moreType/20200101/giftRecord
这里就要思考一个问题,如何获取数据类型?
答案:通过source的拦截器可以向event的header中添加key-value,然后在 后面的channle或者sink中获取key-value的值
那我们在这就可以通过Regex Extractor Interceptor获取原始数据中的type字段的值,获取出来以后存储 到header中,这样在sink阶段就可以获取到了。
但是这个时候直接获取到的type的值是不满足要求的,需要对type的值进行转换,去掉下划线,转化为驼峰命名形式。
所以最终可以先使用Search and Replace Interceptor对原始数据中type的值进行转换,然后使用Regex Extractor Interceptor指定规则获取type字段的值,添加到header中。
最终流程:
Exec Source -> Search and Replace Interceptor->Regex Extractor Interceptor -> file channel -> hdfs sink
在bigdata04机器上创建 file-to-hdfs-moreType.conf
配置文件如下:
# agent的名称是a1 # 指定source组件、channel组件和Sink组件的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 # 配置source组件 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /data/log/moreType.log # 配置拦截器 [多个拦截器按照顺序依次执行] a1.sources.r1.interceptors = i1 i2 i3 i4 a1.sources.r1.interceptors.i1.type = search_replace a1.sources.r1.interceptors.i1.searchPattern = "type":"video_info" a1.sources.r1.interceptors.i1.replaceString = "type":"videoInfo" a1.sources.r1.interceptors.i2.type = search_replace a1.sources.r1.interceptors.i2.searchPattern = "type":"user_info" a1.sources.r1.interceptors.i2.replaceString = "type":"userInfo" a1.sources.r1.interceptors.i3.type = search_replace a1.sources.r1.interceptors.i3.searchPattern = "type":"gift_record" a1.sources.r1.interceptors.i3.replaceString = "type":"giftRecord" a1.sources.r1.interceptors.i4.type = regex_extractor a1.sources.r1.interceptors.i4.regex = "type":"(\\w+)" a1.sources.r1.interceptors.i4.serializers = s1 a1.sources.r1.interceptors.i4.serializers.s1.name = logType # 配置channel组件 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /data/soft/apache-flume-1.9.0-bin/data/moreType/checkpoint a1.channels.c1.dataDirs = /data/soft/apache-flume-1.9.0-bin/data/moreType/data # 配置sink组件 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://192.168.197.100:9000/moreType/%Y%m%d/%{logType} a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.rollInterval = 3600 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.useLocalTimeStamp = true #增加文件前缀和后缀 a1.sinks.k1.hdfs.filePrefix = data a1.sinks.k1.hdfs.fileSuffix = .log # 把组件连接起来 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
注意:这里面的拦截器,拦截器可以设置一个或者多个,source采集的每一条数据都会经过所有 的拦截器进行处理,多个拦截器按照顺序执行。
后面也统一设置了hdfs文件的前缀和后缀。
首先我们先将测试数据放入指定目录
cd /data/log vi moreType.log
上传配置文件到flume conf文件夹下
启动bigdata04 Agent并去bigdata01查看结果
启动命令如下:
../bin/flume-ng agent --name a1 --conf /data/soft/apache-flume-1.9.0-bin/conf/ --conf-file file-to-hdfs-moreType.conf -Dflume.root.logger=INFO,console
成功采集数据
测试成功
Channel Selectors
Channel Selectors类型包括:Replicating Channel Selector 和Multiplexing Channel Selector
官网案例如下:
简单讲:这个案例的意思就是source数据会分发到c1、c2、c3三个channel,flume可以保证c1、c2收到数据,但是c3是否能够收到数据并不保证。
selector.optional参数是一个可选项,可以不用配置
注意这个channel位置是复数,就表明了可以配置多个channel
此外还有Multiplexing Channel Selector,它表示会根据Event中header里面的值将Event发往不同的Channel,官网案例如下
在这个例子的配置中,指定了4个channel,c1、c2、c3、c4 source采集到的数据具体会发送到哪个channel中,会根据event中header里面的state属性的值,这个是通过selector.header控制的
- 如果state属性的值是CZ,则发送给c1
- 如果state属性的值是US,则发送给c2 c3
- 如果state属性的值是其它值,则发送给c4
这些规则是通过selector.mapping和selector.default控制的
案例1
多Channel之Replicating Channel Selector
组件关系图如下:
这张场景还是比较常见的,分析如下:
1.通过Replicating选择器将Source数据分发到两个channel。
2.将不同channel数据分发到两个不同的sink端进行消费。
实际工作中:
一般对接hdfs+kafka。
具体操作如下:
1.在bigdata04增加配置文件 tcp-to-replicatingchannel.conf
内容如下:
# agent的名称是a1 # 指定source组件、channel组件和Sink组件的名称 a1.sources = r1 a1.channels = c1 c2 a1.sinks = k1 k2 # 配置source组件 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 44444 # 配置channle选择器[默认就是replicating,所以可以省略] a1.sources.r1.selector.type = replicating # 配置channel组件 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # 配置sink组件 a1.sinks.k1.type = logger a1.sinks.k2.type = hdfs a1.sinks.k2.hdfs.path = hdfs://192.168.197.101:9000/replicating a1.sinks.k2.hdfs.fileType = DataStream a1.sinks.k2.hdfs.writeFormat = Text a1.sinks.k2.hdfs.rollInterval = 3600 a1.sinks.k2.hdfs.rollSize = 134217728 a1.sinks.k2.hdfs.rollCount = 0 a1.sinks.k2.hdfs.useLocalTimeStamp = true a1.sinks.k2.hdfs.filePrefix = data a1.sinks.k2.hdfs.fileSuffix = .log # 把组件连接起来 a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
这里使用netcat方式方便生成测试数据
2.启动bigdata04 Agent
../bin/flume-ng agent --name a1 --conf /data/soft/apache-flume-1.9.0-bin/conf/ --conf-file tcp-to-replicatingchannel.conf -Dflume.root.logger=INFO,console
3.通过telnet连接到socket 生产测试数据
可以看到我们既在控制台输出了数据,由存储了hdfs数据文件
案例2
Multiplexing Channel Selector
组件关系图如下:
测试数据如下:
{"name":"jack","age":19,"city":"bj"} {"name":"tom","age":26,"city":"sh"}
配置文件内容如下:
在bigdata04上创建新文件 tcp-to-multiplexingchannel.conf
# agent的名称是a1 # 指定source组件、channel组件和Sink组件的名称 a1.sources = r1 a1.channels = c1 c2 a1.sinks = k1 k2 # 配置source组件 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 44444 # 配置source拦截器 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = regex_extractor a1.sources.r1.interceptors.i1.regex = "city":"(\\w+)" a1.sources.r1.interceptors.i1.serializers = s1 a1.sources.r1.interceptors.i1.serializers.s1.name = city # 配置channle选择器 a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = city a1.sources.r1.selector.mapping.bj = c1 a1.sources.r1.selector.default = c2 # 配置channel组件 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # 配置sink组件 a1.sinks.k1.type = logger a1.sinks.k2.type = hdfs a1.sinks.k2.hdfs.path = hdfs://192.168.197.101:9000/multiplexing a1.sinks.k2.hdfs.fileType = DataStream a1.sinks.k2.hdfs.writeFormat = Text a1.sinks.k2.hdfs.rollInterval = 3600 a1.sinks.k2.hdfs.rollSize = 134217728 a1.sinks.k2.hdfs.rollCount = 0 a1.sinks.k2.hdfs.useLocalTimeStamp = true a1.sinks.k2.hdfs.filePrefix = data a1.sinks.k2.hdfs.fileSuffix = .log # 把组件连接起来 a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
启动flume agent
../bin/flume-ng agent --name a1 --conf /data/soft/apache-flume-1.9.0-bin/conf/ --conf-file tcp-to-multiplexingchannel.conf -Dflume.root.logger=INFO,console
ctrl + ]退出telnet
发送测试数据
数据采集成功
成功过滤出city是北京的数据
这样就实现了,根据规则把source采集到的数据分发到不同channel中,最终输出到不同存储介质中。 这就是Multiplexing Channel Selector的应用了
Sink Processors
- Default Sink Processor:默认的,不用配置sinkgroup,就是咱们现在使用的这种最普通的形式,一个 channel后面接一个sink的形式
- Load balancing Sink Processor是负载均衡处理器,一个channel后面可以接多个sink,这多个sink属于 一个sink group,根据指定的算法进行轮询或者随机发送,减轻单个sink的压力。
- Failover Sink Processor是故障转移处理器,一个channle后面可以接多个sink,这多个sink属于一个 sink group,按照sink的优先级,默认先让优先级高的sink来处理数据。
负载均衡
组件关系图
processor.sinks:指定这个sink groups中有哪些sink
processor.type:针对负载均衡的sink处理器,这里需要指定load_balance
processor.selector:此参数的值内置支持两个,round_robin和random,round_robin表示轮询,按照 sink的顺序,轮流处理数据,random表示随机
processor.backoff:默认为false,设置为true后,故障的节点会列入黑名单,过一定时间才会再次发送 数据,如果还失败,则等待时间是指数级增长;一直到达到最大的时间。
processor.selector.maxTimeOut:最大的黑名单时间,默认是30秒
故障转移
参考配置:
# agent的名称是a1 # 指定source组件、channel组件和Sink组件的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 k2 # 配置source组件 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 44444 # 配置channel组件 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.type = avro a1.sinks.k1.hostname = 192.168.197.102 a1.sinks.k1.port = 41414 a1.sinks.k1.batch-size = 1 a1.sinks.k2.type = avro a1.sinks.k2.hostname = 192.168.197.103 a1.sinks.k2.port = 41414 a1.sinks.k2.batch-size = 1 # 配置sink策略 a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = round_robin # 把组件连接起来 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1
processor.type:针对故障转移的sink处理器,使用
failover processor.priority.:指定sink group中每一个sink组件的优先级,默认情况下channel中的数据会被 优先级比较高的sink取走
processor.maxpenalty:sink发生故障之后,最大等待时间
参考配置文件:
# agent的名称是a1 # 指定source组件、channel组件和Sink组件的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 k2 # 配置source组件 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 44444 # 配置channel组件 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.type = avro a1.sinks.k1.hostname = 192.168.197.102 a1.sinks.k1.port = 41414 a1.sinks.k1.batch-size = 1 a1.sinks.k2.type = avro a1.sinks.k2.hostname = 192.168.182.102 a1.sinks.k2.port = 41414 a1.sinks.k2.batch-size = 1 # 配置sink策略 a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sinkgroups.g1.processor.maxpenalty = 10000 # 把组件连接起来 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1