实时数据及离线数据上云方案

简介: 本实验通过使用CANAL、DataHub、DataWorks、MaxCompute服务,实现数据上云,解决了数据孤岛问题,同时把数据迁移到云计算平台,对后续数据的计算和应用提供了第一步开山之路。

实时数据及离线数据上云方案

1. 创建实验资源

开始实验之前,您需要先创建ECS实例资源。

  1. 在实验室页面,单击创建资源
  2. (可选)在实验室页面左侧导航栏中,单击云产品资源列表,可查看本次实验资源相关信息(例如IP地址、用户信息等)。

说明 :资源创建过程需要3~5分钟。


2. 实时数据同步

3~11章节为实时数据同步操作步骤。

详细实验步骤,请点击下一页。


3. 前置条件(无需学员操作)

  1. 开启 MySQL Binlog
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
  1. MySQL中创建canal账号
CREATE USER canal IDENTIFIED BY 'canal'; 
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
  1. 创建需要采集的表
CREATE DATABASE `canal_demo` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;
CREATE TABLE canal_demo.`student` (
`id`  int(8) NOT NULL auto_increment comment '主键id',
`name`  varchar(64)    comment '名字',
`age`   int(4)     comment '名字',
`gender`  varchar(8)       comment '名字',
PRIMARY KEY (`id`)
) comment '学生表' ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
  1. CANAL的MySQL source配置(conf/example/instance.properties)
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal


4. MaxCompute中建表(用于接收CANAL采集的数据)

  1. 双击打开远程桌面的Chromium 浏览器

注:因Firefox ESR浏览器版本较低,无法完全适配产品功能,因此建议全程实验使用Chromium 浏览器。

  1. 在RAM用户登录框中单击下一步,并复制粘贴页面左上角的子用户密码用户密码输入框,单击登录

登录成功后会跳转到阿里云控制台。

  1. 在阿里云控制台页面顶部的搜索框中,搜索并进入大数据开发治理平台DataWorks

  1. 在左侧导航栏,单击工作空间列表。在工作空间列表页面顶部菜单栏中,选择资源所在地域,然后找到实验室分配的DataWorks资源,单击右侧操作列下的数据开发

说明:你可在云产品资源列表中查看到DataWorks资源所在地域和项目名称。

  1. 数据开发页面,单击临时查询

  1. 临时查询页面,单击 图标,选择新建>ODPS SQL。在新建节点对话框中,路径选择临时查询名称输入canal,单击提交

  1. 在canal节点编辑页面,输入如下建表语句,然后单击运行
create table ods_student_canal_sink
(
canal_info string comment 'canal采集信息 json字段'
) comment 'canal采集信息表'
partitioned by (ds string,hh string,mm string);


5. 确认DataHub项目

  1. 阿里云控制台页面顶部的搜索框中,搜索DataHub,单击数据总线DataHub。

  1. 概览页面左侧导航栏中,单击项目管理

  1. 项目管理页面,选择资源所在地域,然后实验室分配的数据总线资源,单击名称

说明:您可在云产品资源列表中查看到数据总线资源的名称。


6. 新建Topic

  1. 在DataHub项目页面,单击新建Topic

  1. 新建Topic面板,参考下图配置参数,然后单击创建

说明:关于拓展模式。

Kafka的Topic扩容方式和DataHub的topic扩容方式不同,为了适配Kafka的topic扩容方式,DataHub创建topic时需要将扩容方式选为扩展模式。扩展模式的topic,不再支持分裂/合并操作,而是添加shard的方式,暂不支持减少shard。

schema我们命名为canal_info,它canal采集的信息,是一个json。


7. DataHub数据同步配置

  1. 在DataHub项目页面的Topic列表页签,单击刚刚创建的Topic名称。

  1. 在Topic页面,单击同步

  1. 新建connector面板,单击MaxCompute

  1. 新建connector面板,参考如下说明填写参数,单击创建

