Dataphin中基于Flink的实时研发

本文涉及的产品
智能数据建设与治理Dataphin,200数据处理单元
简介: 本文将以一个简单的场景来说明如何在Dataphin中进行实时研发



Dataphin从2.7版本开始支持基于Blink的实时研发,提供了流批一体的实时研发功能,支持集团内的实时业务。随着Alibaba Blink逐步向Ververica平台迁移,DataphinV3.2 版本开始Ververica,先在集团内部上线,现在对云上的商用版本也正式发布了。

基于Flink Ververica及开源Flink的Dataphin产品功能与基于Blink的实时研发基本一致,本文将使用一个简单的例子来说明如何创建一个Flink SQL的实时任务。

业务背景


某公司的订单系统使用MySQL作为存储数据库,其中的oms_order为存储订单的表。现在要开始设计一个营销活动,在活动期间基于实时的GMV进行快速的活动方案的调整。

以下为数据流向的示意图:

  1. 订单系统的数据通过Dataphin+Flink 任务的加工,汇总从活动第一天的开始的GMV,并将GMV写入另一个MySQL数据库
  2. QuickBI读取MySQL中的数据,展示实时GMV


接下来我们将在Dataphin上研发这个计算实时GMV的Flink任务,将GMV实时的写入到OLAP分析库中。 注:以下的例子中为简化的模型,非实际业务中的真实模型。

image.png

演示视频



创建数据源

首先我们将创建两个MySQL的数据源

  1. 订单系统
  2. Olap分析


image.png



创建实时元表


创建订单源表(流读)


订单系统中的 oms_order 是包含了订单的信息,我们的实时任务就需要从这张表中实时的抽取增量数据来计算GMV。现在我们需要创建一个订单表对应的实时元表,可以从订单表中读取增量数据。


元表:ods_order

CREATE TEMPORARY TABLE oms_order (`id`  INTNOTNULL,`order_status`  INT,`order_sn`  VARCHAR,`sku_id`  VARCHAR,`sku_quantity`  VARCHAR,`buyer_id`  VARCHAR,`buyer_username`  VARCHAR,`coupon_id` INT,`total_amount`  DOUBLE,`payment_amount`  DOUBLE,`freight_amount`  DOUBLE,`promotion_amount`  DOUBLE,`coupon_amount` DOUBLE,`discount_amount` DOUBLE,`source_type` INT,`gmt_create`  VARCHAR,`gmt_pay` VARCHAR,`gmt_modify`  VARCHAR,PRIMARY KEY (`id`)NOT ENFORCED,`payment_date`  VARCHAR) WITH
('connector'='mysql-cdc','password'='****','hostname'='****','username'='****','port'='3306','database-name'='****','table-name'='oms_order');


创建GMV表


我们需要创建一张结果标,存储GMV的结果

元表:ads_gross_of_day

CREATE TEMPORARY TABLE `ads_gross_of_day` (  `date_id` VARCHARNOTNULL,  `prod_code` VARCHAR,  `sale_amt` DOUBLE,  `sale_num` DOUBLE,  `sale_cost` DOUBLE,  `update_time` VARCHAR,  PRIMARY KEY (`date_id`)NOT ENFORCED
) WITH ('connector'='jdbc','password'='****','username'='****','url'='jdbc:mysql://****:3306/****','table-name'='ads_gross_of_day');

创建Flink SQL流任务

INSERTINTO  ads_gross_of_day
SELECT  DATE_FORMAT(gmt_create ,'yyyy-MM-dd')as date_id
,sku_id as prod_code
,count(sku_quantity)as sale_num
,sum(payment_amount)AS sale_amt
,max(payment_date)as update_time
FROM    ods_order
GROUPBY  DATE_FORMAT(gmt_create ,'yyyy-MM-dd'),sku_id;

现在我们可以开始编写Flink SQL任务了。我们需要从ods_order中读取新增的订单,计算对应的单品的销售总金额。


Flink SQL任务:ads_gross_of_day


Dataphin提供了以下的功能可以帮助简化研发实时计算任务的工作:

  1. 预编译:进行代码的语法正确性、权限有效性及元数据有效性检查
  2. 调试:利用线上的数据或是上传的数据做任务的调试,可快速验证任务的逻辑的正确性。目前该功能仅在Flink VVP引擎下支持。
  3. 测试:在开发环境中试跑任务,读取线上数据,并将结果打印到日志或测试表中。



发布

经过开发环境的测试,我们基本校验了任务的正确性,就可以将任务发布到生产环境中,进行线上试跑了。我们需要将创建的元表 ods_orderads_gross_of_day 和Flink SQL任务 ads_gross_of_day 都发布到生产环境。发布成功后,我们就可以在运维中启动ads_gross_of_day 这个Flink SQL任务了


任务运维

现在我们可以开始启动实时任务了。与离线批任务不同,实时任务是一个常驻的任务,在运维的支持上也与离线任务有所不同。



结语

Dataphin也支持了Flink的流批一体的任务,在某些业务场景中,我们需要使用批任务来校验与修正实时任务的结果,使用流批一体的任务可以让研发和维护更简单。下一篇我们讲用同一个例子来展示如何使用Datpahin实现一个流批一体的任务。

