基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-数据流计算篇

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介: 背景在订单系统中,利用 Spark streamimg 或者 Flink 对接数据流并进行数据分析是一种常见请求。常见的场景如下:大促开始后,实时显示当前总成交金额。大促开始后,实时画出成交量走势图。……MySQL 若需要支持此类场景,需要开发应用,解析 binlog 中的数据,对接中间件,开发成本运维成本都会更高,且系统复杂度也会提升。而表格存储提供了通道服务,可以直接对接 Spark stre

背景

在订单系统中,利用 Spark streamimg 或者 Flink 对接数据流并进行数据分析是一种常见请求。常见的场景如下:

  • 大促开始后,实时显示当前总成交金额。
  • 大促开始后,实时画出成交量走势图。
  • ……

MySQL 若需要支持此类场景,需要开发应用,解析 binlog 中的数据,对接中间件,开发成本运维成本都会更高,且系统复杂度也会提升。而表格存储提供了通道服务,可以直接对接 Spark streaming 或 Flink,不仅省去了开发者解析数据、对接中间件的工作,且可以将开发者从复杂的系统架构中解放出来,更加专注于业务逻辑的处理。

本文将一步一步展示,如何利用表格存储通道服务,实现对订单系统实时成交额和订单数的统计工作。

基于通道服务的 Tablestore 解决方案

通道服务说明

通道服务(Tunnel Service)是基于表格存储数据接口上的全增量一体化服务。通道服务提供了增量、全量、增量加全量三种类型的分布式数据实时消费通道。通过为数据表建立数据通道,可以简单地实现对表中历史存量和新增数据的消费处理。具体可参考通道服务概述

我们可以将 Spark streamimg 或者 Flink 和 Tablestore 的通道服务进行对接,对于表格存储中的数据变动,进行实时计算,完成上述场景中的需求。

本文架构 

本文分别使用 Spark streaming 和 Flink 对接 Tablestore 的通道服务,完成对订单数量和成交额的实时统计。原始订单数据由 Java 服务写入到 MySQL,再由 DTS 服务将数据同步到 Tablestore,这一部分内容已经在前面的文章中详细说明。Spark streaming / Flink 通过通道服务拿到实时数据变化,聚合,将统计结果写回到 Tablestore 中的 sink 表中。完整架构如下:

 

准备工作

Tablestore 申请

申请 Tablestore 的过程已经在前面的文章中进行详细描述,这里不再重复。可以参考开通表格存储服务

创建源表并开通 Tunnel 服务

在 Tablestore 中建表的过程不再进行描述。本文,使用 DTS 同步工具将 MySQL 中的数据同步到 Tablestore 中,Tablestore 中的订单表 order_contract 为 DTS 服务自动创建,其表结构如下:

列名称

类型

列说明

字段内容说明

oId

STRING

主键

订单id

c_id

STRING

客户id

c_name

STRING

客户名称

create_time

STRING

订单创建时间

has_paid

INTEGER

是否已经支付

p_brand

STRING

商品品牌

p_count

INTEGER

商品数量

p_id

STRING

商品id

p_name

STRING

商品名称

p_price

DOUBLE

商品单价

pay_time

STRING

支付时间

s_id

STRING

商家id

s_name

STRING

商家名称

total_price

DOUBLE

订单金额

建表后在实例管理页,找到“数据表列表”,点击刚刚创建的表order_contract进入表管理页面。

在表管理页面,选择实时消费通道创建通道

通道名称自定,通道类型选择增量。点击确定完成通道创建。

 

此时可以看到创建的通道的通道 id。

创建 sink 表

sink 表用于存储 Spark streaming 计算后的结果数据。在本文中 sink 表命名为 order_sink,用于存储单位时间段内订单成交数量和成交额。其表结构如下:

列名称

类型

列说明

字段内容说明

order_start

STRING

主键

记录时间段的开始时间

order_end

STRING

记录时间段的结束时间

order_count

INTEGER

记录时间范围内的总订单数

total_price

DOUBLE

记录时间范围内的总成交额

Spark streaming 对接 Tunnel

创建集群

创建阿里云E-MapReduce的Hadoop集群,文档参见创建集群

登录 Spark-sql 客户端

集群管理页面,点击创建的集群。

点击主机列表,点击emr-header-1机器。

点击远程连接

选择立即登录

输入创建集群时设定的密码。登录机器。

在指令行输入以下指令,登录 Spark-sql 客户端,

streaming-sql --driver-class-path emr-datasources_shaded_*.jar --jars emr-datasources_shaded_*.jar --master yarn-client --num-executors 8 --executor-memory 2g --executor-cores 2

进入如下界面。

流计算

在 Spark-sql 中执行 SQL,创建源表,