参数说明:

  • Project名称:填写云产品资源列表中的DataWorks项目名称。
  • Table名称:填写ods_student_canal_sink。
  • AccessID:填写云产品资源列表中的AK ID。
  • AccessKey Secret:填写云产品资源列表中的AK Sercret。
  • Timestamp Unit:选择SECOND。


8. 进行CANAL中关于DataHub的配置

  1. 在实验室页面右侧,单击 图标,切换至Web Terminal,连接到ECS云服务器。

  1. 执行如下命令,进入canal目录。
cd /usr/local/canal/

  1. 执行如下命令,修改instance配置。
vim conf/example/instance.properties
  1. i键,进入编辑模式。将canal.mq.topic的参数值修改为Datahub项目名称.student。将canal.instance.master.address的参数值修改为MySQL ip:MySQL port。

说明:MySQL ip为云产品资源列表中云服务器ECS的弹性IP,MySQL port为3306。

  1. ESC键退出。输入:wq,按下回车键保存。
  2. 执行如下命令,同时重启MySQL。
/etc/init.d/mysql restart

  1. 执行如下命令,修改canal配置文件。
vim conf/canal.properties
  1. i键,进入编辑模式。将kafka.bootstrap.servers的参数值修改为VPC ECS Endpoint。 说明:这里的参数需要学员根据自己的地域进行配置。可参考下方Kafka域名列表。

Kafka域名列表

地区

Region

外网Endpoint

经典网络ECS Endpoint

VPC ECS Endpoint

华东1(杭州)

cn-hangzhou

dh-cn-hangzhou.aliyuncs.com:9092

dh-cn-hangzhou.aliyun-inc.com:9093

dh-cn-hangzhou-int-vpc.aliyuncs.com:9094

华东2(上海)

cn-shanghai

dh-cn-shanghai.aliyuncs.com:9092

dh-cn-shanghai.aliyun-inc.com:9093

dh-cn-shanghai-int-vpc.aliyuncs.com:9094

华北2(北京)

cn-beijing

dh-cn-beijing.aliyuncs.com:9092

dh-cn-beijing.aliyun-inc.com:9093

dh-cn-beijing-int-vpc.aliyuncs.com:9094

华南1(深圳)

cn-shenzhen

dh-cn-shenzhen.aliyuncs.com:9092

dh-cn-shenzhen.aliyun-inc.com:9093

dh-cn-shenzhen-int-vpc.aliyuncs.com:9094

华北3(张家口)

cn-zhangjiakou

dh-cn-zhangjiakou.aliyuncs.com:9092

dh-cn-zhangjiakou.aliyun-inc.com:9093

dh-cn-zhangjiakou-int-vpc.aliyuncs.com:9094

亚太东南1(新加坡)

ap-southeast-1

dh-ap-southeast-1.aliyuncs.com:9092

dh-ap-southeast-1.aliyun-inc.com:9093

dh-ap-southeast-1-int-vpc.aliyuncs.com:9094

亚太东南3(吉隆坡)

ap-southeast-3

dh-ap-southeast-3.aliyuncs.com:9092

dh-ap-southeast-3.aliyun-inc.com:9093

dh-ap-southeast-3-int-vpc.aliyuncs.com:9094

亚太南部1(孟买)

ap-south-1

dh-ap-south-1.aliyuncs.com:9092

dh-ap-south-1.aliyun-inc.com:9093

dh-ap-south-1-int-vpc.aliyuncs.com:9094

欧洲中部1(法兰克福)

eu-central-1

dh-eu-central-1.aliyuncs.com:9092

dh-eu-central-1.aliyun-inc.com:9093

dh-eu-central-1-int-vpc.aliyuncs.com:9094

上海金融云

cn-shanghai-finance-1

dh-cn-shanghai-finance-1.aliyuncs.com:9092

dh-cn-shanghai-finance-1.aliyun-inc.com:9093

dh-cn-shanghai-finance-1-int-vpc.aliyuncs.com:9094

中国香港

cn-hongkong

dh-cn-hongkong.aliyuncs.com:9092

dh-cn-hongkong.aliyun-inc.com:9093

dh-cn-hongkong-int-vpc.aliyuncs.com:9094

  1. ESC键退出。输入:wq,按下回车键保存。
  2. 执行如下命令,修改jass配置文件 。
vim conf/kafka_client_producer_jaas.conf
  1. i键,进入编辑模式。将username和password的参数值修改为实验室分配的AK ID和AK Secret。

  1. ESC键退出。输入:wq,按下回车键保存。


9. 启动CANAL

经过上述七个步骤,我们的配置工作就完成了。

  1. 启动canal。

执行如下命令,先关闭,在启动。

bin/stop.sh
bin/startup.sh

  1. 依次执行如下命令,查看日志。
tail -F -n 100 logs/canal/canal.log

ctrl+z退出日志查看。

tail -F -n 100 logs/example/example.log

ctrl+z退出日志查看。

tail -F -n 100 logs/example/meta.log

ctrl+z退出日志查看。

数据库的每次增删改操作,都会在meta.log中生成一条记录,查看该日志可以确认canal是否有采集到数据。


10. 查看采集数据

分别向MySQL student表中插入数据,修改数据,删除数据,清空表

  1. 执行如下命令,登录MySQL。
mysql -hlocalhost -P3306 -ucanal -pcanal -Dcanal_demo
show tables;

  1. 执行如下SQL语句,插入数据。
insert into canal_demo.student (id,name, age, gender) values(1,'张三', 20, '男');

  1. 执行如下SQL语句,更改数据。
update canal_demo.student set age = 21 where id = 1;

  1. 执行如下SQL语句,删除数据。
delete from canal_demo.student where id = 1;

  1. 执行如下SQL语句,清空表。
truncate table canal_demo.student;

ctrl+z退出MySQL。

  1. 执行如下SQL语句,查看CANAL日志。
tail -F -n 100 logs/example/meta.log

返回结果如下,ctrl+z退出日志。

  1. 查看DataHub中Topic的数据量。

如果数据量大于0,证明已经采集到数据了。

  1. 耐心等待15分钟,你可以去刷小视频了。
  2. 查看MaxCompute的ods_student_canal_sink表中是否有数据产生。

进入dataworks,找到我们在步骤五第六步中的临时查询文件。

输入如下SQL语句,运行查询。

select * from ods_student_canal_sink where ds <> '';

返回结果如下,您会发现有数据产生。


11. 附:数据格式

本章节仅作数据格式展示,不需要操作。

  1. insert
{
    "data":[
{
"id":"1",
"name":"张三",
"age":"20",
"gender":"男"
}
],
    "database":"canal_demo",
    "es":1658286498000,
    "id":3,
    "isDdl":false,
    "mysqlType":{
"id":"int(8)",
"name":"varchar(64)",
"age":"int(4)",
"gender":"varchar(8)"
},
    "old":null,
    "pkNames":[
"id"
],
    "sql":"",
    "sqlType":{
"id":4,
"name":12,
"age":4,
"gender":12
},
    "table":"student",
    "ts":1658286498272,
    "type":"INSERT"
}
  1. update
{
"data":[
{
"id":"1",
"name":"张三",
"age":"21",
"gender":"男"
}
],
"database":"canal_demo",
"es":1658286810000,
"id":4,
"isDdl":false,
"mysqlType":{
"id":"int(8)",
"name":"varchar(64)",
"age":"int(4)",
"gender":"varchar(8)"
},
"old":[
{
"age":"20"
}
],
"pkNames":[
"id"
],
"sql":"",
"sqlType":{
"id":4,
"name":12,
"age":4,
"gender":12
},
"table":"student",
"ts":1658286810709,
"type":"UPDATE"
}
  1. delete
{
    "data":[
{
"id":"1",
"name":"张三",
"age":"21",
"gender":"男"
}
],
    "database":"canal_demo",
    "es":1658286829000,
    "id":5,
    "isDdl":false,
    "mysqlType":{
"id":"int(8)",
"name":"varchar(64)",
"age":"int(4)",
"gender":"varchar(8)"
},
    "old":null,
    "pkNames":[
"id"
],
    "sql":"",
    "sqlType":{
"id":4,
"name":12,
"age":4,
"gender":12
},
    "table":"student",
    "ts":1658286829637,
    "type":"DELETE"
}
  1. truncate
{
    "data":null,
    "database":"canal_demo",
    "es":1658286836000,
    "id":6,
    "isDdl":true,
    "mysqlType":null,
    "old":null,
    "pkNames":null,
    "sql":"truncate table canal_demo.student",
    "sqlType":null,
    "table":"student",
    "ts":1658286836381,
    "type":"TRUNCATE"
}


12. 离线数据同步

13~17章节为离线数据同步操作步骤。

详细实验步骤,请点击下一页。


13. 离线数据同步前置知识

Sakila是一个在线 DVD 出租商店数据库,是MySQL 官方网站提供的示例数据库。

Sakila 数据库的模式结构如下图所示:

Sakila 数据库提供了以下数据表:

  • actor,演员信息表。通过 film_actor 表和 film 表进行关联。
  • film,电影信息表。film 引用了 language 表,同时被 film_category、film_actor 以及 inventory 表引用。
  • film_actor,电影演员表。film 表和 actor 表之间的多对多关系。
  • film_category,电影分类表。film 表和 category 表之间的多对多关系。
  • category,分类表。通过 film_category 表和 film 表进行关联。
  • film_text,电影描述表。包含了 film 表中的 film_id、title 以及 description 三个字段,通过 film 表上的触发器进行数据同步。
  • language,语言信息表。language 表被 film 表引用。
  • rental,租赁信息表,每个 DVD 每次被租赁的信息。引用了 inventory、customer 以及 staff 表,同时被 payment 表引用。
  • payment,付款信息表。引用了 customer、staff 以及 rental 表。
  • customer,客户信息表。引用了 address 和 store 表,同时被 payment 和 rental 表引用。
  • staff,员工信息表。引用了 store 和 address 表,同时被 rental、payment 以及 store 表引用。
  • store,商店信息表,引用了 staff 和 address 表,同时被 staff、customer 以及 inventory 表引用。
  • address,地址信息表。其中主键字段 address_id 是 customer、staff 以及 store 表上的外键引用字段,同时引用了 city 表。
  • city,城市信息表。引用了 country 表,同时被 address 表引用。
  • country,国家信息表。country 表被 city 表引用。
  • inventory,电影库存表。每部电影在不同商店里的库存,被 rental 表引用。


14. 数据源配置

  1. 进入DataWorks控制台,在左侧导航栏单击工作空间列表。在工作空间列表页面,选择资源所在地域,单击数据集成

  1. 数据集成页面,单击数据源

  1. 数据源管理页面,单击新增数据源

  1. 新增数据源对话框中,单击MySQL

  1. 新增MySQL数据源对话框中,参考如下说明配置数据源。

参数说明:

  • 数据源类型:选择连接串模式。
  • 数据源名称:输入imp_sakila。
  • JDBC URL:输入jdbc:mysql://ECS弹性IP:3306/sakila。您可在云产品资源列表中查看ECS弹性IP。
  • 用户名:输入canal。
  • 密码:输入canal。

  1. 新增MySQL数据源对话框中,单击更多选项

  1. 新增MySQL数据源对话框的公共资源组区域,单击测试连通性

  1. 新增MySQL数据源对话框中,待连接状态为可连通后,单击完成


15. MaxCompute建表

  1. 切换到数据开发页面。

  1. 临时查询页面,单击 图标,选择新建>ODPS SQL。在新建节点对话框中,路径选择临时查询名称输入create_table,单击提交

  1. 在create_table节点编辑页面,输入如下建表语句,然后单击运行
-- 全量
CREATE TABLE IF NOT EXISTS ods_sakila_actor_df(
actor_id BIGINT
,first_name STRING
,last_name STRING
,last_update DATETIME
,etl_date DATETIME
)
PARTITIONED BY (ds STRING) 
lifecycle 60;
---------------------------------------------------------------
CREATE TABLE IF NOT EXISTS ods_sakila_address_df(
address_id BIGINT
,address STRING
,address2 STRING
,district STRING
,city_id BIGINT
,postal_code STRING
,phone STRING
,location STRING
,last_update DATETIME 
,etl_date DATETIME
)
PARTITIONED BY (ds STRING) 
lifecycle 60;
---------------------------------------------------------------
CREATE TABLE IF NOT EXISTS ods_sakila_category_df(
category_id BIGINT
,name STRING
,last_update DATETIME
,etl_date DATETIME
)
PARTITIONED BY (ds STRING) 
lifecycle 60;
---------------------------------------------------------------
CREATE TABLE IF NOT EXISTS ods_sakila_city_df(
city_id BIGINT
,city STRING
,country_id BIGINT
,last_update DATETIME
,etl_date DATETIME
)
PARTITIONED BY (ds STRING) 
lifecycle 60;
---------------------------------------------------------------
CREATE TABLE IF NOT EXISTS ods_sakila_country_df(
country_id BIGINT
,country STRING
,last_update DATETIME
,etl_date DATETIME
)
PARTITIONED BY (ds STRING) 
lifecycle 60;
---------------------------------------------------------------
CREATE TABLE IF NOT EXISTS ods_sakila_customer_df(
customer_id BIGINT
,store_id BIGINT
,first_name STRING
,last_name STRING
,email STRING
,address_id BIGINT
,active BIGINT
,create_date DATETIME
,last_update DATETIME
,etl_date DATETIME
)
PARTITIONED BY (ds STRING) 
lifecycle 60;
---------------------------------------------------------------
CREATE TABLE IF NOT EXISTS ods_sakila_film_df(
film_id BIGINT
,title STRING
,description STRING
,release_year STRING
,language_id BIGINT
,original_language_id BIGINT
,rental_duration BIGINT
,rental_rate DECIMAL
,length BIGINT
,replacement_cost DECIMAL
,rating STRING
,special_features STRING
,last_update DATETIME
,etl_date DATETIME
)
PARTITIONED BY (ds STRING) 
lifecycle 60;
---------------------------------------------------------------
CREATE TABLE IF NOT EXISTS ods_sakila_film_actor_df(
actor_id BIGINT
,film_id BIGINT
,last_update DATETIME
,etl_date DATETIME
)
PARTITIONED BY (ds STRING) 
lifecycle 60;
---------------------------------------------------------------
CREATE TABLE IF NOT EXISTS ods_sakila_film_category_df(
film_id BIGINT
,category_id BIGINT
,last_update DATETIME
,etl_date DATETIME
)
PARTITIONED BY (ds STRING) 
lifecycle 60;
---------------------------------------------------------------
CREATE TABLE IF NOT EXISTS ods_sakila_film_text_df(
film_id BIGINT
,title STRING
,description STRING
,etl_date DATETIME
)
PARTITIONED BY (ds STRING) 
lifecycle 60;
---------------------------------------------------------------
CREATE TABLE IF NOT EXISTS ods_sakila_inventory_df(
inventory_id BIGINT
,film_id BIGINT
,store_id BIGINT
,last_update DATETIME
,etl_date DATETIME
)
PARTITIONED BY (ds STRING) 
lifecycle 60;
---------------------------------------------------------------
CREATE TABLE IF NOT EXISTS ods_sakila_language_df(
language_id BIGINT
,name STRING
,last_update DATETIME
,etl_date DATETIME
)
PARTITIONED BY (ds STRING) 
lifecycle 60;
---------------------------------------------------------------
CREATE TABLE IF NOT EXISTS ods_sakila_payment_df(
payment_id BIGINT
,customer_id BIGINT
,staff_id BIGINT
,rental_id BIGINT
,amount DECIMAL
,payment_date DATETIME
,last_update DATETIME
,etl_date DATETIME
)
PARTITIONED BY (ds STRING) 
lifecycle 60;
---------------------------------------------------------------
CREATE TABLE IF NOT EXISTS ods_sakila_staff_df(
staff_id BIGINT
,first_name STRING
,last_name STRING
,address_id BIGINT
,picture STRING
,email STRING
,store_id BIGINT
,active BIGINT
,username STRING
,password STRING
,last_update DATETIME
,etl_date DATETIME
)
PARTITIONED BY (ds STRING) 
lifecycle 60;
---------------------------------------------------------------
CREATE TABLE IF NOT EXISTS ods_sakila_store_df(
store_id BIGINT
,manager_staff_id BIGINT
,address_id BIGINT
,last_update DATETIME
,etl_date DATETIME
)
PARTITIONED BY (ds STRING) 
lifecycle 60;
---------------------------------------------------------------
-- 增量
CREATE TABLE IF NOT EXISTS stg_sakila_rental_di(
rental_id BIGINT 
,rental_date DATETIME 
,inventory_id BIGINT 
,customer_id BIGINT 
,return_date DATETIME 
,staff_id BIGINT 
,last_update DATETIME 
,etl_date DATETIME
)
PARTITIONED BY (ds STRING) 
lifecycle 60;
---------------------------------------------------------------
CREATE TABLE IF NOT EXISTS ods_sakila_rental_df(
rental_id BIGINT 
,rental_date DATETIME 
,inventory_id BIGINT 
,customer_id BIGINT 
,return_date DATETIME 
,staff_id BIGINT 
,last_update DATETIME 
,etl_date DATETIME
)
PARTITIONED BY (ds STRING) 
lifecycle 60;
---------------------------------------------------------------

  1. 参数对话框中,单击确定

  1. 费用预估对话框中,单击运行


16. 全量数据上云

全量数据上云一共15张表,以actor表上云为例。

  1. 数据开发页面,选择新建>新建业务流程

  1. 新建业务流程对话框中,业务名称输入全量上云,单击新建

  1. 数据开发页面,右键全量上云,选择新建节点>离线同步

  1. 新建节点对话框中,名称输入imp_ods_sakila_actor_df,单击提交

说明:全量表上云任务命名规范:imp_ods_${database_name}_${table_name}_df,因此actor表上云任务命名为imp_ods_sakila_actor_df。

注意:若创建完成,出现此操纵页面建议按照数据源配置流程进行数据源检查,若数据源连通性为正常,则进行以下操作:

a.数据来源选择Mysql,数据源名称选择imp_sakila

b.选择《更多选项》-《公共资源组(调试资源组)》

c.数据去向选择MaxCompute,数据源名称选择odps_first

d.单击下一步,继续按照后续步骤进行操作。

  1. imp_ods_sakila_actor_df离线同步节点的编辑页面,根据下图选择数据源。

  1. imp_ods_sakila_actor_df离线同步节点的编辑页面,根据下图配置字段映射。

  1. imp_ods_sakila_actor_df离线同步节点的编辑页面右侧,单击数据集成资源组配置

  1. 数据集成资源组配置面板,单击更多选项

  1. 警告对话框中,单击确认

  1. 数据集成资源组配置面板,单击调试资源组

  1. imp_ods_sakila_actor_df离线同步节点的编辑页面,单击 保存图标,然后单击 带参数运行图标。

  1. 参数对话框中,单击确定

返回

  1. 根据上述3~12步骤操作,完成剩余14张表(address、category、city、country、customer、film、film_actor、film_category、film_text、inventory、language、payment、staff、store)。
  2. 全量上云任务总览。

  1. 数据查看。

create_table临时查询编辑页面,输入如下语句,单击带参运行

select 'ods_sakila_actor_df' as table_name, count(1) as record_num from ods_sakila_actor_df where ds = ${bizdate}
union all
select 'ods_sakila_address_df' as table_name, count(1) as record_num from ods_sakila_address_df where ds = ${bizdate}
union all
select 'ods_sakila_category_df' as table_name, count(1) as record_num from ods_sakila_category_df where ds = ${bizdate}
union all
select 'ods_sakila_city_df' as table_name, count(1) as record_num from ods_sakila_city_df where ds = ${bizdate}
union all
select 'ods_sakila_country_df' as table_name, count(1) as record_num from ods_sakila_country_df where ds = ${bizdate}
union all
select 'ods_sakila_customer_df' as table_name, count(1) as record_num from ods_sakila_customer_df where ds = ${bizdate}
union all
select 'ods_sakila_film_df' as table_name, count(1) as record_num from ods_sakila_film_df where ds = ${bizdate}
union all
select 'ods_sakila_film_actor_df' as table_name, count(1) as record_num from ods_sakila_film_actor_df where ds = ${bizdate}
union all
select 'ods_sakila_film_category_df' as table_name, count(1) as record_num from ods_sakila_film_category_df where ds = ${bizdate}
union all
select 'ods_sakila_film_text_df' as table_name, count(1) as record_num from ods_sakila_film_text_df where ds = ${bizdate}
union all
select 'ods_sakila_inventory_df' as table_name, count(1) as record_num from ods_sakila_inventory_df where ds = ${bizdate}
union all
select 'ods_sakila_language_df' as table_name, count(1) as record_num from ods_sakila_language_df where ds = ${bizdate}
union all
select 'ods_sakila_payment_df' as table_name, count(1) as record_num from ods_sakila_payment_df where ds = ${bizdate}
union all
select 'ods_sakila_staff_df' as table_name, count(1) as record_num from ods_sakila_staff_df where ds = ${bizdate}
union all
select 'ods_sakila_store_df' as table_name, count(1) as record_num from ods_sakila_store_df where ds = ${bizdate}
;

返回结果如下所示。

  1. 注意、注意、注意

重新打开离线同步节点的时候,getdate()函数会消失,需要重新添加一遍。


17. 增量数据上云

  1. 增量数据上云说明。

增量数据上云包含三部分操作:

A:全量初始化。

B:增量抽取。

C:全量merge。

  1. 数据开发页面,选择新建>新建业务流程

  1. 新建业务流程对话框中,业务名称输入增量上云,单击新建

  1. 数据开发页面,右键增量上云,选择新建节点>离线同步

  1. 新建节点对话框中,名称输入imp_stg_sakila_rental_di,单击提交

  1. imp_stg_sakila_rental_di离线同步节点的编辑页面,根据下图配置全量初始化(参照全量上云)。

  1. imp_stg_sakila_rental_di离线同步节点的编辑页面,单击 保存图标,然后单击 带参数运行图标。

  1. 在参数对话框中,选择bizdate,单击确定

说明:例如bizdate是20220723,初始化应该是bizdate的前一天,因此运行选择的日期为20220722。

到这里,我们的全量初始化工作就完成了。

  1. 模拟数据变化。

9.1 切换到Web Terminal页面。执行如下命令,服务器登录MySQL。

mysql -hlocalhost -P3306 -ucanal -pcanal -Dsakila

9.2 执行如下命令,模拟数据变化。

update rental set last_update = '2022-07-23 11:11:11' where mod(rental_id,100) = 0;

  1. 增量抽取。

imp_stg_sakila_rental_di离线同步节点的编辑页面,修改imp_stg_sakila_rental_di任务,在数据过滤中添加date_format(last_update,'%Y%m%d')='${bizdate}'。

  1. imp_stg_sakila_rental_di离线同步节点的编辑页面,,单击 保存图标,然后单击 带参数运行图标。

  1. 查看数据。

create_table临时查询编辑页面,输入如下语句,单击运行

select 'stg_sakila_rental_di' as table_name,count(1) as record_num from stg_sakila_rental_di where ds = '20220723'
union all 
select 'ods_sakila_rental_df' as table_name,count(1) as record_num from ods_sakila_rental_df where ds = '20220722'

到这里,我们的增量抽取工作就完成了。

  1. 全量merge。

13.1 新建opds节点。

13.2 全量merge。

ods_sakila_rental_df节点编辑页面,输入以下语句,单击带参运行

INSERT overwrite table ods_sakila_rental_df partition(ds = ${bizdate})
SELECT a.rental_id as rental_id
,a.rental_date as rental_date
,a.inventory_id as inventory_id
,a.customer_id as customer_id
,a.return_date as return_date
,a.staff_id as staff_id
,a.last_update as last_update
,getdate() as etl_date
FROM ods_sakila_rental_df a --全量表
LEFT ANTI JOIN stg_sakila_rental_di b --增量表
ON a.rental_id = b.rental_id
AND b.ds = '${bizdate}'
WHERE a.ds = '${last_day}'
UNION ALL
SELECT b.rental_id as rental_id
,b.rental_date as rental_date
,b.inventory_id as inventory_id
,b.customer_id as customer_id
,b.return_date as return_date
,b.staff_id as staff_id
,b.last_update as last_update
,getdate() as etl_date
FROM stg_sakila_rental_di b
WHERE b.ds = '${bizdate}'
;

  1. 查看数据。

ods_sakila_rental_df节点编辑页面,输入以下语句,单击带参运行

select distinct last_update from ods_sakila_rental_df where ds = ${bizdate};

到这里,我们的全量初始化工作就完成了。

实验链接:https://developer.aliyun.com/adc/scenario/8650c921ec8b4851b2f2ab3964b7588f

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
20天前
|
存储 机器学习/深度学习 分布式计算
大数据技术——解锁数据的力量,引领未来趋势
【10月更文挑战第5天】大数据技术——解锁数据的力量,引领未来趋势
|
22天前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
43 3
|
22天前
|
消息中间件 监控 数据可视化
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
41 2
|
1天前
|
存储 安全 大数据
大数据隐私保护:用户数据的安全之道
【10月更文挑战第31天】在大数据时代,数据的价值日益凸显,但用户隐私保护问题也愈发严峻。本文探讨了大数据隐私保护的重要性、面临的挑战及有效解决方案,旨在为企业和社会提供用户数据安全的指导。通过加强透明度、采用加密技术、实施数据最小化原则、加强访问控制、采用隐私保护技术和提升用户意识,共同推动大数据隐私保护的发展。
|
5天前
|
SQL 存储 大数据
大数据中数据提取
【10月更文挑战第19天】
13 2
|
21天前
|
SQL 消息中间件 大数据
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(一)
33 1
|
21天前
|
SQL 大数据 Apache
大数据-159 Apache Kylin 构建Cube 准备和测试数据(二)
大数据-159 Apache Kylin 构建Cube 准备和测试数据(二)
59 1
|
22天前
|
存储 NoSQL 大数据
大数据-51 Redis 高可用方案CAP-AP 主从复制 一主一从 全量和增量同步 哨兵模式 docker-compose测试
大数据-51 Redis 高可用方案CAP-AP 主从复制 一主一从 全量和增量同步 哨兵模式 docker-compose测试
29 3
|
21天前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
44 1
|
22天前
|
消息中间件 缓存 NoSQL
大数据-49 Redis 缓存问题中 穿透、雪崩、击穿、数据不一致、HotKey、BigKey
大数据-49 Redis 缓存问题中 穿透、雪崩、击穿、数据不一致、HotKey、BigKey
38 2

热门文章

最新文章