flume自定义拦截器

简介: 向文件中定时新增日期数据,采集该文件, 通过自定义source拦截器给日期数据加上自己姓名作为前缀,输出到控制台。#### 分析:需求很简单,主要在于练习flume自定义拦截器的流程,我们需要使用java来写flume拦截器的流程需求,然后使用maven将程序打包成jar包。放到采集服务器的flume安装路径的/lib路径下,然后运行。

简单需求:

向文件中定时新增日期数据,采集该文件, 通过自定义source拦截器给日期数据加上自己姓名作为前缀,输出到控制台。

分析:

需求很简单,主要在于练习flume自定义拦截器的流程,我们需要使用java来写flume拦截器的流程需求,然后使用maven将程序打包成jar包。放到采集服务器的flume安装路径的/lib路径下,然后运行。

步骤:

1.启动一个maven工程,导入下面依赖的jar包
<repositories>
   <repository>
    <id>cloudera</id>
    <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
   </repository>
 </repositories>

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.6.0-cdh5.14.2</version>
    </dependency>
</dependencies>


<build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                    <!--    <verbal>true</verbal>-->
                </configuration>
            </plugin>
        </plugins>
</build>
2.自定义flume的拦截器
新建package com.kkb.flume.interceptor
新建类及内部类,分别是 MyInterceptorMyBuilder
package com.book.flume.interceptor;

import com.google.common.base.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;

public class MyInterceptor implements Interceptor {
    /**
     * my_name:指定姓名
     */
    private final String my_name;

    /**
     * 提供构建方法,后期可以接受配置文件中的参数
     *
     * @param my_name
     */
    public MyInterceptor(String my_name) {
        this.my_name = my_name;
    }

    /**
     *
     * 单个event拦截逻辑
     */
    public Event intercept(Event event) {
        if (event == null) {
            return null;
        }

        try {
            String line = new String(event.getBody(), Charsets.UTF_8);

            String newLine = this.my_name + line;

            event.setBody(newLine.getBytes(Charsets.UTF_8));
            return event;
        } catch (Exception e) {
            return event;
        }

    }

    /**
     * 批量event拦截逻辑
     */
    public List<Event> intercept(List<Event> events) {
        List<Event> out = new ArrayList<Event>();
        for (Event event : events) {
            Event outEvent = intercept(event);
            if (outEvent != null) {
                out.add(outEvent);
            }
        }
        return out;
    }

    public void close() {

    }

    public void initialize() {

    }

    /**
     * 相当于自定义Interceptor的工厂类
     */
    public static class MyBuilder implements Interceptor.Builder {

        /**
         * my_name:指定姓名
         */
        private String my_name;

        public void configure(Context context) {
            this.my_name = context.getString("my_name", "");
        }

        /*
         * @see org.apache.flume.interceptor.Interceptor.Builder#build()
         */
        public MyInterceptor build() {
            return new MyInterceptor(my_name);
        }
    }
}

MyInterceptor类实现Interceptor接口的单个拦截器和多个拦截器方法、以及close()、initialize()。根据需求我们的主要逻辑在单个拦截器内。

3.将程序打包成jar包,将original-flumedemo-1.0-SNAPSHOT.jar发送到采集服务器安装flume目录的lib目录下。

在这里插入图片描述

4.编写配置flume的配置文件
# 指定sources、sinks、channels,可以指定多个,a1为agent的名称,以此配置启动时会指定此名称(符合规定的命名都可以)
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 指定sources的配置,此为tpye指定命令行执行
a1.sources.r1.type = exec
# 指定命令行执行的命令,根据需求查看这个/misc_file/flume_date_log/date_log文件下新增的数据
a1.sources.r1.command = tail -F /misc_file/flume_date_log/date_log
# 指定使用的拦截器名称
a1.sources.r1.interceptors = i1
# 指定使用的拦截器的类型,使用刚才打包jar的执行的路径的启动的类
a1.sources.r1.interceptors.i1.type = com.book.flume.interceptor.MyInterceptor$MyBuilder
# 指定向jar中传递的参数,即需求中的名字
a1.sources.r1.interceptors.i1.my_name = lihongcheng

# sinks的方式,此处根据需求是输出到控制台上
a1.sinks.k1.type = logger

# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
# channel中存储的event的最大个数
a1.channels.c1.capacity = 1000
# channel每次从source获得的event最多个数或一次发往sink的event最多个数
a1.channels.c1.transactionCapacity = 100

