Dataphin从2.7版本开始支持基于Blink的实时研发,提供了流批一体的实时研发功能,支持集团内的实时业务。随着Alibaba Blink逐步向Ververica平台迁移,DataphinV3.2 版本开始Ververica,先在集团内部上线,现在对云上的商用版本也正式发布了。
基于Flink Ververica及开源Flink的Dataphin产品功能与基于Blink的实时研发基本一致,本文将使用一个简单的例子来说明如何创建一个Flink SQL的实时任务。
业务背景
某公司的订单系统使用MySQL作为存储数据库,其中的oms_order为存储订单的表。现在要开始设计一个营销活动,在活动期间基于实时的GMV进行快速的活动方案的调整。
以下为数据流向的示意图:
- 订单系统的数据通过Dataphin+Flink 任务的加工,汇总从活动第一天的开始的GMV,并将GMV写入另一个MySQL数据库
- QuickBI读取MySQL中的数据,展示实时GMV
接下来我们将在Dataphin上研发这个计算实时GMV的Flink任务,将GMV实时的写入到OLAP分析库中。 注:以下的例子中为简化的模型,非实际业务中的真实模型。
演示视频
创建数据源
首先我们将创建两个MySQL的数据源
- 订单系统
- Olap分析
创建实时元表
创建订单源表(流读)
订单系统中的 oms_order 是包含了订单的信息,我们的实时任务就需要从这张表中实时的抽取增量数据来计算GMV。现在我们需要创建一个订单表对应的实时元表,可以从订单表中读取增量数据。
元表:ods_order
CREATE TEMPORARY TABLE oms_order (`id` INTNOTNULL,`order_status` INT,`order_sn` VARCHAR,`sku_id` VARCHAR,`sku_quantity` VARCHAR,`buyer_id` VARCHAR,`buyer_username` VARCHAR,`coupon_id` INT,`total_amount` DOUBLE,`payment_amount` DOUBLE,`freight_amount` DOUBLE,`promotion_amount` DOUBLE,`coupon_amount` DOUBLE,`discount_amount` DOUBLE,`source_type` INT,`gmt_create` VARCHAR,`gmt_pay` VARCHAR,`gmt_modify` VARCHAR,PRIMARY KEY (`id`)NOT ENFORCED,`payment_date` VARCHAR) WITH ('connector'='mysql-cdc','password'='****','hostname'='****','username'='****','port'='3306','database-name'='****','table-name'='oms_order');
创建GMV表
我们需要创建一张结果标,存储GMV的结果
元表:ads_gross_of_day
CREATE TEMPORARY TABLE `ads_gross_of_day` ( `date_id` VARCHARNOTNULL, `prod_code` VARCHAR, `sale_amt` DOUBLE, `sale_num` DOUBLE, `sale_cost` DOUBLE, `update_time` VARCHAR, PRIMARY KEY (`date_id`)NOT ENFORCED ) WITH ('connector'='jdbc','password'='****','username'='****','url'='jdbc:mysql://****:3306/****','table-name'='ads_gross_of_day');
创建Flink SQL流任务
INSERTINTO ads_gross_of_day SELECT DATE_FORMAT(gmt_create ,'yyyy-MM-dd')as date_id ,sku_id as prod_code ,count(sku_quantity)as sale_num ,sum(payment_amount)AS sale_amt ,max(payment_date)as update_time FROM ods_order GROUPBY DATE_FORMAT(gmt_create ,'yyyy-MM-dd'),sku_id;
现在我们可以开始编写Flink SQL任务了。我们需要从ods_order中读取新增的订单,计算对应的单品的销售总金额。
Flink SQL任务:ads_gross_of_day
Dataphin提供了以下的功能可以帮助简化研发实时计算任务的工作:
- 预编译:进行代码的语法正确性、权限有效性及元数据有效性检查
- 调试:利用线上的数据或是上传的数据做任务的调试,可快速验证任务的逻辑的正确性。目前该功能仅在Flink VVP引擎下支持。
- 测试:在开发环境中试跑任务,读取线上数据,并将结果打印到日志或测试表中。
发布
经过开发环境的测试,我们基本校验了任务的正确性,就可以将任务发布到生产环境中,进行线上试跑了。我们需要将创建的元表 ods_order、ads_gross_of_day 和Flink SQL任务 ads_gross_of_day 都发布到生产环境。发布成功后,我们就可以在运维中启动ads_gross_of_day 这个Flink SQL任务了
任务运维
现在我们可以开始启动实时任务了。与离线批任务不同,实时任务是一个常驻的任务,在运维的支持上也与离线任务有所不同。
结语
Dataphin也支持了Flink的流批一体的任务,在某些业务场景中,我们需要使用批任务来校验与修正实时任务的结果,使用流批一体的任务可以让研发和维护更简单。下一篇我们讲用同一个例子来展示如何使用Datpahin实现一个流批一体的任务。
数据的实时性会让数据更有价值,Dataphin未来将持续提升事实研发的性能和体验,支持代码模板,支持自定义数据源,让实时研发更简单。
附录
示例中订单表DDL
CREATETABLE `oms_order` ( `id` bigint(20)NOTNULL AUTO_INCREMENT COMMENT 'id', `order_status` tinyint(4) DEFAULT NULL COMMENT '订单状态【0->待付款;1->待发货;2->已发货;3->已完成;4->已关闭;5->无效订单】', `order_sn` char(32) DEFAULT NULL COMMENT '订单号', `sku_id` char(32) DEFAULT NULL COMMENT '商品编号', `sku_quantity` char(32) DEFAULT NULL COMMENT '商品数量', `buyer_id` bigint(20) DEFAULT NULL COMMENT '用户ID', `buyer_username` varchar(200) DEFAULT NULL COMMENT '用户名', `coupon_id` bigint(20) DEFAULT NULL COMMENT '使用的优惠券', `total_amount` decimal(18,4) DEFAULT NULL COMMENT '订单总额', `payment_amount` decimal(18,4) DEFAULT NULL COMMENT '应付总额', `freight_amount` decimal(18,4) DEFAULT NULL COMMENT '运费金额', `promotion_amount` decimal(18,4) DEFAULT NULL COMMENT '促销优化金额(促销价、满减、阶梯价)', `coupon_amount` decimal(18,4) DEFAULT NULL COMMENT '优惠券抵扣金额', `discount_amount` decimal(18,4) DEFAULT NULL COMMENT '后台调整订单使用的折扣金额', `source_type` tinyint(4) DEFAULT NULL COMMENT '订单来源[0->PC;1->APP;1->支付宝小程序]', `gmt_create` datetime DEFAULT NULL COMMENT '创建时间', `gmt_pay` datetime DEFAULT NULL COMMENT '支付时间', `gmt_modify` datetime DEFAULT NULL COMMENT '修改时间', `payment_date` datetime DEFAULT NULL COMMENT '支付时间', PRIMARY KEY (`id`)) COMMENT='订单';
示例中的GMV表DDL
CREATETABLE `ads_gross_of_day` ( `date_id` char(32)NOTNULL COMMENT '下单日期', `prod_code` char(32) DEFAULT NULL COMMENT '商品编号', `sale_amt` decimal(18,4) DEFAULT NULL COMMENT '销售总额', `sale_num` decimal(18,4) DEFAULT NULL COMMENT '销售数量', `sale_cost` decimal(18,4) DEFAULT NULL COMMENT '销售总成本', `update_time` datetime DEFAULT NULL COMMENT '订单支付时间', PRIMARY KEY (`date_id`)) COMMENT='当日GMV';
插入订单表
INSERTINTO `dataphin_db`.`oms_order`(`id`, `order_status`, `order_sn`, `sku_id`, `sku_quantity`, `buyer_id`, `buyer_username`, `coupon_id`, `total_amount`, `payment_amount`, `freight_amount`, `promotion_amount`, `coupon_amount`, `discount_amount`, `source_type`, `gmt_create`)VALUES(1,0,'123456789','1234','1',0001,'张三',0001,12.3,9.9,2,0.4,0.5,0.5,2,'2022-01-28 01:00:01');INSERTINTO `dataphin_db`.`oms_order`(`id`, `order_status`, `order_sn`, `sku_id`, `sku_quantity`, `buyer_id`, `buyer_username`, `coupon_id`, `total_amount`, `payment_amount`, `freight_amount`, `promotion_amount`, `coupon_amount`, `discount_amount`, `source_type`, `gmt_create`)VALUES(2,0,'000000002','1234','1',0001,'张三',0001,12.3,9.9,2,0.4,0.5,0.5,2,'2022-01-28 01:15:01');INSERTINTO `dataphin_db`.`oms_order`(`id`, `order_status`, `order_sn`, `sku_id`, `sku_quantity`, `buyer_id`, `buyer_username`, `coupon_id`, `total_amount`, `payment_amount`, `freight_amount`, `promotion_amount`, `coupon_amount`, `discount_amount`, `source_type`, `gmt_create`)VALUES(3,0,'000000003','1234','1',0001,'张三',0001,12.3,9.9,2,0.4,0.5,0.5,2,'2022-01-29 01:20:01');
更新订单状态
REPLACE INTO `dataphin_db`.`oms_order`(`id`, `order_status`, `order_sn`, `sku_id`, `sku_quantity`, `buyer_id`, `buyer_username`, `coupon_id`, `total_amount`, `payment_amount`, `freight_amount`, `promotion_amount`, `coupon_amount`, `discount_amount`, `source_type`, `gmt_create`)VALUES(1,2,'123456789','1234','1',0001,'张三',0001,12.3,9.9,2,0.4,0.5,0.5,2,'2022-01-29 02:01:00');REPLACE INTO `dataphin_db`.`oms_order`(`id`, `order_status`, `order_sn`, `sku_id`, `sku_quantity`, `buyer_id`, `buyer_username`, `coupon_id`, `total_amount`, `payment_amount`, `freight_amount`, `promotion_amount`, `coupon_amount`, `discount_amount`, `source_type`, `gmt_create`)VALUES,(1,3,'123456789','1234','1',0001,'张三',0001,12.3,9.9,2,0.4,0.5,0.5,2,'2022-01-29 03:00:00');REPLACE INTO `dataphin_db`.`oms_order`(`id`, `order_status`, `order_sn`, `sku_id`, `sku_quantity`, `buyer_id`, `buyer_username`, `coupon_id`, `total_amount`, `payment_amount`, `freight_amount`, `promotion_amount`, `coupon_amount`, `discount_amount`, `source_type`, `gmt_create`)VALUES,(1,4,'123456789','1234','1',0001,'张三',0001,12.3,9.9,2,0.4,0.5,0.5,2,'2022-01-29 18:00:00');