Flume(二)【Flume 进阶使用】(4)

简介: Flume(二)【Flume 进阶使用】

Flume(二)【Flume 进阶使用】(3)https://developer.aliyun.com/article/1532355

6、自定义 Source

自定义 source 用的还是比较少的,毕竟 flume 已经提供了很多常用的了。

1)介绍

       Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence、generator、syslog、http、legacy。官方提供的 source 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 source。

官方也提供了自定义 source 的接口: https://flume.apache.org/FlumeDeveloperGuide.html#source 根据官方说明自定义 MySource 需要继承 AbstractSource 类并实现 Configurable 和 PollableSource 接口。

实现相应方法:

  • getBackOffSleepIncrement() //backoff 步长,当从数据源拉取数据时,拉取不到数据的话它不会一直再去拉取,而是等待,之后每一次再=如果还拉取不到,就会比上一次多等待步长单位个时间。
  • getMaxBackOffSleepInterval()  //backoff 最长时间,如果不设置最长等待时间,它最终会无限等待,所以需要指定。
  • configure(Context context)  //初始化 context(读取配置文件内容)
  • process()  //获取数据封装成 event 并写入 channel,这个方法将被循环调用。

使用场景:读取 MySQL 数据或者其他文件系统。

2)需求

使用 flume 接收数据,并给每条数据添加前缀,输出到控制台。前缀可从 flume 配置文

件中配置。

3)分析

4)需求实现

代码

package com.lyh.source;
 
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
 
import java.util.HashMap;
import java.util.Map;
 
public class MySource extends AbstractSource implements Configurable, PollableSource {
 
    // 定义配置文件将来要读取的字段
    private Long delay;
    private String field;
 
    @Override
    public Status process() throws EventDeliveryException {
        try {
            // 创建事件头信息
            Map<String,String> headerMap = new HashMap<>();
            // 创建事件
            SimpleEvent event = new SimpleEvent();
            // 循环封装事件
            for (int i = 0; i < 5; i++) {
                // 给事件设置头信息
                event.setHeaders(headerMap);
                // 给事件设置内容
                event.setBody((field + i).getBytes());
                // 将事件写入 channel
                getChannelProcessor().processEvent(event);
                Thread.sleep(delay);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return Status.READY;
    }
 
    // 步长
    @Override
    public long getBackOffSleepIncrement() {
        return 0;
    }
 
    // 最大间隔时间
    @Override
    public long getMaxBackOffSleepInterval() {
        return 0;
    }
 
    // 初始化配置信息
    @Override
    public void configure(Context context) {
        delay = context.getLong("delay");
        field = context.getString("field","Hello");
    }
}

配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# Describe/configure the source
a1.sources.r1.type = com.lyh.source.MySource
a1.sources.r1.delay = 1000
a1.sources.r1.field = lyh
 
# Describe the sink
a1.sinks.k1.type = logger
 
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
bin/flume-ng agent -n a1 -c conf/ -f job/custom-source.conf -Dflume.root.logger=INFO,console

运行结果:

7、自定义 Sink

1)介绍

       Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。

       Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。

       Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、 自定义。官方提供的 Sink 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 Sink。

       官方也提供了自定义 sink 的接口:

https://flume.apache.org/FlumeDeveloperGuide.html#sink 根据官方说明自定义 MySink 需要继承 AbstractSink 类并实现 Configurable 接口。实现相应方法:

  • configure(Context context)//初始化 context(读取配置文件内容)
  • process()//从 Channel 读取获取数据(event),这个方法将被循环调用。

使用场景:读取 Channel 数据写入 MySQL 或者其他文件系统。

2)需求分析

使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置。

流程分析:

3)需求实现

package com.lyh.sink;
 
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
public class MySink extends AbstractSink implements Configurable{
 
    private final static Logger LOG = LoggerFactory.getLogger(AbstractSink.class);
 
    private String prefix;
    private String suffix;
 