DROP TABLE IF EXISTS order_contract;
CREATE TABLE order_contract
USING tablestore
OPTIONS(
endpoint="https://test-20210609.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="test-20210609",
table.name="order_contract",
tunnel.id="3d71bb67-58da-4c72-b36f-08b79df7c85d",
catalog='{"columns": {"oId": {"col": "oId", "type": "string"}, "total_price": {"col": "total_price", "type": "double"}, "pay_time": {"cols": "pay_time", "type": "long"}}}'
);

执行以下 SQL 创建目标表,

DROP TABLE IF EXISTS order_sink;
CREATE TABLE order_sink
USING tablestore
OPTIONS(
endpoint="https://test-20210609.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="",
access.key.secret="",
instance.name="test-20210609",
table.name="order_sink",
catalog='{"columns":{"order_start":{"col":"order_start","type":"string"},"order_end":{"col":"order_end","type":"string"},"order_count":{"col":"order_count","type":"long"},"total_price":{"total_price":"end","type":"double"}}}'
);

在目标表建立视图,

CREATE SCAN order_contract_view ON order_contract USING STREAM OPTIONS ("maxoffsetsperchannel"="10000");

创建 Stream 任务,

CREATE STREAM job1
options(
checkpointLocation='/tmp/spark/cp/job1',
outputMode='update'
)
INSERT INTO order_sink
SELECT CAST(window.start AS String) AS order_start, CAST(window.end AS String) AS order_end, 
count(*) AS order_count, sum(total_price) AS total_price 
FROM order_contract_view 
GROUP BY window(to_timestamp(pay_time / 1000000), "30 seconds");

提交任务后,可以看到机器执行流式任务输出的日志。

计算结果

可以在 order_sink 表中看到实时统计结果如图。

Flink 对接 Tunnel

创建集群

下单后,在控制台,点击创建集群

选择订单号,点击下一步。

填入集群信息。

选择 OSS Bucket、VPC、Zone。若没有可以点击右侧问号新建。点击下一步,点击创建。

等待集群启动,启动成功后,可以点击创建项目

创建项目

点击创建项目,填入参数。点击确定。

可以在项目列表看到新建的项目。

流计算

在项目列表点击新建的项目,点击开发。

新建作业。输入作业名。

创建作业成功后,在页面中填入如下 SQL。

CREATE TABLE order_contract (
    oId VARCHAR,
    c_id VARCHAR,
    c_name VARCHAR,
    create_time VARCHAR,
    has_paid BIGINT,
    p_brand VARCHAR,
    p_count BIGINT,
    p_id VARCHAR,
    p_name VARCHAR,
    p_price DOUBLE,
    pay_time BIGINT,
    s_id VARCHAR,
    s_name VARCHAR,
    total_price DOUBLE,
    ts AS PROCTIME(),
    primary key(oId)
) WITH (
    type = 'ots',
    instanceName = 'test-20210609',
    tableName = 'order_contract',
    accessId = '',
    accessKey = '',
    endPoint = 'https://test-20210609.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    tunnelName = 'test20210610'
);


CREATE TABLE order_sink (
    order_start VARCHAR,
    order_end VARCHAR,
    order_count BIGINT,
    total_price DOUBLE,
    primary key(order_start)
) WITH (
    type = 'ots',
    instanceName = 'test-20210609',
    tableName = 'order_sink',
    accessId = '',
    accessKey = '',
    endPoint = 'https://test-20210609.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    valueColumns = 'order_end,order_count,total_price'
);

INSERT INTO order_sink
SELECT 
    DATE_FORMAT(TUMBLE_START(order_contract.ts, INTERVAL '30' SECOND), 'yyyy-MM-dd hh:mm:ss') AS order_start,
    DATE_FORMAT(TUMBLE_END(order_contract.ts, INTERVAL '30' SECOND), 'yyyy-MM-dd hh:mm:ss') AS order_end,
    COUNT(oId) as order_count,
    SUM(total_price) as total_price
FROM order_contract
GROUP BY TUMBLE(ts, INTERVAL '30' SECOND);

点击语法检查,通过后。点击上线

选择系统分配,点击下一步继续。

SQL 检查通过后,继续点击下一步。填入期望事件后,点击下一步,点击上线。

上线成功后,可以在运维页看到作业如图。

计算结果

可以在 order_sink 表中看到实时统计结果如图。

总结

