实时数据及离线数据上云方案
1. 创建实验资源
开始实验之前,您需要先创建ECS实例资源。
- 在实验室页面,单击创建资源。
- (可选)在实验室页面左侧导航栏中,单击云产品资源列表,可查看本次实验资源相关信息(例如IP地址、用户信息等)。
说明 :资源创建过程需要3~5分钟。
2. 实时数据同步
3~11章节为实时数据同步操作步骤。
详细实验步骤,请点击下一页。
3. 前置条件(无需学员操作)
- 开启 MySQL Binlog
[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
- MySQL中创建canal账号
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
- 创建需要采集的表
CREATE DATABASE `canal_demo` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci; CREATE TABLE canal_demo.`student` ( `id` int(8) NOT NULL auto_increment comment '主键id', `name` varchar(64) comment '名字', `age` int(4) comment '名字', `gender` varchar(8) comment '名字', PRIMARY KEY (`id`) ) comment '学生表' ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
- CANAL的MySQL source配置(conf/example/instance.properties)
canal.instance.dbUsername = canal canal.instance.dbPassword = canal
4. MaxCompute中建表(用于接收CANAL采集的数据)
- 双击打开远程桌面的Chromium 浏览器。
注:因Firefox ESR浏览器版本较低,无法完全适配产品功能,因此建议全程实验使用Chromium 浏览器。
- 在RAM用户登录框中单击下一步,并复制粘贴页面左上角的子用户密码到用户密码输入框,单击登录。
登录成功后会跳转到阿里云控制台。
- 在阿里云控制台页面顶部的搜索框中,搜索并进入大数据开发治理平台DataWorks。
- 在左侧导航栏,单击工作空间列表。在工作空间列表页面顶部菜单栏中,选择资源所在地域,然后找到实验室分配的DataWorks资源,单击右侧操作列下的数据开发。
说明:你可在云产品资源列表中查看到DataWorks资源所在地域和项目名称。
- 在数据开发页面,单击临时查询。
- 在临时查询页面,单击 图标,选择新建>ODPS SQL。在新建节点对话框中,路径选择临时查询,名称输入canal,单击提交。
- 在canal节点编辑页面,输入如下建表语句,然后单击运行。
create table ods_student_canal_sink ( canal_info string comment 'canal采集信息 json字段' ) comment 'canal采集信息表' partitioned by (ds string,hh string,mm string);
5. 确认DataHub项目
- 阿里云控制台页面顶部的搜索框中,搜索DataHub,单击数据总线DataHub。
- 在概览页面左侧导航栏中,单击项目管理。
- 在项目管理页面,选择资源所在地域,然后实验室分配的数据总线资源,单击名称。
说明:您可在云产品资源列表中查看到数据总线资源的名称。
6. 新建Topic
- 在DataHub项目页面,单击新建Topic。
- 在新建Topic面板,参考下图配置参数,然后单击创建。
说明:关于拓展模式。
Kafka的Topic扩容方式和DataHub的topic扩容方式不同,为了适配Kafka的topic扩容方式,DataHub创建topic时需要将扩容方式选为扩展模式。扩展模式的topic,不再支持分裂/合并操作,而是添加shard的方式,暂不支持减少shard。
schema我们命名为canal_info,它canal采集的信息,是一个json。
7. DataHub数据同步配置
- 在DataHub项目页面的Topic列表页签,单击刚刚创建的Topic名称。
- 在Topic页面,单击同步。
- 在新建connector面板,单击MaxCompute。
- 在新建connector面板,参考如下说明填写参数,单击创建。
参数说明:
- Project名称:填写云产品资源列表中的DataWorks项目名称。
- Table名称:填写ods_student_canal_sink。
- AccessID:填写云产品资源列表中的AK ID。
- AccessKey Secret:填写云产品资源列表中的AK Sercret。
- Timestamp Unit:选择SECOND。
8. 进行CANAL中关于DataHub的配置
- 在实验室页面右侧,单击 图标,切换至Web Terminal,连接到ECS云服务器。
- 执行如下命令,进入canal目录。
cd /usr/local/canal/
- 执行如下命令,修改instance配置。
vim conf/example/instance.properties
- 按i键,进入编辑模式。将canal.mq.topic的参数值修改为Datahub项目名称.student。将canal.instance.master.address的参数值修改为MySQL ip:MySQL port。
说明:MySQL ip为云产品资源列表中云服务器ECS的弹性IP,MySQL port为3306。
- 按ESC键退出。输入:wq,按下回车键保存。
- 执行如下命令,同时重启MySQL。
/etc/init.d/mysql restart
- 执行如下命令,修改canal配置文件。
vim conf/canal.properties
- 按i键,进入编辑模式。将kafka.bootstrap.servers的参数值修改为VPC ECS Endpoint。 说明:这里的参数需要学员根据自己的地域进行配置。可参考下方Kafka域名列表。
Kafka域名列表
地区 |
Region |
外网Endpoint |
经典网络ECS Endpoint |
VPC ECS Endpoint |
华东1(杭州) |
cn-hangzhou |
dh-cn-hangzhou.aliyuncs.com:9092 |
dh-cn-hangzhou.aliyun-inc.com:9093 |
dh-cn-hangzhou-int-vpc.aliyuncs.com:9094 |
华东2(上海) |
cn-shanghai |
dh-cn-shanghai.aliyuncs.com:9092 |
dh-cn-shanghai.aliyun-inc.com:9093 |
dh-cn-shanghai-int-vpc.aliyuncs.com:9094 |
华北2(北京) |
cn-beijing |
dh-cn-beijing.aliyuncs.com:9092 |
dh-cn-beijing.aliyun-inc.com:9093 |
dh-cn-beijing-int-vpc.aliyuncs.com:9094 |
华南1(深圳) |
cn-shenzhen |
dh-cn-shenzhen.aliyuncs.com:9092 |
dh-cn-shenzhen.aliyun-inc.com:9093 |
dh-cn-shenzhen-int-vpc.aliyuncs.com:9094 |
华北3(张家口) |
cn-zhangjiakou |
dh-cn-zhangjiakou.aliyuncs.com:9092 |
dh-cn-zhangjiakou.aliyun-inc.com:9093 |
dh-cn-zhangjiakou-int-vpc.aliyuncs.com:9094 |
亚太东南1(新加坡) |
ap-southeast-1 |
dh-ap-southeast-1.aliyuncs.com:9092 |
dh-ap-southeast-1.aliyun-inc.com:9093 |
dh-ap-southeast-1-int-vpc.aliyuncs.com:9094 |
亚太东南3(吉隆坡) |
ap-southeast-3 |
dh-ap-southeast-3.aliyuncs.com:9092 |
dh-ap-southeast-3.aliyun-inc.com:9093 |
dh-ap-southeast-3-int-vpc.aliyuncs.com:9094 |
亚太南部1(孟买) |
ap-south-1 |
dh-ap-south-1.aliyuncs.com:9092 |
dh-ap-south-1.aliyun-inc.com:9093 |
dh-ap-south-1-int-vpc.aliyuncs.com:9094 |
欧洲中部1(法兰克福) |
eu-central-1 |
dh-eu-central-1.aliyuncs.com:9092 |
dh-eu-central-1.aliyun-inc.com:9093 |
dh-eu-central-1-int-vpc.aliyuncs.com:9094 |
上海金融云 |
cn-shanghai-finance-1 |
dh-cn-shanghai-finance-1.aliyuncs.com:9092 |
dh-cn-shanghai-finance-1.aliyun-inc.com:9093 |
dh-cn-shanghai-finance-1-int-vpc.aliyuncs.com:9094 |
中国香港 |
cn-hongkong |
dh-cn-hongkong.aliyuncs.com:9092 |
dh-cn-hongkong.aliyun-inc.com:9093 |
dh-cn-hongkong-int-vpc.aliyuncs.com:9094 |
- 按ESC键退出。输入:wq,按下回车键保存。
- 执行如下命令,修改jass配置文件 。
vim conf/kafka_client_producer_jaas.conf
- 按i键,进入编辑模式。将username和password的参数值修改为实验室分配的AK ID和AK Secret。
- 按ESC键退出。输入:wq,按下回车键保存。
9. 启动CANAL
经过上述七个步骤,我们的配置工作就完成了。
- 启动canal。
执行如下命令,先关闭,在启动。
bin/stop.sh bin/startup.sh
- 依次执行如下命令,查看日志。
tail -F -n 100 logs/canal/canal.log
ctrl+z退出日志查看。
tail -F -n 100 logs/example/example.log
ctrl+z退出日志查看。
tail -F -n 100 logs/example/meta.log
ctrl+z退出日志查看。
数据库的每次增删改操作,都会在meta.log中生成一条记录,查看该日志可以确认canal是否有采集到数据。
10. 查看采集数据
分别向MySQL student表中插入数据,修改数据,删除数据,清空表
- 执行如下命令,登录MySQL。
mysql -hlocalhost -P3306 -ucanal -pcanal -Dcanal_demo show tables;
- 执行如下SQL语句,插入数据。
insert into canal_demo.student (id,name, age, gender) values(1,'张三', 20, '男');
- 执行如下SQL语句,更改数据。
update canal_demo.student set age = 21 where id = 1;
- 执行如下SQL语句,删除数据。
delete from canal_demo.student where id = 1;
- 执行如下SQL语句,清空表。
truncate table canal_demo.student;
ctrl+z退出MySQL。
- 执行如下SQL语句,查看CANAL日志。
tail -F -n 100 logs/example/meta.log
返回结果如下,ctrl+z退出日志。
- 查看DataHub中Topic的数据量。
如果数据量大于0,证明已经采集到数据了。
- 耐心等待15分钟,你可以去刷小视频了。
- 查看MaxCompute的ods_student_canal_sink表中是否有数据产生。
进入dataworks,找到我们在步骤五第六步中的临时查询文件。
输入如下SQL语句,运行查询。
select * from ods_student_canal_sink where ds <> '';
返回结果如下,您会发现有数据产生。
11. 附:数据格式
本章节仅作数据格式展示,不需要操作。
- insert
{ "data":[ { "id":"1", "name":"张三", "age":"20", "gender":"男" } ], "database":"canal_demo", "es":1658286498000, "id":3, "isDdl":false, "mysqlType":{ "id":"int(8)", "name":"varchar(64)", "age":"int(4)", "gender":"varchar(8)" }, "old":null, "pkNames":[ "id" ], "sql":"", "sqlType":{ "id":4, "name":12, "age":4, "gender":12 }, "table":"student", "ts":1658286498272, "type":"INSERT" }
- update
{ "data":[ { "id":"1", "name":"张三", "age":"21", "gender":"男" } ], "database":"canal_demo", "es":1658286810000, "id":4, "isDdl":false, "mysqlType":{ "id":"int(8)", "name":"varchar(64)", "age":"int(4)", "gender":"varchar(8)" }, "old":[ { "age":"20" } ], "pkNames":[ "id" ], "sql":"", "sqlType":{ "id":4, "name":12, "age":4, "gender":12 }, "table":"student", "ts":1658286810709, "type":"UPDATE" }
- delete
{ "data":[ { "id":"1", "name":"张三", "age":"21", "gender":"男" } ], "database":"canal_demo", "es":1658286829000, "id":5, "isDdl":false, "mysqlType":{ "id":"int(8)", "name":"varchar(64)", "age":"int(4)", "gender":"varchar(8)" }, "old":null, "pkNames":[ "id" ], "sql":"", "sqlType":{ "id":4, "name":12, "age":4, "gender":12 }, "table":"student", "ts":1658286829637, "type":"DELETE" }
- truncate
{ "data":null, "database":"canal_demo", "es":1658286836000, "id":6, "isDdl":true, "mysqlType":null, "old":null, "pkNames":null, "sql":"truncate table canal_demo.student", "sqlType":null, "table":"student", "ts":1658286836381, "type":"TRUNCATE" }
12. 离线数据同步
13~17章节为离线数据同步操作步骤。
详细实验步骤,请点击下一页。
13. 离线数据同步前置知识
Sakila是一个在线 DVD 出租商店数据库,是MySQL 官方网站提供的示例数据库。
Sakila 数据库的模式结构如下图所示:
Sakila 数据库提供了以下数据表:
- actor,演员信息表。通过 film_actor 表和 film 表进行关联。
- film,电影信息表。film 引用了 language 表,同时被 film_category、film_actor 以及 inventory 表引用。
- film_actor,电影演员表。film 表和 actor 表之间的多对多关系。
- film_category,电影分类表。film 表和 category 表之间的多对多关系。
- category,分类表。通过 film_category 表和 film 表进行关联。
- film_text,电影描述表。包含了 film 表中的 film_id、title 以及 description 三个字段,通过 film 表上的触发器进行数据同步。
- language,语言信息表。language 表被 film 表引用。
- rental,租赁信息表,每个 DVD 每次被租赁的信息。引用了 inventory、customer 以及 staff 表,同时被 payment 表引用。
- payment,付款信息表。引用了 customer、staff 以及 rental 表。
- customer,客户信息表。引用了 address 和 store 表,同时被 payment 和 rental 表引用。
- staff,员工信息表。引用了 store 和 address 表,同时被 rental、payment 以及 store 表引用。
- store,商店信息表,引用了 staff 和 address 表,同时被 staff、customer 以及 inventory 表引用。
- address,地址信息表。其中主键字段 address_id 是 customer、staff 以及 store 表上的外键引用字段,同时引用了 city 表。
- city,城市信息表。引用了 country 表,同时被 address 表引用。
- country,国家信息表。country 表被 city 表引用。
- inventory,电影库存表。每部电影在不同商店里的库存,被 rental 表引用。
14. 数据源配置
- 进入DataWorks控制台,在左侧导航栏单击工作空间列表。在工作空间列表页面,选择资源所在地域,单击数据集成。
- 在数据集成页面,单击数据源。
- 在数据源管理页面,单击新增数据源。
- 在新增数据源对话框中,单击MySQL。
- 在新增MySQL数据源对话框中,参考如下说明配置数据源。
参数说明:
- 数据源类型:选择连接串模式。
- 数据源名称:输入imp_sakila。
- JDBC URL:输入jdbc:mysql://ECS弹性IP:3306/sakila。您可在云产品资源列表中查看ECS弹性IP。
- 用户名:输入canal。
- 密码:输入canal。
- 在新增MySQL数据源对话框中,单击更多选项。
- 在新增MySQL数据源对话框的公共资源组区域,单击测试连通性。
- 在新增MySQL数据源对话框中,待连接状态为可连通后,单击完成。
15. MaxCompute建表
- 切换到数据开发页面。
- 在临时查询页面,单击 图标,选择新建>ODPS SQL。在新建节点对话框中,路径选择临时查询,名称输入create_table,单击提交。
- 在create_table节点编辑页面,输入如下建表语句,然后单击运行。
-- 全量 CREATE TABLE IF NOT EXISTS ods_sakila_actor_df( actor_id BIGINT ,first_name STRING ,last_name STRING ,last_update DATETIME ,etl_date DATETIME ) PARTITIONED BY (ds STRING) lifecycle 60; --------------------------------------------------------------- CREATE TABLE IF NOT EXISTS ods_sakila_address_df( address_id BIGINT ,address STRING ,address2 STRING ,district STRING ,city_id BIGINT ,postal_code STRING ,phone STRING ,location STRING ,last_update DATETIME ,etl_date DATETIME ) PARTITIONED BY (ds STRING) lifecycle 60; --------------------------------------------------------------- CREATE TABLE IF NOT EXISTS ods_sakila_category_df( category_id BIGINT ,name STRING ,last_update DATETIME ,etl_date DATETIME ) PARTITIONED BY (ds STRING) lifecycle 60; --------------------------------------------------------------- CREATE TABLE IF NOT EXISTS ods_sakila_city_df( city_id BIGINT ,city STRING ,country_id BIGINT ,last_update DATETIME ,etl_date DATETIME ) PARTITIONED BY (ds STRING) lifecycle 60; --------------------------------------------------------------- CREATE TABLE IF NOT EXISTS ods_sakila_country_df( country_id BIGINT ,country STRING ,last_update DATETIME ,etl_date DATETIME ) PARTITIONED BY (ds STRING) lifecycle 60; --------------------------------------------------------------- CREATE TABLE IF NOT EXISTS ods_sakila_customer_df( customer_id BIGINT ,store_id BIGINT ,first_name STRING ,last_name STRING ,email STRING ,address_id BIGINT ,active BIGINT ,create_date DATETIME ,last_update DATETIME ,etl_date DATETIME ) PARTITIONED BY (ds STRING) lifecycle 60; --------------------------------------------------------------- CREATE TABLE IF NOT EXISTS ods_sakila_film_df( film_id BIGINT ,title STRING ,description STRING ,release_year STRING ,language_id BIGINT ,original_language_id BIGINT ,rental_duration BIGINT ,rental_rate DECIMAL ,length BIGINT ,replacement_cost DECIMAL ,rating STRING ,special_features STRING ,last_update DATETIME ,etl_date DATETIME ) PARTITIONED BY (ds STRING) lifecycle 60; --------------------------------------------------------------- CREATE TABLE IF NOT EXISTS ods_sakila_film_actor_df( actor_id BIGINT ,film_id BIGINT ,last_update DATETIME ,etl_date DATETIME ) PARTITIONED BY (ds STRING) lifecycle 60; --------------------------------------------------------------- CREATE TABLE IF NOT EXISTS ods_sakila_film_category_df( film_id BIGINT ,category_id BIGINT ,last_update DATETIME ,etl_date DATETIME ) PARTITIONED BY (ds STRING) lifecycle 60; --------------------------------------------------------------- CREATE TABLE IF NOT EXISTS ods_sakila_film_text_df( film_id BIGINT ,title STRING ,description STRING ,etl_date DATETIME ) PARTITIONED BY (ds STRING) lifecycle 60; --------------------------------------------------------------- CREATE TABLE IF NOT EXISTS ods_sakila_inventory_df( inventory_id BIGINT ,film_id BIGINT ,store_id BIGINT ,last_update DATETIME ,etl_date DATETIME ) PARTITIONED BY (ds STRING) lifecycle 60; --------------------------------------------------------------- CREATE TABLE IF NOT EXISTS ods_sakila_language_df( language_id BIGINT ,name STRING ,last_update DATETIME ,etl_date DATETIME ) PARTITIONED BY (ds STRING) lifecycle 60; --------------------------------------------------------------- CREATE TABLE IF NOT EXISTS ods_sakila_payment_df( payment_id BIGINT ,customer_id BIGINT ,staff_id BIGINT ,rental_id BIGINT ,amount DECIMAL ,payment_date DATETIME ,last_update DATETIME ,etl_date DATETIME ) PARTITIONED BY (ds STRING) lifecycle 60; --------------------------------------------------------------- CREATE TABLE IF NOT EXISTS ods_sakila_staff_df( staff_id BIGINT ,first_name STRING ,last_name STRING ,address_id BIGINT ,picture STRING ,email STRING ,store_id BIGINT ,active BIGINT ,username STRING ,password STRING ,last_update DATETIME ,etl_date DATETIME ) PARTITIONED BY (ds STRING) lifecycle 60; --------------------------------------------------------------- CREATE TABLE IF NOT EXISTS ods_sakila_store_df( store_id BIGINT ,manager_staff_id BIGINT ,address_id BIGINT ,last_update DATETIME ,etl_date DATETIME ) PARTITIONED BY (ds STRING) lifecycle 60; --------------------------------------------------------------- -- 增量 CREATE TABLE IF NOT EXISTS stg_sakila_rental_di( rental_id BIGINT ,rental_date DATETIME ,inventory_id BIGINT ,customer_id BIGINT ,return_date DATETIME ,staff_id BIGINT ,last_update DATETIME ,etl_date DATETIME ) PARTITIONED BY (ds STRING) lifecycle 60; --------------------------------------------------------------- CREATE TABLE IF NOT EXISTS ods_sakila_rental_df( rental_id BIGINT ,rental_date DATETIME ,inventory_id BIGINT ,customer_id BIGINT ,return_date DATETIME ,staff_id BIGINT ,last_update DATETIME ,etl_date DATETIME ) PARTITIONED BY (ds STRING) lifecycle 60; ---------------------------------------------------------------
- 在参数对话框中,单击确定。
- 在费用预估对话框中,单击运行。
16. 全量数据上云
全量数据上云一共15张表,以actor表上云为例。
- 在数据开发页面,选择新建>新建业务流程。
- 在新建业务流程对话框中,业务名称输入全量上云,单击新建。
- 在数据开发页面,右键全量上云,选择新建节点>离线同步。
- 在新建节点对话框中,名称输入imp_ods_sakila_actor_df,单击提交。
说明:全量表上云任务命名规范:imp_ods_${database_name}_${table_name}_df,因此actor表上云任务命名为imp_ods_sakila_actor_df。
注意:若创建完成,出现此操纵页面建议按照数据源配置流程进行数据源检查,若数据源连通性为正常,则进行以下操作:
a.数据来源选择Mysql,数据源名称选择imp_sakila
b.选择《更多选项》-《公共资源组(调试资源组)》
c.数据去向选择MaxCompute,数据源名称选择odps_first
d.单击下一步,继续按照后续步骤进行操作。
- 在imp_ods_sakila_actor_df离线同步节点的编辑页面,根据下图选择数据源。
- 在imp_ods_sakila_actor_df离线同步节点的编辑页面,根据下图配置字段映射。
- 在imp_ods_sakila_actor_df离线同步节点的编辑页面右侧,单击数据集成资源组配置。
- 在数据集成资源组配置面板,单击更多选项。
- 在警告对话框中,单击确认,
- 在数据集成资源组配置面板,单击调试资源组。
- 在imp_ods_sakila_actor_df离线同步节点的编辑页面,单击 保存图标,然后单击 带参数运行图标。
- 在参数对话框中,单击确定。
返回
- 根据上述3~12步骤操作,完成剩余14张表(address、category、city、country、customer、film、film_actor、film_category、film_text、inventory、language、payment、staff、store)。
- 全量上云任务总览。
- 数据查看。
在create_table临时查询编辑页面,输入如下语句,单击带参运行。
select 'ods_sakila_actor_df' as table_name, count(1) as record_num from ods_sakila_actor_df where ds = ${bizdate} union all select 'ods_sakila_address_df' as table_name, count(1) as record_num from ods_sakila_address_df where ds = ${bizdate} union all select 'ods_sakila_category_df' as table_name, count(1) as record_num from ods_sakila_category_df where ds = ${bizdate} union all select 'ods_sakila_city_df' as table_name, count(1) as record_num from ods_sakila_city_df where ds = ${bizdate} union all select 'ods_sakila_country_df' as table_name, count(1) as record_num from ods_sakila_country_df where ds = ${bizdate} union all select 'ods_sakila_customer_df' as table_name, count(1) as record_num from ods_sakila_customer_df where ds = ${bizdate} union all select 'ods_sakila_film_df' as table_name, count(1) as record_num from ods_sakila_film_df where ds = ${bizdate} union all select 'ods_sakila_film_actor_df' as table_name, count(1) as record_num from ods_sakila_film_actor_df where ds = ${bizdate} union all select 'ods_sakila_film_category_df' as table_name, count(1) as record_num from ods_sakila_film_category_df where ds = ${bizdate} union all select 'ods_sakila_film_text_df' as table_name, count(1) as record_num from ods_sakila_film_text_df where ds = ${bizdate} union all select 'ods_sakila_inventory_df' as table_name, count(1) as record_num from ods_sakila_inventory_df where ds = ${bizdate} union all select 'ods_sakila_language_df' as table_name, count(1) as record_num from ods_sakila_language_df where ds = ${bizdate} union all select 'ods_sakila_payment_df' as table_name, count(1) as record_num from ods_sakila_payment_df where ds = ${bizdate} union all select 'ods_sakila_staff_df' as table_name, count(1) as record_num from ods_sakila_staff_df where ds = ${bizdate} union all select 'ods_sakila_store_df' as table_name, count(1) as record_num from ods_sakila_store_df where ds = ${bizdate} ;
返回结果如下所示。
- 注意、注意、注意
重新打开离线同步节点的时候,getdate()函数会消失,需要重新添加一遍。
17. 增量数据上云
- 增量数据上云说明。
增量数据上云包含三部分操作:
A:全量初始化。
B:增量抽取。
C:全量merge。
- 在数据开发页面,选择新建>新建业务流程。
- 在新建业务流程对话框中,业务名称输入增量上云,单击新建。
- 在数据开发页面,右键增量上云,选择新建节点>离线同步。
- 在新建节点对话框中,名称输入imp_stg_sakila_rental_di,单击提交。
- 在imp_stg_sakila_rental_di离线同步节点的编辑页面,根据下图配置全量初始化(参照全量上云)。
- 在imp_stg_sakila_rental_di离线同步节点的编辑页面,单击 保存图标,然后单击 带参数运行图标。
- 在参数对话框中,选择bizdate,单击确定。
说明:例如bizdate是20220723,初始化应该是bizdate的前一天,因此运行选择的日期为20220722。
到这里,我们的全量初始化工作就完成了。
- 模拟数据变化。
9.1 切换到Web Terminal页面。执行如下命令,服务器登录MySQL。
mysql -hlocalhost -P3306 -ucanal -pcanal -Dsakila
9.2 执行如下命令,模拟数据变化。
update rental set last_update = '2022-07-23 11:11:11' where mod(rental_id,100) = 0;
- 增量抽取。
在imp_stg_sakila_rental_di离线同步节点的编辑页面,修改imp_stg_sakila_rental_di任务,在数据过滤中添加date_format(last_update,'%Y%m%d')='${bizdate}'。
- 在imp_stg_sakila_rental_di离线同步节点的编辑页面,,单击 保存图标,然后单击 带参数运行图标。
- 查看数据。
在create_table临时查询编辑页面,输入如下语句,单击运行。
select 'stg_sakila_rental_di' as table_name,count(1) as record_num from stg_sakila_rental_di where ds = '20220723' union all select 'ods_sakila_rental_df' as table_name,count(1) as record_num from ods_sakila_rental_df where ds = '20220722'
到这里,我们的增量抽取工作就完成了。
- 全量merge。
13.1 新建opds节点。
13.2 全量merge。
在ods_sakila_rental_df节点编辑页面,输入以下语句,单击带参运行。
INSERT overwrite table ods_sakila_rental_df partition(ds = ${bizdate}) SELECT a.rental_id as rental_id ,a.rental_date as rental_date ,a.inventory_id as inventory_id ,a.customer_id as customer_id ,a.return_date as return_date ,a.staff_id as staff_id ,a.last_update as last_update ,getdate() as etl_date FROM ods_sakila_rental_df a --全量表 LEFT ANTI JOIN stg_sakila_rental_di b --增量表 ON a.rental_id = b.rental_id AND b.ds = '${bizdate}' WHERE a.ds = '${last_day}' UNION ALL SELECT b.rental_id as rental_id ,b.rental_date as rental_date ,b.inventory_id as inventory_id ,b.customer_id as customer_id ,b.return_date as return_date ,b.staff_id as staff_id ,b.last_update as last_update ,getdate() as etl_date FROM stg_sakila_rental_di b WHERE b.ds = '${bizdate}' ;
- 查看数据。
在ods_sakila_rental_df节点编辑页面,输入以下语句,单击带参运行。
select distinct last_update from ods_sakila_rental_df where ds = ${bizdate};
到这里,我们的全量初始化工作就完成了。
实验链接:https://developer.aliyun.com/adc/scenario/8650c921ec8b4851b2f2ab3964b7588f