从 0 到 1 通过 Flink + Tablestore 进行大数据处理与分析

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 阿里云实时计算 Flink 版是一套基于 Apache Flink 构建的⼀站式实时大数据分析平台。在大数据场景下,实时计算 Flink 可提供端到端亚秒级实时数据流批处理能力。表格存储 Tablestore (又名 OTS)是阿里云自研的多模型结构化数据存储,可提供海量结构化数据的存储、查询分析服务。表格存储的双引擎架构支持千万TPS和毫秒级延迟的服务能力,可作为大数据计算的极佳上下游存储。

前言

阿里云实时计算 Flink 版是一套基于 Apache Flink 构建的⼀站式实时大数据分析平台。在大数据场景下,实时计算 Flink 可提供端到端亚秒级实时数据流批处理能力。

表格存储 Tablestore (又名 OTS)是阿里云自研的多模型结构化数据存储,可提供海量结构化数据的存储、查询分析服务。表格存储的双引擎架构支持千万TPS和毫秒级延迟的服务能力,可作为大数据计算的极佳上下游存储。

本文章将以商品订单场景为基础,介绍如何从 0 到 1 通过 Flink+Tablestore 进行大数据分析。

场景介绍

某大型连锁超市会实时产生大量的消费数据,通过分析这些数据可获取到商品售卖热度、门店经营状态极具商业价值的信息,便于辅助经营者的商业决策。现需要设计一套方案,获取每分钟不同商品类别的 GMV (商品交易总额)。

本文采用云数据库 RDS 作为商品消费订单的存储库,通过 mysql-cdc connector 作为源表接入实时计算Flink。以表格存储 Tablestore 作为商品元信息的存储库,通过 OTS connector 作为维表接入实时计算Flink。配置流计算作业任务计算商品 GMV ,并写入表格存储 Tablestore 结果表进行保存。方案架构图如下:


实现步骤准备工作

  1. 创建 RDS MySQL 实例。创建步骤请参考创建 RDS MySQL 实例

  2. 开通表格存储服务,并创建按量模式实例。

  3. 开通实时计算服务,并购买Flink全托管集群。

数据源准备

1. 登录RDS控制台,登陆数据管理DMS,创建consume_record表。

CREATE TABLE `consume_record` (
	`consume_id` varchar(20) NOT NULL,
	`product_id` varchar(20) NOT NULL,
	`consume_time` bigint(20) NOT NULL,
	`consume_name` varchar(20) NOT NULL,
	`consume_phone` varchar(20) NOT NULL,
	PRIMARY KEY (`consume_id`)
) ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8
COMMENT='消费记录数据源表';

2. 登录 Tablestore 控制台。创建 gmv_result、product 两张表。

说明:Tablestore数据表是schema free的,只需要定义主键,无需定义属性列。关于创建表步骤请参考创建数据表

product表。作为流计算任务的维表,存储商品元数据信息。

字段名

字段类型

是否主键

描述

product_ID

STRING

商品ID

price

BIGINT

商品单价

product_type

STRING

商品类别


gmv_result表。作为流计算任务的结果表,存储商品交易总额的计算结果。

字段名

字段类型

是否主键

描述

product_type

STRING

商品类型

gmv_time

STRING

统计时间

total_price

BIGINT

商品交易总额


Flink 作业配置

1. 登陆 Flink 全托管控制台,创建项目并创建作业,作业名 gmv_pre_aggregation。

作业脚本

-- mysql-cdc 源表
CREATE TEMPORARY TABLE consume_record (
  `consume_id` VARCHAR(20),
  `product_id` VARCHAR(20),
  `consume_time` BIGINT,
  `consume_name` VARCHAR(20),
  `consume_phone` VARCHAR(20),
  PRIMARY KEY(consume_id)  NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '',
  'port' = '3306',
  'username' = '',
  'password' = '',
  'database-name' = '',
  'table-name' = ''
);
-- tablestore 维表
CREATE TEMPORARY TABLE product (
  product_ID STRING,
  price BIGINT,
  product_type STRING,
  PRIMARY KEY (product_ID) NOT ENFORCED
) WITH (
  'connector' = 'ots',
  'endPoint' = '',
  'instanceName' = '',
  'tableName' = '',
  'accessId' = '',
  'accessKey' = ''
);
-- tablestore 结果表
CREATE TEMPORARY TABLE gmv_result (
   product_type STRING,
   gmv_time BIGINT,
   total_price BIGINT,
   PRIMARY KEY (product_type) NOT ENFORCED
) WITH (
  'connector' = 'ots',
  'endPoint' = '',
  'instanceName' = '',
  'tableName' = '',
  'accessId' = '',
  'accessKey' = '',
  'valueColumns' = 'gmv_time,total_price'
);
INSERT INTO `gmv_result`
select 
  d.product_type, 
  UNIX_TIMESTAMP(s.consume_time,'yy-MM-dd') as gmv_time,
  sum (d.price) as total_price
