库仓一体实时数据分析

本文涉及的产品
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
RDS SQL Server Serverless,2-4RCU 50GB 3个月
推荐场景:
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 通过RDS MySQL/PolarDB MySQL+DMS+AnalyticDB MySQL的轻量级架构,可实现实时数仓数据分析,生成相应数据报表,助力商家及时查看运营情况,实时调整运营策略。

库仓一体实时数据分析


1. 创建源库和账号,开启跨库查询功能

创建源库和账号,开启跨库查询功能

  1. 登录数据库产品控制台,创建数据库。
  • RDS控制台:单击实例名,进入实例详情后,单击数据库管理-创建数据库,完成数据库的创建。
  • PolarDB MySQL控制台:单击集群名,进入集群详情后,单击数据库管理-创建数据库,完成数据库的创建。


  1. 创建账号和密码。
  • RDS控制台:在实例详情页,单击账号管理-创建账号,完成高权限账号创建。
  • PolarDB MySQL控制台:在集群详情页,单击账号管理-创建账号,完成高权限账号创建。


  1. 访问DMS控制台,搜索创建的RDS实例和PolarDB实例,输入数据库账号和密码,分别登录RDS和PolarDB数据库。


  1. 分别选中RDS和PolarDB实例名称,右键选择编辑实例,开启安全协同,开启跨库查询


说明:高级信息中涉及dblink的开启,自定义名称。本案例中,RDS和PolarDB的dblink名称为rds_dblink,poalrdb_dblink。



2. 构建RDS-AnalyticDB数仓链路

构建RDS-AnalyticDB数仓链路

本步骤在DMS控制台操作,可完成从数据库RDS MySQL到AnalyticDB MySQL的快速建仓。

  1. 进入DMS控制台,单击数据资产-数据入仓,打开数据分析窗口。

  1. 在数据分析窗口,选择RDS数据库为数据来源,选择已有ADB实例,选择具体实例信息,单击提交申请

说明:

  • 高级配置中的参数,建议保持默认值,如下:
  • 目标库名:自动创建同名库。
  • 同步方式:结构初始化。
  • 同步范围:全库。


3. 创建数据库表,导入模拟数据

创建数据库表,导入模拟数据

本步骤在DMS控制台操作,完成在RDS和PolarDB数据库中的快速建表,并导入模拟数据。

  1. 在DMS上,打开RDS数据库的SQL Console窗口,输入以下语句,依次建立表:lineitem,nation,orders,part,partsupp,region,supplier。
CREATE TABLE `lineitem` (
  `L_ORDERKEY` int(11) NOT NULL,
  `L_PARTKEY` int(11) NOT NULL,
  `L_SUPPKEY` int(11) NOT NULL,
  `L_LINENUMBER` int(11) NOT NULL,
  `L_QUANTITY` decimal(15,2) NOT NULL,
  `L_EXTENDEDPRICE` decimal(15,2) NOT NULL,
  `L_DISCOUNT` decimal(15,2) NOT NULL,
  `L_TAX` decimal(15,2) NOT NULL,
  `L_RETURNFLAG` char(1) NOT NULL,
  `L_LINESTATUS` char(1) NOT NULL,
  `L_SHIPDATE` date NOT NULL,
  `L_COMMITDATE` date NOT NULL,
  `L_RECEIPTDATE` date NOT NULL,
  `L_SHIPINSTRUCT` char(25) NOT NULL,
  `L_SHIPMODE` char(10) NOT NULL,
  `L_COMMENT` varchar(44) NOT NULL,
  PRIMARY KEY (`L_ORDERKEY`,`L_LINENUMBER`)
);
CREATE TABLE `nation` (
  `N_NATIONKEY` int(11) NOT NULL,
  `N_NAME` char(25) NOT NULL,
  `N_REGIONKEY` int(11) NOT NULL,
  `N_COMMENT` varchar(152) DEFAULT NULL,
  PRIMARY KEY (`N_NATIONKEY`)
);
CREATE TABLE `orders` (
  `O_ORDERKEY` int(11) NOT NULL,
  `O_CUSTKEY` int(11) NOT NULL,
  `O_ORDERSTATUS` char(1) NOT NULL,
  `O_TOTALPRICE` decimal(15,2) NOT NULL,
  `O_ORDERDATE` datetime NOT NULL,
  `O_ORDERPRIORITY` char(15) NOT NULL,
  `O_CLERK` char(15) NOT NULL,
  `O_SHIPPRIORITY` int(11) NOT NULL,
  `O_COMMENT` varchar(79) NOT NULL,
  PRIMARY KEY (`O_ORDERKEY`)
) COMMENT='业务订单表';
CREATE TABLE `part` (
  `P_PARTKEY` int(11) NOT NULL,
  `P_NAME` varchar(55) NOT NULL,
  `P_MFGR` char(25) NOT NULL,
  `P_BRAND` char(10) NOT NULL,
  `P_TYPE` varchar(25) NOT NULL,
  `P_SIZE` int(11) NOT NULL,
  `P_CONTAINER` char(10) NOT NULL,
  `P_RETAILPRICE` decimal(15,2) NOT NULL,
  `P_COMMENT` varchar(23) NOT NULL,
  PRIMARY KEY (`P_PARTKEY`)
);
CREATE TABLE `partsupp` (
  `PS_PARTKEY` int(11) NOT NULL,
  `PS_SUPPKEY` int(11) NOT NULL,
  `PS_AVAILQTY` int(11) NOT NULL,
  `PS_SUPPLYCOST` decimal(15,2) NOT NULL,
  `PS_COMMENT` varchar(199) NOT NULL,
  PRIMARY KEY (`PS_PARTKEY`,`PS_SUPPKEY`)
); 
CREATE TABLE `region` (
  `R_REGIONKEY` int(11) NOT NULL,
  `R_NAME` char(25) NOT NULL,
  `R_COMMENT` varchar(152) DEFAULT NULL,
  PRIMARY KEY (`R_REGIONKEY`)
); 
CREATE TABLE `supplier` (
  `S_SUPPKEY` int(11) NOT NULL,
  `S_NAME` char(25) NOT NULL,
  `S_ADDRESS` varchar(40) NOT NULL,
  `S_NATIONKEY` int(11) NOT NULL,
  `S_PHONE` char(15) NOT NULL,
  `S_ACCTBAL` decimal(15,2) NOT NULL,
  `S_COMMENT` varchar(101) NOT NULL,
  PRIMARY KEY (`S_SUPPKEY`)
);
  1. 在DMS上,打开PolarDB数据库的SQL Console窗口,输入以下语句,建立表:customer。
