Flume 拦截器概念及自定义拦截器的运用

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
实时数仓Hologres,5000CU*H 100GB 3个月
简介: Apache Flume 的拦截器是事件处理组件,位于Source和Channel之间,用于在写入Channel前对数据进行转换、提取或删除。它们支持数据处理和转换、数据增强、数据过滤以及监控和日志功能。要创建自定义拦截器,需实现Interceptor接口,包含initialize、intercept、intercept(List<Event>)和close方法。配置拦截器时,通过Builder模式实现Interceptor.Builder接口。在Flume配置文件中指定拦截器全类名,如`TestInterceptor$Builder`,然后启动Flume进行测试。

@[toc]

Flume 拦截器

在 Flume 中,拦截器(Interceptors)是一种可以在事件传输过程中拦截、处理和修改事件的组件。

位于 Source 与 Channel 之间,在写入Channel 之前,拦截器可以对数据进行转换、提取或删除,以满足特定的需求。每个拦截器只处理同一个 Source 接收到的事件,你也可以同时配置多个拦截器,它们会按顺序执行。

拦截器的作用

  • 数据处理和转换: 拦截器可以对事件数据进行处理和转换。例如,可以对原始日志进行解析、过滤、格式化等操作,以便后续处理或存储。

  • 数据增强: 拦截器可以为事件数据添加额外的信息或元数据。例如,可以添加时间戳、主机信息、标签等,以丰富事件数据的内容。

  • 数据过滤: 拦截器可以根据特定条件过滤掉不需要的事件数据,减少数据传输的量或过滤掉无效数据。

  • 监控和日志: 拦截器可以用于监控数据流的运行情况,记录日志信息或统计数据流中的事件数量、处理速率等指标,帮助用户进行性能分析和故障排查。

拦截器运用

1.创建项目

创建一个 Maven 工程项目,引入 Flume 依赖。

在 IDEA 中创建 Maven 项目想必大家都会,这里不再赘述。

根据集群中的 Flume 版本,引入 Flume 依赖,如下所示:

    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.10.1</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

无需将该依赖打包进最后的 JAR 包中,故将其作用域设置为 provided

当一个依赖项的 scope 被设置为 compile 时,它将在编译和运行时都可用,并包含在最终的项目包中。而 provided 范围的依赖项仅在编译和测试阶段需要,运行时不包括。

2.实现拦截器接口

创建测试类 TestInterceptor 实现拦截器 Interceptor,注意,导包时不要导错了。

import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.List;

public class TimestampInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        return null;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        return null;
    }

    @Override
    public void close() {

    }
}

在上面的代码中,我们实现了 Flume 拦截器接口 Interceptor,并重写了其中的四个方法:

  • initialize() 方法:初始化拦截器操作,读取配置信息、建立连接等。

  • intercept(Event event) 方法:用于拦截单个事件,并对事件进行处理。接收一个事件对象作为输入,并返回一个修改后的事件对象。

  • intercept(List<Event> list) 方法:事件批处理,拦截事件列表,并对事件列表进行处理。

  • close() 方法:关闭拦截器,在这里释放资源、关闭连接等。

3.编写事件处理逻辑

在这里做个简单的事件处理,如果数据中包含字符串 hello 则进行过滤操作,这样我们可以直观感受到拦截器的存在,下面来进行逻辑设计。

import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

public class TestInterceptor implements Interceptor {
   
   

    @Override
    public void initialize() {
   
   

    }

    @Override
    public Event intercept(Event event) {
   
   
        // 获取事件数据
        String eventData = new String(event.getBody(), StandardCharsets.UTF_8);

        // 检查事件数据中是否包含指定字符串
        if (eventData.contains("hello")) {
   
   
            // 如果包含指定字符串,则过滤掉该事件,返回 null
            return null;
        }

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
   
   
        // 创建一个新的列表,存储处理过后的事件
        List<Event> interceptedEvents = new ArrayList<>();

        for (Event event : events) {
   
   
            Event interceptedEvent = intercept(event);
            if (interceptedEvent != null) {
   
   
                interceptedEvents.add(interceptedEvent);
            }
        }

        return interceptedEvents;
    }

