【阿里云流计算】- 电商订单和销量统计案例

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
日志服务 SLS,月写入数据量 50GB 1个月
简介:

背景

随着新零售的概念慢慢崛起,互联网电商行业竞争越来越激烈!实时数据信息对于电商行业尤为重要,那如何从实时不断的数据流中获取我们想要的信息呢?以下案例是流计算的合作伙伴袋鼠云用阿里云流计算来解决电商订单管理案例。

场景案例

统计商铺的订单总数和总的销量

业务架构图

undefined | center

业务流程:

  1. 用阿里云的DTS(DTS信息同步)把用户的数据同步到大数据总线(DATAHUB)。
  2. 阿里云流计算订阅大数据总线(DATAHUB)的数据进行实时计算。
  3. 将实时数据插入到RDS的云数据库
  4. 再通过阿里云的DATAV或者是其他的大屏做数据展示。

准备工作

RDS->DataHub 数据实时同步,是将 RDS for MySQL 产生的增量数据数据实时同步到 DataHub 中的 topic。
由RDS经过DTS数据同步到大数据总线(DATAHUB)后 DataHub表Schema信息。

可以参考RDS 到 DataHub 数据实时同步

1.订单源表

字段名 数据类型 详情
dts_ordercodeofsys varchar 订单编号
dts_paytime varchar 订单付款时间
dts_deliveredtime varchar 订单发货时间
dts_storecode varchar 店铺编号
dts_warehousecode varchar 仓库code
dts_cancelled bigint 是否取消
dts_delivered bigint 是否发货
dts_receivercity varchar 收货人城市
dts_receiverprovince varchar 收货人省份
dts_record_id varchar 记录ID
dts_operation_flag varchar 操作Flag
dts_instance_id varchar 数据库instanceId
dts_db_name varchar 数据库名
dts_table_name varchar 数据表
dts_utc_timestamp varchar 更新时间
dts_before_flag varchar 变更前标识
dts_after_flag varchar 变更后标识

2.订单详情源表

字段名 数据类型 详情
dts_ordercodeofsys varchar 订单编号
dts_skuname varchar 商品名字
dts_skucode varchar 商品编号
dts_quantity bigint 数量
dts_dividedamount double 发货金额
dts_salechanneldividedamount double 渠道销售金额
dts_initialcost double 成本
dts_record_id varchar 记录ID
dts_operation_flag varchar 操作Flag
dts_instance_id varchar 数据库instanceId
dts_db_name varchar 数据库名字
dts_table_name varchar 表名
dts_utc_timestamp varchar 更新时间
dts_before_flag varchar 变更前标识
dts_after_flag varchar 变更后标识

业务逻辑

--数据的订单源表
create table orders_real(
     dts_ordercodeofsys   varchar, 
     dts_paytime          varchar, 
     dts_deliveredtime    varchar, 
     dts_storecode        varchar, 
     dts_warehousecode    varchar, 
     dts_cancelled        bigint, 
     dts_delivered        bigint, 
     dts_receivercity     varchar, 
     dts_receiverprovince varchar, 
     dts_record_id        varchar, 
     dts_operation_flag   varchar, 
     dts_instance_id      varchar, 
     dts_db_name          varchar, 
     dts_table_name       varchar, 
     dts_utc_timestamp    varchar, 
     dts_before_flag      varchar, 
     dts_after_flag       varchar 
) with (
  type='datahub',
  endPoint='http://dh-cn-XXXXX.com',
  project='项目名',
  topic='表名',
  accessId='自己的ID',
  accessKey='自己的KEY'
); 

create table orderdetail_real(
     dts_ordercodeofsys            varchar, 
     dts_skuname                   varchar,
     dts_skucode                   varchar,
     dts_quantity                  bigint ,
     dts_dividedamount             double,
     dts_salechanneldividedamount  double,
     dts_initialcost               double,
     dts_record_id                 varchar,
     dts_operation_flag            varchar,
     dts_instance_id               varchar,
     dts_db_name                   varchar,
     dts_table_name                varchar,
     dts_utc_timestamp             varchar,
     dts_before_flag               varchar,
     dts_after_flag                varchar
) with (
  type='datahub',
  endPoint='http://dh-cn-XXXX.com',
  project='项目名',
  topic='表名',
  accessId='自己的ID',
  accessKey='自己的KEY'
); 


