从 0 到 1 通过 Flink + Tablestore 进行大数据处理与分析

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 阿里云实时计算Flink版是一套基于 Apache Flink 构建的⼀站式实时大数据分析平台。在大数据场景下,实时计算 Flink 可提供端到端亚秒级实时数据流批处理能力。表格存储 Tablestore (又名 OTS)是阿里云自研的多模型结构化数据存储,可提供海量结构化数据的存储、查询分析服务。表格存储的双引擎架构支持千万TPS和毫秒级延迟的服务能力,可作为大数据计算的极佳上下游存储。

一、前言

实时计算Flink版是一套基于 Apache Flink 构建的⼀站式实时大数据分析平台。在大数据场景下,实时计算 Flink 可提供端到端亚秒级实时数据流批处理能力。

表格存储 Tablestore (又名 OTS)是阿里云自研的多模型结构化数据存储,可提供海量结构化数据的存储、查询分析服务。表格存储的双引擎架构支持千万TPS和毫秒级延迟的服务能力,可作为大数据计算的极佳上下游存储。

本文章将以商品订单场景为基础,介绍如何从 0 到 1 通过 Flink+Tablestore 进行大数据分析。

二、场景介绍

某大型连锁超市会实时产生大量的消费数据,通过分析这些数据可获取到商品售卖热度、门店经营状态极具商业价值的信息,便于辅助经营者的商业决策。现需要设计一套方案,获取每分钟不同商品类别的 GMV (商品交易总额)。

本文采用云数据库 RDS 作为商品消费订单的存储库,通过 mysql-cdc connector 作为源表接入实时计算Flink。以表格存储 Tablestore 作为商品元信息的存储库,通过 OTS connector 作为维表接入实时计算Flink。配置流计算作业任务计算商品 GMV ,并写入表格存储 Tablestore 结果表进行保存。方案架构图如下:

img

三、实现步骤准备工作

  1. 创建 RDS MySQL 实例。创建步骤请参考创建 RDS MySQL 实例
  2. 开通表格存储服务,并创建按量模式实例。详情请参考[开通表格存储服务]和创建实例
  3. 开通实时计算服务,并购买Flink全托管集群。

1. 数据源准备

1. 登录RDS控制台,登陆数据管理DMS,创建consume_record表。

img

CREATE TABLE `consume_record` (
    `consume_id` varchar(20) NOT NULL,
    `product_id` varchar(20) NOT NULL,
    `consume_time` bigint(20) NOT NULL,
    `consume_name` varchar(20) NOT NULL,
    `consume_phone` varchar(20) NOT NULL,
    PRIMARY KEY (`consume_id`)
) ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8
COMMENT='消费记录数据源表';

2. 登录 Tablestore 控制台。创建 gmv_result、product 两张表。

说明:Tablestore数据表是schema free的,只需要定义主键,无需定义属性列。关于创建表步骤请参考创建数据表

product表。作为流计算任务的维表,存储商品元数据信息。

字段名 字段类型 是否主键 描述
product_ID STRING 商品ID
price BIGINT 商品单价
product_type STRING 商品类别

gmv_result表。作为流计算任务的结果表,存储商品交易总额的计算结果。

字段名 字段类型 是否主键 描述
product_type STRING 商品类型
gmv_time STRING 统计时间
total_price BIGINT 商品交易总额

img

img

2. Flink 作业配置

1. 登陆 Flink 全托管控制台,创建项目并创建作业,作业名 gmv_pre_aggregation。

img

作业脚本

-- mysql-cdc 源表
CREATE TEMPORARY TABLE consume_record (
  `consume_id` VARCHAR(20),
  `product_id` VARCHAR(20),
  `consume_time` BIGINT,
  `consume_name` VARCHAR(20),
  `consume_phone` VARCHAR(20),
  PRIMARY KEY(consume_id)  NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '',
  'port' = '3306',
  'username' = '',
  'password' = '',
  'database-name' = '',
  'table-name' = ''
);
-- tablestore 维表
CREATE TEMPORARY TABLE product (
  product_ID STRING,
  price BIGINT,
  product_type STRING,
  PRIMARY KEY (product_ID) NOT ENFORCED
) WITH (
  'connector' = 'ots',
  'endPoint' = '',
  'instanceName' = '',
  'tableName' = '',
  'accessId' = '',
  'accessKey' = ''
);
-- tablestore 结果表
CREATE TEMPORARY TABLE gmv_result (
   product_type STRING,
   gmv_time BIGINT,
   total_price BIGINT,
   PRIMARY KEY (product_type) NOT ENFORCED
) WITH (
  'connector' = 'ots',
  'endPoint' = '',
  'instanceName' = '',
  'tableName' = '',
  'accessId' = '',
  'accessKey' = '',
  'valueColumns' = 'gmv_time,total_price'
);
INSERT INTO `gmv_result`
select 
  d.product_type, 
  UNIX_TIMESTAMP(s.consume_time,'yy-MM-dd') as gmv_time,
  sum (d.price) as total_price
