离线数仓(二)【用户行为日志采集平台搭建】(2)

本文涉及的产品
云原生数据仓库AnalyticDB MySQL版,基础版 8ACU 100GB 1个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: 离线数仓(二)【用户行为日志采集平台搭建】

离线数仓(二)【用户行为日志采集平台搭建】(1)https://developer.aliyun.com/article/1532360

还有 sink:

  • hdfs:写出到 hdfs
  • kafka:写出到 kafka,相当于 kafka 消费者
  • avro:写出到 avro 系统(avro 是一个 rpc 数据序列化系统)

为了保证数据的可靠性(以及 Kafak 具有削峰解耦的功能),我们当然是选择 Kafak Channel 作为 channel ,那么对于 Kafak Channel 的三种结构,我们应该如何选择呢?

首先第一种结构 - 结合 source 和 sink,这种结构不可取。因为 Kafka Channel 有一个参数 parseAsFlumeEvent(默认为 true) ,它的意思是把日志数据以 Flume Event  的格式存储到 Kafka,这就包括了 Event Header 和 Event Body。对于离线数仓而言,因为它本来 sink 用的就是 flume 的 sink 从 kafka channel 中读取,它可以把 event 中的 body 解析出来。但是对于实时数仓,flink 是从 kafka 中直接读取数据的,所以它读到的就是包含 header 和 body 的完整 event 数据,而 header 对于实时数仓来说根本没有,所以没有必要存储。

既然 header 不需要存储我们能不能把 parseAsFlumeEvent 设为 false 然后继续使用这种结构呢,我们知道拦截器是在 source 和 channel 之间起作用的,如果不使用 flume event 的形式存储数据就不能使用拦截器,而 channel 和 sink 之间又不能设置拦截器。所以最好的办法就是使用第二种结构 - Kafak Channel 结合 Source 使用,同时设置 parseAsFlumeEvent 参数为 false ,这样存储进 Kafka 的数据就只有 body,虽然没有了 sink,但是我们可以把存储在 Kafak 中的数据当做一个数据源,下游再用一个 Source 读取,这样我们就可以再设置拦截器了。

2、作业脚本配置

编写 flume 配置文件

file_to_kafka.conf

#定义组件
a1.sources = r1
a1.channels = c1
 
#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/taildir_position.json
 
#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false
 
#组装 
a1.sources.r1.channels = c1

测试

# 启动 zookeeper 和 kafka
zk start
kf.sh start
# 开启flume作业
bin/flume-ng agent -n a1 -c conf/ -f job/warehouse/file_to_kafka.conf -Dflume.root.logger=INFO,console
# 开启kafka消费者
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_log
# 开启日志生成器
mklog.sh

运行结果:

至此,flume 的 source -> kafka channel 配置完毕,接下来我们需要在 flume 这里进行一个简单的 ETL 数据清洗,把一些脏数据去除掉,而且这里不能做复杂的拦截器,比如添加多个拦截器,因为flume 毕竟只是一个传递数据的管道,如果这里的 ETL工作太复杂会导致数据堆积在拦截器这里,影响效率。

添加拦截器

引入依赖和打包插件:

    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
            <scope>provided</scope>
        </dependency>
 
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
    </dependencies>
 
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

Json 工具类:

package com.lyh.gmall.utils;
 
import com.alibaba.fastjson.JSONObject;
 
public class JSONUtil {
 
    // 通过异常捕获来校验json是否合法
    public static boolean isJsonValidate(String body) {
        try {
            JSONObject.parseObject(body);
            return true;
        }catch (Exception e){
            return false;
        }
    }
}

自定义拦截器:

package com.lyh.gmall.interceptor;
 
import com.lyh.gmall.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
 
import java.nio.charset.StandardCharsets;
import java.util.List;
 
public class ETLInterceptor implements Interceptor {
    @Override
    public void initialize() {
 
    }
 
    // 单个事件的拦截器
    @Override
    public Event intercept(Event event) {
        // 1. 获取body当中的数据
        String body = new String(event.getBody(), StandardCharsets.UTF_8);
        // 2. 判断数据是否合法,如果合法直接返回,否则返回 null
        return JSONUtil.isJsonValidate(body)?event:null;
    }
 
