自定义拦截器
在有些情况下,我们需要对采集来的数据进行分类,那么我们就可以采用multiplexing拦截器的方式,在数据中添加头部的键值,根据键值选择相应的channel。
我们自定义拦截器需要实现官方的Interceptor,实现相应的方法,而且还需要一个静态内部类,用于返回Interceptor类。
而且在实现interceptor方法时,可以进行丢失据,就是有些不符合条件的数据就抛弃掉,可以进行简单的过滤,直接返回null即可。
下面实现的就是服务器1进行监听一个数据,如果该数据中存在“hello”字符串,就将其传入服务器2,否则就将其传入服务器3.
a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 a1.sources.r1.type = taildir a1.sources.r1.positionFile = /opt/module/flume/position/position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /home/hadoop/data3/data6 a1.sources.r1.interceptors=i1 a1.sources.r1.interceptors.i1.type=com.atguigu.Interceptor.MyInterceptor$Builder a1.sources.r1.selector.type=multiplexing a1.sources.r1.selector.header=type a1.sources.r1.selector.mapping.yes=c1 a1.sources.r1.selector.mapping.no=c2 a1.sinks.k1.type = avro a1.sinks.k1.hostname=hadoop103 a1.sinks.k1.port=4141 a1.sinks.k2.type = avro a1.sinks.k2.hostname=hadoop104 a1.sinks.k2.port=4141 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
a2.sources = r1 a2.sinks = k1 a2.channels = c1 a2.sources.r1.type = avro a2.sources.r1.bind=hadoop103 a2.sources.r1.port=4141 a2.sinks.k1.type = logger a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
a3.sources = r1 a3.sinks = k1 a3.channels = c1 a3.sources.r1.type = avro a3.sources.r1.bind=hadoop104 a3.sources.r1.port=4141 a3.sinks.k1.type = logger a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1
package com.atguigu.Interceptor; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.util.ArrayList; import java.util.List; public class MyInterceptor implements Interceptor { ArrayList<Event> list_event; @Override public void initialize() { list_event = new ArrayList<>(); } @Override public Event intercept(Event event) { # 获取时间封装成字符串 String string = new String(event.getBody()); # 判断数据中是否存在hello,如果存在封装键值对,type-yes,type就是header if (string.contains("hello")) { event.getHeaders().put("type", "yes"); } else { event.getHeaders().put("type", "no"); } return event; } # 操纵事件集合 @Override public List<Event> intercept(List<Event> list) { # 将上一次的集合清空 list_event.clear(); for (Event event : list) { # 调用上面的方法,将数据加工头部,存到list中 list_event.add(intercept(event)); } return list_event; } @Override public void close() { } public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new MyInterceptor(); } @Override public void configure(Context context) { } } }