离线数仓(四)【数仓数据同步策略】(4)

本文涉及的产品
云原生数据仓库AnalyticDB MySQL版,8核32GB 100GB 1个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: 离线数仓(四)【数仓数据同步策略】

离线数仓(四)【数仓数据同步策略】(3)https://developer.aliyun.com/article/1532384

sink 配置:

这里除了设置输出的 hdfs 路径必须包含日期之外,主要就是滚动策略的配置,我们要防止小文件的问题。

编写拦截器:

package com.lyh.gmall.interceptor;
 
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
 
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
 
public class TimestampAndTableNameInterceptor implements Interceptor {
 
    @Override
    public void initialize() {
 
    }
 
    @Override
    public Event intercept(Event event) {
        // 1. 把 body 中的 timestamp 和 table 字段提取出来 放到 header
        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody(), StandardCharsets.UTF_8);
 
        // 2. 解析 log 中的 ts 和 table 字段
        JSONObject json = JSONObject.parseObject(log);
        String ts = json.getString("ts");
        String table = json.getString("table");
 
        // 3. 把 ts 和 table 字段放到 header 中的 tableName 和 timestamp 字段
        headers.put("tableName",table);
        headers.put("timestamp",ts + "000");
 
        return event;
    }
 
    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event: list)
            intercept(event);
        return list;
    }
 
    @Override
    public void close() {
 
    }
 
    public static class Builder implements Interceptor.Builder{
 
        @Override
        public Interceptor build() {
            return new TimestampAndTableNameInterceptor();
        }
 
        @Override
        public void configure(Context context) {
 
        }
    }
 
}

打包放到 hadoop104 上 flume 的 lib 目录下,开始测试:

打通通道

myhadoop start
zk start
kf.sh start
mxw.sh start

启动 flume 作业:

[lyh@hadoop104 flume-1.9.0]$ bin/flume-ng agent -n a1 -c conf/ -f job/warehouse/kafka_to_hdfs_db.conf -Dflume.root.logger=INFO,console

模拟业务数据生成:

cd /opt/module/db_log/
java -jar gmall2020-mock-db-2021-11-14.jar

查看 hdfs:

可以看到,其中带 inc 后缀的都是我们增量同步进来的数据。

增量同步文件数 =  总文件数 - 全量同步文件数 = 27 - 15 = 12 ,没有问题

这里存在一个问题:我们之前在拦截器中设置了 event header 中的 timestamp 为 kafka 中的数据t  ts 字段的时间信息,但是这里却依然是我们机器的时间,这是因为我们 java -jar 操作数据库的时间就是我们服务器当前的时间,所以导致 Maxwelll 读取 binlog 后的数据就是当前服务器的时间。具体解决办法看下面的 Maxwell 配置。

2. 编写增量数据同步脚本
vim f3.sh
#!/bin/bash
 
case $1 in
"start")
        echo " --------启动 hadoop104 业务数据flume-------"
        ssh hadoop104 "nohup /opt/module/flume-1.9.0/bin/flume-ng agent -n a1 -c /opt/module/flume-1.9.0/conf -f /opt/module/flume-1.9.0/job/warehouse/kafka_to_hdfs_db.conf >/dev/null 2>&1 &"
;;
"stop")
 
        echo " --------停止 hadoop104 业务数据flume-------"
        ssh hadoop104 "ps -ef | grep kafka_to_hdfs_db | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac
3. Maxwell 配置

这里主要是解决时间戳的问题:

生产环境中是不会有这个问题的,这里我们用的是 经过修改源码的 Maxwell,所以只需要修改一下配置文件即可:

cd /opt/module/maxwell-1.29.2/
vim config.properties

添加配置:

mock_date=2020-06-14

4. 增量表首日全量同步

       增量表本来就存在一些数据,但是 Maxwell 在监听的 binlog 的时候是不知道的,所以我们还需要全量同步一次增量表中的历史数据。但是我们用哪个工具呢,我们知道,Maxwell 也可以做全量,DataX也可以。这里我们选择 Maxwell ,因为 DataX 同步到 HDFS 的文件是一个以特定字符分割的文件,而 Maxwell 同步到 HDFS 的文件是 json 格式的,所以我们肯定是希望保存到 HDFS 后的数据格式都是一致的,那我们就自然会联想到学习 Maxwell 说的 bootstrap,它是 Maxwell  的一张元数据表。

编写初始化脚本:

vim mysql_to_kafka_inc_init.sh
#!/bin/bash
 
# 该脚本的作用是初始化所有的增量表,只需执行一次
 
MAXWELL_HOME=/opt/module/maxwell-1.29.2
 
import_data() {
    $MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties
}
 
case $1 in
"cart_info")
  import_data cart_info
  ;;
"comment_info")
  import_data comment_info
  ;;
"coupon_use")
  import_data coupon_use
  ;;
"favor_info")
  import_data favor_info
  ;;
"order_detail")
  import_data order_detail
  ;;
"order_detail_activity")
  import_data order_detail_activity
  ;;
"order_detail_coupon")
  import_data order_detail_coupon
  ;;