    @Override
    public void close() {
   
   

    }

}

intercept(List<Event> events) 方法用于对事件列表进行批量处理。这个方法会遍历传入的事件列表,并对每一个事件调用 intercept(Event event) 方法来进行单独处理。

注意,如果只有 intercept(Event event) 方法被重写了,而没有实现 intercept(List<Event> events) 批处理方法,那么在处理事件时会以单个事件的方式进行处理。

在不需要进行初始化和释放资源的情况下,我们可以选择不重写 initializeclose 方法。

4.拦截器构建

在编写完事件处理逻辑后,我们还需要对拦截器进行构建。

在 Flume 中,拦截器的创建和配置通常是通过 Builder 模式来完成的。

在程序中,我们可以定义一个静态内部类 Builder,实现 Interceptor.Builder 接口来对拦截器进行构建,如下所示:

    public static class Builder implements Interceptor.Builder {
   
   

        @Override
        public void configure(Context context) {
   
   
            // 配置操作,可留空
        }

        @Override
        public Interceptor build() {
   
   
            // 返回构建的拦截器类
        }

    }

在我们这个案例中,完整的代码如下所示:

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

public class TestInterceptor implements Interceptor {
   
   

    @Override
    public void initialize() {
   
   

    }

    @Override
    public Event intercept(Event event) {
   
   
        // 获取事件数据
        String eventData = new String(event.getBody(), StandardCharsets.UTF_8);

        // 检查事件数据中是否包含指定字符串
        if (eventData.contains("hello")) {
   
   
            // 如果包含指定字符串,则过滤掉该事件,返回 null
            return null;
        }

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
   
   
        List<Event> interceptedEvents = new ArrayList<>();

        for (Event event : events) {
   
   
            Event interceptedEvent = intercept(event);
            if (interceptedEvent != null) {
   
   
                interceptedEvents.add(interceptedEvent);
            }
        }

        return interceptedEvents;
    }

    @Override
    public void close() {
   
   

    }

    // 拦截器构建
    public static class Builder implements Interceptor.Builder {
   
   

        @Override
        public void configure(Context context) {
   
   

        }

        @Override
        public Interceptor build() {
   
   
            return new TestInterceptor();
        }

    }

}

5.打包与上传

将写好的项目进行打包,并上传到集群中,进行测试。

注意,需要将打包好的拦截器包放在 Flume 安装目录下的 lib 文件夹中。

image.png

6.编写配置文件

这里为了验证拦截器的作用,通过一个 Flume 采集案例来进行体现。

如果你不知道如何编写配置文件,可以看我写的这篇文章 —— Flume 配置文件编写技巧(包会的,抄就完了)

这个配置案例是将发送到 HTTP 源中的数据采集到 HDFS 上,将本地文件作为缓冲通道,该配置文件命名为 httpToHDFS.conf

# 声明
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Source 源配置
a1.sources.r1.type = http
a1.sources.r1.port = 5140
a1.sources.r1.bind = localhost

# 拦截器配置
# 拦截器定义
a1.sources.r1.interceptors = i1
# 拦截器全类名
a1.sources.r1.interceptors.i1.type = TestInterceptor$Builder

# Sink 处理/存储配置
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%Y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

# Channel 通道配置
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/software/flume/checkpoint
a1.channels.c1.dataDirs = /opt/software/flume/data

# 组装/绑定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

拦截器全类名配置那里需要注意,格式为 拦截器的全类名 + $Builder

在 IDEA 中获取全类名的方式:右击需要引用的类,依次选择【File——>Copy Path/Reference...——>Copy Reference】即可复制。

你可以根据你的需要对该配置文件进行修改。

7.测试运行

因为我们是将数据采集到 HDFS 上,所以需要先启动 Hadoop,然后再进行操作。

# 运行 Flume
cd $FLUME_HOME

# 注意引用路径,需要修改成你自己的
./bin/flume-ng agent -n a1 -c conf/ -f job/httpToHDFS.conf -Dflume.root.logger=INFO,console

Flume 启动完成后,如下所示:

