【Flume中间件】(13)自定义Source

本文涉及的产品
云原生网关 MSE Higress,422元/月
性能测试 PTS,5000VUM额度
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
简介: 【Flume中间件】(13)自定义Source

自定义Source

有时候,flume中的source不符合我们的需求,这时就可以进行自己定义Source。

自定义Source的流程就是首先继承并实现官方类,然后实现相应的方法,重点是读取数据的方法,在该内部可以定义jdbc或者是IO流进行读取数据。

然后将数据封装成事件,交给channel处理器。

处理器的内部流程是先将该事件交给拦截器进行处理(封装头部信息等),然后判断是否为空,不为空,将其将给选择器,将该事件交给自己对应的channel。

自定义的Source类型为自己编写的代码的全类名。

注意要将写好的代码打成jar包丢到flume的lib目录下。

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = com.atguigu.Source.MySource
a1.sources.r1.prefix=log
a1.sources.r1.suffix=mod
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
package com.atguigu.Source;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
public class MySource extends AbstractSource implements Configurable, PollableSource {
    // 定义键值对
    String prefix;
    String suffix;
    // 根据上下文获取flume中配置的值
    @Override
    public void configure(Context context) {
        prefix = context.getString("prefix");
        suffix = context.getString("suffix", ".www");
    }
    @Override
    public Status process() throws EventDeliveryException {
        Status status = null;
        // 读取数据,可以自定义jdbc或IO流
        try {
            for (int i = 0; i < 10; i++) {
                SimpleEvent event = new SimpleEvent();
                event.setBody((prefix + "-->" + i + suffix).getBytes());
                // 将事件交给处理器
                getChannelProcessor().processEvent(event);
                // 设置提交状态
                status = Status.READY;
            }
        } catch (Exception e) {
            e.printStackTrace();
            status = Status.BACKOFF;
        }
        // 定义延时,也可以设置成配置参数
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return status;
    }
    @Override
    public long getBackOffSleepIncrement() {
        return 0;
    }
    @Override
    public long getMaxBackOffSleepInterval() {
        return 0;
    }
}


目录
相关文章
|
23天前
|
SQL 分布式计算 监控
Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
52 3
|
23天前
|
分布式计算 Java Hadoop
Hadoop-18 Flume HelloWorld 第一个Flume尝试!编写conf实现Source+Channel+Sink 控制台查看收集到的数据 流式收集
Hadoop-18 Flume HelloWorld 第一个Flume尝试!编写conf实现Source+Channel+Sink 控制台查看收集到的数据 流式收集
28 1
|
27天前
|
数据采集 中间件 开发者
Scrapy爬虫框架-自定义中间件
Scrapy爬虫框架-自定义中间件
46 1
|
23天前
|
存储 数据采集 分布式计算
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
35 1
|
3月前
|
数据采集 存储 Apache
Flume核心组件大揭秘:Agent、Source、Channel、Sink,一文掌握数据采集精髓!
【8月更文挑战第24天】Flume是Apache旗下的一款顶级服务工具,专为大规模日志数据的收集、聚合与传输而设计。其架构基于几个核心组件:Agent、Source、Channel及Sink。Agent作为基础执行单元,整合Source(数据采集)、Channel(数据暂存)与Sink(数据传输)。本文通过实例深入剖析各组件功能与配置,包括Avro、Exec及Spooling Directory等多种Source类型,Memory与File Channel方案以及HDFS、Avro和Logger等Sink选项,旨在提供全面的Flume应用指南。
90 1
|
6月前
|
存储 数据采集 监控
Flume 拦截器概念及自定义拦截器的运用
Apache Flume 的拦截器是事件处理组件,位于Source和Channel之间,用于在写入Channel前对数据进行转换、提取或删除。它们支持数据处理和转换、数据增强、数据过滤以及监控和日志功能。要创建自定义拦截器,需实现Interceptor接口,包含initialize、intercept、intercept(List&lt;Event&gt;)和close方法。配置拦截器时,通过Builder模式实现Interceptor.Builder接口。在Flume配置文件中指定拦截器全类名,如`TestInterceptor$Builder`,然后启动Flume进行测试。
152 0
|
6月前
|
监控 Apache
【Flume】 Flume 区别分析:ExecSource、Spooldir Source、Taildir Source
【4月更文挑战第4天】 Flume 区别分析:ExecSource、Spooldir Source、Taildir Source
|
中间件
如何开发一个 SAP UI5 Tools 的自定义中间件扩展 - Custom Middleware Extension
如何开发一个 SAP UI5 Tools 的自定义中间件扩展 - Custom Middleware Extension
|
开发框架 中间件 .NET
ASP.NET CORE 自定义中间件
ASP.NET CORE 自定义中间件
62 0
|
存储 Java 分布式数据库
Flume学习---3、自定义Interceptor、自定义Source、自定义Sink
Flume学习---3、自定义Interceptor、自定义Source、自定义Sink