简单需求:
向文件中定时新增日期数据,采集该文件, 通过自定义source拦截器给日期数据加上自己姓名作为前缀,输出到控制台。
分析:
需求很简单,主要在于练习flume自定义拦截器的流程,我们需要使用java来写flume拦截器的流程需求,然后使用maven将程序打包成jar包。放到采集服务器的flume安装路径的/lib路径下,然后运行。
步骤:
1.启动一个maven工程,导入下面依赖的jar包
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.6.0-cdh5.14.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<!-- <verbal>true</verbal>-->
</configuration>
</plugin>
</plugins>
</build>
2.自定义flume的拦截器
新建packagecom.kkb.flume.interceptor
新建类及内部类,分别是MyInterceptor
、MyBuilder
package com.book.flume.interceptor;
import com.google.common.base.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
public class MyInterceptor implements Interceptor {
/**
* my_name:指定姓名
*/
private final String my_name;
/**
* 提供构建方法,后期可以接受配置文件中的参数
*
* @param my_name
*/
public MyInterceptor(String my_name) {
this.my_name = my_name;
}
/**
*
* 单个event拦截逻辑
*/
public Event intercept(Event event) {
if (event == null) {
return null;
}
try {
String line = new String(event.getBody(), Charsets.UTF_8);
String newLine = this.my_name + line;
event.setBody(newLine.getBytes(Charsets.UTF_8));
return event;
} catch (Exception e) {
return event;
}
}
/**
* 批量event拦截逻辑
*/
public List<Event> intercept(List<Event> events) {
List<Event> out = new ArrayList<Event>();
for (Event event : events) {
Event outEvent = intercept(event);
if (outEvent != null) {
out.add(outEvent);
}
}
return out;
}
public void close() {
}
public void initialize() {
}
/**
* 相当于自定义Interceptor的工厂类
*/
public static class MyBuilder implements Interceptor.Builder {
/**
* my_name:指定姓名
*/
private String my_name;
public void configure(Context context) {
this.my_name = context.getString("my_name", "");
}
/*
* @see org.apache.flume.interceptor.Interceptor.Builder#build()
*/
public MyInterceptor build() {
return new MyInterceptor(my_name);
}
}
}
MyInterceptor类实现Interceptor接口的单个拦截器和多个拦截器方法、以及close()、initialize()。根据需求我们的主要逻辑在单个拦截器内。
3.将程序打包成jar包,将original-flumedemo-1.0-SNAPSHOT.jar发送到采集服务器安装flume目录的lib目录下。
4.编写配置flume的配置文件
# 指定sources、sinks、channels,可以指定多个,a1为agent的名称,以此配置启动时会指定此名称(符合规定的命名都可以)
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 指定sources的配置,此为tpye指定命令行执行
a1.sources.r1.type = exec
# 指定命令行执行的命令,根据需求查看这个/misc_file/flume_date_log/date_log文件下新增的数据
a1.sources.r1.command = tail -F /misc_file/flume_date_log/date_log
# 指定使用的拦截器名称
a1.sources.r1.interceptors = i1
# 指定使用的拦截器的类型,使用刚才打包jar的执行的路径的启动的类
a1.sources.r1.interceptors.i1.type = com.book.flume.interceptor.MyInterceptor$MyBuilder
# 指定向jar中传递的参数,即需求中的名字
a1.sources.r1.interceptors.i1.my_name = lihongcheng
# sinks的方式,此处根据需求是输出到控制台上
a1.sinks.k1.type = logger
# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
# channel中存储的event的最大个数
a1.channels.c1.capacity = 1000
# channel每次从source获得的event最多个数或一次发往sink的event最多个数
a1.channels.c1.transactionCapacity = 100
# 将source和sink与channel连接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
5.启动一个flume代理
# cd到你自己flume的安装目录执行
cd /install/apache-flume-1.6.0-cdh5.14.2-bin
# -c指定执行conf配置文件,-f指定执行的配置文件的路径,-name为指定此agent的名称,即为配置文件中的a1,-Dflume.root.logger=DEBUG,console指定输出日志级别以及输出到哪(此处指定控制台)
bin/flume-ng agent -c conf -f conf/spool-interceptor-hdfs.conf -name a1 -Dflume.root.logger=DEBUG,console
6.准备向/misc_file/flume_date_log/date_log文件中添加数据。
使用如下shell,编写一个shell文件运行:
#!/bin/bash
while true
do
date >> /misc_file/flume_date_log/date_log;
sleep 0.5;
done
使用如下命令运行:
# 修改对应shell脚本文件的权限,使其可以执行
chmod u+x tail-file.sh
# 运行脚本
sh /shells/tail-file.sh
最终在启动flume的服务器控制台可以看到如下结果: