库仓一体实时数据分析
1. 创建源库和账号,开启跨库查询功能
创建源库和账号,开启跨库查询功能
- 登录数据库产品控制台,创建数据库。
- RDS控制台:单击实例名,进入实例详情后,单击数据库管理-创建数据库,完成数据库的创建。
- PolarDB MySQL控制台:单击集群名,进入集群详情后,单击数据库管理-创建数据库,完成数据库的创建。
- 创建账号和密码。
- RDS控制台:在实例详情页,单击账号管理-创建账号,完成高权限账号创建。
- PolarDB MySQL控制台:在集群详情页,单击账号管理-创建账号,完成高权限账号创建。
- 访问DMS控制台,搜索创建的RDS实例和PolarDB实例,输入数据库账号和密码,分别登录RDS和PolarDB数据库。
- 分别选中RDS和PolarDB实例名称,右键选择编辑实例,开启安全协同,并开启跨库查询。
说明:高级信息中涉及dblink的开启,自定义名称。本案例中,RDS和PolarDB的dblink名称为rds_dblink,poalrdb_dblink。
2. 构建RDS-AnalyticDB数仓链路
构建RDS-AnalyticDB数仓链路
本步骤在DMS控制台操作,可完成从数据库RDS MySQL到AnalyticDB MySQL的快速建仓。
- 进入DMS控制台,单击数据资产-数据入仓,打开数据分析窗口。
- 在数据分析窗口,选择RDS数据库为数据来源,选择已有ADB实例,选择具体实例信息,单击提交申请。
说明:
- 高级配置中的参数,建议保持默认值,如下:
- 目标库名:自动创建同名库。
- 同步方式:结构初始化。
- 同步范围:全库。
3. 创建数据库表,导入模拟数据
创建数据库表,导入模拟数据
本步骤在DMS控制台操作,完成在RDS和PolarDB数据库中的快速建表,并导入模拟数据。
- 在DMS上,打开RDS数据库的SQL Console窗口,输入以下语句,依次建立表:lineitem,nation,orders,part,partsupp,region,supplier。
CREATE TABLE `lineitem` ( `L_ORDERKEY` int(11) NOT NULL, `L_PARTKEY` int(11) NOT NULL, `L_SUPPKEY` int(11) NOT NULL, `L_LINENUMBER` int(11) NOT NULL, `L_QUANTITY` decimal(15,2) NOT NULL, `L_EXTENDEDPRICE` decimal(15,2) NOT NULL, `L_DISCOUNT` decimal(15,2) NOT NULL, `L_TAX` decimal(15,2) NOT NULL, `L_RETURNFLAG` char(1) NOT NULL, `L_LINESTATUS` char(1) NOT NULL, `L_SHIPDATE` date NOT NULL, `L_COMMITDATE` date NOT NULL, `L_RECEIPTDATE` date NOT NULL, `L_SHIPINSTRUCT` char(25) NOT NULL, `L_SHIPMODE` char(10) NOT NULL, `L_COMMENT` varchar(44) NOT NULL, PRIMARY KEY (`L_ORDERKEY`,`L_LINENUMBER`) ); CREATE TABLE `nation` ( `N_NATIONKEY` int(11) NOT NULL, `N_NAME` char(25) NOT NULL, `N_REGIONKEY` int(11) NOT NULL, `N_COMMENT` varchar(152) DEFAULT NULL, PRIMARY KEY (`N_NATIONKEY`) ); CREATE TABLE `orders` ( `O_ORDERKEY` int(11) NOT NULL, `O_CUSTKEY` int(11) NOT NULL, `O_ORDERSTATUS` char(1) NOT NULL, `O_TOTALPRICE` decimal(15,2) NOT NULL, `O_ORDERDATE` datetime NOT NULL, `O_ORDERPRIORITY` char(15) NOT NULL, `O_CLERK` char(15) NOT NULL, `O_SHIPPRIORITY` int(11) NOT NULL, `O_COMMENT` varchar(79) NOT NULL, PRIMARY KEY (`O_ORDERKEY`) ) COMMENT='业务订单表'; CREATE TABLE `part` ( `P_PARTKEY` int(11) NOT NULL, `P_NAME` varchar(55) NOT NULL, `P_MFGR` char(25) NOT NULL, `P_BRAND` char(10) NOT NULL, `P_TYPE` varchar(25) NOT NULL, `P_SIZE` int(11) NOT NULL, `P_CONTAINER` char(10) NOT NULL, `P_RETAILPRICE` decimal(15,2) NOT NULL, `P_COMMENT` varchar(23) NOT NULL, PRIMARY KEY (`P_PARTKEY`) ); CREATE TABLE `partsupp` ( `PS_PARTKEY` int(11) NOT NULL, `PS_SUPPKEY` int(11) NOT NULL, `PS_AVAILQTY` int(11) NOT NULL, `PS_SUPPLYCOST` decimal(15,2) NOT NULL, `PS_COMMENT` varchar(199) NOT NULL, PRIMARY KEY (`PS_PARTKEY`,`PS_SUPPKEY`) ); CREATE TABLE `region` ( `R_REGIONKEY` int(11) NOT NULL, `R_NAME` char(25) NOT NULL, `R_COMMENT` varchar(152) DEFAULT NULL, PRIMARY KEY (`R_REGIONKEY`) ); CREATE TABLE `supplier` ( `S_SUPPKEY` int(11) NOT NULL, `S_NAME` char(25) NOT NULL, `S_ADDRESS` varchar(40) NOT NULL, `S_NATIONKEY` int(11) NOT NULL, `S_PHONE` char(15) NOT NULL, `S_ACCTBAL` decimal(15,2) NOT NULL, `S_COMMENT` varchar(101) NOT NULL, PRIMARY KEY (`S_SUPPKEY`) );
- 在DMS上,打开PolarDB数据库的SQL Console窗口,输入以下语句,建立表:customer。
CREATE TABLE `customer` ( `C_CUSTKEY` int(11) NOT NULL, `C_FIRST_NAME` varchar(25) NOT NULL, `C_LAST_NAME` varchar(25) NOT NULL, `C_ADDRESS` varchar(40) NOT NULL, `C_NATIONKEY` int(11) NOT NULL, `C_PHONE` char(15) NOT NULL, `C_ACCTBAL` decimal(15,2) NOT NULL, `C_MKTSEGMENT` char(10) NOT NULL, `C_COMMENT` varchar(117) NOT NULL, PRIMARY KEY (`C_CUSTKEY`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='用户表信息'
- 在DMS上,打开AnalyticDB数据库的SQL Console窗口,输入以下语句,建立表:customer1。
Create Table `customer1` ( `C_CUSTKEY` int NOT NULL, `C_FIRST_NAME` varchar NOT NULL, `C_LAST_NAME` varchar NOT NULL, `C_CUSTOMER_NAME` varchar NOT NULL, `C_ADDRESS` varchar NOT NULL, `C_NATIONKEY` int NOT NULL, `C_PHONE` varchar NOT NULL, `C_ACCTBAL` decimal(12, 2) NOT NULL, `C_MKTSEGMENT` varchar NOT NULL, `C_COMMENT` varchar NOT NULL, primary key (`c_custkey`) ) COMMENT='’用户表’'
- 向RDS MySQL和PolarDB MySQL中导入模拟数据:单击数据库开发-数据导入,选择目标数据库,选择导入模式为安全模式,文件类型为csv格式,设置目标表名,上传数据附件,单击提交申请。
说明:各表的模拟数据如下,点击链接获取。
- 执行以下SQL,完成在AnalyticDB数据库的表中更新数据。
update orders set o_orderdate = date_add(date(now()), interval round(rand() * 86400) second) where o_orderkey < 60000 and o_orderkey >= 30000
4. 配置ETL数据清洗加工流程
配置ETL数据清洗加工流程
本步骤在DMS控制台操作,配置ETL数据清洗加工流程。
本案例以在目标库的表中增加字段为例:
将PolarDB的表customer同步到ADB的表customer1,用简单字段处理C_FIRST_NAME 和 C_LAST_NAME ,映射到AnalyticDB中新添加的C_CUSTOMER_NAME字段中。
- 单击传输与加工-ETL-创建任务,打开ETL画板。
- 通过拖拽的方式,绘制从输入到输出的ETL数据加工清洗流程。
- 单击PolarDB MySQL节点,配置数据库PolarDB MySQL的区域信息,建立连接信息,选择表customer。
说明:连接模板是指该节点与指定数据库(PolarDB MySQL)的关联。
- 单击字段计算器节点-新增字段,设置字段名为C_CUSTOMER_NAME,字段类型为varchar,使用concat函数配置目标字段的取值:CONCAT(C_FIRST_NAME,C_LAST_NAME )。
- 单击AnalyticDB MySQL 3.0节点,配置数据库的区域信息,建立连接信息,选择表customer1。
说明:连接模板是指该节点与指定数据库(AnalyticDB MySQL)的关联。
- 单击下一步保存任务并预检查,根据界面提示进行操作。
5. 配置定时任务,设置数据调度周期
本步骤在DMS控制台操作。完成数据同步后,可配置定时任务,在数仓内对数据进一步加工,产出每小时的新增销售额。
本案例的配置调度周期为1小时。
- 单击传输与加工-任务编排,打开任务编排窗口;单击新增任务流,自定义任务名称,进入任务流绘制界面。
说明:本案例中,任务名称为testtask1。
- 创建3个单实例SQL节点(分别命名为“日表结构检查”、“月表结构检查”、“1小时累计当月订单”)和1个跨库SQL节点(命名为“跨源统计当日订单”),如下图排列构建任务流。
- 设置各节点的数据库信息和SQL脚本。
说明:“日表结构检查”、“月表结构检查”、“1小时累计当月订单”对应数据库均为AnalyticDB MySQL。
- 日表结构检查
create table if not exists items_revenue ( dt date, stats_end string, order_num bigint, revenue decimal(32,4), PRIMARY KEY(`dt`, `stats_end`) ) PARTITION by VALUE(date_format(dt, '%Y%m')) LIFECYCLE 200;
- 跨源统计当日订单
insert into `test_dblink`.`dbsfromrds`.`items_revenue` (dt, stats_end, order_num, revenue) select '${today_zero}' as dt, cast(date_format('${now_time}', '%H:%i') as varchar(255)) as stats_end, count(distinct o_orderkey) as order_num, cast(sum(l_extendedprice*(1-l_discount)) as decimal(32,4)) as revenue from `polardb_link1`.`polardbtest_new`.`customer`, `test_dblink`.`dbsfromrds`.`orders`, `test_dblink`.`dbsfromrds`.`lineitem` where c_mktsegment = 'MACHINERY' and c_custkey = o_custkey and l_orderkey = o_orderkey and o_orderdate < '${now_time}' and o_orderdate >= '${today_zero}' group by '${today_zero}', cast(date_format('${now_time}', '%H:%i') as varchar(255));
说明:
- 本案例中,polardb_link1为PolarDB的跨库dblink名称,polardbtest_new为PolarDB数据库名称。
- 本案例中,test_dblink为AnalyticDB的跨库dblink名称,dbsfromrds为AnalyticDB数据库名称。
- 节点“跨源统计当日订单”,需设置变量now_time和today_time。
- 月表结构检查
create table if not exists items_revenue_month ( month_key string, order_num bigint, revenue DECIMAL(1000,4), PRIMARY KEY (`month_key`) ) PARTITION by `month_key` LIFECYCLE 200;
- 1小时累计当日订单
* 删除当月已统计的累计订单量和订单金额 * **/ delete from items_revenue_month where month_key = '${this_month_start}'; /** * * 统计当月的订单量、订单金额 * **/ insert into items_revenue_month select '${this_month_start}' as month_key, sum(order_num), sum(revenue) from ( select dt, max(order_num) as order_num, max(revenue) as revenue from items_revenue where dt >= '${this_month_start}' and dt < '${next_month_start}' group by dt ) x group by month_key;
说明:节点“1小时累计当月订单”,需设置变量this_month_start和next_month_start。
- 调度配置:单击任务编排区域空白处,可打开任务流信息页面。开启调度,调度周期为小时,定时调度间隔为1小时。
说明:产品支持5分钟为周期进行调度。
- 在AnalyticDB数据库的SQL窗口中,输入以下命令:
INSERT INTO `items_revenue_month` (`month_key`, `order_num`, `revenue`) VALUES ('2021-03-01','23234','41252311.0000'); INSERT INTO `items_revenue_month` (`month_key`, `order_num`, `revenue`) VALUES ('2020-10-01','45523','61252311.2200'); INSERT INTO `items_revenue_month` (`month_key`, `order_num`, `revenue`) VALUES ('2020-11-01','23234','71252311.2200'); INSERT INTO `items_revenue_month` (`month_key`, `order_num`, `revenue`) VALUES ('2021-02-01','45523','61252311.0000'); INSERT INTO `items_revenue_month` (`month_key`, `order_num`, `revenue`) VALUES ('2021-04-01','3412','31252311.2200'); INSERT INTO `items_revenue_month` (`month_key`, `order_num`, `revenue`) VALUES ('2020-09-01','6453','50252311.0000'); INSERT INTO `items_revenue_month` (`month_key`, `order_num`, `revenue`) VALUES ('2021-01-01','6453','44242335.0000'); INSERT INTO `items_revenue_month` (`month_key`, `order_num`, `revenue`) VALUES ('2020-12-01','3412','21252311.2200');
6. 生成统计报表
生成“当日销售额统计表”
在AnalyticDB数据库中,执行对应SQL,依次生成统计报表。
- 在AnalyticDB的SQL Console中,执行如下命令,单击数据可视化。
SELECT `dt`, stats_end , `order_num` as 订单量 , `revenue` as 销售额 FROM `items_revenue`
- 在数据可视化界面,依次设置如下信息后,单击保存。
- 表名:当日销售额统计(每小时)
- 报表类型:折线图
- 纬度:stats_end
- 度量:[总计]销售额
说明:单击stats_end左侧的向下箭头,单击排序,可对数据进行升降序排序。本案例以升序为例。
生成“当日订单量统计表”
- 在AnalyticDB的SQL Console中,执行如下命令后,单击数据可视化。
SELECT `dt`, stats_end , `order_num` as 订单量 , `revenue` as 销售额 FROM `items_revenue`
- 在数据可视化界面,依次设置如下信息后,单击保存。
- 表名:当日订单量统计(每小时)
- 报表类型:折线图
- 纬度:stats_end
- 度量:[总计]订单量
说明:单击stats_end左侧的向下箭头,单击排序,可对数据进行升降序排序。本案例以升序为例。
生成“地区日销售额统计表”
- 在AnalyticDB的SQL Console中,执行如下命令后,单击数据可视化。
SELECT `dt` , `region_name` as 地区, `totalprice` as 销售额 FROM `sum_region_price`
- 在数据可视化界面,依次设置如下信息后,单击保存。
- 表名:地区日销售额统计
- 报表类型:饼图
- 度量:[总计]销售额
- 分组:地区
说明:在样式页,开启添加标签,可在饼图上展示每个区域的数值。
生成“月销售额统计表”
- 在AnalyticDB的SQL Console中,执行如下命令后,单击数据可视化。
SELECT month_key,`order_num` as 订单量 , `revenue` as 销售额 FROM `items_revenue_month`
- 在数据可视化界面,依次设置如下信息后,单击保存。
- 表名:月销售额统计
- 报表类型:柱状图
- 纬度:month_key
- 度量:[总计]销售额
生成统计看板
- 进入数据可视化-数据展示,新增名为订单统计看板的仪表盘。
- 单击订单统计看板,进入看板配置页,单击右上角 “+ ”号,勾选上述生成的四张表格。
- 单击下一步-保存,生成看板。
实验链接:https://developer.aliyun.com/adc/scenario/216c7156266240a29eedffd310fd3f6d