Flume(二)【Flume 进阶使用】(3)https://developer.aliyun.com/article/1532355
6、自定义 Source
自定义 source 用的还是比较少的,毕竟 flume 已经提供了很多常用的了。
1)介绍
Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence、generator、syslog、http、legacy。官方提供的 source 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 source。
官方也提供了自定义 source 的接口: https://flume.apache.org/FlumeDeveloperGuide.html#source 根据官方说明自定义 MySource 需要继承 AbstractSource 类并实现 Configurable 和 PollableSource 接口。
实现相应方法:
- getBackOffSleepIncrement() //backoff 步长,当从数据源拉取数据时,拉取不到数据的话它不会一直再去拉取,而是等待,之后每一次再=如果还拉取不到,就会比上一次多等待步长单位个时间。
- getMaxBackOffSleepInterval() //backoff 最长时间,如果不设置最长等待时间,它最终会无限等待,所以需要指定。
- configure(Context context) //初始化 context(读取配置文件内容)
- process() //获取数据封装成 event 并写入 channel,这个方法将被循环调用。
使用场景:读取 MySQL 数据或者其他文件系统。
2)需求
使用 flume 接收数据,并给每条数据添加前缀,输出到控制台。前缀可从 flume 配置文
件中配置。
3)分析
4)需求实现
代码
package com.lyh.source; import org.apache.flume.Context; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; import org.apache.flume.event.SimpleEvent; import org.apache.flume.source.AbstractSource; import java.util.HashMap; import java.util.Map; public class MySource extends AbstractSource implements Configurable, PollableSource { // 定义配置文件将来要读取的字段 private Long delay; private String field; @Override public Status process() throws EventDeliveryException { try { // 创建事件头信息 Map<String,String> headerMap = new HashMap<>(); // 创建事件 SimpleEvent event = new SimpleEvent(); // 循环封装事件 for (int i = 0; i < 5; i++) { // 给事件设置头信息 event.setHeaders(headerMap); // 给事件设置内容 event.setBody((field + i).getBytes()); // 将事件写入 channel getChannelProcessor().processEvent(event); Thread.sleep(delay); } } catch (InterruptedException e) { e.printStackTrace(); } return Status.READY; } // 步长 @Override public long getBackOffSleepIncrement() { return 0; } // 最大间隔时间 @Override public long getMaxBackOffSleepInterval() { return 0; } // 初始化配置信息 @Override public void configure(Context context) { delay = context.getLong("delay"); field = context.getString("field","Hello"); } }
配置文件
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = com.lyh.source.MySource a1.sources.r1.delay = 1000 a1.sources.r1.field = lyh # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
bin/flume-ng agent -n a1 -c conf/ -f job/custom-source.conf -Dflume.root.logger=INFO,console
运行结果:
7、自定义 Sink
1)介绍
Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。
Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、 自定义。官方提供的 Sink 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 Sink。
官方也提供了自定义 sink 的接口:
https://flume.apache.org/FlumeDeveloperGuide.html#sink 根据官方说明自定义 MySink 需要继承 AbstractSink 类并实现 Configurable 接口。实现相应方法:
- configure(Context context)//初始化 context(读取配置文件内容)
- process()//从 Channel 读取获取数据(event),这个方法将被循环调用。
使用场景:读取 Channel 数据写入 MySQL 或者其他文件系统。
2)需求分析
使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置。
流程分析:
3)需求实现
package com.lyh.sink; import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MySink extends AbstractSink implements Configurable{ private final static Logger LOG = LoggerFactory.getLogger(AbstractSink.class); private String prefix; private String suffix; @Override public Status process() throws EventDeliveryException { // 声明返回值状态信息 Status status; // 获取当前 sink 绑定的 channel Channel channel = getChannel(); // 获取事务 Transaction txn = channel.getTransaction(); // 声明事件 Event event; // 开启事务 txn.begin(); // 读取 channel 中的事件、直到读取事件结束循环 while (true){ event = channel.take(); if (event!=null) break; } try { // 打印事件 LOG.info(prefix + new String(event.getBody()) + suffix); // 事务提交 txn.commit(); status = Status.READY; }catch (Exception e){ // 遇到异常回滚事务 txn.rollback(); status = Status.BACKOFF; }finally { // 关闭事务 txn.close(); } return null; } // 初始化配置信息 @Override public void configure(Context context) { // 带默认值 prefix = context.getString("prefix","hello"); // 不带默认值 suffix = context.getString("suffix"); } }
配置文件
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = com.atguigu.MySink a1.sinks.k1.prefix = lyh: a1.sinks.k1.suffix = :lyh # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
4)测试
bin/flume-ng agent -n a1 -c conf/ -f job/custom-sink.conf -Dflume.root.logger=INFO,console
运行结果:
总结
自此,flume 的学习基本也完了,这一篇虽然不多但也用了大概3天时间。相比较 kafka、flink,flume 这个框架还是非常简单的,比如我们自己实现一些 source、sink,都是很简单的,没有太多复杂的理解的东西。
总之 flume 这个工具还是多看官网。