【阿里云流计算】- 电商订单和销量统计案例

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
日志服务 SLS,月写入数据量 50GB 1个月
简介:

背景

随着新零售的概念慢慢崛起,互联网电商行业竞争越来越激烈!实时数据信息对于电商行业尤为重要,那如何从实时不断的数据流中获取我们想要的信息呢?以下案例是流计算的合作伙伴袋鼠云用阿里云流计算来解决电商订单管理案例。

场景案例

统计商铺的订单总数和总的销量

业务架构图

undefined | center

业务流程:

  1. 用阿里云的DTS(DTS信息同步)把用户的数据同步到大数据总线(DATAHUB)。
  2. 阿里云流计算订阅大数据总线(DATAHUB)的数据进行实时计算。
  3. 将实时数据插入到RDS的云数据库
  4. 再通过阿里云的DATAV或者是其他的大屏做数据展示。

准备工作

RDS->DataHub 数据实时同步,是将 RDS for MySQL 产生的增量数据数据实时同步到 DataHub 中的 topic。
由RDS经过DTS数据同步到大数据总线(DATAHUB)后 DataHub表Schema信息。

可以参考RDS 到 DataHub 数据实时同步

1.订单源表

字段名 数据类型 详情
dts_ordercodeofsys varchar 订单编号
dts_paytime varchar 订单付款时间
dts_deliveredtime varchar 订单发货时间
dts_storecode varchar 店铺编号
dts_warehousecode varchar 仓库code
dts_cancelled bigint 是否取消
dts_delivered bigint 是否发货
dts_receivercity varchar 收货人城市
dts_receiverprovince varchar 收货人省份
dts_record_id varchar 记录ID
dts_operation_flag varchar 操作Flag
dts_instance_id varchar 数据库instanceId
dts_db_name varchar 数据库名
dts_table_name varchar 数据表
dts_utc_timestamp varchar 更新时间
dts_before_flag varchar 变更前标识
dts_after_flag varchar 变更后标识

2.订单详情源表

字段名 数据类型 详情
dts_ordercodeofsys varchar 订单编号
dts_skuname varchar 商品名字
dts_skucode varchar 商品编号
dts_quantity bigint 数量
dts_dividedamount double 发货金额
dts_salechanneldividedamount double 渠道销售金额
dts_initialcost double 成本
dts_record_id varchar 记录ID
dts_operation_flag varchar 操作Flag
dts_instance_id varchar 数据库instanceId
dts_db_name varchar 数据库名字
dts_table_name varchar 表名
dts_utc_timestamp varchar 更新时间
dts_before_flag varchar 变更前标识
dts_after_flag varchar 变更后标识

业务逻辑

--数据的订单源表
create table orders_real(
     dts_ordercodeofsys   varchar, 
     dts_paytime          varchar, 
     dts_deliveredtime    varchar, 
     dts_storecode        varchar, 
     dts_warehousecode    varchar, 
     dts_cancelled        bigint, 
     dts_delivered        bigint, 
     dts_receivercity     varchar, 
     dts_receiverprovince varchar, 
     dts_record_id        varchar, 
     dts_operation_flag   varchar, 
     dts_instance_id      varchar, 
     dts_db_name          varchar, 
     dts_table_name       varchar, 
     dts_utc_timestamp    varchar, 
     dts_before_flag      varchar, 
     dts_after_flag       varchar 
) with (
  type='datahub',
  endPoint='http://dh-cn-XXXXX.com',
  project='项目名',
  topic='表名',
  accessId='自己的ID',
  accessKey='自己的KEY'
); 

create table orderdetail_real(
     dts_ordercodeofsys            varchar, 
     dts_skuname                   varchar,
     dts_skucode                   varchar,
     dts_quantity                  bigint ,
     dts_dividedamount             double,
     dts_salechanneldividedamount  double,
     dts_initialcost               double,
     dts_record_id                 varchar,
     dts_operation_flag            varchar,
     dts_instance_id               varchar,
     dts_db_name                   varchar,
     dts_table_name                varchar,
     dts_utc_timestamp             varchar,
     dts_before_flag               varchar,
     dts_after_flag                varchar
) with (
  type='datahub',
  endPoint='http://dh-cn-XXXX.com',
  project='项目名',
  topic='表名',
  accessId='自己的ID',
  accessKey='自己的KEY'
); 


