【Flume中间件】(12)自定义拦截器

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: 【Flume中间件】(12)自定义拦截器

自定义拦截器

在有些情况下,我们需要对采集来的数据进行分类,那么我们就可以采用multiplexing拦截器的方式,在数据中添加头部的键值,根据键值选择相应的channel。

我们自定义拦截器需要实现官方的Interceptor,实现相应的方法,而且还需要一个静态内部类,用于返回Interceptor类。

而且在实现interceptor方法时,可以进行丢失据,就是有些不符合条件的数据就抛弃掉,可以进行简单的过滤,直接返回null即可。

下面实现的就是服务器1进行监听一个数据,如果该数据中存在“hello”字符串,就将其传入服务器2,否则就将其传入服务器3.

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
a1.sources.r1.type = taildir
a1.sources.r1.positionFile = /opt/module/flume/position/position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/hadoop/data3/data6
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=com.atguigu.Interceptor.MyInterceptor$Builder
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header=type
a1.sources.r1.selector.mapping.yes=c1
a1.sources.r1.selector.mapping.no=c2
a1.sinks.k1.type = avro
a1.sinks.k1.hostname=hadoop103
a1.sinks.k1.port=4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname=hadoop104
a1.sinks.k2.port=4141
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1  
a1.sinks.k2.channel = c2
a2.sources = r1
a2.sinks = k1
a2.channels = c1
a2.sources.r1.type = avro
a2.sources.r1.bind=hadoop103
a2.sources.r1.port=4141
a2.sinks.k1.type = logger
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1      
a3.sources = r1
a3.sinks = k1
a3.channels = c1
a3.sources.r1.type = avro
a3.sources.r1.bind=hadoop104
a3.sources.r1.port=4141
a3.sinks.k1.type = logger
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1      
package com.atguigu.Interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
public class MyInterceptor implements Interceptor {
    ArrayList<Event> list_event;
    @Override
    public void initialize() {
        list_event = new ArrayList<>();
    }
    @Override
    public Event intercept(Event event) {
        # 获取时间封装成字符串
        String string = new String(event.getBody());
        # 判断数据中是否存在hello,如果存在封装键值对,type-yes,type就是header
        if (string.contains("hello")) {
            event.getHeaders().put("type", "yes");
        } else {
            event.getHeaders().put("type", "no");
        }
        return event;
    }
    # 操纵事件集合
    @Override
    public List<Event> intercept(List<Event> list) {
        # 将上一次的集合清空
        list_event.clear();
        for (Event event : list) {
            # 调用上面的方法,将数据加工头部,存到list中
            list_event.add(intercept(event));
        }
        return list_event;
    }
    @Override
    public void close() {
    }
    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new MyInterceptor();
        }
        @Override
        public void configure(Context context) {
        }
    }
}


目录
相关文章
|
存储 开发框架 安全
ASP.NET Core 中间件的使用(三):全局异常处理机制(Filter拦截器对比)
ASP.NET Core 中间件的使用(三):全局异常处理机制(Filter拦截器对比)
|
2月前
|
数据采集 中间件 开发者
Scrapy爬虫框架-自定义中间件
Scrapy爬虫框架-自定义中间件
64 1
|
7月前
|
数据采集 消息中间件 存储
Flume 快速入门【概述、安装、拦截器】
Apache Flume 是一个开源的数据采集工具,用于从各种数据源(如日志、网络数据、消息队列)收集大规模数据,并将其传输和加载到数据存储系统(如 HDFS、HBase、Hive)。Flume 由数据源(Source)、通道(Channel)、拦截器(Interceptor)和接收器(Sink)组成,支持灵活配置以适应不同的数据流处理需求。安装 Flume 包括解压软件包、配置环境变量和调整日志及内存设置。配置文件定义数据源、通道、拦截器和接收器,拦截器允许预处理数据。Flume 适用于构建数据管道,整合分散数据到中心存储系统,便于分析和报告。
1235 3
|
7月前
|
存储 数据采集 监控
Flume 拦截器概念及自定义拦截器的运用
Apache Flume 的拦截器是事件处理组件,位于Source和Channel之间,用于在写入Channel前对数据进行转换、提取或删除。它们支持数据处理和转换、数据增强、数据过滤以及监控和日志功能。要创建自定义拦截器,需实现Interceptor接口,包含initialize、intercept、intercept(List&lt;Event&gt;)和close方法。配置拦截器时,通过Builder模式实现Interceptor.Builder接口。在Flume配置文件中指定拦截器全类名,如`TestInterceptor$Builder`,然后启动Flume进行测试。
241 0
|
开发框架 中间件 .NET
ASP.NET CORE 自定义中间件
ASP.NET CORE 自定义中间件
66 0
|
开发框架 中间件 .NET
asp.net core 自定义中间件【以dapper为例】
asp.net core 自定义中间件【以dapper为例】
185 7
|
存储 Java 分布式数据库
Flume学习---3、自定义Interceptor、自定义Source、自定义Sink
Flume学习---3、自定义Interceptor、自定义Source、自定义Sink
|
中间件
gin注册自定义中间件失效
gin注册自定义中间件失效
268 8
|
中间件
【Flume中间件】(14)自定义Sink
【Flume中间件】(14)自定义Sink
109 7
|
数据采集 Java Shell
flume自定义拦截器
向文件中定时新增日期数据,采集该文件, 通过自定义source拦截器给日期数据加上自己姓名作为前缀,输出到控制台。 #### 分析: 需求很简单,主要在于练习flume自定义拦截器的流程,我们需要使用java来写flume拦截器的流程需求,然后使用maven将程序打包成jar包。放到采集服务器的flume安装路径的/lib路径下,然后运行。
211 0
flume自定义拦截器