CREATE TABLE `customer` (
  `C_CUSTKEY` int(11) NOT NULL,
  `C_FIRST_NAME` varchar(25) NOT NULL,
  `C_LAST_NAME` varchar(25) NOT NULL,
  `C_ADDRESS` varchar(40) NOT NULL,
  `C_NATIONKEY` int(11) NOT NULL,
  `C_PHONE` char(15) NOT NULL,
  `C_ACCTBAL` decimal(15,2) NOT NULL,
  `C_MKTSEGMENT` char(10) NOT NULL,
  `C_COMMENT` varchar(117) NOT NULL,
  PRIMARY KEY (`C_CUSTKEY`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='用户表信息'
  1. 在DMS上,打开AnalyticDB数据库的SQL Console窗口,输入以下语句,建立表:customer1。
Create Table `customer1` (
 `C_CUSTKEY` int NOT NULL,
 `C_FIRST_NAME` varchar NOT NULL,
 `C_LAST_NAME` varchar NOT NULL,
 `C_CUSTOMER_NAME` varchar NOT NULL,
 `C_ADDRESS` varchar NOT NULL,
 `C_NATIONKEY` int NOT NULL,
 `C_PHONE` varchar NOT NULL,
 `C_ACCTBAL` decimal(12, 2) NOT NULL,
 `C_MKTSEGMENT` varchar NOT NULL,
 `C_COMMENT` varchar NOT NULL,
 primary key (`c_custkey`)
) COMMENT='’用户表’'
  1. 向RDS MySQL和PolarDB MySQL中导入模拟数据:单击数据库开发-数据导入,选择目标数据库,选择导入模式为安全模式,文件类型为csv格式,设置目标表名,上传数据附件,单击提交申请

说明:各表的模拟数据如下,点击链接获取。

数据库

模拟数据链接

RDS MySQL

lineitem

模拟数据下载地址1

模拟数据下载地址2

说明:

表lineitem的模拟数据包括地址1和地址2的数据。

nation

模拟数据下载地址

orders

模拟数据下载地址

part

模拟数据下载地址

partsupp

模拟数据下载地址

region

模拟数据下载地址

supplier

模拟数据下载地址

PolarDB MySQL

Customer

模拟数据下载地址

  1. 执行以下SQL,完成在AnalyticDB数据库的表中更新数据。
update orders set o_orderdate = date_add(date(now()), interval round(rand() * 86400) second)
where o_orderkey < 60000 and o_orderkey >= 30000


4. 配置ETL数据清洗加工流程

配置ETL数据清洗加工流程

本步骤在DMS控制台操作,配置ETL数据清洗加工流程。

本案例以在目标库的表中增加字段为例:

将PolarDB的表customer同步到ADB的表customer1,用简单字段处理C_FIRST_NAME 和 C_LAST_NAME ,映射到AnalyticDB中新添加的C_CUSTOMER_NAME字段中。

  1. 单击传输与加工-ETL-创建任务,打开ETL画板。

  1. 通过拖拽的方式,绘制从输入到输出的ETL数据加工清洗流程。

  1. 单击PolarDB MySQL节点,配置数据库PolarDB MySQL的区域信息,建立连接信息,选择表customer。

说明:连接模板是指该节点与指定数据库(PolarDB MySQL)的关联。

  1. 单击字段计算器节点-新增字段,设置字段名为C_CUSTOMER_NAME,字段类型为varchar,使用concat函数配置目标字段的取值:CONCAT(C_FIRST_NAME,C_LAST_NAME )。

  1. 单击AnalyticDB MySQL 3.0节点,配置数据库的区域信息,建立连接信息,选择表customer1。

说明:连接模板是指该节点与指定数据库(AnalyticDB MySQL)的关联。

  1. 单击下一步保存任务并预检查,根据界面提示进行操作。


5. 配置定时任务,设置数据调度周期

本步骤在DMS控制台操作。完成数据同步后,可配置定时任务,在数仓内对数据进一步加工,产出每小时的新增销售额。

本案例的配置调度周期为1小时。

  1. 单击传输与加工-任务编排,打开任务编排窗口;单击新增任务流,自定义任务名称,进入任务流绘制界面。

说明:本案例中,任务名称为testtask1。

  1. 创建3个单实例SQL节点(分别命名为“日表结构检查”、“月表结构检查”、“1小时累计当月订单”)和1个跨库SQL节点(命名为“跨源统计当日订单”),如下图排列构建任务流。

  1. 设置各节点的数据库信息和SQL脚本。

说明:“日表结构检查”、“月表结构检查”、“1小时累计当月订单”对应数据库均为AnalyticDB MySQL。

  • 日表结构检查
create table if not exists items_revenue (
    dt date,
    stats_end string,
    order_num bigint,
    revenue decimal(32,4),
    PRIMARY KEY(`dt`, `stats_end`)
) PARTITION by VALUE(date_format(dt, '%Y%m')) LIFECYCLE 200;
  • 跨源统计当日订单
insert into `test_dblink`.`dbsfromrds`.`items_revenue`
 (dt, stats_end, order_num, revenue)
select  
    '${today_zero}' as dt,
    cast(date_format('${now_time}', '%H:%i') as varchar(255)) as stats_end,
    count(distinct o_orderkey) as order_num,
    cast(sum(l_extendedprice*(1-l_discount)) as decimal(32,4)) as revenue
from  
    `polardb_link1`.`polardbtest_new`.`customer`,
    `test_dblink`.`dbsfromrds`.`orders`,
    `test_dblink`.`dbsfromrds`.`lineitem`
where  
    c_mktsegment = 'MACHINERY'
    and c_custkey = o_custkey 
    and l_orderkey = o_orderkey
    and o_orderdate < '${now_time}'
    and o_orderdate >= '${today_zero}'
group by 
    '${today_zero}', cast(date_format('${now_time}', '%H:%i') as varchar(255));

说明:

  • 本案例中,polardb_link1为PolarDB的跨库dblink名称,polardbtest_new为PolarDB数据库名称。
  • 本案例中,test_dblink为AnalyticDB的跨库dblink名称,dbsfromrds为AnalyticDB数据库名称。
  • 节点“跨源统计当日订单”,需设置变量now_time和today_time。

  • 月表结构检查
create table if not exists items_revenue_month (
    month_key string,
    order_num bigint,
    revenue DECIMAL(1000,4),
    PRIMARY KEY (`month_key`)
) PARTITION by `month_key` LIFECYCLE 200;
  • 1小时累计当日订单
* 删除当月已统计的累计订单量和订单金额
*
**/
delete from items_revenue_month where month_key = '${this_month_start}';
/**
*
* 统计当月的订单量、订单金额
*
**/
insert into items_revenue_month
select '${this_month_start}' as month_key, sum(order_num), sum(revenue)
from (
select dt, max(order_num) as order_num, max(revenue) as revenue
from items_revenue
where
    dt >= '${this_month_start}' and dt < '${next_month_start}'
group by
    dt
) x
group by
    month_key;

说明:节点“1小时累计当月订单”,需设置变量this_month_start和next_month_start。

  1. 调度配置:单击任务编排区域空白处,可打开任务流信息页面。开启调度,调度周期为小时,定时调度间隔为1小时

说明:产品支持5分钟为周期进行调度。

  1. 在AnalyticDB数据库的SQL窗口中,输入以下命令:
INSERT INTO `items_revenue_month` (`month_key`, `order_num`, `revenue`) VALUES ('2021-03-01','23234','41252311.0000');
INSERT INTO `items_revenue_month` (`month_key`, `order_num`, `revenue`) VALUES ('2020-10-01','45523','61252311.2200');
INSERT INTO `items_revenue_month` (`month_key`, `order_num`, `revenue`) VALUES ('2020-11-01','23234','71252311.2200');
INSERT INTO `items_revenue_month` (`month_key`, `order_num`, `revenue`) VALUES ('2021-02-01','45523','61252311.0000');
INSERT INTO `items_revenue_month` (`month_key`, `order_num`, `revenue`) VALUES ('2021-04-01','3412','31252311.2200');
INSERT INTO `items_revenue_month` (`month_key`, `order_num`, `revenue`) VALUES ('2020-09-01','6453','50252311.0000');
INSERT INTO `items_revenue_month` (`month_key`, `order_num`, `revenue`) VALUES ('2021-01-01','6453','44242335.0000');
INSERT INTO `items_revenue_month` (`month_key`, `order_num`, `revenue`) VALUES ('2020-12-01','3412','21252311.2200');


6. 生成统计报表

生成“当日销售额统计表”

在AnalyticDB数据库中,执行对应SQL,依次生成统计报表。


  1. 在AnalyticDB的SQL Console中,执行如下命令,单击数据可视化
SELECT `dt`, stats_end , `order_num` as 订单量 , `revenue` as 销售额 
FROM `items_revenue`

  1. 在数据可视化界面,依次设置如下信息后,单击保存
  • 表名:当日销售额统计(每小时)
  • 报表类型:折线图
  • 纬度:stats_end
  • 度量:[总计]销售额

说明:单击stats_end左侧的向下箭头,单击排序,可对数据进行升降序排序。本案例以升序为例。

生成“当日订单量统计表”

  1. 在AnalyticDB的SQL Console中,执行如下命令后,单击数据可视化
SELECT `dt`, stats_end , `order_num` as 订单量 , `revenue` as 销售额 
FROM `items_revenue`
  1. 在数据可视化界面,依次设置如下信息后,单击保存
  • 表名:当日订单量统计(每小时)
  • 报表类型:折线图
  • 纬度:stats_end
  • 度量:[总计]订单量

说明:单击stats_end左侧的向下箭头,单击排序,可对数据进行升降序排序。本案例以升序为例。

生成“地区日销售额统计表”

  1. 在AnalyticDB的SQL Console中,执行如下命令后,单击数据可视化
SELECT `dt` , `region_name` as 地区, `totalprice` as 销售额 
FROM `sum_region_price`
  1. 在数据可视化界面,依次设置如下信息后,单击保存
  • 表名:地区日销售额统计
  • 报表类型:饼图
  • 度量:[总计]销售额
  • 分组:地区

说明:在样式页,开启添加标签,可在饼图上展示每个区域的数值。

生成“月销售额统计表”

  1. 在AnalyticDB的SQL Console中,执行如下命令后,单击数据可视化
SELECT month_key,`order_num` as 订单量 , `revenue` as 销售额 
FROM `items_revenue_month`
  1. 在数据可视化界面,依次设置如下信息后,单击保存
  • 表名:月销售额统计
  • 报表类型:柱状图
  • 纬度:month_key
  • 度量:[总计]销售额

生成统计看板

  1. 进入数据可视化-数据展示,新增名为订单统计看板的仪表盘。

  1. 单击订单统计看板,进入看板配置页,单击右上角 “+ ”号,勾选上述生成的四张表格。

  1. 单击下一步-保存,生成看板。

实验链接:https://developer.aliyun.com/adc/scenario/216c7156266240a29eedffd310fd3f6d

相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
4月前
|
数据采集 机器学习/深度学习 数据可视化
深入学习NumPy库在数据分析中的应用场景
深入学习NumPy库在数据分析中的应用场景
|
4月前
|
存储 数据可视化 数据挖掘
Python在数据分析中的利器:Pandas库全面解析
【2月更文挑战第7天】 众所周知,Python作为一种简洁、易学且功能强大的编程语言,被广泛运用于数据科学和人工智能领域。而Pandas库作为Python中最受欢迎的数据处理库之一,在数据分析中扮演着举足轻重的角色。本文将全面解析Pandas库的基本功能、高级应用以及实际案例,带您深入了解这个在数据分析领域的利器。
182 1
|
7天前
|
机器学习/深度学习 算法 数据挖掘
Python数据分析革命:Scikit-learn库,让机器学习模型训练与评估变得简单高效!
在数据驱动时代,Python 以强大的生态系统成为数据科学的首选语言,而 Scikit-learn 则因简洁的 API 和广泛的支持脱颖而出。本文将指导你使用 Scikit-learn 进行机器学习模型的训练与评估。首先通过 `pip install scikit-learn` 安装库,然后利用内置数据集进行数据准备,选择合适的模型(如逻辑回归),并通过交叉验证评估其性能。最终,使用模型对新数据进行预测,简化整个流程。无论你是新手还是专家,Scikit-learn 都能助你一臂之力。
44 8
|
2月前
|
机器学习/深度学习 数据采集 算法
数据海洋中的导航者:Scikit-learn库引领Python数据分析与机器学习新航向!
【7月更文挑战第26天】在数据的海洋里,Python以强大的生态成为探索者的首选,尤其Scikit-learn库(简称sklearn),作为一颗璀璨明珠,以高效、灵活、易用的特性引领数据科学家们破浪前行。无论新手还是专家,sklearn提供的广泛算法与工具支持从数据预处理到模型评估的全流程。秉承“简单有效”的设计哲学,它简化了复杂模型的操作,如线性回归等,使用户能轻松比较并选择最优方案。示例代码展示了如何简洁地实现线性回归分析,彰显了sklearn的强大能力。总之,sklearn不仅是数据科学家的利器,也是推动行业进步的关键力量。
43 3
|
2月前
|
数据可视化 数据挖掘 API
数据可视化秘籍聚焦Python的Matplotlib和Seaborn库,它们是数据分析的得力工具。
【7月更文挑战第5天】数据可视化秘籍聚焦Python的Matplotlib和Seaborn库,它们是数据分析的得力工具。Matplotlib是基础库,提供高度自定义的2D图表,而Seaborn在其上构建,提供美观的统计图形。文章介绍了如何用两者画线图、散点图、条形图、饼图和直方图,展示数据趋势和关系。
36 1
|
2月前
|
机器学习/深度学习 算法 数据挖掘
Python数据分析革命:Scikit-learn库,让机器学习模型训练与评估变得简单高效!
【7月更文挑战第27天】在数据驱动时代,Python以丰富的库成为数据科学首选。Scikit-learn因简洁高效而备受青睐,引领数据分析革命。本文引导您使用Scikit-learn简化机器学习流程。首先通过`pip install scikit-learn`安装库。接着使用内置数据集简化数据准备步骤,例如加载Iris数据集。选择合适的模型,如逻辑回归,并初始化与训练模型。利用交叉验证评估模型性能,获取准确率等指标。最后,应用训练好的模型进行新数据预测。Scikit-learn为各阶段提供一站式支持,助力数据分析项目成功。
48 0
|
2月前
|
机器学习/深度学习 算法 数据挖掘
|
3月前
|
数据采集 机器学习/深度学习 数据可视化
利用Python和Pandas库构建高效的数据分析流程
在数据驱动的时代,数据分析已成为企业决策的关键环节。本文介绍如何利用Python编程语言及其强大的数据分析库Pandas,构建一套高效且可扩展的数据分析流程。与常规的数据分析流程不同,本文不仅涵盖数据加载、清洗、转换等基础步骤,还强调数据可视化、模型探索与评估等高级分析技巧,并通过实际案例展示如何在Python中实现这些步骤,为数据分析师提供一套完整的数据分析解决方案。
|
3月前
|
JSON 数据挖掘 API
数据分析实战丨基于pygal与requests分析GitHub最受欢迎的Python库
数据分析实战丨基于pygal与requests分析GitHub最受欢迎的Python库
40 2
|
2月前
|
存储 消息中间件 数据挖掘
Python实时数据分析:利用丰富的库(如Pandas, PySpark, Kafka)进行流处理,涵盖数据获取、预处理、处理、存储及展示。
【7月更文挑战第5天】Python实时数据分析:利用丰富的库(如Pandas, PySpark, Kafka)进行流处理,涵盖数据获取、预处理、处理、存储及展示。示例代码展示了从Kafka消费数据,计算社交媒体活跃度和物联网设备状态,并可视化结果。适用于监控、故障检测等场景。通过学习和实践,提升实时数据分析能力。
80 0