create table ads_all_count_amount(
    bill_date     varchar,--下单时间
    bill_count    bigint,--总的订单总数
    qty           bigint,--总的销售量
    primary key (bill_date) 
) with (
  type='rds',
  url='jdbc:mysql://rm-XXXX.mysql.rds.aXXXXcs.com:3306/XXXX',
  tableName='数据库表名',
  userName='数据库的账号',
  password='数据库的密码'
);

--订单源表,最新交易时间的商品编号
CREATE VIEW new_paytime AS
SELECT 
dts_ordercodeofsys, 
MAX(dts_paytime) AS dts_paytime
    FROM orders_real
    GROUP BY dts_ordercodeofsys

--订单详情表,有效的订单的订单编码、商品名称、商品编号、数量的信息
CREATE VIEW new_orderdetail AS
SELECT 
dts_ordercodeofsys, 
dts_skuname, 
dts_skucode,
CASE WHEN dts_operation_flag = 'U'
        AND dts_before_flag = 'Y'
        AND dts_after_flag = 'N' THEN -1 * dts_quantity 
    WHEN dts_operation_flag = 'U'
        AND dts_before_flag = 'N'
        AND dts_after_flag = 'Y' THEN dts_quantity 
    WHEN dts_operation_flag = 'D' THEN -1 * dts_quantity 
    ELSE dts_quantity 
    END AS dts_quantity
        FROM 
        orderdetail_real

--订单总单数,总销售量
INSERT INTO ads_all_count_amount
SELECT 
    from_unixtime(CAST(a.dts_paytime AS bigint) / 1000000, 'yyyyMMdd') AS bill_date, 
    COUNT(DISTINCT a.dts_ordercodeofsys) AS bill_count, 
    SUM(b.dts_quantity) AS qty
from 
    (new_paytime) a 
join 
    (new_orderdetail) b
ON a.dts_ordercodeofsys = b.dts_ordercodeofsys
GROUP BY 
        from_unixtime(CAST(a.dts_paytime AS bigint) / 1000000, 'yyyyMMdd')

难点解析

为了方便大家理解结构化代码和代码维护,我们推荐使用View把业务逻辑差分成三个模块。

模块一

首先根据订单编号做分组,因为同一个编号订单会有多次业务操作(例如下单、付款、发货),会在Binlog日志中形成多条同一订单编号的订单流水记录。使用MAX(dts_paytime)获取同一编号的最后一次操作数据库最终付款交易时间。

CREATE VIEW new_paytime AS
SELECT 
    dts_ordercodeofsys, 
    MAX(dts_paytime) AS dts_paytime
FROM orders_real
GROUP BY dts_ordercodeofsys

模块二

数据库日志会获取所有的数据记录的变更,而每个订单是有状态的。如列表所示:

字段名 数据类型 详情
dts_record_id varchar 记录ID
dts_operation_flag varchar 操作Flag
dts_instance_id varchar 数据库instanceId
dts_db_name varchar 数据库名字
dts_table_name varchar 表名
dts_utc_timestamp varchar 更新时间
dts_before_flag varchar 变更前标识
dts_after_flag varchar 变更后标识

dts_record_id: 这条增量日志的唯一标识,唯一递增。如果变更类型为 update,那么增量更新会被拆分成 2 条,一条 Insert,一条 Delete。这两条记录具有相同的 record_id。

dts_instance_id: 这条增量日志所对应的数据库的 server id。

dts_db_name: 这条增量更新日志更新的表所在的数据库库名。

dts_table_name:这条增量更新日志更新的表。

dts_operation_flag: 标示这条增量日志的操作类型。取值包括:

I : insert 操作
D : delete 操作
U : update 操作

dts_utc_timestamp: 这条增量日志的操作时间戳,为这个更新操作记录 binlog 的时间戳。这个时间戳为 UTC 时间。

dts_before_flag: 表示这条增量日志后面带的各个 column 值是否更新前的值。取值包括:Y 和 N。当后面的 column 为更新前的值时,dts_before_flag=Y, 当后面的 column 值为更新后的值时,dts_before_flag=N.

dts_after_flag:表示这条增量日志后面带的各个 column 值是否更新后的值。取值包括:Y 和 N。 当后面的 column 为更新前的值时,dts_after_flag=N,当后面的 column 值为更新后的值时,dts_after_flag=Y.

对于不同的操作类型,增量日志中的 dts_before_flag 和 dts_after_flag 定义如下:

  1. 操作类型为:insert

    当操作类型为 insert 时,后面的所有 column 值为新插入的记录值,即为更新后的值。所以 before_flag=N, after_flag=Y
    undefined | center

  2. 操作类型为:update

    当操作类型为 update 时,会将 update 操作拆为 2 条增量日志。这两条增量日志的 dts_record_id ,dts_operation_flag 及 dts_utc_timestamp 相同。
    第一条日志记录了更新前的值,所以 dts_before_flag=Y, dts_after_flag=N
    第二条日志记录了更新后的值,所以 dts_before_flag=N, dts_after_flag=Y
    undefined | center

  3. 操作类型为:delete

    当操作类型为 delete 时,后面的所有 column 值为被删除的记录值,即为更新前的值。所以 dts_before_flag=Y, dts_after_flag=N
    undefined | center

CREATE VIEW new_orderdetail AS
SELECT 
dts_ordercodeofsys, 
dts_skuname, 
dts_skucode,
CASE WHEN dts_operation_flag = 'U'
        AND dts_before_flag = 'Y'
        AND dts_after_flag = 'N' THEN -1 * dts_quantity 
    WHEN dts_operation_flag = 'U'
        AND dts_before_flag = 'N'
        AND dts_after_flag = 'Y' THEN dts_quantity 
    WHEN dts_operation_flag = 'D' THEN -1 * dts_quantity 
    ELSE dts_quantity 
    END AS dts_quantity
        FROM 
        orderdetail_real

怎么判断是有效交易订单呢?

首先是要满足dts_operation_flag=U 或者 dts_operation_flag=I,
然后dts_before_flag代表的是变更前订单状态,dts_after_flag是变更后订单状态;
所以有效交易订单为:

        dts_operation_flag = 'U'
        AND dts_before_flag = 'N'
        AND dts_after_flag = 'Y' THEN dts_quantity
  • 为什么THEN -1 * dts_quantity呢?

    订单的取消或者是交易没有成功在总的销量里也会记录;为了保证总销量的正确性,所以把没有成交的订单数量设为负数在计算总的销量会减去这个数量。

模块三

为什么订单源表和订单详情要做JOIN操作?

new_paytime查出的是最新交易的时间的所有的订单编号;new_orderdetail查询的是所有的有效的订单的订单编码、商品名称、商品编号、数量的信息;两张表JOIN是为整合成一张大表,方便用户来统计订单总数和总的销量。


SELECT 
    from_unixtime(CAST(a.dts_paytime AS bigint) / 1000000, 'yyyyMMdd') AS bill_date, 
    COUNT(DISTINCT a.dts_ordercodeofsys) AS bill_count, 
    SUM(b.dts_quantity) AS qty
from 
    (new_paytime) a 
join 
    (new_orderdetail) b
ON 
    a.dts_ordercodeofsys = b.dts_ordercodeofsys
GROUP BY 
        from_unixtime(CAST(a.dts_paytime AS bigint) / 1000000, 'yyyyMMdd');
