4、离线数仓数据同步策略(全量表数据同步、增量表数据同步、首日同步、采集通道脚本)(二)

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介: 4、离线数仓数据同步策略(全量表数据同步、增量表数据同步、首日同步、采集通道脚本)(二)

脚本内容如下

#!/bin/bash
python ~/bin/gen_import_config.py -d gmall -t activity_info
python ~/bin/gen_import_config.py -d gmall -t activity_rule
python ~/bin/gen_import_config.py -d gmall -t base_category1
python ~/bin/gen_import_config.py -d gmall -t base_category2
python ~/bin/gen_import_config.py -d gmall -t base_category3
python ~/bin/gen_import_config.py -d gmall -t base_dic
python ~/bin/gen_import_config.py -d gmall -t base_province
python ~/bin/gen_import_config.py -d gmall -t base_region
python ~/bin/gen_import_config.py -d gmall -t base_trademark
python ~/bin/gen_import_config.py -d gmall -t cart_info
python ~/bin/gen_import_config.py -d gmall -t coupon_info
python ~/bin/gen_import_config.py -d gmall -t sku_attr_value
python ~/bin/gen_import_config.py -d gmall -t sku_info
python ~/bin/gen_import_config.py -d gmall -t sku_sale_attr_value
python ~/bin/gen_import_config.py -d gmall -t spu_info

(3)为gen_import_config.sh脚本增加执行权限

chmod 777 ~/bin/gen_import_config.sh

(4)执行gen_import_config.sh脚本,生成配置文件

gen_import_config.sh

(5)观察生成的配置文件

5、 测试生成的DataX配置文件

以activity_info为例,测试用脚本生成的配置文件是否可用。

1、创建目标路径

由于DataX同步任务要求目标路径提前存在,故需手动创建路径,当前activity_info表的目标路径应为/origin_data/gmall/db/activity_info_full/2020-06-14

hadoop fs -mkdir /origin_data/gmall/db/activity_info_full/2020-06-14

2、执行DataX同步命令

python /opt/module/datax/bin/datax.py -p"-Dtargetdir=/origin_data/gmall/db/activity_info_full/2020-06-14" /opt/module/datax/job/import/gmall.activity_info.json

3、观察同步结果

6、全量表数据同步脚本

为方便使用以及后续的任务调度,此处编写一个全量表数据同步脚本。

(1)在~/bin目录创建mysql_to_hdfs_full.sh

脚本内容如下

#!/bin/bash
DATAX_HOME=/opt/module/datax
# 如果传入日期则do_date等于传入的日期,否则等于前一天日期
if [ -n "$2" ] ;then
    do_date=$2
else
    do_date=`date -d "-1 day" +%F`