create table ads_all_count_amount(
    bill_date     varchar,--下单时间
    bill_count    bigint,--总的订单总数
    qty           bigint,--总的销售量
    primary key (bill_date) 
) with (
  type='rds',
  url='jdbc:mysql://rm-XXXX.mysql.rds.aXXXXcs.com:3306/XXXX',
  tableName='数据库表名',
  userName='数据库的账号',
  password='数据库的密码'
);

--订单源表,最新交易时间的商品编号
CREATE VIEW new_paytime AS
SELECT 
dts_ordercodeofsys, 
MAX(dts_paytime) AS dts_paytime
    FROM orders_real
    GROUP BY dts_ordercodeofsys

--订单详情表,有效的订单的订单编码、商品名称、商品编号、数量的信息
CREATE VIEW new_orderdetail AS
SELECT 
dts_ordercodeofsys, 
dts_skuname, 
dts_skucode,
CASE WHEN dts_operation_flag = 'U'
        AND dts_before_flag = 'Y'
        AND dts_after_flag = 'N' THEN -1 * dts_quantity 
    WHEN dts_operation_flag = 'U'
        AND dts_before_flag = 'N'
        AND dts_after_flag = 'Y' THEN dts_quantity 
    WHEN dts_operation_flag = 'D' THEN -1 * dts_quantity 
    ELSE dts_quantity 
    END AS dts_quantity
        FROM 
        orderdetail_real

--订单总单数,总销售量
INSERT INTO ads_all_count_amount
SELECT 
    from_unixtime(CAST(a.dts_paytime AS bigint) / 1000000, 'yyyyMMdd') AS bill_date, 
    COUNT(DISTINCT a.dts_ordercodeofsys) AS bill_count, 
    SUM(b.dts_quantity) AS qty
from 
    (new_paytime) a 
join 
    (new_orderdetail) b
ON a.dts_ordercodeofsys = b.dts_ordercodeofsys
GROUP BY 
        from_unixtime(CAST(a.dts_paytime AS bigint) / 1000000, 'yyyyMMdd')

难点解析

为了方便大家理解结构化代码和代码维护,我们推荐使用View把业务逻辑差分成三个模块。

模块一

首先根据订单编号做分组,因为同一个编号订单会有多次业务操作(例如下单、付款、发货),会在Binlog日志中形成多条同一订单编号的订单流水记录。使用MAX(dts_paytime)获取同一编号的最后一次操作数据库最终付款交易时间。

CREATE VIEW new_paytime AS
SELECT 
    dts_ordercodeofsys, 
    MAX(dts_paytime) AS dts_paytime
FROM orders_real
GROUP BY dts_ordercodeofsys

模块二

数据库日志会获取所有的数据记录的变更,而每个订单是有状态的。如列表所示:

字段名 数据类型 详情
dts_record_id varchar 记录ID
dts_operation_flag varchar 操作Flag
dts_instance_id varchar 数据库instanceId
dts_db_name varchar 数据库名字
dts_table_name varchar 表名
dts_utc_timestamp varchar 更新时间
dts_before_flag varchar 变更前标识
dts_after_flag varchar 变更后标识

dts_record_id: 这条增量日志的唯一标识,唯一递增。如果变更类型为 update,那么增量更新会被拆分成 2 条,一条 Insert,一条 Delete。这两条记录具有相同的 record_id。

dts_instance_id: 这条增量日志所对应的数据库的 server id。

dts_db_name: 这条增量更新日志更新的表所在的数据库库名。

dts_table_name:这条增量更新日志更新的表。

dts_operation_flag: 标示这条增量日志的操作类型。取值包括:

I : insert 操作
D : delete 操作
U : update 操作

