【Flume中间件】(14)自定义Sink

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【Flume中间件】(14)自定义Sink

自定义Sink

自定义Sink的流程就是:

首先需要或取sink对应的channel,然后从指定的channel中获取事务,然后再从channel中拉取事件,将事件进行处理,根据业务逻辑将数据写出,然后提交事务,如果成功,channel将该事件清除,否则进行回滚

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = taildir
a1.sources.r1.positionFile = /opt/module/flume/position/position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/hadoop/data3/data6
# 自定义的Sink
a1.sinks.k1.type = com.atguigu.Sink.MySink
a1.sinks.k1.prefix=999
a1.sinks.k1.suffix=ttt
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1 
package com.atguigu.Sink;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MySink extends AbstractSink implements Configurable {
    // 配置前后缀
    String prefix;
    String suffix;
    Logger logger = LoggerFactory.getLogger(MySink.class);
    @Override
    public void configure(Context context) {
        prefix = context.getString("prefix");
        suffix = context.getString("suffix", "666");
    }
    @Override
    public Status process() throws EventDeliveryException {
        Status status = null;
        // 获取channel
        Channel channel = getChannel();
        // 获取事务
        Transaction transaction = channel.getTransaction();
        try {
            // 开启事务
            transaction.begin();
            // 从channel中获取事件
            Event event = channel.take();
            if (event != null) {
                String body = new String(event.getBody());
                logger.info(prefix + "-->" + body + "-->" + suffix);
//                System.out.println(body);
            }
            // 提交事件
            transaction.commit();
            status = Status.READY;
        } catch (ChannelException e) {
            e.printStackTrace();
            transaction.rollback();
            status = Status.BACKOFF;
        } finally {
            // 关闭事务
            transaction.close();
        }
        return status;
    }
}


目录
相关文章
|
7月前
|
开发框架 中间件 .NET
ASP.NET CORE 自定义中间件
ASP.NET CORE 自定义中间件
43 0
|
7月前
|
中间件
如何开发一个 SAP UI5 Tools 的自定义中间件扩展 - Custom Middleware Extension
如何开发一个 SAP UI5 Tools 的自定义中间件扩展 - Custom Middleware Extension
76 1
|
9月前
|
存储 Java 分布式数据库
Flume学习---3、自定义Interceptor、自定义Source、自定义Sink
Flume学习---3、自定义Interceptor、自定义Source、自定义Sink
|
11月前
|
数据采集 缓存 大数据
大数据数据采集的数据采集(收集/聚合)的Flume之数据采集流程的Sink Processor的Failover Sink Processor
在大数据处理和管理中,数据采集是非常重要的一环。为了更加高效地进行数据采集,Flume作为一种流式数据采集工具得到了广泛的应用。其中,Flume的Sink Processor模块是实现数据输出和处理的核心模块之一。本文将介绍Flume中的Failover Sink Processor,讲解其数据采集流程。
65 0
|
11月前
|
数据采集 缓存 负载均衡
大数据数据采集的数据采集(收集/聚合)的Flume之数据采集流程的Sink Processor的Load Balancing Sink Processor
在大数据处理和管理中,数据采集是非常重要的一环。为了更加高效地进行数据采集,Flume作为一种流式数据采集工具得到了广泛的应用。其中,Flume的Sink Processor模块是实现数据输出和处理的核心模块之一。本文将介绍Flume中的Load Balancing Sink Processor,讲解其数据采集流程。
108 0
|
11月前
|
数据采集 缓存 负载均衡
大数据数据采集的数据采集(收集/聚合)的Flume之数据采集流程的Sink Processor的Default Sink Processor
在大数据处理和管理中,数据采集是非常重要的一环。为了更加高效地进行数据采集,Flume作为一种流式数据采集工具得到了广泛的应用。其中,Flume的Sink Processor模块是实现数据输出和处理的核心模块之一。本文将介绍Flume中的Default Sink Processor,讲解其数据采集流程。
77 0
|
11月前
|
数据采集 存储 缓存
大数据数据采集的数据采集(收集/聚合)的Flume之数据采集流程的Sink的Avro Sink
在大数据处理和管理中,数据采集是非常重要的一环。为了更加高效地进行数据采集,Flume作为一种流式数据采集工具得到了广泛的应用。其中,Flume的Sink模块是实现数据输出和存储的核心模块之一。本文将介绍Flume中的Avro Sink,讲解其数据采集流程。
132 0
|
11月前
|
数据采集 存储 缓存
大数据数据采集的数据采集(收集/聚合)的Flume之数据采集流程的Sink的HDFS Sink
在大数据处理和管理中,数据采集是非常重要的一环。为了更加高效地进行数据采集,Flume作为一种流式数据采集工具得到了广泛的应用。其中,Flume的Sink模块是实现数据输出和存储的核心模块之一。本文将介绍Flume中的HDFS Sink,讲解其数据采集流程。
100 0
|
12月前
|
存储 数据采集 消息中间件
大数据数据采集的数据采集(收集/聚合)的Flume之基本组件的Sink:从Channel中取数据
在Flume中,Sink是数据采集和传输过程中的最终组件。它负责从Channel缓冲区中获取数据并将其存储到目标存储系统中。
200 0
|
开发框架 中间件 .NET
asp.net core 自定义中间件【以dapper为例】
asp.net core 自定义中间件【以dapper为例】
126 0