数据的实时性会让数据更有价值,Dataphin未来将持续提升事实研发的性能和体验,支持代码模板,支持自定义数据源,让实时研发更简单。

附录


示例中订单表DDL

CREATETABLE `oms_order` (  `id` bigint(20)NOTNULL AUTO_INCREMENT COMMENT 'id',  `order_status` tinyint(4) DEFAULT NULL COMMENT '订单状态【0->待付款;1->待发货;2->已发货;3->已完成;4->已关闭;5->无效订单】',  `order_sn` char(32) DEFAULT NULL COMMENT '订单号',  `sku_id` char(32) DEFAULT NULL COMMENT '商品编号',  `sku_quantity` char(32) DEFAULT NULL COMMENT '商品数量',  `buyer_id` bigint(20) DEFAULT NULL COMMENT '用户ID',  `buyer_username` varchar(200) DEFAULT NULL COMMENT '用户名',  `coupon_id` bigint(20) DEFAULT NULL COMMENT '使用的优惠券',  `total_amount` decimal(18,4) DEFAULT NULL COMMENT '订单总额',  `payment_amount` decimal(18,4) DEFAULT NULL COMMENT '应付总额',  `freight_amount` decimal(18,4) DEFAULT NULL COMMENT '运费金额',  `promotion_amount` decimal(18,4) DEFAULT NULL COMMENT '促销优化金额(促销价、满减、阶梯价)',  `coupon_amount` decimal(18,4) DEFAULT NULL COMMENT '优惠券抵扣金额',  `discount_amount` decimal(18,4) DEFAULT NULL COMMENT '后台调整订单使用的折扣金额',  `source_type` tinyint(4) DEFAULT NULL COMMENT '订单来源[0->PC;1->APP;1->支付宝小程序]',  `gmt_create` datetime DEFAULT NULL COMMENT '创建时间',  `gmt_pay` datetime DEFAULT NULL COMMENT '支付时间',  `gmt_modify` datetime DEFAULT NULL COMMENT '修改时间',  `payment_date` datetime DEFAULT NULL COMMENT '支付时间',  PRIMARY KEY (`id`)) COMMENT='订单';

示例中的GMV表DDL

CREATETABLE `ads_gross_of_day` (  `date_id` char(32)NOTNULL COMMENT '下单日期',  `prod_code` char(32) DEFAULT NULL COMMENT '商品编号',  `sale_amt` decimal(18,4) DEFAULT NULL COMMENT '销售总额',  `sale_num` decimal(18,4) DEFAULT NULL COMMENT '销售数量',  `sale_cost` decimal(18,4) DEFAULT NULL COMMENT '销售总成本',  `update_time` datetime DEFAULT NULL COMMENT '订单支付时间',  PRIMARY KEY (`date_id`)) COMMENT='当日GMV';



插入订单表

INSERTINTO `dataphin_db`.`oms_order`(`id`, `order_status`, `order_sn`, `sku_id`, `sku_quantity`, `buyer_id`, `buyer_username`, `coupon_id`, `total_amount`, `payment_amount`, `freight_amount`, `promotion_amount`, `coupon_amount`, `discount_amount`, `source_type`, `gmt_create`)VALUES(1,0,'123456789','1234','1',0001,'张三',0001,12.3,9.9,2,0.4,0.5,0.5,2,'2022-01-28 01:00:01');INSERTINTO `dataphin_db`.`oms_order`(`id`, `order_status`, `order_sn`, `sku_id`, `sku_quantity`, `buyer_id`, `buyer_username`, `coupon_id`, `total_amount`, `payment_amount`, `freight_amount`, `promotion_amount`, `coupon_amount`, `discount_amount`, `source_type`, `gmt_create`)VALUES(2,0,'000000002','1234','1',0001,'张三',0001,12.3,9.9,2,0.4,0.5,0.5,2,'2022-01-28 01:15:01');INSERTINTO `dataphin_db`.`oms_order`(`id`, `order_status`, `order_sn`, `sku_id`, `sku_quantity`, `buyer_id`, `buyer_username`, `coupon_id`, `total_amount`, `payment_amount`, `freight_amount`, `promotion_amount`, `coupon_amount`, `discount_amount`, `source_type`, `gmt_create`)VALUES(3,0,'000000003','1234','1',0001,'张三',0001,12.3,9.9,2,0.4,0.5,0.5,2,'2022-01-29 01:20:01');


更新订单状态


