flume自定义拦截器

简介: 向文件中定时新增日期数据,采集该文件, 通过自定义source拦截器给日期数据加上自己姓名作为前缀,输出到控制台。#### 分析:需求很简单,主要在于练习flume自定义拦截器的流程,我们需要使用java来写flume拦截器的流程需求,然后使用maven将程序打包成jar包。放到采集服务器的flume安装路径的/lib路径下,然后运行。

简单需求:

向文件中定时新增日期数据,采集该文件, 通过自定义source拦截器给日期数据加上自己姓名作为前缀,输出到控制台。

分析:

需求很简单,主要在于练习flume自定义拦截器的流程,我们需要使用java来写flume拦截器的流程需求,然后使用maven将程序打包成jar包。放到采集服务器的flume安装路径的/lib路径下,然后运行。

步骤:

1.启动一个maven工程,导入下面依赖的jar包
<repositories>
   <repository>
    <id>cloudera</id>
    <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
   </repository>
 </repositories>

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.6.0-cdh5.14.2</version>
    </dependency>
</dependencies>


<build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                    <!--    <verbal>true</verbal>-->
                </configuration>
            </plugin>
        </plugins>
</build>
2.自定义flume的拦截器
新建package com.kkb.flume.interceptor
新建类及内部类,分别是 MyInterceptorMyBuilder
package com.book.flume.interceptor;

import com.google.common.base.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;

public class MyInterceptor implements Interceptor {
    /**
     * my_name:指定姓名
     */
    private final String my_name;

    /**
     * 提供构建方法,后期可以接受配置文件中的参数
     *
     * @param my_name
     */
    public MyInterceptor(String my_name) {
        this.my_name = my_name;
    }

    /**
     *
     * 单个event拦截逻辑
     */
    public Event intercept(Event event) {
        if (event == null) {
            return null;
        }

        try {
            String line = new String(event.getBody(), Charsets.UTF_8);

            String newLine = this.my_name + line;

            event.setBody(newLine.getBytes(Charsets.UTF_8));
            return event;
        } catch (Exception e) {
            return event;
        }

    }

    /**
     * 批量event拦截逻辑
     */
    public List<Event> intercept(List<Event> events) {
        List<Event> out = new ArrayList<Event>();
        for (Event event : events) {
            Event outEvent = intercept(event);
            if (outEvent != null) {
                out.add(outEvent);
            }
        }
        return out;
    }

    public void close() {

    }

    public void initialize() {

    }

    /**
     * 相当于自定义Interceptor的工厂类
     */
    public static class MyBuilder implements Interceptor.Builder {

        /**
         * my_name:指定姓名
         */
        private String my_name;

        public void configure(Context context) {
            this.my_name = context.getString("my_name", "");
        }

        /*
         * @see org.apache.flume.interceptor.Interceptor.Builder#build()
         */
        public MyInterceptor build() {
            return new MyInterceptor(my_name);
        }
    }
}

MyInterceptor类实现Interceptor接口的单个拦截器和多个拦截器方法、以及close()、initialize()。根据需求我们的主要逻辑在单个拦截器内。

3.将程序打包成jar包,将original-flumedemo-1.0-SNAPSHOT.jar发送到采集服务器安装flume目录的lib目录下。

在这里插入图片描述

4.编写配置flume的配置文件
# 指定sources、sinks、channels,可以指定多个,a1为agent的名称,以此配置启动时会指定此名称(符合规定的命名都可以)
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 指定sources的配置,此为tpye指定命令行执行
a1.sources.r1.type = exec
# 指定命令行执行的命令,根据需求查看这个/misc_file/flume_date_log/date_log文件下新增的数据
a1.sources.r1.command = tail -F /misc_file/flume_date_log/date_log
# 指定使用的拦截器名称
a1.sources.r1.interceptors = i1
# 指定使用的拦截器的类型,使用刚才打包jar的执行的路径的启动的类
a1.sources.r1.interceptors.i1.type = com.book.flume.interceptor.MyInterceptor$MyBuilder
# 指定向jar中传递的参数,即需求中的名字
a1.sources.r1.interceptors.i1.my_name = lihongcheng

# sinks的方式,此处根据需求是输出到控制台上
a1.sinks.k1.type = logger

# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
# channel中存储的event的最大个数
a1.channels.c1.capacity = 1000
# channel每次从source获得的event最多个数或一次发往sink的event最多个数
a1.channels.c1.transactionCapacity = 100

# 将source和sink与channel连接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
5.启动一个flume代理
# cd到你自己flume的安装目录执行
cd /install/apache-flume-1.6.0-cdh5.14.2-bin
# -c指定执行conf配置文件,-f指定执行的配置文件的路径,-name为指定此agent的名称,即为配置文件中的a1,-Dflume.root.logger=DEBUG,console指定输出日志级别以及输出到哪(此处指定控制台)
bin/flume-ng agent -c conf -f conf/spool-interceptor-hdfs.conf -name a1 -Dflume.root.logger=DEBUG,console
6.准备向/misc_file/flume_date_log/date_log文件中添加数据。

使用如下shell,编写一个shell文件运行:

#!/bin/bash
while true
do
 date >> /misc_file/flume_date_log/date_log;
  sleep 0.5;
done

使用如下命令运行:

# 修改对应shell脚本文件的权限,使其可以执行
chmod u+x tail-file.sh 
# 运行脚本
sh /shells/tail-file.sh

最终在启动flume的服务器控制台可以看到如下结果:
在这里插入图片描述

目录
相关文章
|
中间件
【Flume中间件】(12)自定义拦截器
【Flume中间件】(12)自定义拦截器
76 0
|
缓存 监控 Java
Apache Flume-自定义拦截器-功能实现|学习笔记
快速学习 Apache Flume-自定义拦截器-功能实现
144 0
Apache Flume-自定义拦截器-功能实现|学习笔记
|
算法 Java 编译器
Apache Flume-自定义拦截器-代码逻辑梳理|学习笔记
快速学习 Apache Flume-自定义拦截器-代码逻辑梳理
217 0
Apache Flume-自定义拦截器-代码逻辑梳理|学习笔记
|
存储 监控 大数据
Apache Flume- 自定义拦截器-需求描述|学习笔记
快速学习 Apache Flume- 自定义拦截器-需求描述
156 0
 Apache Flume- 自定义拦截器-需求描述|学习笔记
|
缓存 分布式计算 监控
Apache Flume-静态拦截器-功能实现|学习笔记
快速学习 Apache Flume-静态拦截器-功能实现
103 0
 Apache Flume-静态拦截器-功能实现|学习笔记
|
应用服务中间件 Apache nginx
Apache Flume- 静态拦截器-案例业务需求描述|学习笔记
快速学习 Apache Flume- 静态拦截器-案例业务需求描述,本节课讲 Flume 当中拦截器的一个实战实用。
91 0
Apache Flume- 静态拦截器-案例业务需求描述|学习笔记
|
5天前
|
存储 运维 监控
【Flume】flume 日志管理中的应用
【4月更文挑战第4天】【Flume】flume 日志管理中的应用
|
5天前
|
存储 分布式计算 监控
【Flume】Flume 监听日志文件案例分析
【4月更文挑战第4天】【Flume】Flume 监听日志文件案例分析
|
9月前
|
消息中间件 数据采集 SQL
1、电商数仓(用户行为采集平台)数据仓库概念、用户行为日志、业务数据、模拟数据、用户行为数据采集模块、日志采集Flume(一)
1、电商数仓(用户行为采集平台)数据仓库概念、用户行为日志、业务数据、模拟数据、用户行为数据采集模块、日志采集Flume(一)