    @Override
    public Status process() throws EventDeliveryException {
 
        // 声明返回值状态信息
        Status status;
 
        // 获取当前 sink 绑定的 channel
        Channel channel = getChannel();
 
        // 获取事务
        Transaction txn = channel.getTransaction();
 
        // 声明事件
        Event event;
 
        // 开启事务
        txn.begin();
 
        // 读取 channel 中的事件、直到读取事件结束循环
        while (true){
            event = channel.take();
            if (event!=null) break;
        }
 
        try {
            // 打印事件
            LOG.info(prefix + new String(event.getBody()) + suffix);
 
            // 事务提交
            txn.commit();
            status = Status.READY;
        }catch (Exception e){
            // 遇到异常回滚事务
            txn.rollback();
            status = Status.BACKOFF;
        }finally {
            // 关闭事务
            txn.close();
        }
 
        return null;
    }
 
    // 初始化配置信息
    @Override
    public void configure(Context context) {
        // 带默认值
        prefix = context.getString("prefix","hello");
        // 不带默认值
        suffix = context.getString("suffix");
    }
}

配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
 
# Describe the sink
a1.sinks.k1.type = com.atguigu.MySink
a1.sinks.k1.prefix = lyh:
a1.sinks.k1.suffix = :lyh
 
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

4)测试

bin/flume-ng agent -n a1 -c conf/ -f job/custom-sink.conf -Dflume.root.logger=INFO,console

运行结果:

总结

       自此,flume 的学习基本也完了,这一篇虽然不多但也用了大概3天时间。相比较 kafka、flink,flume 这个框架还是非常简单的,比如我们自己实现一些 source、sink,都是很简单的,没有太多复杂的理解的东西。

       总之 flume 这个工具还是多看官网。

相关文章
|
4月前
|
监控 负载均衡
Flume(二)【Flume 进阶使用】(2)
Flume(二)【Flume 进阶使用】
|
4月前
|
SQL 存储 负载均衡
Flume(二)【Flume 进阶使用】(1)
Flume(二)【Flume 进阶使用】
|
SQL 存储 分布式计算
Flume学习---2、Flume进阶(事务)、负载均衡、故障转移、聚合
Flume学习---2、Flume进阶(事务)、负载均衡、故障转移、聚合
|
监控 负载均衡
Flume学习---2、Flume进阶(事务)、负载均衡、故障转移、聚合(二)
Flume学习---2、Flume进阶(事务)、负载均衡、故障转移、聚合(二)
|
SQL 存储 分布式计算
Flume学习---2、Flume进阶(事务)、负载均衡、故障转移、聚合(一)
Flume学习---2、Flume进阶(事务)、负载均衡、故障转移、聚合(一)
|
5月前
|
存储 分布式计算 监控
【Flume】Flume 监听日志文件案例分析
【4月更文挑战第4天】【Flume】Flume 监听日志文件案例分析
|
5月前
|
存储 运维 监控
【Flume】flume 日志管理中的应用
【4月更文挑战第4天】【Flume】flume 日志管理中的应用
|
消息中间件 数据采集 SQL
1、电商数仓(用户行为采集平台)数据仓库概念、用户行为日志、业务数据、模拟数据、用户行为数据采集模块、日志采集Flume(一)
1、电商数仓(用户行为采集平台)数据仓库概念、用户行为日志、业务数据、模拟数据、用户行为数据采集模块、日志采集Flume(一)
|
2月前
|
存储 数据采集 数据处理
【Flume拓扑揭秘】掌握Flume的四大常用结构,构建强大的日志收集系统!
【8月更文挑战第24天】Apache Flume是一个强大的工具,专为大规模日志数据的收集、聚合及传输设计。其核心架构包括源(Source)、通道(Channel)与接收器(Sink)。Flume支持多样化的拓扑结构以适应不同需求,包括单层、扇入(Fan-in)、扇出(Fan-out)及复杂多层拓扑。单层拓扑简单直观,适用于单一数据流场景;扇入结构集中处理多源头数据;扇出结构则实现数据多目的地分发;复杂多层拓扑提供高度灵活性,适合多层次数据处理。通过灵活配置,Flume能够高效构建各种规模的数据收集系统。
33 0