"order_info")
  import_data order_info
  ;;
"order_refund_info")
  import_data order_refund_info
  ;;
"order_status_log")
  import_data order_status_log
  ;;
"payment_info")
  import_data payment_info
  ;;
"refund_payment")
  import_data refund_payment
  ;;
"user_info")
  import_data user_info
  ;;
"all")
  import_data cart_info
  import_data comment_info
  import_data coupon_use
  import_data favor_info
  import_data order_detail
  import_data order_detail_activity
  import_data order_detail_coupon
  import_data order_info
  import_data order_refund_info
  import_data order_status_log
  import_data payment_info
  import_data refund_payment
  import_data user_info
  ;;
esac

测试:

f3.sh start 
mysql_to_hdfs_full_init.sh all

这里需要牢记 Maxwell 可以既做全量又做增量为什么还需要 DataX,这是因为 DataX 对于全量同步更加专业,因为它可以进行一些流控,而且支持更多的数据源并且支持并发。所以 Maxwell 只在初始化同步历史数据的时候用一下,所以不用担心它的性能问题。

2.3、采集通道启/停脚本

这里只是为了方便学习的时候用的,生产环境千万不敢用:

#!/bin/bash
 
case $1 in
"start"){
        echo ================== 启动 集群 ==================
 
        #启动 Zookeeper集群
        zk start
 
        #启动 Hadoop集群
        myhadoop start
 
        #启动 Kafka采集集群
        kf.sh start
 
        #启动采集 Flume
        f1.sh start
 
  #启动日志消费 Flume
        f2.sh start
 
  #启动业务消费 Flume
        f3.sh start
 
  #启动 maxwell
        mxw.sh start
 
        };;
"stop"){
        echo ================== 停止 集群 ==================
 
  #停止 Maxwell
        mxw.sh stop
 
  #停止 业务消费Flume
        f3.sh stop
 
  #停止 日志消费Flume
        f2.sh stop
 
  #停止 日志采集Flume
        f1.sh stop
 
        #停止 Kafka采集集群
        kf.sh stop
 
        #停止 Hadoop集群
        myhadoop stop
 
        #停止 Zookeeper集群
        zk stop
 
};;
esac

总结

       现在是2024-2-27 19:28 。

       到这里,我们的数仓数据同步工作就都做完了,包括全量用户行为日志的同步(用户行为日志数据并没有增量同步)、增量业务数据的同步、全量业务数据的同步以及业务数据的历史数据初始化全量同步。

       接下来就是关于数仓的知识的学习了,这部分也将是最最重要的!不管是理论还是建模方法和编程实践。

       今天额外的好消息就是四级终于过了,这就剩下了很多时间去专心技术啦!

相关实践学习
数据库实验室挑战任务-初级任务
本场景介绍如何开通属于你的免费云数据库,在RDS-MySQL中完成对学生成绩的详情查询,执行指定类型SQL。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
17天前
|
消息中间件 关系型数据库 Kafka
深入理解数仓开发(二)数据技术篇之数据同步
深入理解数仓开发(二)数据技术篇之数据同步
|
20天前
|
数据挖掘
离线数仓(十)【ADS 层开发】(2)
离线数仓(十)【ADS 层开发】
|
17天前
|
分布式计算 DataWorks 关系型数据库
DataWorks操作报错合集之离线同步任务中,把表数据同步到POLARDB,显示所有数据都是脏数据,报错信息:ERROR JobContainer - 运行scheduler 模式[local]出错.是什么原因
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
17天前
|
存储 消息中间件 NoSQL
Flink 实时数仓(一)【实时数仓&离线数仓对比】(2)
Flink 实时数仓(一)【实时数仓&离线数仓对比】
|
17天前
|
存储 消息中间件 Kafka
Flink 实时数仓(一)【实时数仓&离线数仓对比】(1)
Flink 实时数仓(一)【实时数仓&离线数仓对比】
|
20天前
|
SQL
离线数仓(十)【ADS 层开发】(5)
离线数仓(十)【ADS 层开发】
离线数仓(十)【ADS 层开发】(4)
离线数仓(十)【ADS 层开发】
离线数仓(十)【ADS 层开发】(3)
离线数仓(十)【ADS 层开发】
|
20天前
|
存储 SQL 数据可视化
离线数仓(十)【ADS 层开发】(1)
离线数仓(十)【ADS 层开发】
|
17天前
|
Cloud Native 数据管理 OLAP
云原生数据仓库AnalyticDB产品使用合集之是否可以创建表而不使用分区
阿里云AnalyticDB提供了全面的数据导入、查询分析、数据管理、运维监控等功能,并通过扩展功能支持与AI平台集成、跨地域复制与联邦查询等高级应用场景,为企业构建实时、高效、可扩展的数据仓库解决方案。以下是对AnalyticDB产品使用合集的概述,包括数据导入、查询分析、数据管理、运维监控、扩展功能等方面。
340 2
云原生数据仓库AnalyticDB产品使用合集之是否可以创建表而不使用分区

热门文章

最新文章