# 将source和sink与channel连接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
5.启动一个flume代理
# cd到你自己flume的安装目录执行
cd /install/apache-flume-1.6.0-cdh5.14.2-bin
# -c指定执行conf配置文件,-f指定执行的配置文件的路径,-name为指定此agent的名称,即为配置文件中的a1,-Dflume.root.logger=DEBUG,console指定输出日志级别以及输出到哪(此处指定控制台)
bin/flume-ng agent -c conf -f conf/spool-interceptor-hdfs.conf -name a1 -Dflume.root.logger=DEBUG,console
6.准备向/misc_file/flume_date_log/date_log文件中添加数据。

使用如下shell,编写一个shell文件运行:

#!/bin/bash
while true
do
 date >> /misc_file/flume_date_log/date_log;
  sleep 0.5;
done

使用如下命令运行:

# 修改对应shell脚本文件的权限,使其可以执行
chmod u+x tail-file.sh 
# 运行脚本
sh /shells/tail-file.sh

最终在启动flume的服务器控制台可以看到如下结果:
在这里插入图片描述

目录
相关文章
|
8月前
|
数据采集 消息中间件 存储
Flume 快速入门【概述、安装、拦截器】
Apache Flume 是一个开源的数据采集工具,用于从各种数据源(如日志、网络数据、消息队列)收集大规模数据,并将其传输和加载到数据存储系统(如 HDFS、HBase、Hive)。Flume 由数据源(Source)、通道(Channel)、拦截器(Interceptor)和接收器(Sink)组成,支持灵活配置以适应不同的数据流处理需求。安装 Flume 包括解压软件包、配置环境变量和调整日志及内存设置。配置文件定义数据源、通道、拦截器和接收器,拦截器允许预处理数据。Flume 适用于构建数据管道,整合分散数据到中心存储系统,便于分析和报告。
1295 3
|
8月前
|
存储 数据采集 监控
Flume 拦截器概念及自定义拦截器的运用
Apache Flume 的拦截器是事件处理组件,位于Source和Channel之间,用于在写入Channel前对数据进行转换、提取或删除。它们支持数据处理和转换、数据增强、数据过滤以及监控和日志功能。要创建自定义拦截器,需实现Interceptor接口,包含initialize、intercept、intercept(List&lt;Event&gt;)和close方法。配置拦截器时,通过Builder模式实现Interceptor.Builder接口。在Flume配置文件中指定拦截器全类名,如`TestInterceptor$Builder`,然后启动Flume进行测试。
264 0
|
存储 Java 分布式数据库
Flume学习---3、自定义Interceptor、自定义Source、自定义Sink
Flume学习---3、自定义Interceptor、自定义Source、自定义Sink
|
中间件
【Flume中间件】(12)自定义拦截器
【Flume中间件】(12)自定义拦截器
112 6
|
中间件
【Flume中间件】(14)自定义Sink
【Flume中间件】(14)自定义Sink
113 8
|
中间件 Java 数据库连接
【Flume中间件】(13)自定义Source
【Flume中间件】(13)自定义Source
146 2
|
存储 Java Linux
Apache Flume-- 自定义 sink(扩展)--数据写入本地|学习笔记
Apache Flume-- 自定义 sink(扩展)--数据写入本地
Apache Flume-- 自定义 sink(扩展)--数据写入本地|学习笔记
|
监控 Java 关系型数据库
Apache Flume-自定义 source(扩展)--功能测试实现|学习笔记
快速学习 Apache Flume-自定义 source(扩展)--功能测试实现
 Apache Flume-自定义 source(扩展)--功能测试实现|学习笔记
|
SQL 监控 关系型数据库
Apache Flume-自定义 source(扩展)|学习笔记
快速学习 Apache Flume-自定义 source(扩展),具体实现代码逻辑 Flume 提供了很多内置的 source、sink、channel。但是在某些场合下,它自带的组件可能不满足需求,为此 Flume 官方也提供了相关的接口,我们可以按照它的接口和规范进行开发,实现自己的需求。
Apache Flume-自定义 source(扩展)|学习笔记
|
缓存 监控 Java
Apache Flume-自定义拦截器-功能实现|学习笔记
快速学习 Apache Flume-自定义拦截器-功能实现
Apache Flume-自定义拦截器-功能实现|学习笔记