from
  `consume_record` as s
  JOIN `product` for system_time as of proctime() as d
  on s.product_id = d.product_ID
GROUP BY d.product_type,UNIX_TIMESTAMP(s.consume_time,'yy-MM-dd')

2. 上线作业。

3. 结果展示

作业 gmv_pre_aggregation 直接写入 gmv 结果到 Tablestore,登录 Tablestore 控制台查询 gmv_result 表 即可获取商品交易总额结果。

img

四、方案改进

采用 Flink + Tablestore 方案很好地实现了商品交易总额的计算。然而 Flink 作业中固定了源表与维表参与计算的字段,如果业务需求发生变化,不得不重新设计作业脚本,再次进行作业下线上线操作,非常繁琐。所以将对上述方案做一下改进:Flink 流计算作业中仅负责关联消费记录和商品信息数据并写入 Tablestore 中。再通过Tablestore 的功能之一多元索引进行数据分析。多元索引基于倒排索引、列式存储、空间索引等,可解决大数据的复杂查询、分析聚合等需求。通过在数据表上建立多元索引,可实现全文检索、前缀查询、模糊查询、组合查询、统计聚合等功能。方案二架构图如下:

img

五、实现步骤

1. 数据源准备

登录 Tablestore 控制台。创建 consume_product 表。

说明:Tablestore数据表是schema free的,只需要定义主键,无需定义属性列。关于创建表步骤请参考创建数据表

consume_product表。作为流计算任务结果表,保存了商品消费信息与商品元数据信息。

字段名 字段类型 是否主键 描述
consume_id STRING 消费ID(主键)
product_ID STRING 商品ID
product_num BIGINT(10) 商品数量
price DOUBLE 商品单价
consume_time BIGINT(20) 消费时间
product_type STRING 商品类别

2. Flink 作业配置

1. 登陆 Flink 全托管控制台,创建项目并创建作业,作业名 gmv_post_aggregation。

-- mysql-cdc 源表,与之前相同
CREATE TEMPORARY TABLE consume_record (
  `consume_id` VARCHAR(20),
  `product_id` VARCHAR(20),
  `consume_time` BIGINT,
  `consume_name` VARCHAR(20),
  `consume_phone` VARCHAR(20),
  PRIMARY KEY(consume_id)  NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '',
  'port' = '3306',
  'username' = '',
  'password' = '',
  'database-name' = '',
  'table-name' = ''
);
-- tablestore 维表,与之前相同
CREATE TEMPORARY TABLE product (
  product_ID STRING,
  price BIGINT,
  product_type STRING,
  PRIMARY KEY (product_ID) NOT ENFORCED
) WITH (
  'connector' = 'ots',
  'endPoint' = '',
  'instanceName' = '',
  'tableName' = '',
  'accessId' = '',
  'accessKey' = ''
);
-- tablestore 结果表
CREATE TEMPORARY TABLE consume_product (
   consume_id STRING,
   product_id STRING,
   price BIGINT,
   consume_time BIGINT,
   consume_name STRING,
   consume_phone STRING,
   PRIMARY KEY (consume_id,product_id) NOT ENFORCED
) WITH (
  'connector' = 'ots',
  'endPoint' = '',
  'instanceName' = '',
  'tableName' = '',
  'accessId' = '',
  'accessKey' = '',
  'valueColumns' = 'price,consume_time,consume_name,consume_phone'
);
insert into consume_product
select s.consume_id,d.product_ID as product_id,d.price,
        UNIX_TIMESTAMP(s.consume_time,'yy-MM-dd') as consume_time,
        s.consume_name,s.consume_phone
        from `consume_record` as s 
        join `product` for system_time as of proctime() as d
        on s.product_id = d.product_ID

2. 上线作业

3. 创建多元索引

登录 Tablestore 控制台,在 consume_product 表上建立多元索引。可通过控制台 SQL 查询或 SDK 分析获取商品交易总额信息。

说明:多元索引创建步骤请参考创建及使用多元索引

创建索引

img

4. 结果展示

SQL查询

img

SDK查询

 SearchRequest searchRequest = SearchRequest.newBuilder()
                .tableName("consume_product")
                .indexName("consume_product_index")
                .searchQuery(SearchQuery.newBuilder()
                        .query(QueryBuilders.matchAll())
                        .addGroupBy(GroupByBuilders.groupByField("groupByProductID","product_id").addSubAggregation(
                                AggregationBuilders.sum("sumagg","price")
                        ))
                        .build())
                .build();
        SearchResponse searchResponse = syncClient.search(searchRequest);
        for(GroupByFieldResultItem item : searchResponse.getGroupByResults().getAsGroupByFieldResult("groupByProductID").getGroupByFieldResultItems()){
            System.out.println("商品ID:"+item.getKey()+",交易总额:"+item.getSubAggregationResults().getAsSumAggregationResult("sumagg").getValue());
        }