    // 多个事件的拦截器
    @Override
    public List<Event> intercept(List<Event> list) {
        for (int i = 0; i < list.size(); i++) {
            if (intercept(list.get(i)) == null){
                list.remove(i);
                i--;
            }
        }
        return list;
    }
 
    @Override
    public void close() {
 
    }
 
    public static class Builder implements Interceptor.Builder {
 
        @Override
        public Interceptor build() {
            return new ETLInterceptor();
        }
 
        @Override
        public void configure(Context context) {
 
        }
    }
}

打包后会生成两个jar包,选择带依赖的放到 flume 的 lib 目录下,在 flume 作业配置文件中添加以下配置来添加拦截器:

a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.lyh.gmall.interceptor.ETLInterceptor$Builder

测试:

因为 flume 监听的是 applog/log/app-log-xxx.log 文件,所以我们直接 echo 写入来模拟错误日志(不完整的 json 信息):

可以看到,不合法的 json 数据被过滤掉了,至此,我们的数据采集通道可用了。根据我们的项目架构图可以看到,我们需要在 hadoop102 和 hadoop103 上采集,所以 hadoop103 也许要配置:

flume 采集脚本

#!/bin/bash
 
case $1 in
"start"){
        for i in hadoop102 hadoop103
        do
                echo " -------- $i flume开始采集 -------"
                ssh $i "nohup /opt/module/flume-1.9.0/bin/flume-ng agent -n a1 -c /opt/module/flume-1.9.0/conf/ -f /opt/module/flume-1.9.0/job/warehouse/file_to_kafka.conf >/dev/null 2>&1 &"
        done
};; 
"stop"){
        for i in hadoop102 hadoop103
        do
                echo " -------- $i flume停止采集-------"
                ssh $i "ps -ef | grep file_to_kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "
        done
};;
esac

总结

       至此,用户行为日志的采集环境搭建好了,只要输入命令 f1.sh start 就可以让在 hadoop102 和 hadoop103 上的 flume开启采集日志了。接下来就是业务数据的采集了。

       这一部分还算简单,对于 flume 的操作无非就是拦截器的编写,逻辑也没多复杂,flume 的 source 使用了 taildir 监听由脚本不断生成的日志文件,channel 使用了 Kafka source ,sink 估计不会采用 flume 的 sink。

相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
23天前
|
存储 数据采集 分布式计算
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
35 1
|
2月前
|
Kubernetes API Docker
跟着iLogtail学习容器运行时与K8s下日志采集方案
iLogtail 作为开源可观测数据采集器,对 Kubernetes 环境下日志采集有着非常好的支持,本文跟随 iLogtail 的脚步,了解容器运行时与 K8s 下日志数据采集原理。
|
3月前
|
消息中间件 监控 关系型数据库
Serverless 应用的监控与调试问题之实时离线数仓一体化常用的解决方案有什么问题
Serverless 应用的监控与调试问题之实时离线数仓一体化常用的解决方案有什么问题
|
4月前
|
消息中间件 Kubernetes Kafka
日志采集/分析
日志采集/分析
78 7
|
3月前
|
存储 Kubernetes Java
在k8S中,容器内日志是怎么采集的?
在k8S中,容器内日志是怎么采集的?
|
4月前
|
存储 DataWorks Java
DataWorks产品使用合集之开发离线数仓时,需要多个工作空间的情况有哪些
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
3月前
|
数据采集 监控 Kubernetes
Job类日志采集问题之iLogtail以减小容器发现和开始采集的延时如何优化
Job类日志采集问题之iLogtail以减小容器发现和开始采集的延时如何优化
|
3月前
|
数据采集 Kubernetes Java
Job类日志采集问题之在日志中添加容器的元信息标签,如何操作
Job类日志采集问题之在日志中添加容器的元信息标签,如何操作
|
3月前
|
存储 容器
Job类日志采集问题之DaemonSet采集方式的参数以减小采集延时如何调整
Job类日志采集问题之DaemonSet采集方式的参数以减小采集延时如何调整
|
3月前
|
容器
Job类日志采集问题之ECI产品采集方式对于弹性扩缩容是如何支持的
Job类日志采集问题之ECI产品采集方式对于弹性扩缩容是如何支持的