dts_utc_timestamp: 这条增量日志的操作时间戳,为这个更新操作记录 binlog 的时间戳。这个时间戳为 UTC 时间。

dts_before_flag: 表示这条增量日志后面带的各个 column 值是否更新前的值。取值包括:Y 和 N。当后面的 column 为更新前的值时,dts_before_flag=Y, 当后面的 column 值为更新后的值时,dts_before_flag=N.

dts_after_flag:表示这条增量日志后面带的各个 column 值是否更新后的值。取值包括:Y 和 N。 当后面的 column 为更新前的值时,dts_after_flag=N,当后面的 column 值为更新后的值时,dts_after_flag=Y.

对于不同的操作类型,增量日志中的 dts_before_flag 和 dts_after_flag 定义如下:

  1. 操作类型为:insert

    当操作类型为 insert 时,后面的所有 column 值为新插入的记录值,即为更新后的值。所以 before_flag=N, after_flag=Y
    undefined | center

  2. 操作类型为:update

    当操作类型为 update 时,会将 update 操作拆为 2 条增量日志。这两条增量日志的 dts_record_id ,dts_operation_flag 及 dts_utc_timestamp 相同。
    第一条日志记录了更新前的值,所以 dts_before_flag=Y, dts_after_flag=N
    第二条日志记录了更新后的值,所以 dts_before_flag=N, dts_after_flag=Y
    undefined | center

  3. 操作类型为:delete

    当操作类型为 delete 时,后面的所有 column 值为被删除的记录值,即为更新前的值。所以 dts_before_flag=Y, dts_after_flag=N
    undefined | center

CREATE VIEW new_orderdetail AS
SELECT 
dts_ordercodeofsys, 
dts_skuname, 
dts_skucode,
CASE WHEN dts_operation_flag = 'U'
        AND dts_before_flag = 'Y'
        AND dts_after_flag = 'N' THEN -1 * dts_quantity 
    WHEN dts_operation_flag = 'U'
        AND dts_before_flag = 'N'
        AND dts_after_flag = 'Y' THEN dts_quantity 
    WHEN dts_operation_flag = 'D' THEN -1 * dts_quantity 
    ELSE dts_quantity 
    END AS dts_quantity
        FROM 
        orderdetail_real

怎么判断是有效交易订单呢?

首先是要满足dts_operation_flag=U 或者 dts_operation_flag=I,
然后dts_before_flag代表的是变更前订单状态,dts_after_flag是变更后订单状态;
所以有效交易订单为:

        dts_operation_flag = 'U'
        AND dts_before_flag = 'N'
        AND dts_after_flag = 'Y' THEN dts_quantity
  • 为什么THEN -1 * dts_quantity呢?

    订单的取消或者是交易没有成功在总的销量里也会记录;为了保证总销量的正确性,所以把没有成交的订单数量设为负数在计算总的销量会减去这个数量。

模块三

为什么订单源表和订单详情要做JOIN操作?

new_paytime查出的是最新交易的时间的所有的订单编号;new_orderdetail查询的是所有的有效的订单的订单编码、商品名称、商品编号、数量的信息;两张表JOIN是为整合成一张大表,方便用户来统计订单总数和总的销量。


SELECT 
    from_unixtime(CAST(a.dts_paytime AS bigint) / 1000000, 'yyyyMMdd') AS bill_date, 
    COUNT(DISTINCT a.dts_ordercodeofsys) AS bill_count, 
    SUM(b.dts_quantity) AS qty
from 
    (new_paytime) a 
join 
    (new_orderdetail) b
ON 
    a.dts_ordercodeofsys = b.dts_ordercodeofsys
GROUP BY 
        from_unixtime(CAST(a.dts_paytime AS bigint) / 1000000, 'yyyyMMdd');
