离线数仓(二)【用户行为日志采集平台搭建】(2)

简介: 离线数仓(二)【用户行为日志采集平台搭建】

离线数仓(二)【用户行为日志采集平台搭建】(1)https://developer.aliyun.com/article/1532360

还有 sink:

  • hdfs:写出到 hdfs
  • kafka:写出到 kafka,相当于 kafka 消费者
  • avro:写出到 avro 系统(avro 是一个 rpc 数据序列化系统)

为了保证数据的可靠性(以及 Kafak 具有削峰解耦的功能),我们当然是选择 Kafak Channel 作为 channel ,那么对于 Kafak Channel 的三种结构,我们应该如何选择呢?

首先第一种结构 - 结合 source 和 sink,这种结构不可取。因为 Kafka Channel 有一个参数 parseAsFlumeEvent(默认为 true) ,它的意思是把日志数据以 Flume Event  的格式存储到 Kafka,这就包括了 Event Header 和 Event Body。对于离线数仓而言,因为它本来 sink 用的就是 flume 的 sink 从 kafka channel 中读取,它可以把 event 中的 body 解析出来。但是对于实时数仓,flink 是从 kafka 中直接读取数据的,所以它读到的就是包含 header 和 body 的完整 event 数据,而 header 对于实时数仓来说根本没有,所以没有必要存储。

既然 header 不需要存储我们能不能把 parseAsFlumeEvent 设为 false 然后继续使用这种结构呢,我们知道拦截器是在 source 和 channel 之间起作用的,如果不使用 flume event 的形式存储数据就不能使用拦截器,而 channel 和 sink 之间又不能设置拦截器。所以最好的办法就是使用第二种结构 - Kafak Channel 结合 Source 使用,同时设置 parseAsFlumeEvent 参数为 false ,这样存储进 Kafka 的数据就只有 body,虽然没有了 sink,但是我们可以把存储在 Kafak 中的数据当做一个数据源,下游再用一个 Source 读取,这样我们就可以再设置拦截器了。

2、作业脚本配置

编写 flume 配置文件

file_to_kafka.conf

#定义组件
a1.sources = r1
a1.channels = c1
 
#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/taildir_position.json
 
#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false
 
#组装 
a1.sources.r1.channels = c1

测试

# 启动 zookeeper 和 kafka
zk start
kf.sh start
# 开启flume作业
bin/flume-ng agent -n a1 -c conf/ -f job/warehouse/file_to_kafka.conf -Dflume.root.logger=INFO,console
# 开启kafka消费者
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_log
# 开启日志生成器
mklog.sh

运行结果:

至此,flume 的 source -> kafka channel 配置完毕,接下来我们需要在 flume 这里进行一个简单的 ETL 数据清洗,把一些脏数据去除掉,而且这里不能做复杂的拦截器,比如添加多个拦截器,因为flume 毕竟只是一个传递数据的管道,如果这里的 ETL工作太复杂会导致数据堆积在拦截器这里,影响效率。

添加拦截器

引入依赖和打包插件:

    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
            <scope>provided</scope>
        </dependency>
 
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
    </dependencies>
 
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

Json 工具类:

package com.lyh.gmall.utils;
 
import com.alibaba.fastjson.JSONObject;
 
public class JSONUtil {
 
    // 通过异常捕获来校验json是否合法
    public static boolean isJsonValidate(String body) {
        try {
            JSONObject.parseObject(body);
            return true;
        }catch (Exception e){
            return false;
        }
    }
}

自定义拦截器:

package com.lyh.gmall.interceptor;
 
import com.lyh.gmall.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
 
import java.nio.charset.StandardCharsets;
import java.util.List;
 
public class ETLInterceptor implements Interceptor {
    @Override
    public void initialize() {
 
    }
 
    // 单个事件的拦截器
    @Override
    public Event intercept(Event event) {
        // 1. 获取body当中的数据
        String body = new String(event.getBody(), StandardCharsets.UTF_8);
        // 2. 判断数据是否合法,如果合法直接返回,否则返回 null
        return JSONUtil.isJsonValidate(body)?event:null;
    }
 
    // 多个事件的拦截器
    @Override
    public List<Event> intercept(List<Event> list) {
        for (int i = 0; i < list.size(); i++) {
            if (intercept(list.get(i)) == null){
                list.remove(i);
                i--;
            }
        }
        return list;
    }
 
    @Override
    public void close() {
 
    }
 
    public static class Builder implements Interceptor.Builder {
 
        @Override
        public Interceptor build() {
            return new ETLInterceptor();
        }
 
        @Override
        public void configure(Context context) {
 
        }
    }
}

打包后会生成两个jar包,选择带依赖的放到 flume 的 lib 目录下,在 flume 作业配置文件中添加以下配置来添加拦截器:

a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.lyh.gmall.interceptor.ETLInterceptor$Builder

测试:

因为 flume 监听的是 applog/log/app-log-xxx.log 文件,所以我们直接 echo 写入来模拟错误日志(不完整的 json 信息):