image.png

我们通过其它窗口,使用 curl 命令向 HTTP 源发送两条模拟数据:

curl -X POST -d '[{"body":"hello body"}]'  http://localhost:5140

curl -X POST -d '[{"body":"HELLO FLUME"}]'  http://localhost:5140

image.png

数据发送完成后,Flume 会采集到该数据,并存储到 HDFS 上。

image.png

通过命令,查看 HDFS 中存储的内容,验证拦截器是否生效:

hdfs dfs -text /flume/events/2024-04-04/1630/00/ev*

结果如下所示:

image.png

可以看到,我们在上面分别发送了两条数据 hello bodyHELLO FLUME,但最终 HDFS 中只存储了一条数据。

这是因为我们设置的拦截器生效了,它对数据中包含 hello 字符串的事件进行了过滤,故只存储了一条数据。

Flume 拦截器就是起到这样的效果,对数据进行处理、转换、删除等操作,是不是很简单呀。(同学,包会的呀)。

相关文章
|
7月前
|
数据采集 消息中间件 存储
Flume 快速入门【概述、安装、拦截器】
Apache Flume 是一个开源的数据采集工具,用于从各种数据源(如日志、网络数据、消息队列)收集大规模数据,并将其传输和加载到数据存储系统(如 HDFS、HBase、Hive)。Flume 由数据源(Source)、通道(Channel)、拦截器(Interceptor)和接收器(Sink)组成,支持灵活配置以适应不同的数据流处理需求。安装 Flume 包括解压软件包、配置环境变量和调整日志及内存设置。配置文件定义数据源、通道、拦截器和接收器,拦截器允许预处理数据。Flume 适用于构建数据管道,整合分散数据到中心存储系统,便于分析和报告。
1235 3
|
7月前
|
消息中间件 存储 SQL
Flume【基础知识 01】简介 + 基本架构及核心概念 + 架构模式 + Agent内部原理 + 配置格式(一篇即可入门Flume)
【2月更文挑战第18天】Flume【基础知识 01】简介 + 基本架构及核心概念 + 架构模式 + Agent内部原理 + 配置格式(一篇即可入门Flume)
2032 0
|
存储 Java 分布式数据库
Flume学习---3、自定义Interceptor、自定义Source、自定义Sink
Flume学习---3、自定义Interceptor、自定义Source、自定义Sink
|
数据采集 消息中间件 分布式计算
大数据数据采集的数据采集(收集/聚合)的Flume之概念
在大数据应用中,数据采集是非常重要的一步。Flume是一个开源的分布式系统,可以帮助企业完成数据采集、收集和聚合等操作,并将它们发送到后续处理系统中。
303 0
|
中间件
【Flume中间件】(12)自定义拦截器
【Flume中间件】(12)自定义拦截器
109 5
|
数据采集 Java Shell
flume自定义拦截器
向文件中定时新增日期数据,采集该文件, 通过自定义source拦截器给日期数据加上自己姓名作为前缀,输出到控制台。 #### 分析: 需求很简单,主要在于练习flume自定义拦截器的流程,我们需要使用java来写flume拦截器的流程需求,然后使用maven将程序打包成jar包。放到采集服务器的flume安装路径的/lib路径下,然后运行。
211 0
flume自定义拦截器
|
中间件
【Flume中间件】(14)自定义Sink
【Flume中间件】(14)自定义Sink
107 7
|
中间件 Java 数据库连接
【Flume中间件】(13)自定义Source
【Flume中间件】(13)自定义Source
145 1
|
存储 消息中间件 缓存
Flume核心概念
Flume 是一个分布式、可靠、高可用的服务,它能够将不同数据源的海量日志数据进行高效收集、汇聚、移动,最后存储到一个中心化数据存储系统(HDFS、 HBase等)中,它是一个轻量级的工具,简单、灵活、容易部署,适应各种方式日志收集并支持 failover 和负载均衡。
|
7月前
|
存储 分布式计算 监控
【Flume】Flume 监听日志文件案例分析
【4月更文挑战第4天】【Flume】Flume 监听日志文件案例分析