【数据计算实践】如何使用Datpahin实现一个流批一体任务

简介: 以计算促销活动期间GMV为例,介绍Dataphin如何基于Flink流批一体的任务开发流程,实现实时数据处理。

Dp黑.png


" 某公司的订单系统使用MySQL存储业务数据和进行简单的查询分析,同时由于MySQL的存储容量和处理速度都存在一定的限制,不适合处理大规模的数据量。为了更好的数据分析和处理,该公司将MySQL的数据同步到Hive中,便于利用Hadoop生态系统中的各种工具来实现离线处理,提高数据分析和处理的效率和速度。其中的order为存储订单的表,products为商品表。


" 该公司需要进行一次促销活动,实时计算和离线计算都在这个过程中发挥重要作用:

实时计算方面,可以通过实时监控用户活动和购买行为,以及实时推荐和搭配商品等方式,提高用户体验和促进消费。例如,当用户浏览某个商品时,系统可以实时分析用户数据,为用户推荐相关的热门商品和促销商品,以提高用户的购买意愿。同时,系统还可以实时计算库存、销售额等数据,以便及时调整库存,保证商品的及时配送。

离线计算方面,可以通过对历史数据的分析和挖掘,进行商品定价、销售策略等方面的决策支持。例如,根据历次促销活动的销售数据、用户行为等信息,系统可以通过离线计算,预测今年的销售量和热门商品,以及优化商品定价和促销策略,以提高销售业绩。


下面以计算促销活动期间GMV为例介绍Dataphin如何基于Flink流批一体的任务开发流程,其中流任务处理的数据是当天的实时数据,批任务处理的数据为前一天的业务数据。运营人员可以通过分析实时GMV进行快速的活动方案的调整,通过分析对比流批任务计算结果的一致性确保历史成交额数据的准确性。


以下为数据流向的示意图:

MySQL订单系统中的业务数据通过Dataphin离线集成到Hive中,同时通过Flink cdc 任务同步到kafka中便于进行实时加工处理,同时将数据处理和数据存储解耦合,dataphin实时研发流批一体支持一套代码两种计算,Flink流任务负责计算活动当天的实时GMV,而Flink批任务则负责计算前一天的活动GMV,最后将GMV写入到一个MySQL数据库中便于后续的查询分析。

如何使用Datpahin实现一个流批一体任务-流程图.jpg

具体的实现思路:

如何使用Datpahin实现一个流批一体任务-流程图 (1).jpg


  • 命令行登录MySQL并建表:
CREATE TABLE products (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
       (default,"car battery","12V car battery"),
       (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
       (default,"hammer","12oz carpenter's hammer"),
       (default,"hammer","14oz carpenter's hammer"),
       (default,"hammer","16oz carpenter's hammer"),
       (default,"rocks","box of assorted rocks"),
       (default,"jacket","water resistent black wind breaker"),
       (default,"spare tire","24 inch spare tire");
CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
       (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
       (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
CREATE TABLE sink (
   order_id INT  PRIMARY KEY,
   order_date TIMESTAMP(0),
   customer_name VARCHAR(2048),
   `price` VARCHAR(2048),
   product_id INT,
   order_status BOOLEAN,
   name VARCHAR(2048),
   description VARCHAR(2048),
 );


dataphin上创建Kafka和MySQL数据源:

Kafka:

image.png

MySQL:

image.png

dataphin上将MySQL数据集成同步到Hive并一键生成目标表


dataphin上创建元表:

创建MySQL元表: orders_mysql 和 products_mysql,connector选择CDC Source表

创建MySQL元表: mysql_sink,connector选择JDBC读写表

创建Kafka元表:orders_kafka 和 products_kakfa,connector选择upsert-kafka,消息格式选择json

这里需要保证orders_mysql,orders_kafka 与hive表中对应字段的字段类型保持一致

提交发布元表


创建Hive元表:orders_hive 和 products_hive

在V3.9版本中,dataphin实时研发已经支持flink sql访问Hive计算引擎中的物理表,但这里为了安全性、可复用性及灵活性,我们仍采用创建元表的方式。


在dataphin上创建实时计算任务Flink SQL:

orders_to_kafka 和 products_to_kafka,将MySQL中的数据实时同步到kafka中

提交发布计算任务


在dataphin上创建镜像表:mirror_orders 和 mirror_products

mirror_orders:实时表选择 orders_kafka ,离线表选择 orders_hive

mirror_products:实时表选择 products_kakfa,离线表选择 products_hive

提交发布镜像表

在dataphin上创建实时计算任务Flink SQL:

创建计算任务sink,预编译通过并提交

SET mirror_orders.`properties.group.id`='12';
SET mirror_products.`properties.group.id`='12';
--  cast(price as STRING) as  
INSERT into mysql_sink_yuan
select cast(order_id as int) as order_id, cast(order_date as TIMESTAMP) as order_date, customer_name, cast(price as STRING) as price,  cast(product_id as int) as product_id, cast(order_status as TINYINT)as order_status, name, description 
from mirror_orders AS  o 
LEFT JOIN mirror_products AS p
ON o.order_id = p.id;

开启离线模式,并配置好上下游依赖:


  • 提交发布计算任务,会自动提交流和批两个任务,可以到运维页面的周期任务和实时任务中查看:

实时任务,点击启动实时实例,任务能够正常运行


周期任务,在生产环境对批任务补数据,补数据任务能成功运行,也可以查看历史周期实例的运行情况





Dp黑.png

Dataphin 智能数据建设与治理:用中台方法论治理企业级好数据

针对各行业大数据建设、治理及应用诉求,结合数据中台方法论,一站式提供全域数据集成、可视建模及规范定义、数据资产治理等能力,助力企业打造标准统一、准确可信、便捷可消费的数据体系。


Dataphin介绍:https://www.lydaas.com/dataphin

Dataphin咨询钉钉群:23381533

相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
相关文章
|
6月前
|
存储 数据采集 搜索推荐
Java 大视界 -- Java 大数据在智慧文旅旅游景区游客情感分析与服务改进中的应用实践(226)
本篇文章探讨了 Java 大数据在智慧文旅景区中的创新应用,重点分析了如何通过数据采集、情感分析与可视化等技术,挖掘游客情感需求,进而优化景区服务。文章结合实际案例,展示了 Java 在数据处理与智能推荐等方面的强大能力,为文旅行业的智慧化升级提供了可行路径。
Java 大视界 -- Java 大数据在智慧文旅旅游景区游客情感分析与服务改进中的应用实践(226)
|
6月前
|
数据采集 SQL 搜索推荐
大数据之路:阿里巴巴大数据实践——OneData数据中台体系
OneData是阿里巴巴内部实现数据整合与管理的方法体系与工具,旨在解决指标混乱、数据孤岛等问题。通过规范定义、模型设计与工具平台三层架构,实现数据标准化与高效开发,提升数据质量与应用效率。
大数据之路:阿里巴巴大数据实践——OneData数据中台体系
|
7月前
|
数据采集 存储 大数据
大数据之路:阿里巴巴大数据实践——日志采集与数据同步
本资料全面介绍大数据处理技术架构,涵盖数据采集、同步、计算与服务全流程。内容包括Web/App端日志采集方案、数据同步工具DataX与TimeTunnel、离线与实时数仓架构、OneData方法论及元数据管理等核心内容,适用于构建企业级数据平台体系。
|
7月前
|
分布式计算 监控 大数据
大数据之路:阿里巴巴大数据实践——离线数据开发
该平台提供一站式大数据开发与治理服务,涵盖数据存储计算、任务调度、质量监控及安全管控。基于MaxCompute实现海量数据处理,结合D2与DataWorks进行任务开发与运维,通过SQLSCAN与DQC保障代码质量与数据准确性。任务调度系统支持定时、周期、手动运行等多种模式,确保高效稳定的数据生产流程。
大数据之路:阿里巴巴大数据实践——离线数据开发
|
7月前
|
数据采集 分布式计算 DataWorks
ODPS在某公共数据项目上的实践
本项目基于公共数据定义及ODPS与DataWorks技术,构建一体化智能化数据平台,涵盖数据目录、归集、治理、共享与开放六大目标。通过十大子系统实现全流程管理,强化数据安全与流通,提升业务效率与决策能力,助力数字化改革。
239 4
|
7月前
|
分布式计算 DataWorks 数据处理
在数据浪潮中前行:记录一次我与ODPS的实践、思考与展望
本文详细介绍了在 AI 时代背景下,如何利用阿里云 ODPS 平台(尤其是 MaxCompute)进行分布式多模态数据处理的实践过程。内容涵盖技术架构解析、完整操作流程、实际部署步骤以及未来发展方向,同时结合 CSDN 博文深入探讨了多模态数据处理的技术挑战与创新路径,为企业提供高效、低成本的大规模数据处理方案。
380 3
|
7月前
|
机器学习/深度学习 存储 分布式计算
ODPS驱动电商仓储革命:动态需求预测系统的落地实践
本方案基于ODPS构建“预测-仿真-决策”闭环系统,解决传统仓储中滞销积压与爆款缺货问题。通过动态特征工程、时空融合模型与库存仿真引擎,实现库存周转天数下降42%,缺货率下降65%,年损减少5000万以上,显著提升运营效率与GMV。
717 1
|
6月前
|
存储 SQL 分布式计算
大数据之路:阿里巴巴大数据实践——元数据与计算管理
本内容系统讲解了大数据体系中的元数据管理与计算优化。元数据部分涵盖技术、业务与管理元数据的分类及平台工具,并介绍血缘捕获、智能推荐与冷热分级等技术创新。元数据应用于数据标签、门户管理与建模分析。计算管理方面,深入探讨资源调度失衡、数据倾斜、小文件及长尾任务等问题,提出HBO与CBO优化策略及任务治理方案,全面提升资源利用率与任务执行效率。
|
4月前
|
人工智能 Cloud Native 算法
拔俗云原生 AI 临床大数据平台:赋能医学科研的开发者实践
AI临床大数据科研平台依托阿里云、腾讯云,打通医疗数据孤岛,提供从数据治理到模型落地的全链路支持。通过联邦学习、弹性算力与安全合规技术,实现跨机构协作与高效训练,助力开发者提升科研效率,推动医学AI创新落地。(238字)
290 7
|
7月前
|
SQL 人工智能 分布式计算
在数据浪潮中前行:我与ODPS的实践、思考与展望
在数据驱动决策的时代,企业如何高效处理海量数据成为数字化转型关键。本文结合作者实践,深入解析阿里云自研大数据平台 ODPS 的技术优势与应用场景,涵盖 MaxCompute、DataWorks、Hologres 等核心产品,分享从数据治理到实时分析的落地经验,并展望其在 AI 与向量数据时代的发展前景。
277 70