可以看到,不合法的 json 数据被过滤掉了,至此,我们的数据采集通道可用了。根据我们的项目架构图可以看到,我们需要在 hadoop102 和 hadoop103 上采集,所以 hadoop103 也许要配置:

flume 采集脚本

#!/bin/bash
 
case $1 in
"start"){
        for i in hadoop102 hadoop103
        do
                echo " -------- $i flume开始采集 -------"
                ssh $i "nohup /opt/module/flume-1.9.0/bin/flume-ng agent -n a1 -c /opt/module/flume-1.9.0/conf/ -f /opt/module/flume-1.9.0/job/warehouse/file_to_kafka.conf >/dev/null 2>&1 &"
        done
};; 
"stop"){
        for i in hadoop102 hadoop103
        do
                echo " -------- $i flume停止采集-------"
                ssh $i "ps -ef | grep file_to_kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "
        done
};;
esac

总结

       至此,用户行为日志的采集环境搭建好了,只要输入命令 f1.sh start 就可以让在 hadoop102 和 hadoop103 上的 flume开启采集日志了。接下来就是业务数据的采集了。

       这一部分还算简单,对于 flume 的操作无非就是拦截器的编写,逻辑也没多复杂,flume 的 source 使用了 taildir 监听由脚本不断生成的日志文件,channel 使用了 Kafka source ,sink 估计不会采用 flume 的 sink。

相关实践学习
数据库实验室挑战任务-初级任务
本场景介绍如何开通属于你的免费云数据库,在RDS-MySQL中完成对学生成绩的详情查询,执行指定类型SQL。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
14天前
|
存储 数据采集 JavaScript
深入理解数仓开发(一)数据技术篇之日志采集
深入理解数仓开发(一)数据技术篇之日志采集
|
7天前
|
存储 监控 安全
《SelectDB 新一代日志存储分析平台解决方案》白皮书重磅发布|立即下载
作为基于 Apache Doris 打造的现代化数据仓库,SelectDB 不拘泥于传统数仓的限制,针对日志数据的特点引入了多项创新性技术,使用户可基于 SelectDB 构建开放、高性能、低成本、统一的日志存储分析平台, 截至目前已在近百家行业内知名企业中落地。
《SelectDB 新一代日志存储分析平台解决方案》白皮书重磅发布|立即下载
|
14天前
|
机器学习/深度学习 人工智能 DataWorks
人工智能平台PAI产品使用合集之在使用行调用时遇到一直卡在ps job的问题,并且无法在DataWorks上查看到相关日志,是什么导致的
阿里云人工智能平台PAI是一个功能强大、易于使用的AI开发平台,旨在降低AI开发门槛,加速创新,助力企业和开发者高效构建、部署和管理人工智能应用。其中包含了一系列相互协同的产品与服务,共同构成一个完整的人工智能开发与应用生态系统。以下是对PAI产品使用合集的概述,涵盖数据处理、模型开发、训练加速、模型部署及管理等多个环节。
|
14天前
|
存储 消息中间件 NoSQL
Flink 实时数仓(一)【实时数仓&离线数仓对比】(2)
Flink 实时数仓(一)【实时数仓&离线数仓对比】
|
14天前
|
存储 消息中间件 Kafka
Flink 实时数仓(一)【实时数仓&离线数仓对比】(1)
Flink 实时数仓(一)【实时数仓&离线数仓对比】
|
18天前
|
SQL
离线数仓(十)【ADS 层开发】(5)
离线数仓(十)【ADS 层开发】
离线数仓(十)【ADS 层开发】(4)
离线数仓(十)【ADS 层开发】
离线数仓(十)【ADS 层开发】(3)
离线数仓(十)【ADS 层开发】
|
15天前
|
Cloud Native 数据管理 OLAP
云原生数据仓库AnalyticDB产品使用合集之是否可以创建表而不使用分区
阿里云AnalyticDB提供了全面的数据导入、查询分析、数据管理、运维监控等功能,并通过扩展功能支持与AI平台集成、跨地域复制与联邦查询等高级应用场景,为企业构建实时、高效、可扩展的数据仓库解决方案。以下是对AnalyticDB产品使用合集的概述,包括数据导入、查询分析、数据管理、运维监控、扩展功能等方面。
337 2
云原生数据仓库AnalyticDB产品使用合集之是否可以创建表而不使用分区
|
15天前
|
SQL Cloud Native 关系型数据库
云原生数据仓库AnalyticDB产品使用合集之如何进行一键诊断
阿里云AnalyticDB提供了全面的数据导入、查询分析、数据管理、运维监控等功能,并通过扩展功能支持与AI平台集成、跨地域复制与联邦查询等高级应用场景,为企业构建实时、高效、可扩展的数据仓库解决方案。以下是对AnalyticDB产品使用合集的概述,包括数据导入、查询分析、数据管理、运维监控、扩展功能等方面。
350 7