自定义Source
有时候,flume中的source不符合我们的需求,这时就可以进行自己定义Source。
自定义Source的流程就是首先继承并实现官方类,然后实现相应的方法,重点是读取数据的方法,在该内部可以定义jdbc或者是IO流进行读取数据。
然后将数据封装成事件,交给channel处理器。
处理器的内部流程是先将该事件交给拦截器进行处理(封装头部信息等),然后判断是否为空,不为空,将其将给选择器,将该事件交给自己对应的channel。
自定义的Source类型为自己编写的代码的全类名。
注意要将写好的代码打成jar包丢到flume的lib目录下。
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = com.atguigu.Source.MySource a1.sources.r1.prefix=log a1.sources.r1.suffix=mod a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
package com.atguigu.Source; import org.apache.flume.Context; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; import org.apache.flume.event.SimpleEvent; import org.apache.flume.source.AbstractSource; public class MySource extends AbstractSource implements Configurable, PollableSource { // 定义键值对 String prefix; String suffix; // 根据上下文获取flume中配置的值 @Override public void configure(Context context) { prefix = context.getString("prefix"); suffix = context.getString("suffix", ".www"); } @Override public Status process() throws EventDeliveryException { Status status = null; // 读取数据,可以自定义jdbc或IO流 try { for (int i = 0; i < 10; i++) { SimpleEvent event = new SimpleEvent(); event.setBody((prefix + "-->" + i + suffix).getBytes()); // 将事件交给处理器 getChannelProcessor().processEvent(event); // 设置提交状态 status = Status.READY; } } catch (Exception e) { e.printStackTrace(); status = Status.BACKOFF; } // 定义延时,也可以设置成配置参数 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return status; } @Override public long getBackOffSleepIncrement() { return 0; } @Override public long getMaxBackOffSleepInterval() { return 0; } }