【Flume中间件】(14)自定义Sink

本文涉及的产品
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
注册配置 MSE Nacos/ZooKeeper,118元/月
性能测试 PTS,5000VUM额度
简介: 【Flume中间件】(14)自定义Sink

自定义Sink

自定义Sink的流程就是:

首先需要或取sink对应的channel,然后从指定的channel中获取事务,然后再从channel中拉取事件,将事件进行处理,根据业务逻辑将数据写出,然后提交事务,如果成功,channel将该事件清除,否则进行回滚

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = taildir
a1.sources.r1.positionFile = /opt/module/flume/position/position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/hadoop/data3/data6
# 自定义的Sink
a1.sinks.k1.type = com.atguigu.Sink.MySink
a1.sinks.k1.prefix=999
a1.sinks.k1.suffix=ttt
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1 
package com.atguigu.Sink;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MySink extends AbstractSink implements Configurable {
    // 配置前后缀
    String prefix;
    String suffix;
    Logger logger = LoggerFactory.getLogger(MySink.class);
    @Override
    public void configure(Context context) {
        prefix = context.getString("prefix");
        suffix = context.getString("suffix", "666");
    }
    @Override
    public Status process() throws EventDeliveryException {
        Status status = null;
        // 获取channel
        Channel channel = getChannel();
        // 获取事务
        Transaction transaction = channel.getTransaction();
        try {
            // 开启事务
            transaction.begin();
            // 从channel中获取事件
            Event event = channel.take();
            if (event != null) {
                String body = new String(event.getBody());
                logger.info(prefix + "-->" + body + "-->" + suffix);
//                System.out.println(body);
            }
            // 提交事件
            transaction.commit();
            status = Status.READY;
        } catch (ChannelException e) {
            e.printStackTrace();
            transaction.rollback();
            status = Status.BACKOFF;
        } finally {
            // 关闭事务
            transaction.close();
        }
        return status;
    }
}


目录
相关文章
|
1月前
|
分布式计算 Java Hadoop
Hadoop-18 Flume HelloWorld 第一个Flume尝试!编写conf实现Source+Channel+Sink 控制台查看收集到的数据 流式收集
Hadoop-18 Flume HelloWorld 第一个Flume尝试!编写conf实现Source+Channel+Sink 控制台查看收集到的数据 流式收集
31 1
|
1月前
|
数据采集 中间件 开发者
Scrapy爬虫框架-自定义中间件
Scrapy爬虫框架-自定义中间件
|
1月前
|
存储 数据采集 分布式计算
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
44 1
|
3月前
|
数据采集 存储 Apache
Flume核心组件大揭秘:Agent、Source、Channel、Sink,一文掌握数据采集精髓!
【8月更文挑战第24天】Flume是Apache旗下的一款顶级服务工具,专为大规模日志数据的收集、聚合与传输而设计。其架构基于几个核心组件:Agent、Source、Channel及Sink。Agent作为基础执行单元,整合Source(数据采集)、Channel(数据暂存)与Sink(数据传输)。本文通过实例深入剖析各组件功能与配置,包括Avro、Exec及Spooling Directory等多种Source类型,Memory与File Channel方案以及HDFS、Avro和Logger等Sink选项,旨在提供全面的Flume应用指南。
128 1
|
6月前
|
存储 数据采集 监控
Flume 拦截器概念及自定义拦截器的运用
Apache Flume 的拦截器是事件处理组件,位于Source和Channel之间,用于在写入Channel前对数据进行转换、提取或删除。它们支持数据处理和转换、数据增强、数据过滤以及监控和日志功能。要创建自定义拦截器,需实现Interceptor接口,包含initialize、intercept、intercept(List<Event>)和close方法。配置拦截器时,通过Builder模式实现Interceptor.Builder接口。在Flume配置文件中指定拦截器全类名,如`TestInterceptor$Builder`,然后启动Flume进行测试。
176 0
|
开发框架 中间件 .NET
ASP.NET CORE 自定义中间件
ASP.NET CORE 自定义中间件
63 0
|
存储 Java 分布式数据库
Flume学习---3、自定义Interceptor、自定义Source、自定义Sink
Flume学习---3、自定义Interceptor、自定义Source、自定义Sink
|
数据采集 缓存 大数据
大数据数据采集的数据采集(收集/聚合)的Flume之数据采集流程的Sink Processor的Failover Sink Processor
在大数据处理和管理中,数据采集是非常重要的一环。为了更加高效地进行数据采集,Flume作为一种流式数据采集工具得到了广泛的应用。其中,Flume的Sink Processor模块是实现数据输出和处理的核心模块之一。本文将介绍Flume中的Failover Sink Processor,讲解其数据采集流程。
99 0
|
数据采集 缓存 负载均衡
大数据数据采集的数据采集(收集/聚合)的Flume之数据采集流程的Sink Processor的Load Balancing Sink Processor
在大数据处理和管理中,数据采集是非常重要的一环。为了更加高效地进行数据采集,Flume作为一种流式数据采集工具得到了广泛的应用。其中,Flume的Sink Processor模块是实现数据输出和处理的核心模块之一。本文将介绍Flume中的Load Balancing Sink Processor,讲解其数据采集流程。
139 0
|
数据采集 缓存 负载均衡
大数据数据采集的数据采集(收集/聚合)的Flume之数据采集流程的Sink Processor的Default Sink Processor
在大数据处理和管理中,数据采集是非常重要的一环。为了更加高效地进行数据采集,Flume作为一种流式数据采集工具得到了广泛的应用。其中,Flume的Sink Processor模块是实现数据输出和处理的核心模块之一。本文将介绍Flume中的Default Sink Processor,讲解其数据采集流程。
104 0