from
  `consume_record` as s
  JOIN `product` for system_time as of proctime() as d
  on s.product_id = d.product_ID
GROUP BY d.product_type,UNIX_TIMESTAMP(s.consume_time,'yy-MM-dd')

2. 上线作业。

结果展示

作业 gmv_pre_aggregation 直接写入 gmv 结果到 Tablestore,登录 Tablestore 控制台查询 gmv_result 表 即可获取商品交易总额结果。

方案改进

采用 Flink + Tablestore 方案很好地实现了商品交易总额的计算。然而 Flink 作业中固定了源表与维表参与计算的字段,如果业务需求发生变化,不得不重新设计作业脚本,再次进行作业下线上线操作,非常繁琐。所以将对上述方案做一下改进:Flink 流计算作业中仅负责关联消费记录和商品信息数据并写入 Tablestore 中。再通过Tablestore 的功能之一多元索引进行数据分析。多元索引基于倒排索引、列式存储、空间索引等,可解决大数据的复杂查询、分析聚合等需求。通过在数据表上建立多元索引,可实现全文检索、前缀查询、模糊查询、组合查询、统计聚合等功能。方案二架构图如下:

 

实现步骤

数据源准备

登录 Tablestore 控制台。创建 consume_product 表。

说明:Tablestore数据表是schema free的,只需要定义主键,无需定义属性列。关于创建表步骤请参考创建数据表

consume_product表。作为流计算任务结果表,保存了商品消费信息与商品元数据信息。

字段名

字段类型

是否主键

描述

consume_id

STRING

消费ID(主键)

product_ID

STRING

商品ID

product_num

BIGINT(10)

商品数量

price

DOUBLE

商品单价

consume_time

BIGINT(20)

消费时间

product_type

STRING

商品类别


Flink 作业配置

1. 登陆 Flink 全托管控制台,创建项目并创建作业,作业名 gmv_post_aggregation。

-- mysql-cdc 源表,与之前相同
CREATE TEMPORARY TABLE consume_record (
  `consume_id` VARCHAR(20),
  `product_id` VARCHAR(20),
  `consume_time` BIGINT,
  `consume_name` VARCHAR(20),
  `consume_phone` VARCHAR(20),
  PRIMARY KEY(consume_id)  NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '',
  'port' = '3306',
  'username' = '',
  'password' = '',
  'database-name' = '',
  'table-name' = ''
);
-- tablestore 维表,与之前相同
CREATE TEMPORARY TABLE product (
  product_ID STRING,
  price BIGINT,
  product_type STRING,
  PRIMARY KEY (product_ID) NOT ENFORCED
) WITH (
  'connector' = 'ots',
  'endPoint' = '',
  'instanceName' = '',
  'tableName' = '',
  'accessId' = '',
  'accessKey' = ''
);
-- tablestore 结果表
CREATE TEMPORARY TABLE consume_product (
   consume_id STRING,
   product_id STRING,
   price BIGINT,
   consume_time BIGINT,
   consume_name STRING,
   consume_phone STRING,
   PRIMARY KEY (consume_id,product_id) NOT ENFORCED
) WITH (
  'connector' = 'ots',
  'endPoint' = '',
  'instanceName' = '',
  'tableName' = '',
  'accessId' = '',
  'accessKey' = '',
  'valueColumns' = 'price,consume_time,consume_name,consume_phone'
);
insert into consume_product
select s.consume_id,d.product_ID as product_id,d.price,
        UNIX_TIMESTAMP(s.consume_time,'yy-MM-dd') as consume_time,
        s.consume_name,s.consume_phone
        from `consume_record` as s 
        join `product` for system_time as of proctime() as d
        on s.product_id = d.product_ID

2. 上线作业

创建多元索引

登录 Tablestore 控制台,在 consume_product 表上建立多元索引。可通过控制台 SQL 查询或 SDK 分析获取商品交易总额信息。

说明:多元索引创建步骤请参考创建及使用多元索引

创建索引

结果展示

SQL查询