SDK查询结果

商品ID:A001,交易总额:20.0
商品ID:A002,交易总额:40.0
商品ID:A004,交易总额:20.0
商品ID:A003,交易总额:5.0
商品ID:A005,交易总额:15.0
商品ID:A006,交易总额:5.0
商品ID:A008,交易总额:5.0

六、总结

本篇文章演示了基于 Flink + Tablestore 方案在大数据计算场景下的应用。后续,我们会推出 Flink on Tablestore 系列文章,并针对维表和结果表场景推出最佳实践文章。

希望本次文章对你有帮助,如果希望继续交流,可以加入表格存储技术交流群,可搜索群号『11789671』或『23307953』。

本文转载自:https://developer.aliyun.com/article/790005


更多 Flink 相关技术问题,可扫码加入社区钉钉交流群
第一时间获取最新技术文章和社区动态,请关注公众号~

image.png

活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99 元试用 实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制T恤;另包 3 个月及以上还有 85 折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc

image.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
27天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
81 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
16天前
|
存储 机器学习/深度学习 SQL
大数据处理与分析技术
大数据处理与分析技术
57 2
|
5天前
|
机器学习/深度学习 存储 大数据
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系
在大数据时代,高维数据处理成为难题,主成分分析(PCA)作为一种有效的数据降维技术,通过线性变换将数据投影到新的坐标系,保留最大方差信息,实现数据压缩、去噪及可视化。本文详解PCA原理、步骤及其Python实现,探讨其在图像压缩、特征提取等领域的应用,并指出使用时的注意事项,旨在帮助读者掌握这一强大工具。
19 4
|
7天前
|
机器学习/深度学习 分布式计算 算法
【大数据分析&机器学习】分布式机器学习
本文主要介绍分布式机器学习基础知识,并介绍主流的分布式机器学习框架,结合实例介绍一些机器学习算法。
54 5
|
18天前
|
存储 监控 数据挖掘
【Clikhouse 探秘】ClickHouse 物化视图:加速大数据分析的新利器
ClickHouse 的物化视图是一种特殊表,通过预先计算并存储查询结果,显著提高查询性能,减少资源消耗,适用于实时报表、日志分析、用户行为分析、金融数据分析和物联网数据分析等场景。物化视图的创建、数据插入、更新和一致性保证通过事务机制实现。
76 14
|
24天前
|
消息中间件 分布式计算 大数据
数据为王:大数据处理与分析技术在企业决策中的力量
【10月更文挑战第29天】在信息爆炸的时代,大数据处理与分析技术为企业提供了前所未有的洞察力和决策支持。本文探讨了大数据技术在企业决策中的重要性和实际应用,包括数据的力量、实时分析、数据驱动的决策以及数据安全与隐私保护。通过这些技术,企业能够从海量数据中提取有价值的信息,预测市场趋势,优化业务流程,从而在竞争中占据优势。
70 2
|
25天前
|
数据采集 机器学习/深度学习 搜索推荐
大数据与社交媒体:用户行为分析
【10月更文挑战第31天】在数字化时代,社交媒体成为人们生活的重要部分,大数据技术的发展使其用户行为分析成为企业理解用户需求、优化产品设计和提升用户体验的关键手段。本文探讨了大数据在社交媒体用户行为分析中的应用,包括用户画像构建、情感分析、行为路径分析和社交网络分析,以及面临的挑战与机遇。
|
25天前
|
机器学习/深度学习 搜索推荐 大数据
大数据与教育:学生表现分析的工具
【10月更文挑战第31天】在数字化时代,大数据成为改善教育质量的重要工具。本文探讨了大数据在学生表现分析中的应用,介绍学习管理系统、智能评估系统、情感分析技术和学习路径优化等工具,帮助教育者更好地理解学生需求,制定个性化教学策略,提升教学效果。尽管面临数据隐私等挑战,大数据仍为教育创新带来巨大机遇。
|
28天前
|
人工智能 供应链 搜索推荐
大数据分析:解锁商业智能的秘密武器
【10月更文挑战第31天】在信息爆炸时代,大数据分析成为企业解锁商业智能的关键工具。本文探讨了大数据分析在客户洞察、风险管理、供应链优化、产品开发和决策支持等方面的应用,强调了明确分析目标、选择合适工具、培养专业人才和持续优化的重要性,并展望了未来的发展趋势。
|
28天前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
54 1

相关产品

  • 实时计算 Flink版