【阿里云流计算】- 电商实时态势感知/订单地理分布案例-阿里云开发者社区

开发者社区> 梁程> 正文

【阿里云流计算】- 电商实时态势感知/订单地理分布案例

简介:
+关注继续查看

目标

通过这个案例,希望你能掌握下面几个知识点并灵活运用:

1.业务场景描述

假设你是一家国内大型食品电商公司的CEO,你们公司的愿景就是让天下的吃货都吃得好。

但中国之大,众口难调,每个地方都有自己独特的饮食文化:从南到北,口味由咸转淡,从西到东,口味由辣转甜,从陆到海,口味由重转轻……

作为CEO,你自然十分关心一个问题:自己有没有服务好全国各地的吃货?同时又作为数据时代的弄潮儿,你并不屑于去做用户调研,而是准备用数据化的方式来解决这个问题:统计收货人的地理分布,看看哪个多,哪个少,少的是不是因为网站上缺少符合这个地方口味的食品?是的话尽快调整品类分布。

如果能快速的完成这一连串的动作,那么众口不再难调。

动机有了,思路有了,剩下的就让阿里云流计算来帮你吧。

2.流计算解决方案

2.1数据流

系统订单是实时产生的,数据格式如下:
(注:为了聚焦核心逻辑,订单数据格式做了大量精简,只保留了与案例有关的属性)

CREATE TABLE source_order (
    id VARCHAR,-- 订单ID
    seller_id VARCHAR, --卖家ID
    account_id VARCHAR,--买家ID
    receive_address_id VARCHAR,--收货地址ID
    total_price VARCHAR,--订单金额
    pay_time VARCHAR --订单支付时间
) WITH (
    type='datahub',
    endPoint='http://dh-cn-hangzhou.aliyun-inc.com',
    project='xxx',--你的project
    topic='xxx',--你的topic
    roleArn='xxx',--你的roleArn
    batchReadSize='500'
);

订单是一种流数据,源源不断的产生,本案例把这份数据发送到datahub中。

在电商系统里,订单与订单地址一般都是分开存储的(下单人可以给多个地址下单),所以在订单创建时并没有收货地址,只有在订单提交时才真正的知道收货地址。

所以订单地址也是一种流数据,其随着订单的创建而不断增加,同样,这份数据也发送到datahub中。

假设订单地址的数据格式如下:

CREATE TABLE source_order_receive_address ( 
     id VARCHAR,--收货地址ID 
     full_name VARCHAR,--收货人全名 
     mobile_number VARCHAR,--收货人手机号 
     detail_address VARCHAR,--收货详细地址 
     province VARCHAR,--收货省份 
     city_id VARCHAR,--收货城市 
     create_time VARCHAR --创建时间 
 ) WITH ( 
     type='datahub', 
     endPoint='http://dh-cn-hangzhou.aliyun-inc.com', 
     project='xxx',--你的project 
     topic='xxx',--你的topic 
     roleArn='xxx',--你的roleArn 
     batchReadSize='500' 
 ); 

订单地址里保存的是城市的id(city_id),为了获取地理信息,我们还需要一张城市表,这张表存储着城市的地理信息。城市的地理信息是不会变化的,静态的。

假设这张表存储在rds中,格式如下:

CREATE TABLE dim_city ( 
     city_id varchar, 
     city_name varchar,--城市名 
     province_id varchar,--所属省份ID 
     zip_code varchar,--邮编 
     lng varchar,--经度 
     lat varchar,--纬度 
  PRIMARY KEY (city_id), 
  PERIOD FOR SYSTEM_TIME --定义为维表 
 ) WITH ( 
     type= 'rds', 
     url = 'xxxx',--你的数据库url 
     tableName = 'xxx',--你的表名 
     userName = 'xxx',--你的用户名 
     password = 'xxx'--你的密码 
 );

三份数据都有了,其中有两份流式数据,一份静态数据。

我们的目标是按日统计不同地域订单(总销售额)的分布情况,假设结果的数据格式如下:

 CREATE TABLE result_order_city_distribution ( 
     summary_date bigint,--统计日期 
     city_id bigint,--城市ID 
     city_name varchar,--城市名 
     province_id bigint,--所属省份ID 
     gmv double,--总销售额 
     lng varchar,--经度 
     lat varchar,--纬度 
     primary key (summary_date,city_id) 
    ) WITH ( 
        type= 'rds', 
        url = 'xxxx',--你的数据库url 
        tableName = 'xxx',--你的表名 
        userName = 'xxx',--你的用户名 
        password = 'xxx'--你的密码 
    );  

