离线数仓(二)【用户行为日志采集平台搭建】(2)

本文涉及的产品
云原生数据仓库AnalyticDB MySQL版,基础版 8ACU 100GB 1个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: 离线数仓(二)【用户行为日志采集平台搭建】

离线数仓(二)【用户行为日志采集平台搭建】(1)https://developer.aliyun.com/article/1532360

还有 sink:

  • hdfs:写出到 hdfs
  • kafka:写出到 kafka,相当于 kafka 消费者
  • avro:写出到 avro 系统(avro 是一个 rpc 数据序列化系统)

为了保证数据的可靠性(以及 Kafak 具有削峰解耦的功能),我们当然是选择 Kafak Channel 作为 channel ,那么对于 Kafak Channel 的三种结构,我们应该如何选择呢?

首先第一种结构 - 结合 source 和 sink,这种结构不可取。因为 Kafka Channel 有一个参数 parseAsFlumeEvent(默认为 true) ,它的意思是把日志数据以 Flume Event  的格式存储到 Kafka,这就包括了 Event Header 和 Event Body。对于离线数仓而言,因为它本来 sink 用的就是 flume 的 sink 从 kafka channel 中读取,它可以把 event 中的 body 解析出来。但是对于实时数仓,flink 是从 kafka 中直接读取数据的,所以它读到的就是包含 header 和 body 的完整 event 数据,而 header 对于实时数仓来说根本没有,所以没有必要存储。

既然 header 不需要存储我们能不能把 parseAsFlumeEvent 设为 false 然后继续使用这种结构呢,我们知道拦截器是在 source 和 channel 之间起作用的,如果不使用 flume event 的形式存储数据就不能使用拦截器,而 channel 和 sink 之间又不能设置拦截器。所以最好的办法就是使用第二种结构 - Kafak Channel 结合 Source 使用,同时设置 parseAsFlumeEvent 参数为 false ,这样存储进 Kafka 的数据就只有 body,虽然没有了 sink,但是我们可以把存储在 Kafak 中的数据当做一个数据源,下游再用一个 Source 读取,这样我们就可以再设置拦截器了。

2、作业脚本配置

编写 flume 配置文件

file_to_kafka.conf

#定义组件
a1.sources = r1
a1.channels = c1
 
#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume-1.9.0/taildir_position.json
 
#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false
 
#组装 
a1.sources.r1.channels = c1

测试

# 启动 zookeeper 和 kafka
zk start
kf.sh start
# 开启flume作业
bin/flume-ng agent -n a1 -c conf/ -f job/warehouse/file_to_kafka.conf -Dflume.root.logger=INFO,console
# 开启kafka消费者
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_log
# 开启日志生成器
mklog.sh

运行结果:

至此,flume 的 source -> kafka channel 配置完毕,接下来我们需要在 flume 这里进行一个简单的 ETL 数据清洗,把一些脏数据去除掉,而且这里不能做复杂的拦截器,比如添加多个拦截器,因为flume 毕竟只是一个传递数据的管道,如果这里的 ETL工作太复杂会导致数据堆积在拦截器这里,影响效率。

添加拦截器

引入依赖和打包插件:

    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
            <scope>provided</scope>
        </dependency>
 
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
    </dependencies>
 
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

Json 工具类:

package com.lyh.gmall.utils;
 
import com.alibaba.fastjson.JSONObject;
 
public class JSONUtil {
 
    // 通过异常捕获来校验json是否合法
    public static boolean isJsonValidate(String body) {
        try {
            JSONObject.parseObject(body);
            return true;
        }catch (Exception e){
            return false;
        }
    }
}

自定义拦截器:

package com.lyh.gmall.interceptor;
 
import com.lyh.gmall.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
 
import java.nio.charset.StandardCharsets;
import java.util.List;
 
public class ETLInterceptor implements Interceptor {
    @Override
    public void initialize() {
 
    }
 
    // 单个事件的拦截器
    @Override
    public Event intercept(Event event) {
        // 1. 获取body当中的数据
        String body = new String(event.getBody(), StandardCharsets.UTF_8);
        // 2. 判断数据是否合法,如果合法直接返回,否则返回 null
        return JSONUtil.isJsonValidate(body)?event:null;
    }
 
    // 多个事件的拦截器
    @Override
    public List<Event> intercept(List<Event> list) {
        for (int i = 0; i < list.size(); i++) {
            if (intercept(list.get(i)) == null){
                list.remove(i);
                i--;
            }
        }
        return list;
    }
 
    @Override
    public void close() {
 
    }
 
    public static class Builder implements Interceptor.Builder {
 
        @Override
        public Interceptor build() {
            return new ETLInterceptor();
        }
 
        @Override
        public void configure(Context context) {
 
        }
    }
}

打包后会生成两个jar包,选择带依赖的放到 flume 的 lib 目录下,在 flume 作业配置文件中添加以下配置来添加拦截器:

a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.lyh.gmall.interceptor.ETLInterceptor$Builder

测试:

因为 flume 监听的是 applog/log/app-log-xxx.log 文件,所以我们直接 echo 写入来模拟错误日志(不完整的 json 信息):

可以看到,不合法的 json 数据被过滤掉了,至此,我们的数据采集通道可用了。根据我们的项目架构图可以看到,我们需要在 hadoop102 和 hadoop103 上采集,所以 hadoop103 也许要配置:

