自定义Sink
自定义Sink的流程就是:
首先需要或取sink对应的channel,然后从指定的channel中获取事务,然后再从channel中拉取事件,将事件进行处理,根据业务逻辑将数据写出,然后提交事务,如果成功,channel将该事件清除,否则进行回滚。
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = taildir a1.sources.r1.positionFile = /opt/module/flume/position/position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /home/hadoop/data3/data6 # 自定义的Sink a1.sinks.k1.type = com.atguigu.Sink.MySink a1.sinks.k1.prefix=999 a1.sinks.k1.suffix=ttt a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
package com.atguigu.Sink; import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MySink extends AbstractSink implements Configurable { // 配置前后缀 String prefix; String suffix; Logger logger = LoggerFactory.getLogger(MySink.class); @Override public void configure(Context context) { prefix = context.getString("prefix"); suffix = context.getString("suffix", "666"); } @Override public Status process() throws EventDeliveryException { Status status = null; // 获取channel Channel channel = getChannel(); // 获取事务 Transaction transaction = channel.getTransaction(); try { // 开启事务 transaction.begin(); // 从channel中获取事件 Event event = channel.take(); if (event != null) { String body = new String(event.getBody()); logger.info(prefix + "-->" + body + "-->" + suffix); // System.out.println(body); } // 提交事件 transaction.commit(); status = Status.READY; } catch (ChannelException e) { e.printStackTrace(); transaction.rollback(); status = Status.BACKOFF; } finally { // 关闭事务 transaction.close(); } return status; } }