这份数据存于RDS中,并且实时更新。

为了完成我们的目标,设计下面的数据流:
2233
计算分三个步骤:

  1. 订单数据流与订单收货地址数据流join,得到订单的收货地址id;
  2. 根据城市ID和日期统计销售额;
  3. 统计结果与城市信息join,补齐城市name和地理位置信息,得到最终数据;

2.2 数据开发

根据上边的数据流,我们开始进行开发,从模式上讲,这个任务是一个典型的【双流join然后维表join】任务。

代码如下:

--定义输入表:订单数据

   CREATE TABLE source_order ( 
     id VARCHAR,-- 订单ID 
     seller_id VARCHAR, --卖家ID 
     account_id VARCHAR,--买家ID 
     receive_address_id VARCHAR,--收货地址ID 
     total_price VARCHAR,--订单金额 
     pay_time VARCHAR --订单支付时间 
    ) WITH ( 
        type='datahub', 
       endPoint='http://dh-cn-hangzhou.aliyun-inc.com', 
        project='xxx',--你的project 
        topic='xxx',--你的topic 
        roleArn='xxx',--你的roleArn 
        batchReadSize='500' 
    ); 

--定义输入表:订单收货地址

   CREATE TABLE source_order_receive_address ( 
     id VARCHAR,--收货地址ID 
     full_name VARCHAR,--收货人全名 
     mobile_number VARCHAR,--收货人手机号 
     detail_address VARCHAR,--收货详细地址 
     city_id VARCHAR,--收货城市 
     create_time VARCHAR --创建时间 
    ) WITH ( 
        type='datahub', 
       endPoint='http://dh-cn-hangzhou.aliyun-inc.com', 
        project='xxx',#你的project 
        topic='xxx',#你的topic 
        roleArn='xxx',#你的roleArn 
        batchReadSize='500' 
    ); 

--定义维表:城市信息

   CREATE TABLE dim_city ( 
     city_id varchar, 
     city_name varchar,--城市名 
     province_id varchar,--所属省份ID 
     zip_code varchar,--邮编 
     lng varchar,--经度 
     lat varchar,--纬度 
    PRIMARY KEY (city_id), 
    PERIOD FOR SYSTEM_TIME --定义为维表 
    ) WITH ( 
        type= 'rds', 
        url = 'xxxx',--你的数据库url 
        tableName = 'xxx',--你的表名 
        userName = 'xxx',--你的用户名 
        password = 'xxx'--你的密码 
    ); 

--定义结果表:销售额城市分布表

  CREATE TABLE result_order_city_distribution ( 
     summary_date varchar,--统计日期 
     city_id bigint,--城市ID 
     city_name varchar,--城市名 
     province_id bigint,--所属省份ID 
     gmv double,--总销售额 
     lng varchar,--经度 
     lat varchar,--纬度 
     primary key (summary_date,city_id) 
    ) WITH ( 
        type= 'rds', 
        url = 'xxxx',--你的数据库url 
        tableName = 'xxx',--你的表名 
        userName = 'xxx',--你的用户名 
        password = 'xxx'--你的密码 
    ); 

--完成计算,通过SQL这种方式,只用下面几行代码就完成了整个过程,其优越性可见一斑。

 insert into result_order_city_distribution 
 select 
 d.summary_date 
 ,cast(d.city_id as BIGINT) 
 ,e.city_name 
 ,cast(e.province_id as BIGINT) 
 ,d.gmv 
 ,e.lng 
 ,e.lat 
 ,e.lnglat 
 from 
 ( 
         select 
         DISTINCT 
         DATE_FORMAT(a.pay_time,'yyyyMMdd') as summary_date 
         ,b.city_id as city_id 
         ,round(sum(cast(a.total_price as double)),2) as gmv 
         from source_order as a 
         join source_order_receive_address as b on a.receive_address_id =b.id 
         group by DATE_FORMAT(a.pay_time,'yyyyMMdd'),b.city_id 
         --双流join,并根据日期和城市ID得到销售额分布 
 )d join dim_city FOR SYSTEM_TIME AS OF PROCTIME() as e on d.city_id = e.city_id 
 -- join维表,补齐城市地理信息,得到最终结果 
 ; 