flume 采集脚本

#!/bin/bash
 
case $1 in
"start"){
        for i in hadoop102 hadoop103
        do
                echo " -------- $i flume开始采集 -------"
                ssh $i "nohup /opt/module/flume-1.9.0/bin/flume-ng agent -n a1 -c /opt/module/flume-1.9.0/conf/ -f /opt/module/flume-1.9.0/job/warehouse/file_to_kafka.conf >/dev/null 2>&1 &"
        done
};; 
"stop"){
        for i in hadoop102 hadoop103
        do
                echo " -------- $i flume停止采集-------"
                ssh $i "ps -ef | grep file_to_kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "
        done
};;
esac

总结

       至此,用户行为日志的采集环境搭建好了,只要输入命令 f1.sh start 就可以让在 hadoop102 和 hadoop103 上的 flume开启采集日志了。接下来就是业务数据的采集了。

       这一部分还算简单,对于 flume 的操作无非就是拦截器的编写,逻辑也没多复杂,flume 的 source 使用了 taildir 监听由脚本不断生成的日志文件,channel 使用了 Kafka source ,sink 估计不会采用 flume 的 sink。

相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
目录
打赏
0
0
0
0
37
分享
相关文章
Dataphin离线数仓搭建深度测评:数据工程师的实战视角
作为一名金融行业数据工程师,我参与了阿里云Dataphin智能研发版的评测。通过《离线数仓搭建》实践,体验了其在数据治理中的核心能力。Dataphin在环境搭建、管道开发和任务管理上显著提效,如测试环境搭建从3天缩短至2小时,复杂表映射效率提升50%。产品支持全链路治理、智能提效和架构兼容,帮助企业降低40%建设成本,缩短60%需求响应周期。建议加强行业模板库和移动适配功能,进一步提升使用体验。
Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
本文介绍了阿里云实时数仓Hologres负责人姜伟华在Flink Forward Asia 2024上的分享,涵盖实时数仓的发展历程、从实时数仓到实时湖仓的演进,以及总结。文章通过三代实时数仓架构的演变,详细解析了Lambda架构、Kafka实时数仓分层+OLAP、Hologres实时数仓分层复用等方案,并探讨了未来从实时数仓到实时湖仓的演进方向。最后,结合实际案例和Demo展示了Hologres + Flink + Paimon在实时湖仓中的应用,帮助用户根据业务需求选择合适的方案。
589 20
Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
【重磅发布】AllData数据中台核心功能:湖仓一体化平台
杭州奥零数据科技有限公司成立于2023年,专注于数据中台业务,维护开源项目AllData并提供商业版解决方案。AllData提供数据集成、存储、开发、治理及BI展示等一站式服务,支持AI大模型应用,助力企业高效利用数据价值。
【重磅发布】AllData数据中台核心功能:湖仓一体化平台
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
本文整理自鹰角网络大数据开发工程师朱正军在Flink Forward Asia 2024上的分享,主要涵盖四个方面:鹰角数据平台架构、数据湖选型、湖仓一体建设及未来展望。文章详细介绍了鹰角如何构建基于Paimon的数据湖,解决了Hudi入湖的痛点,并通过Trino引擎和Ranger权限管理实现高效的数据查询与管控。此外,还探讨了湖仓一体平台的落地效果及未来技术发展方向,包括Trino与Paimon的集成增强、StarRocks的应用以及Paimon全面替换Hive的计划。
197 1
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
湖仓分析|浙江霖梓基于 Doris + Paimon 打造实时/离线一体化湖仓架构
浙江霖梓早期基于 Apache Doris 进行整体架构与表结构的重构,并基于湖仓一体和查询加速展开深度探索与实践,打造了 Doris + Paimon 的实时/离线一体化湖仓架构,实现查询提速 30 倍、资源成本节省 67% 等显著成效。
湖仓分析|浙江霖梓基于 Doris + Paimon 打造实时/离线一体化湖仓架构
​Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
​Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践
中信银行信用卡中心每日新增日志数据 140 亿条(80TB),全量归档日志量超 40PB,早期基于 Elasticsearch 构建的日志云平台,面临存储成本高、实时写入性能差、文本检索慢以及日志分析能力不足等问题。因此使用 Apache Doris 替换 Elasticsearch,实现资源投入降低 50%、查询速度提升 2~4 倍,同时显著提高了运维效率。
金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践
实时数仓 Hologres 产品介绍:一体化实时湖仓平台
本次方案的主题是实时数仓 Hologres 产品介绍:一体化实时湖仓平台,介绍了 Hologres 湖仓存储一体,多模式计算一体、分析服务一体和 Data+AI 一体四方面一体化场景,并对其运维监控方面及客户案例进行一定讲解。 1. Hologres :面向未来的一体化实时湖仓 2. 运维监控 3. 客户案例 4. 总结
170 14
美的楼宇科技基于阿里云 EMR Serverless Spark 构建 LakeHouse 湖仓数据平台
美的楼宇科技基于阿里云 EMR Serverless Spark 建设 IoT 数据平台,实现了数据与 AI 技术的有效融合,解决了美的楼宇科技设备数据量庞大且持续增长、数据半结构化、数据价值缺乏深度挖掘的痛点问题。并结合 EMR Serverless StarRocks 搭建了 Lakehouse 平台,最终实现不同场景下整体性能提升50%以上,同时综合成本下降30%。

热门文章

最新文章