离线数仓(二)【用户行为日志采集平台搭建】(1)https://developer.aliyun.com/article/1532360
还有 sink:
- hdfs:写出到 hdfs
- kafka:写出到 kafka,相当于 kafka 消费者
- avro:写出到 avro 系统(avro 是一个 rpc 数据序列化系统)
为了保证数据的可靠性(以及 Kafak 具有削峰解耦的功能),我们当然是选择 Kafak Channel 作为 channel ,那么对于 Kafak Channel 的三种结构,我们应该如何选择呢?
首先第一种结构 - 结合 source 和 sink,这种结构不可取。因为 Kafka Channel 有一个参数 parseAsFlumeEvent(默认为 true) ,它的意思是把日志数据以 Flume Event 的格式存储到 Kafka,这就包括了 Event Header 和 Event Body。对于离线数仓而言,因为它本来 sink 用的就是 flume 的 sink 从 kafka channel 中读取,它可以把 event 中的 body 解析出来。但是对于实时数仓,flink 是从 kafka 中直接读取数据的,所以它读到的就是包含 header 和 body 的完整 event 数据,而 header 对于实时数仓来说根本没有,所以没有必要存储。
既然 header 不需要存储我们能不能把 parseAsFlumeEvent 设为 false 然后继续使用这种结构呢,我们知道拦截器是在 source 和 channel 之间起作用的,如果不使用 flume event 的形式存储数据就不能使用拦截器,而 channel 和 sink 之间又不能设置拦截器。所以最好的办法就是使用第二种结构 - Kafak Channel 结合 Source 使用,同时设置 parseAsFlumeEvent 参数为 false ,这样存储进 Kafka 的数据就只有 body,虽然没有了 sink,但是我们可以把存储在 Kafak 中的数据当做一个数据源,下游再用一个 Source 读取,这样我们就可以再设置拦截器了。
2、作业脚本配置
编写 flume 配置文件
file_to_kafka.conf
#定义组件 a1.sources = r1 a1.channels = c1 #配置source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.* a1.sources.r1.positionFile = /opt/module/flume-1.9.0/taildir_position.json #配置channel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092 a1.channels.c1.kafka.topic = topic_log a1.channels.c1.parseAsFlumeEvent = false #组装 a1.sources.r1.channels = c1
测试
# 启动 zookeeper 和 kafka zk start kf.sh start # 开启flume作业 bin/flume-ng agent -n a1 -c conf/ -f job/warehouse/file_to_kafka.conf -Dflume.root.logger=INFO,console # 开启kafka消费者 bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_log # 开启日志生成器 mklog.sh
运行结果:
至此,flume 的 source -> kafka channel 配置完毕,接下来我们需要在 flume 这里进行一个简单的 ETL 数据清洗,把一些脏数据去除掉,而且这里不能做复杂的拦截器,比如添加多个拦截器,因为flume 毕竟只是一个传递数据的管道,如果这里的 ETL工作太复杂会导致数据堆积在拦截器这里,影响效率。
添加拦截器
引入依赖和打包插件:
<dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
Json 工具类:
package com.lyh.gmall.utils; import com.alibaba.fastjson.JSONObject; public class JSONUtil { // 通过异常捕获来校验json是否合法 public static boolean isJsonValidate(String body) { try { JSONObject.parseObject(body); return true; }catch (Exception e){ return false; } } }
自定义拦截器:
package com.lyh.gmall.interceptor; import com.lyh.gmall.utils.JSONUtil; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.StandardCharsets; import java.util.List; public class ETLInterceptor implements Interceptor { @Override public void initialize() { } // 单个事件的拦截器 @Override public Event intercept(Event event) { // 1. 获取body当中的数据 String body = new String(event.getBody(), StandardCharsets.UTF_8); // 2. 判断数据是否合法,如果合法直接返回,否则返回 null return JSONUtil.isJsonValidate(body)?event:null; } // 多个事件的拦截器 @Override public List<Event> intercept(List<Event> list) { for (int i = 0; i < list.size(); i++) { if (intercept(list.get(i)) == null){ list.remove(i); i--; } } return list; } @Override public void close() { } public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new ETLInterceptor(); } @Override public void configure(Context context) { } } }
打包后会生成两个jar包,选择带依赖的放到 flume 的 lib 目录下,在 flume 作业配置文件中添加以下配置来添加拦截器:
a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.lyh.gmall.interceptor.ETLInterceptor$Builder
测试:
因为 flume 监听的是 applog/log/app-log-xxx.log 文件,所以我们直接 echo 写入来模拟错误日志(不完整的 json 信息):
可以看到,不合法的 json 数据被过滤掉了,至此,我们的数据采集通道可用了。根据我们的项目架构图可以看到,我们需要在 hadoop102 和 hadoop103 上采集,所以 hadoop103 也许要配置:
flume 采集脚本
#!/bin/bash case $1 in "start"){ for i in hadoop102 hadoop103 do echo " -------- $i flume开始采集 -------" ssh $i "nohup /opt/module/flume-1.9.0/bin/flume-ng agent -n a1 -c /opt/module/flume-1.9.0/conf/ -f /opt/module/flume-1.9.0/job/warehouse/file_to_kafka.conf >/dev/null 2>&1 &" done };; "stop"){ for i in hadoop102 hadoop103 do echo " -------- $i flume停止采集-------" ssh $i "ps -ef | grep file_to_kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 " done };; esac
总结
至此,用户行为日志的采集环境搭建好了,只要输入命令 f1.sh start 就可以让在 hadoop102 和 hadoop103 上的 flume开启采集日志了。接下来就是业务数据的采集了。
这一部分还算简单,对于 flume 的操作无非就是拦截器的编写,逻辑也没多复杂,flume 的 source 使用了 taildir 监听由脚本不断生成的日志文件,channel 使用了 Kafka source ,sink 估计不会采用 flume 的 sink。