2.3 数据准备&调试&上线&运维

这部分可以参考文档,本案例不在赘述。

3.总结

整个案例从逻辑上说非常简单,其覆盖的知识点和各知识点的作用如下:

  • 双流join,详见文档链接 ,完成了订单数据与订单收货地址的关联
  • 维表join,详见文档链接 ,完成了城市信息的补齐
  • group by语句,详见文档链接 ,完成了统计计算

相信你早就掌握了单个的知识点,所以我们把不同的知识点结合起来做了一个案例,通过这个案例,你一定有了不小的提高。

一定要注意体会维表join的应用场景,什么样的数据通过维表来加载,什么样的数据通过流数据来加载,为什么订单收货地址是流式数据。

SQL很简单,流计算很简单,一旦你学会灵活使用,却能展示出强大的能力。

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
阿里云服务器怎么设置密码?怎么停机?怎么重启服务器?
如果在创建实例时没有设置密码,或者密码丢失,您可以在控制台上重新设置实例的登录密码。本文仅描述如何在 ECS 管理控制台上修改实例登录密码。
4068 0
2017云栖大会·杭州峰会:《在线用户行为分析:基于流式计算的数据处理及应用》Workshop-入口
2017云栖大会·杭州峰会:《在线用户行为分析:基于流式计算的数据处理及应用》Workshop-入口
3099 0
《Kafka Stream》调研:一种轻量级流计算模式
流计算,已经有Storm、Spark,Samza,包括最近新起的Flink,Kafka为什么再自己做一套流计算呢?Kafka Stream 与这些框架比有什么优势?Samza、Consumer Group已经包装了Kafka轻量级的消费功能,难道不够吗? 花了一些时间阅读[docs](http
23845 0
Flink on Zeppelin 流计算处理最佳实践
欢迎钉钉扫描文章底部二维码进入 EMR Studio 用户交流群 直接和讲师交流讨论~ 点击以下链接直接观看直播回放:https://developer.aliyun.com/live/247106
395 0
以物流案例看基于表格存储实时数据流的serverless计算
许多业务有实时数据处理的需求。相较于传统的数据库+流计算+应用服务器方案,使用基于表格存储实时数据流的Serverless计算方案有自动弹性伸缩及开发简单、部署简单等优点。本文通过一个想象的物流案例来说明如何实施“基于表格存储实时数据流的Serverless计算方案”。
6357 0
2017云栖大会·杭州峰会:《在线用户行为分析:基于流式计算的数据处理及应用》实验环境准备
2017云栖大会·杭州峰会:《在线用户行为分析:基于流式计算的数据处理及应用》实验环境准备
5209 0
用PostgreSQL支持含有更新,删除,插入的实时流式计算
大多数的流式计算产品只支持APPEND ONLY的应用场景,也就是只有插入,没有更新和删除操作。如果要实现更新和删除的实时流式计算,在PostgreSQL中可以这样来实现。在此前你可以阅读我以前写的文章来了解PG是如何处理一天一万亿的实时流式计算的:https://yq.aliyun.com/ar.
7300 0
阿里云服务器端口号设置
阿里云服务器初级使用者可能面临的问题之一. 使用tomcat或者其他服务器软件设置端口号后,比如 一些不是默认的, mysql的 3306, mssql的1433,有时候打不开网页, 原因是没有在ecs安全组去设置这个端口号. 解决: 点击ecs下网络和安全下的安全组 在弹出的安全组中,如果没有就新建安全组,然后点击配置规则 最后如上图点击添加...或快速创建.   have fun!  将编程看作是一门艺术,而不单单是个技术。
4485 0
流计算框架 Flink 与 Storm 的性能对比
分布式实时计算框架 Flink 与 Storm 进行性能对比,为实时计算平台和业务提供数据参考。
1189 0
+关注
9
文章
0
问答
文章排行榜
最热
最新
相关电子书
更多
文娱运维技术
立即下载
《SaaS模式云原生数据仓库应用场景实践》
立即下载
《看见新力量:二》电子书
立即下载