【数据计算实践】如何使用Datpahin实现一个流批一体任务

本文涉及的产品
智能数据建设与治理Dataphin,200数据处理单元
简介: 以计算促销活动期间GMV为例,介绍Dataphin如何基于Flink流批一体的任务开发流程,实现实时数据处理。

Dp黑.png


" 某公司的订单系统使用MySQL存储业务数据和进行简单的查询分析,同时由于MySQL的存储容量和处理速度都存在一定的限制,不适合处理大规模的数据量。为了更好的数据分析和处理,该公司将MySQL的数据同步到Hive中,便于利用Hadoop生态系统中的各种工具来实现离线处理,提高数据分析和处理的效率和速度。其中的order为存储订单的表,products为商品表。


" 该公司需要进行一次促销活动,实时计算和离线计算都在这个过程中发挥重要作用:

实时计算方面,可以通过实时监控用户活动和购买行为,以及实时推荐和搭配商品等方式,提高用户体验和促进消费。例如,当用户浏览某个商品时,系统可以实时分析用户数据,为用户推荐相关的热门商品和促销商品,以提高用户的购买意愿。同时,系统还可以实时计算库存、销售额等数据,以便及时调整库存,保证商品的及时配送。

离线计算方面,可以通过对历史数据的分析和挖掘,进行商品定价、销售策略等方面的决策支持。例如,根据历次促销活动的销售数据、用户行为等信息,系统可以通过离线计算,预测今年的销售量和热门商品,以及优化商品定价和促销策略,以提高销售业绩。


下面以计算促销活动期间GMV为例介绍Dataphin如何基于Flink流批一体的任务开发流程,其中流任务处理的数据是当天的实时数据,批任务处理的数据为前一天的业务数据。运营人员可以通过分析实时GMV进行快速的活动方案的调整,通过分析对比流批任务计算结果的一致性确保历史成交额数据的准确性。


以下为数据流向的示意图:

MySQL订单系统中的业务数据通过Dataphin离线集成到Hive中,同时通过Flink cdc 任务同步到kafka中便于进行实时加工处理,同时将数据处理和数据存储解耦合,dataphin实时研发流批一体支持一套代码两种计算,Flink流任务负责计算活动当天的实时GMV,而Flink批任务则负责计算前一天的活动GMV,最后将GMV写入到一个MySQL数据库中便于后续的查询分析。

如何使用Datpahin实现一个流批一体任务-流程图.jpg

具体的实现思路:

如何使用Datpahin实现一个流批一体任务-流程图 (1).jpg


  • 命令行登录MySQL并建表:
CREATE TABLE products (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
       (default,"car battery","12V car battery"),
       (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
       (default,"hammer","12oz carpenter's hammer"),
       (default,"hammer","14oz carpenter's hammer"),
       (default,"hammer","16oz carpenter's hammer"),
       (default,"rocks","box of assorted rocks"),
       (default,"jacket","water resistent black wind breaker"),
       (default,"spare tire","24 inch spare tire");
CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
       (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
       (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
CREATE TABLE sink (
   order_id INT  PRIMARY KEY,
   order_date TIMESTAMP(0),
   customer_name VARCHAR(2048),
   `price` VARCHAR(2048),
   product_id INT,
   order_status BOOLEAN,
   name VARCHAR(2048),
   description VARCHAR(2048),
 );


dataphin上创建Kafka和MySQL数据源:

Kafka:

image.png

MySQL:

image.png

dataphin上将MySQL数据集成同步到Hive并一键生成目标表


dataphin上创建元表:

创建MySQL元表: orders_mysql 和 products_mysql,connector选择CDC Source表

创建MySQL元表: mysql_sink,connector选择JDBC读写表

创建Kafka元表:orders_kafka 和 products_kakfa,connector选择upsert-kafka,消息格式选择json

这里需要保证orders_mysql,orders_kafka 与hive表中对应字段的字段类型保持一致

提交发布元表


创建Hive元表:orders_hive 和 products_hive

在V3.9版本中,dataphin实时研发已经支持flink sql访问Hive计算引擎中的物理表,但这里为了安全性、可复用性及灵活性,我们仍采用创建元表的方式。


在dataphin上创建实时计算任务Flink SQL:

orders_to_kafka 和 products_to_kafka,将MySQL中的数据实时同步到kafka中

提交发布计算任务


在dataphin上创建镜像表:mirror_orders 和 mirror_products

mirror_orders:实时表选择 orders_kafka ,离线表选择 orders_hive

mirror_products:实时表选择 products_kakfa,离线表选择 products_hive

提交发布镜像表

在dataphin上创建实时计算任务Flink SQL:

创建计算任务sink,预编译通过并提交

SET mirror_orders.`properties.group.id`='12';
SET mirror_products.`properties.group.id`='12';
--  cast(price as STRING) as  
INSERT into mysql_sink_yuan
select cast(order_id as int) as order_id, cast(order_date as TIMESTAMP) as order_date, customer_name, cast(price as STRING) as price,  cast(product_id as int) as product_id, cast(order_status as TINYINT)as order_status, name, description 
from mirror_orders AS  o 
LEFT JOIN mirror_products AS p
ON o.order_id = p.id;

开启离线模式,并配置好上下游依赖:


  • 提交发布计算任务,会自动提交流和批两个任务,可以到运维页面的周期任务和实时任务中查看:

实时任务,点击启动实时实例,任务能够正常运行


周期任务,在生产环境对批任务补数据,补数据任务能成功运行,也可以查看历史周期实例的运行情况





Dp黑.png

Dataphin 智能数据建设与治理:用中台方法论治理企业级好数据

针对各行业大数据建设、治理及应用诉求,结合数据中台方法论,一站式提供全域数据集成、可视建模及规范定义、数据资产治理等能力,助力企业打造标准统一、准确可信、便捷可消费的数据体系。


Dataphin介绍:https://www.lydaas.com/dataphin

Dataphin咨询钉钉群:23381533

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
相关文章
|
3月前
|
SQL 分布式计算 运维
如何对付一个耗时6h+的ODPS任务:慢节点优化实践
本文描述了大数据处理任务(特别是涉及大量JOIN操作的任务)中遇到的性能瓶颈问题及其优化过程。
|
2月前
|
机器学习/深度学习 算法 搜索推荐
从理论到实践,Python算法复杂度分析一站式教程,助你轻松驾驭大数据挑战!
【10月更文挑战第4天】在大数据时代,算法效率至关重要。本文从理论入手,介绍时间复杂度和空间复杂度两个核心概念,并通过冒泡排序和快速排序的Python实现详细分析其复杂度。冒泡排序的时间复杂度为O(n^2),空间复杂度为O(1);快速排序平均时间复杂度为O(n log n),空间复杂度为O(log n)。文章还介绍了算法选择、分而治之及空间换时间等优化策略,帮助你在大数据挑战中游刃有余。
87 4
|
21天前
|
存储 消息中间件 分布式计算
Cisco WebEx 数据平台:统一 Trino、Pinot、Iceberg 及 Kyuubi,探索 Apache Doris 在 Cisco 的改造实践
Cisco WebEx 早期数据平台采用了多系统架构(包括 Trino、Pinot、Iceberg 、 Kyuubi 等),面临架构复杂、数据冗余存储、运维困难、资源利用率低、数据时效性差等问题。因此,引入 Apache Doris 替换了 Trino、Pinot 、 Iceberg 及 Kyuubi 技术栈,依赖于 Doris 的实时数据湖能力及高性能 OLAP 分析能力,统一数据湖仓及查询分析引擎,显著提升了查询性能及系统稳定性,同时实现资源成本降低 30%。
Cisco WebEx 数据平台:统一 Trino、Pinot、Iceberg 及 Kyuubi,探索 Apache Doris 在 Cisco 的改造实践
|
4月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之如何开发ODPS Spark任务
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
1月前
|
存储 分布式计算 监控
大数据增加分区减少单个任务的负担
大数据增加分区减少单个任务的负担
31 1
|
1月前
|
边缘计算 人工智能 搜索推荐
大数据与零售业:精准营销的实践
【10月更文挑战第31天】在信息化社会,大数据技术正成为推动零售业革新的重要驱动力。本文探讨了大数据在零售业中的应用,包括客户细分、个性化推荐、动态定价、营销自动化、预测性分析、忠诚度管理和社交网络洞察等方面,通过实际案例展示了大数据如何帮助商家洞悉消费者行为,优化决策,实现精准营销。同时,文章也讨论了大数据面临的挑战和未来展望。
|
2月前
|
SQL 消息中间件 分布式计算
大数据-143 - ClickHouse 集群 SQL 超详细实践记录!(一)
大数据-143 - ClickHouse 集群 SQL 超详细实践记录!(一)
93 0
|
2月前
|
SQL 大数据
大数据-143 - ClickHouse 集群 SQL 超详细实践记录!(二)
大数据-143 - ClickHouse 集群 SQL 超详细实践记录!(二)
66 0
|
2月前
|
SQL 消息中间件 分布式计算
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
56 0
|
2月前
|
资源调度 分布式计算 大数据
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
117 0