目录
相关文章
|
SQL XML Java
程序员都要懂的SQL防注入Mybatis框架SQL防注入
程序员都要懂的SQL防注入Mybatis框架SQL防注入
217 0
|
虚拟化 Windows
VMwareWorkstationPro16的下载与安装,以及vm账号注册的问题
本文介绍了VMware Workstation Pro 16的下载、安装过程以及VMware账号的注册问题,包括如何检查虚拟化支持是否开启、VMware的下载步骤、注册VM账号时的常见问题以及VMware 16的安装步骤。
VMwareWorkstationPro16的下载与安装,以及vm账号注册的问题
|
9月前
|
数据处理 索引 Python
用Python实现数据录入、追加、数据校验并生成表格
本示例展示了如何使用Python和Pandas库实现学生期末考试成绩的数据录入、追加和校验,并生成Excel表格。首先通过`pip install pandas openpyxl`安装所需库,然后定义列名、检查并读取现有数据、用户输入数据、数据校验及保存至Excel文件。程序支持成绩范围验证,确保数据准确性。
353 14
|
存储 Kubernetes Cloud Native
【阿里云云原生专栏】云原生容器存储:阿里云CSI与EBS的高效配合策略
【5月更文挑战第29天】阿里云提供云原生容器存储接口(CSI)和弹性块存储(EBS)解决方案,以应对云原生环境中的数据存储挑战。CSI作为Kubernetes的标准接口简化存储管理,而EBS则提供高性能、高可靠性的块存储服务。二者协同实现动态供应、弹性伸缩及数据备份恢复。示例代码展示了在Kubernetes中使用CSI和EBS创建存储卷的过程。
464 3
|
10月前
|
存储 数据处理 Python
Python科学计算:NumPy与SciPy的高效数据处理与分析
【10月更文挑战第27天】在科学计算和数据分析领域,Python凭借简洁的语法和强大的库支持广受欢迎。NumPy和SciPy作为Python科学计算的两大基石,提供了高效的数据处理和分析工具。NumPy的核心功能是N维数组对象(ndarray),支持高效的大型数据集操作;SciPy则在此基础上提供了线性代数、信号处理、优化和统计分析等多种科学计算工具。结合使用NumPy和SciPy,可以显著提升数据处理和分析的效率,使Python成为科学计算和数据分析的首选语言。
242 3
|
9月前
|
数据采集 监控 数据可视化
数据质量:电商零售数据管理根基
电商零售数据管理是企业数字化发展的核心竞争力。它包括市场洞察、运营优化和客户关系管理,通过数据收集、整理、分析与应用,实现精准决策与高效运营。然而,数据管理面临数据质量、安全与隐私、集成融合及人才短缺等挑战。使用板栗看板等工具,可有效提升数据可视化、实时监控、团队协作与决策优化,助力企业挖掘数据价值,增强市场竞争力。
|
人工智能 监控 Shell
常用的 55 个 Linux Shell 脚本(包括基础案例、文件操作、实用工具、图形化、sed、gawk)
这篇文章提供了55个常用的Linux Shell脚本实例,涵盖基础案例、文件操作、实用工具、图形化界面及sed、gawk的使用。
1939 2
|
设计模式 数据可视化 架构师
从UML类图关系到依赖注入(IoC)
从UML类图关系到依赖注入(IoC)
382 0
从UML类图关系到依赖注入(IoC)
|
存储 开发框架 小程序
【全栈小程序开发路线】手把手教你入门小程序开发,小白必看!
以下内容是结合我项目中实战经验,踩坑记录,大量时间学习小程序的积累,总结分享给大家。 学习路线包括前端基础、小程序开发框架、UI组件库、云开发、周边生态以及插件这几个纬度,学完这些,你也能全栈开发一个属于自己的产品。
977 0
|
人工智能 搜索推荐 vr&ar
安卓系统的发展历程与未来趋势
【2月更文挑战第10天】 安卓系统作为全球手机操作系统市场的领导者,其发展历程和未来趋势备受关注。本文将从安卓系统的起源、发展历程、技术特点以及未来发展趋势等方面进行探讨,分析安卓系统在移动设备领域的影响力以及未来可能的发展方向。

热门文章

最新文章