fi
#处理目标路径,此处的处理逻辑是,如果目标路径不存在,则创建;若存在,则清空,目的是保证同步任务可重复执行
handle_targetdir() {
  hadoop fs -test -e $1
  if [[ $? -eq 1 ]]; then
    echo "路径$1不存在,正在创建......"
    hadoop fs -mkdir -p $1
  else
    echo "路径$1已经存在"
    fs_count=$(hadoop fs -count $1)
    content_size=$(echo $fs_count | awk '{print $3}')
    if [[ $content_size -eq 0 ]]; then
      echo "路径$1为空"
    else
      echo "路径$1不为空,正在清空......"
      hadoop fs -rm -r -f $1/*
    fi
  fi
}
#数据同步
import_data() {
  datax_config=$1
  target_dir=$2
  handle_targetdir $target_dir
  python $DATAX_HOME/bin/datax.py -p"-Dtargetdir=$target_dir" $datax_config
}
case $1 in
"activity_info")
  import_data /opt/module/datax/job/import/gmall.activity_info.json /origin_data/gmall/db/activity_info_full/$do_date
  ;;
"activity_rule")
  import_data /opt/module/datax/job/import/gmall.activity_rule.json /origin_data/gmall/db/activity_rule_full/$do_date
  ;;
"base_category1")
  import_data /opt/module/datax/job/import/gmall.base_category1.json /origin_data/gmall/db/base_category1_full/$do_date
  ;;
"base_category2")
  import_data /opt/module/datax/job/import/gmall.base_category2.json /origin_data/gmall/db/base_category2_full/$do_date
  ;;
"base_category3")
  import_data /opt/module/datax/job/import/gmall.base_category3.json /origin_data/gmall/db/base_category3_full/$do_date
  ;;
"base_dic")
  import_data /opt/module/datax/job/import/gmall.base_dic.json /origin_data/gmall/db/base_dic_full/$do_date
  ;;
"base_province")
  import_data /opt/module/datax/job/import/gmall.base_province.json /origin_data/gmall/db/base_province_full/$do_date
  ;;
"base_region")
  import_data /opt/module/datax/job/import/gmall.base_region.json /origin_data/gmall/db/base_region_full/$do_date
  ;;
"base_trademark")
  import_data /opt/module/datax/job/import/gmall.base_trademark.json /origin_data/gmall/db/base_trademark_full/$do_date
  ;;
"cart_info")
  import_data /opt/module/datax/job/import/gmall.cart_info.json /origin_data/gmall/db/cart_info_full/$do_date
  ;;
"coupon_info")
  import_data /opt/module/datax/job/import/gmall.coupon_info.json /origin_data/gmall/db/coupon_info_full/$do_date
  ;;
"sku_attr_value")
  import_data /opt/module/datax/job/import/gmall.sku_attr_value.json /origin_data/gmall/db/sku_attr_value_full/$do_date
  ;;
"sku_info")
  import_data /opt/module/datax/job/import/gmall.sku_info.json /origin_data/gmall/db/sku_info_full/$do_date
  ;;
"sku_sale_attr_value")
  import_data /opt/module/datax/job/import/gmall.sku_sale_attr_value.json /origin_data/gmall/db/sku_sale_attr_value_full/$do_date
  ;;
"spu_info")
  import_data /opt/module/datax/job/import/gmall.spu_info.json /origin_data/gmall/db/spu_info_full/$do_date
  ;;
"all")
  import_data /opt/module/datax/job/import/gmall.activity_info.json /origin_data/gmall/db/activity_info_full/$do_date
  import_data /opt/module/datax/job/import/gmall.activity_rule.json /origin_data/gmall/db/activity_rule_full/$do_date
  import_data /opt/module/datax/job/import/gmall.base_category1.json /origin_data/gmall/db/base_category1_full/$do_date
  import_data /opt/module/datax/job/import/gmall.base_category2.json /origin_data/gmall/db/base_category2_full/$do_date
  import_data /opt/module/datax/job/import/gmall.base_category3.json /origin_data/gmall/db/base_category3_full/$do_date
  import_data /opt/module/datax/job/import/gmall.base_dic.json /origin_data/gmall/db/base_dic_full/$do_date
  import_data /opt/module/datax/job/import/gmall.base_province.json /origin_data/gmall/db/base_province_full/$do_date
  import_data /opt/module/datax/job/import/gmall.base_region.json /origin_data/gmall/db/base_region_full/$do_date
  import_data /opt/module/datax/job/import/gmall.base_trademark.json /origin_data/gmall/db/base_trademark_full/$do_date
  import_data /opt/module/datax/job/import/gmall.cart_info.json /origin_data/gmall/db/cart_info_full/$do_date
  import_data /opt/module/datax/job/import/gmall.coupon_info.json /origin_data/gmall/db/coupon_info_full/$do_date
  import_data /opt/module/datax/job/import/gmall.sku_attr_value.json /origin_data/gmall/db/sku_attr_value_full/$do_date
  import_data /opt/module/datax/job/import/gmall.sku_info.json /origin_data/gmall/db/sku_info_full/$do_date
  import_data /opt/module/datax/job/import/gmall.sku_sale_attr_value.json /origin_data/gmall/db/sku_sale_attr_value_full/$do_date
  import_data /opt/module/datax/job/import/gmall.spu_info.json /origin_data/gmall/db/spu_info_full/$do_date
  ;;
esac

(2)为mysql_to_hdfs_full.sh增加执行权限

chmod 777 ~/bin/mysql_to_hdfs_full.sh

(3)测试同步脚本

mysql_to_hdfs_full.sh all 2020-06-14

(4)检查同步结果

查看HDFS目表路径是否出现全量表数据,全量表共15张。

2.2.6 增量表数据同步

1、数据通道


7b7c78b7788a44608aeb3a8aeccb10a2.png

2、 Flume配置

(1)Flume配置概述

Flume需要将Kafka中topic_db主题的数据传输到HDFS,故其需选用KafkaSource以及HDFSSink,Channel选用FileChannel。

需要注意的是, HDFSSink需要将不同mysql业务表的数据写到不同的路径,并且路径中应当包含一层日期,用于区分每天的数据。关键配置如下:

a4cc72f2cdc9492db04f181ad4b66125.png

具体数据示例如下:


f6afa477560b4bf4a82aa6a58f07906a.png

(2)Flume配置实操

(a)创建Flume配置文件

在hadoop104节点的Flume的job目录下创建kafka_to_hdfs_db.conf

配置内容如下

a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics = topic_db
a1.sources.r1.kafka.consumer.group.id = flume
a1.sources.r1.setTopicHeader = true
a1.sources.r1.topicHeader = topic
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.zhm.gmall.flume.interceptor.TimestampAndTableNameInterceptor$Builder
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/db/%{tableName}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

(b)编写拦截器

新建一个Maven项目,并在pom.xml文件中加入如下配置

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

在com.zhm.gmall.flume.interceptor包下创建TimestampAndTableNameInterceptor类

package com.zhm.gmall.flume.interceptor;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
public class TimestampAndTableNameInterceptor implements Interceptor {
    @Override
    public void initialize() {
    }
    @Override
    public Event intercept(Event event) {
        Map<String, String> headers = event.getHeaders();
String log = new String(event.getBody(), StandardCharsets.UTF_8);
    JSONObject jsonObject = JSONObject.parseObject(log);
    Long ts = jsonObject.getLong("ts");
    //Maxwell输出的数据中的ts字段时间戳单位为秒,Flume HDFSSink要求单位为毫秒
    String timeMills = String.valueOf(ts * 1000);
    String tableName = jsonObject.getString("table");
    headers.put("timestamp", timeMills);
    headers.put("tableName", tableName);
    return event;
    }
    @Override
    public List<Event> intercept(List<Event> events) {
        for (Event event : events) {
            intercept(event);
        }
        return events;
    }
    @Override
    public void close() {
    }
    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new TimestampAndTableNameInterceptor ();
        }
        @Override
        public void configure(Context context) {
        }
    }
}

重新打包。

将打好的包放入到hadoop104的/opt/module/flume/lib文件夹下

(3)通道测试

(a)启动Zookeeper、Kafka集群

(b)启动hadoop104的Flume

(c)生成模拟数据

(d)观察HDFS上的目标路径是否有数据出现

若HDFS上的目标路径已有增量表的数据出现了,就证明数据通道已经打通。

(e)数据目标路径的日期说明

仔细观察,会发现目标路径中的日期,并非模拟数据的业务日期,而是当前日期。这是由于Maxwell输出的JSON字符串中的ts字段的值,是数据的变动日期。而真实场景下,数据的业务日期与变动日期应当是一致的。

(4)编写Flume启停脚本

在hadoop102节点的/home/atguigu/bin目录下创建脚本f3.sh

填写以下内容

#!/bin/bash
case $1 in
"start")
        echo " --------启动 hadoop104 业务数据flume-------"
        ssh hadoop104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_db.conf >/dev/null 2>&1 &"
;;
"stop")
        echo " --------停止 hadoop104 业务数据flume-------"
        ssh hadoop104 "ps -ef | grep kafka_to_hdfs_db | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac

增加脚本执行权限

chmod 777 f3.sh

3、MaxWell配置

1、Maxwell时间戳问题

720d8aa84a16487d81061ab7f1b2a424.png

修改Maxwell配置文件config.properties,增加mock_date参数,如下

log_level=info
producer=kafka
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092
#kafka topic配置
kafka_topic=topic_db
#注:该参数仅在maxwell教学版中存在,修改该参数后重启Maxwell才可生效
mock_date=2020-06-14
# mysql login info
host=hadoop102
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai

重启Maxwell

重新生成模拟数据

4、增量表首日全量同步

通常情况下,增量表需要在首日进行一次全量同步,后续每日再进行增量同步,首日全量同步可以使用Maxwell的bootstrap功能,方便起见,下面编写一个增量表首日全量同步脚本。

(1)在~/bin目录创建mysql_to_kafka_inc_init.sh

脚本内容如下

#!/bin/bash
# 该脚本的作用是初始化所有的增量表,只需执行一次
MAXWELL_HOME=/opt/module/maxwell
import_data() {
    $MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties
}
case $1 in
"cart_info")
  import_data cart_info
  ;;
"comment_info")
  import_data comment_info
  ;;
"coupon_use")
  import_data coupon_use
  ;;
"favor_info")
  import_data favor_info
  ;;
"order_detail")
  import_data order_detail
  ;;
"order_detail_activity")
  import_data order_detail_activity
  ;;
"order_detail_coupon")
  import_data order_detail_coupon
  ;;
"order_info")
  import_data order_info
  ;;
"order_refund_info")
  import_data order_refund_info
  ;;
"order_status_log")
  import_data order_status_log
  ;;
"payment_info")
  import_data payment_info
  ;;
"refund_payment")
  import_data refund_payment
  ;;
"user_info")
  import_data user_info
  ;;
"all")
  import_data cart_info
  import_data comment_info
  import_data coupon_use
  import_data favor_info
  import_data order_detail
  import_data order_detail_activity
  import_data order_detail_coupon
  import_data order_info
  import_data order_refund_info
  import_data order_status_log
  import_data payment_info
  import_data refund_payment
  import_data user_info
  ;;
esac

(2)为mysql_to_kafka_inc_init.sh增加执行权限

chmod 777 ~/bin/mysql_to_kafka_inc_init.sh

(3)测试同步脚本

(a)清理历史数据

hadoop fs -ls /origin_data/gmall/db | grep _inc | awk '{print $8}' | xargs hadoop fs -rm -r -f

(b)执行同步脚本

mysql_to_kafka_inc_init.sh all

(4)检查同步结果

观察HDFS上是否重新出现增量表数据。

2.3 采集通道启动/停止脚本

1、在/home/atguigu/bin目录下创建脚本cluster.sh

在脚本中填写如下内容

#!/bin/bash
case $1 in
"start"){
        echo ================== 启动 集群 ==================
        #启动 Zookeeper集群
        zk.sh start
        #启动 Hadoop集群
        hdp.sh start
        #启动 Kafka采集集群
        kf.sh start
        #启动采集 Flume
        f1.sh start
#启动日志消费 Flume
        f2.sh start
#启动业务消费 Flume
        f3.sh start
#启动 maxwell
        mxw.sh start
        };;
"stop"){
        echo ================== 停止 集群 ==================
#停止 Maxwell
        mxw.sh stop
#停止 业务消费Flume
        f3.sh stop
#停止 日志消费Flume
        f2.sh stop
#停止 日志采集Flume
        f1.sh stop
        #停止 Kafka采集集群
        kf.sh stop
        #停止 Hadoop集群
        hdp.sh stop
        #停止 Zookeeper集群
        zk.sh stop
};;
esac

2、增加脚本执行权限

chmod 777 cluster.sh

3、数仓环境准备

Hive的安装和部署

就是安装配置一下Hive就行

相关实践学习
数据库实验室挑战任务-初级任务
本场景介绍如何开通属于你的免费云数据库,在RDS-MySQL中完成对学生成绩的详情查询,执行指定类型SQL。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
2月前
练习手动立即同步,将深圳的数据同步到北京
练习手动立即同步,将深圳的数据同步到北京
17 0
|
2月前
|
关系型数据库 MySQL Java
flink cdc 同步问题之多表数据如何同步
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
107 0
|
2月前
|
存储 JSON 大数据
大数据离线数仓---金融审批数仓
大数据离线数仓---金融审批数仓
133 1
|
2月前
|
存储 关系型数据库 MySQL
数据同步大事务同步延迟
数据同步大事务同步延迟
26 6
|
2月前
|
SQL 分布式计算 DataWorks
DataWorks提供的数据同步类型不仅包括整库离线同步
【2月更文挑战第31天】DataWorks提供的数据同步类型不仅包括整库离线同步
22 8
|
2月前
|
存储 编解码 算法
【ffmpeg音视频同步】解决ffmpeg音视频中多线程之间的数据同步问题
【ffmpeg音视频同步】解决ffmpeg音视频中多线程之间的数据同步问题
41 2
|
2月前
|
SQL 分布式计算 DataWorks
DataWorks不仅提供单表离线模式,还支持多种数据同步任务类型。
【2月更文挑战第31天】DataWorks不仅提供单表离线模式,还支持多种数据同步任务类型。这些类型包括整库离线同步(一次性全量同步、周期性全量同步、离线全增量同步、一次性增量同步、周期性增量同步)以及一键实时同步(一次性全量同步,实时增量同步)。此外,DataWorks还提供了数据类型转换的功能,您可以选择在源端和目标端使用相同的数据类型以避免数据类型转换,或者在源端和目标端使用不同的数据类型,然后在同步时手动转换数据类型。
24 6
|
2月前
|
SQL API 数据库
flink cdc 同步问题之将Flink CDC 4.x中的数据同步到Doris如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
存储 SQL HIVE
金融审批数仓(离线)--DWD层、ADS层
金融审批数仓(离线)--DWD层、ADS层
|
3月前
|
SQL 分布式计算 数据库
离线数仓--大数据技术之DolphinScheduler
离线数仓--大数据技术之DolphinScheduler
154 2

热门文章

最新文章