目录
相关文章
|
7月前
|
数据采集 存储 监控
淘宝详情数据采集(商品上货,数据分析,属性详情,价格监控),海量数据值得get
淘宝详情数据采集涉及多个环节,包括商品上货、数据分析、属性详情以及价格监控等。在采集这些数据时,尤其是面对海量数据时,需要采取有效的方法和技术来确保数据的准确性和完整性。以下是一些关于淘宝详情数据采集的建议:
|
7月前
|
数据采集 API 开发者
快手商品数据采集接口
快手商品数据采集接口
|
消息中间件 NoSQL Kafka
118 Storm实时交易金额计算案例分析
118 Storm实时交易金额计算案例分析
88 0
|
4月前
|
存储 监控 API
1688商品评论数据接口实战指南:挖掘电商洞察
要获取1688商品评论数据,先注册1688开放平台并登录,然后用Python等工具调用API获取评论信息,如内容、评分等,并存储或分析这些数据。使用时须遵守平台规定,保障数据安全及隐私,利用接口进行舆情监控、提升品牌形象,并留意接口更新以优化业务流程。
|
7月前
|
安全 数据可视化 数据挖掘
数据分享|AARRR模型淘宝用户行为分析、电商销售分析
数据分享|AARRR模型淘宝用户行为分析、电商销售分析
|
关系型数据库 MySQL 数据库
实现淘宝母婴订单实时查询和实时大屏
本场景中订单和婴儿信息存储在MySQL中,使用Flink实时把它写到Elasticsearch中;数据经过分组聚合后,计算出订单数量和婴儿出生的关系,实时展示到Kibana大屏中。
|
7月前
|
存储 搜索推荐 数据挖掘
淘宝商品详情API:挖掘实时数据金矿,点燃电商增长引擎
随着互联网的快速发展,电子商务在全球范围内得到了广泛应用。作为中国电商市场的领军者,淘宝不仅拥有庞大的用户群体和海量的商品数据,还提供了一系列的API接口,使得第三方开发者可以方便地获取并利用这些数据。其中,淘宝商品详情API是淘宝开放平台中非常重要的一项接口,它能够获取到淘宝网内商品的详细信息,从而帮助开发者更好地服务用户,提升电商业务的运营效率。 本文将详细介绍淘宝商品详情API的应用场景、使用方法和注意事项,并通过示例代码展示如何使用该API获取商品详情数据。同时,本文还将探讨如何利用这些数据实现个性化推荐、提升销售转化率等业务目标。
|
7月前
|
存储 JSON 监控
京东商品详情接口在电商行业中的重要性及实时数据获取实现
随着电子商务的快速发展,电商平台上的商品数量不断增加,竞争也越来越激烈。对于电商企业来说,如何快速、准确地获取商品详情信息变得至关重要。京东作为中国最大的电商平台之一,提供了商品详情接口,为电商企业提供了强大的支持。本文将深入探讨京东商品详情接口在电商行业中的重要性,并通过实例代码介绍如何实现实时数据获取。
|
SQL 关系型数据库 MySQL
实现淘宝母婴订单实时查询和可视化|Flink-Learning实战营
本场景将以 阿里云实时计算Flink版为基础,使用 Flink 自带的 MySQL Connector 连接 RDS 云数据库实例、Elasticsearch Connector 连接 Elasticsearch 检索分析服务实例,并以一个淘宝母婴订单实时查询的例子尝试上手 Connector 的数据捕获、数据写入等功能。
570 3
实现淘宝母婴订单实时查询和可视化|Flink-Learning实战营
|
SQL 关系型数据库 MySQL
如何实时统计最近 15 秒的商品销售额|Flink-Learning 实战营
想要了解如何使用 Flink 实时统计最近 15 秒的商品销售额吗?本实验将以阿里云实时计算 Flink 版为基础,使用 Flink 自带的 MySQL Connector 连接 RDS 云数据库实例,并以实时商品销售数据统计的例子,引导开发者上手 Connector 的数据捕获、数据写入等功能。
857 2
如何实时统计最近 15 秒的商品销售额|Flink-Learning 实战营