SDK查询

 SearchRequest searchRequest = SearchRequest.newBuilder()
                .tableName("consume_product")
                .indexName("consume_product_index")
                .searchQuery(SearchQuery.newBuilder()
                        .query(QueryBuilders.matchAll())
                        .addGroupBy(GroupByBuilders.groupByField("groupByProductID","product_id").addSubAggregation(
                                AggregationBuilders.sum("sumagg","price")
                        ))
                        .build())
                .build();
        SearchResponse searchResponse = syncClient.search(searchRequest);
        for(GroupByFieldResultItem item : searchResponse.getGroupByResults().getAsGroupByFieldResult("groupByProductID").getGroupByFieldResultItems()){
            System.out.println("商品ID:"+item.getKey()+",交易总额:"+item.getSubAggregationResults().getAsSumAggregationResult("sumagg").getValue());
        }

SDK查询结果

商品ID:A001,交易总额:20.0
商品ID:A002,交易总额:40.0
商品ID:A004,交易总额:20.0
商品ID:A003,交易总额:5.0
商品ID:A005,交易总额:15.0
商品ID:A006,交易总额:5.0
商品ID:A008,交易总额:5.0

联系我们

本篇文章演示了基于 Flink + Tablestore 方案在大数据计算场景下的应用。后续,我们会推出 Flink on Tablestore 系列文章,并针对维表和结果表场景推出最佳实践文章。

希望本次文章对你有帮助,如果希望继续交流,可以加入我们的开发者技术交流群,可搜索群号『11789671』或『23307953』,亦可直接扫码加入。


相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
10天前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
52 2
|
9天前
|
分布式计算 Hadoop 大数据
Jupyter 在大数据分析中的角色
【8月更文第29天】Jupyter Notebook 提供了一个交互式的开发环境,它不仅适用于 Python 编程语言,还能够支持其他语言,包括 Scala 和 R 等。这种多语言的支持使得 Jupyter 成为大数据分析领域中非常有价值的工具,特别是在与 Apache Spark 和 Hadoop 等大数据框架集成方面。本文将探讨 Jupyter 如何支持这些大数据框架进行高效的数据处理和分析,并提供具体的代码示例。
17 0
|
9天前
|
存储 SQL 大数据
用实时计算释放当下企业大数据潜能
本文整理自阿里云高级产品解决方案架构师王启华(敖北)老师在 Flink Forward Asia 2023 中闭门会的分享。
257 8
用实时计算释放当下企业大数据潜能
|
2天前
|
存储 大数据 数据挖掘
【数据新纪元】Apache Doris:重塑实时分析性能,解锁大数据处理新速度,引爆数据价值潜能!
【9月更文挑战第5天】Apache Doris以其卓越的性能、灵活的架构和高效的数据处理能力,正在重塑实时分析的性能极限,解锁大数据处理的新速度,引爆数据价值的无限潜能。在未来的发展中,我们有理由相信Apache Doris将继续引领数据处理的潮流,为企业提供更快速、更准确、更智能的数据洞察和决策支持。让我们携手并进,共同探索数据新纪元的无限可能!
33 11
|
10天前
|
分布式计算 数据可视化 大数据
Vaex :突破pandas,快速分析100GB大数据集
Vaex :突破pandas,快速分析100GB大数据集
|
9天前
|
大数据 机器人 数据挖掘
这个云ETL工具配合Python轻松实现大数据集分析,附案例
这个云ETL工具配合Python轻松实现大数据集分析,附案例
|
9天前
|
数据采集 人工智能 安全
AI大数据处理与分析实战--体育问卷分析
本文是关于使用AI进行大数据处理与分析的实战案例,详细记录了对深圳市义务教育阶段学校“每天一节体育课”网络问卷的分析过程,包括数据概览、交互Prompt、代码处理、年级和学校维度的深入分析,以及通过AI工具辅助得出的分析结果和结论。
|
11天前
|
消息中间件 前端开发 安全
第三方数据平台技术选型分析
这篇文章分析了第三方数据平台的技术选型,涵盖了移动统计平台、自助分析平台和BI平台的不同代表厂商,讨论了它们的数据源、使用要求和适用场景。
27 2
|
12天前
|
存储 JSON 关系型数据库
MySQL与JSON的邂逅:开启大数据分析新纪元
MySQL与JSON的邂逅:开启大数据分析新纪元
|
13天前
|
存储 SQL 分布式计算
Hadoop生态系统概述:构建大数据处理与分析的基石
【8月更文挑战第25天】Hadoop生态系统为大数据处理和分析提供了强大的基础设施和工具集。通过不断扩展和优化其组件和功能,Hadoop将继续在大数据时代发挥重要作用。

热门文章

最新文章

下一篇
DDNS