REPLACE INTO `dataphin_db`.`oms_order`(`id`, `order_status`, `order_sn`, `sku_id`, `sku_quantity`, `buyer_id`, `buyer_username`, `coupon_id`, `total_amount`, `payment_amount`, `freight_amount`, `promotion_amount`, `coupon_amount`, `discount_amount`, `source_type`, `gmt_create`)VALUES(1,2,'123456789','1234','1',0001,'张三',0001,12.3,9.9,2,0.4,0.5,0.5,2,'2022-01-29 02:01:00');REPLACE INTO `dataphin_db`.`oms_order`(`id`, `order_status`, `order_sn`, `sku_id`, `sku_quantity`, `buyer_id`, `buyer_username`, `coupon_id`, `total_amount`, `payment_amount`, `freight_amount`, `promotion_amount`, `coupon_amount`, `discount_amount`, `source_type`, `gmt_create`)VALUES,(1,3,'123456789','1234','1',0001,'张三',0001,12.3,9.9,2,0.4,0.5,0.5,2,'2022-01-29 03:00:00');REPLACE INTO `dataphin_db`.`oms_order`(`id`, `order_status`, `order_sn`, `sku_id`, `sku_quantity`, `buyer_id`, `buyer_username`, `coupon_id`, `total_amount`, `payment_amount`, `freight_amount`, `promotion_amount`, `coupon_amount`, `discount_amount`, `source_type`, `gmt_create`)VALUES,(1,4,'123456789','1234','1',0001,'张三',0001,12.3,9.9,2,0.4,0.5,0.5,2,'2022-01-29 18:00:00');



相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
分布式计算 关系型数据库 MySQL
Dataphin数据研发
学员将在Dataphin(数据治理产品)集成MySQL数据库进行数据上云,然后利用Dataphin针对上云表进行规范建模。并通过规范建模生成的逻辑表针对需求进行指标/标签开发。
179 0
|
3月前
|
消息中间件 Kafka SQL
|
4月前
|
SQL 关系型数据库 MySQL
如何在Dataphin中构建Flink+Paimon流式湖仓方案
当前大数据处理工业界非常重要的一个大趋势是一体化,尤其是湖仓一体架构。与过去分散的数据仓库和数据湖不同,湖仓一体架构通过将数据存储和处理融为一体,不仅提升了数据访问速度和处理效率,还简化了数据管理流程,降低了资源成本。企业可以更轻松地实现数据治理和分析,从而快速决策。paimon是国内开源的,也是最年轻的成员。 本文主要演示如何在 Dataphin 产品中构建 Flink+Paimon 的流式湖仓方案。
7709 10
如何在Dataphin中构建Flink+Paimon流式湖仓方案
|
5月前
|
资源调度 运维 Kubernetes
Dataphin实时研发任务资源预估与资源配置
在企业用户使用Dataphin的实时研发模块时,有两个基本问题是必须考虑的: 1. 短期上线一个实时业务,需要准备多少资源?企业在未来一年中,需要提前准备多少服务器/云资源? 2. 上线实时任务时,怎么配置需要的资源? 本文对这两个问题做简单介绍,期望企业用户能够快速理解资源预估原理和资源配置方法。
100 5
|
4月前
|
监控 数据可视化 BI
基于Dataphin+Flink构建期货交易监察实时应用
新一代证券交易监察系统利用大数据和实时计算技术强化风险控制、交易数据处理、识别异常交易等能力。通过Dataphin与Flink结合,构建期货交易监察实时数据应用;借助QuickBI用于打造实时看板和预警体系,实现期货交易监察的实时可视化分析和自动化预警。
293 0
|
6月前
|
SQL 运维 调度
Dataphin V3.14 版本升级|研发平台更易用,治理能力更完备,企业级适配更灵活
Dataphin V3.14 重磅升级,平台支持企业级适配,适配企业特色;研发体验易用性提升,数据研发更高效、任务运维更便捷;数据治理能力更完备,支持多对象批量操作,规则级告警配置、分级分类自动继承继承!
413 0
|
12月前
|
运维 算法 安全
带你读《构建企业级好数据(Dataphin智能数据建设与治理白皮书)》——4. 特色研发能力
带你读《构建企业级好数据(Dataphin智能数据建设与治理白皮书)》——4. 特色研发能力
351 1
|
6月前
|
SQL 运维 监控
Dataphin V3.9 版本升级|支持一站式数据汇聚处理、优化研发体验、提升数据治理能力
Dataphin V3.9 版本升级|支持一站式数据汇聚处理、优化研发体验、提升数据治理能力
149 0
|
12月前
|
数据采集 调度 监控
带你读《构建企业级好数据(Dataphin智能数据建设与治理白皮书)》——3. 研发:高效建设,稳定运行
带你读《构建企业级好数据(Dataphin智能数据建设与治理白皮书)》——3. 研发:高效建设,稳定运行
305 0
|
12月前
|
存储 消息中间件 数据可视化
Dataphin实时研发实践—电商场景下的实时数据大屏构建
实时数据大屏是实时计算的重要应用场景之一,广泛应用在电商业务中,用于实时监控和分析电商平台的运营情况。通过大屏展示实时的销售额、订单量、用户活跃度、商品热度等数据指标,帮助业务人员随时了解业务的实时状态,快速发现问题和机会。同时,通过数据可视化和趋势分析,大屏也提供了决策支持和优化运营的功能,帮助业务人员做出及时的决策和调整策略,优化电商业务的运营效果。 下面以电商业务为背景,介绍如何构建经典实时数仓,实现实时数据从业务库到ODS层、DWD层、DWS层全链路流转,基于Dataphin和Quick BI实现实时数据大屏。
496 0

相关产品

  • 智能数据建设与治理 Dataphin