表格存储通过通道服务,对流式数据计算进行了支持。相比于 MySQL 中解析 binlog 方案,使用 Tablestore 通道服务直接对接流式处理工具,运维开发成本更低,系统架构更加简单。本文分别使用了 Spark streaming 和 Flink 对订单系统中典型场景进行了模拟操作,完成了实时统计交易额、订单数的流式作业,整个过程简单易懂。

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
3天前
|
存储 监控 API
构建高效微服务架构:后端开发的现代实践
【5月更文挑战第9天】 在本文中,我们将深入探讨如何在后端开发中构建一个高效的微服务架构。通过分析不同的设计模式和最佳实践,我们将展示如何提升系统的可扩展性、弹性和维护性。我们还将讨论微服务架构在处理复杂业务逻辑和高并发场景下的优势。最后,我们将分享一些实用的工具和技术,以帮助开发者实现这一目标。
|
12天前
|
监控 Java 测试技术
现代化软件开发中的微服务架构设计与实践
随着软件开发的发展,传统的单体应用架构已经无法满足现代化应用的需求。微服务架构作为一种新的设计理念,为软件开发提供了更灵活、可扩展的解决方案。本文将介绍微服务架构的设计原则、实践方法以及相关技术工具,并结合实例展示其在现代化软件开发中的应用。
|
1天前
|
监控 数据库 开发者
构建高效可靠的微服务架构:策略与实践
【5月更文挑战第11天】在当今软件开发的世界中,微服务架构已经成为构建可扩展、灵活且容错的系统的首选方法。本文深入探讨了设计、部署和维护微服务系统时面临的挑战,并提出了一系列实用的策略和最佳实践。我们将从服务的划分原则出发,讨论如何确保每个微服务的自治性,以及如何通过容器化和编排技术实现服务的高效运行。文章还将涉及监控、日志记录和故障恢复的策略,旨在帮助开发人员构建一个既高效又可靠的微服务环境。
|
2天前
|
缓存 负载均衡 API
微服务架构下的API网关性能优化实践
【5月更文挑战第10天】在微服务架构中,API网关作为前端和后端服务之间的关键枢纽,其性能直接影响到整个系统的响应速度和稳定性。本文将探讨在高并发场景下,如何通过缓存策略、负载均衡、异步处理等技术手段对API网关进行性能优化,以确保用户体验和服务的可靠性。
|
3天前
|
监控 API 持续交付
构建高效可靠的微服务架构:策略与实践
【5月更文挑战第8天】在当今快速演进的软件开发领域,微服务架构已经成为实现敏捷开发、持续交付和系统弹性的关键模式。本文将探讨构建一个高效且可靠的微服务系统所必须的策略和最佳实践。我们将从服务的划分与设计原则出发,讨论如何通过容器化、服务发现、API网关以及断路器模式来优化系统的可伸缩性和鲁棒性。此外,我们还将涉及监控、日志管理以及CI/CD流程在确保微服务架构稳定运行中的作用。
|
4天前
|
敏捷开发 持续交付 API
构建高效微服务架构:后端开发的现代实践
【5月更文挑战第8天】 在数字化转型的浪潮中,微服务架构已成为企业追求敏捷开发、持续交付和系统弹性的关键解决方案。本文将深入探讨微服务的核心概念,包括其设计原则、优缺点以及如何在后端开发中实现高效的微服务架构。我们将通过实际案例分析,展示微服务如何帮助企业快速适应市场变化,同时保持系统的可维护性和扩展性。
|
4天前
|
监控 云计算 开发者
探索云计算中的无服务器架构:从概念到实践
无服务器架构作为云计算领域的新兴技术,正在以其高效、灵活的特性吸引着越来越多的开发者和企业。本文将深入探讨无服务器架构的概念及其在云计算中的应用,通过实际案例展示如何利用无服务器架构构建可靠、可扩展的应用系统。
|
6天前
|
监控 负载均衡 数据安全/隐私保护
探索微服务架构下的服务网格(Service Mesh)实践
【5月更文挑战第6天】 在现代软件工程的复杂多变的开发环境中,微服务架构已成为构建、部署和扩展应用的一种流行方式。随着微服务架构的普及,服务网格(Service Mesh)作为一种新兴技术范式,旨在提供一种透明且高效的方式来管理微服务间的通讯。本文将深入探讨服务网格的核心概念、它在微服务架构中的作用以及如何在实际项目中落地实施服务网格。通过剖析服务网格的关键组件及其与现有系统的协同工作方式,我们揭示了服务网格提高系统可观察性、安全性和可操作性的内在机制。此外,文章还将分享一些实践中的挑战和应对策略,为开发者和企业决策者提供实用的参考。
|
6天前
|
API 持续交付 开发者
构建高效微服务架构:策略与实践
【5月更文挑战第6天】随着现代软件系统的复杂性增加,微服务架构逐渐成为企业开发的首选模式。本文深入分析了构建高效微服务架构的关键策略,并提供了一套实践指南,帮助开发者在保证系统可伸缩性、灵活性和稳定性的前提下,优化后端服务的性能和可维护性。通过具体案例分析,本文将展示如何利用容器化、服务网格、API网关等技术手段,实现微服务的高可用和敏捷部署。
|
7天前
|
存储 前端开发 Java
Android应用开发中的MVP架构模式实践
【5月更文挑战第5天】随着移动应用开发的复杂性增加,传统的MVC(Model-View-Controller)架构在应对大型项目时显得笨重且不灵活。本文将探讨一种更适应现代Android应用开发的架构模式——MVP(Model-View-Presenter),并展示如何在Android项目中实现该模式以提升代码的可维护性和可测试性。通过对比分析MVP与传统MVC的差异,以及提供一个实际案例,读者将能深入了解MVP的优势和实施步骤。