每天万亿+级 实时分析、数据规整 - 阿里云HybridDB for PostgreSQL最佳实践

本文涉及的产品
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
简介:

背景

横看成岭侧成峰,

远近高低各不同。

不识庐山真面目,

只缘身在此山中。

不同的视角我们所看到的物体是不一样的,

http://t.m.china.com.cn/convert/c_ovWL9w.html

pic

图为墨西哥城放射状的街区广场。

pic

图为西班牙迷宫般的果树漩涡。

地心说和日心说也是视角不同所呈现的。

pic

pic

实际上数据也有这样,我们每天产生海量的数据,有各种属性,以每个属性为视角(分组、归类、聚合),看到的是对应属性为中心的数据。

对应的业务场景也非常多,例如:

1、物联网,

每个传感器有很多属性:ID,地理位置,传感器归属,各类指标等。

以传感器的角度进行观察,观察某个传感器ID在流金岁月中的值。

以归属角度(例如归属于公安、城管、某家公司、。。。)进行观察,

以地理位置为视角进行观察,。。。。

2、车联网、站长网。。。。

按车辆、按客户、按访问者。。。多重视角进行观察

观察在数据库中可以触发两种行为,一种是实时计算的行为,另一种是数据规整的行为。

数据规整指将数据按视角重新归类存放。(例如在云端汇聚了各个网站的被访问记录,站长只关注他自己的网站的被访问记录,当需要向站长提供数据时,可以按网站进行数据规整)。

那么如何在云端实现实时分析、准实时数据归类的需求呢?

1 架构

HybridDB for PostgreSQL是阿里云的一款分析型MPP数据库产品(基于Greenplum开源版本而来,新增了插件功能、云端特性以及内核代码优化),提供了水平扩展的能力以及强大的分析SQL兼容性,同时与云端海量存储OSS进行了深度整合,可以并行读写OSS,将OSS作为数据存储来使用。

实时计算架构

pic

海量数据源,写入OSS,通过HybridDB for PostgreSQL的oss_ext插件,实时分析写入的数据。

OSS带宽指标:目前每个计算节点每个会话约30MB/s的读写速率。

对于列式存储格式,数值类型。1亿记录约381MB,压缩比5:1的话,约76.3MB。

按行换算的性能指标:2048个计算节点,读写吞吐约 805亿行/s。每天处理6900万亿行(当然,按多列进出打个折扣,万亿级别也是没有问题的)。

准实时数据规整架构

pic

实时数据规整的目的是按视角将数据规整,数据进入OSS时,是打乱的。由HybridDB for PostgreSQL对接重分布分组规整后,再写出到OSS,形成规整的数据。

为什么需要重分布?

前面谈到了视角问题,我们可能有多重视角来观察数据,而在数据库中只能选择一种固定的分布键,当视角与之不同时,就需要重分布。

准实时导出的优化:

对于一个视角,可能有少量或多种属性,例如用户实际,假设有100万个用户,如果每个计算节点分别导出100万用户,每个用户对应到OSS的一个规整文件,那么由于文件数过多,导出会较慢。

那么可以对用户重分布,例如1000个节点,每个节点分配到1000个用户的数据,这样的话,并行写出到OSS时,一下子就降低到了每个节点写1000个文件的规模。

如何强制重分布呢?后面讲到。

2 实时计算

HybridDB for PostgreSQL与OSS对接的详细文档请参考:

https://help.aliyun.com/document_detail/35457.html

简略步骤如下:

1、创建OSS用户

2、创建OSS BUCKET,例如每个小时一个BUCKET

3、写入数据(最好写入小文件,数量为HybridDB for PostgreSQL的倍数)

4、在HybridDB for PostgreSQL中创建OSS外部表,例如每个小时一个

5、直接读取OSS外表进行分析

3 准实时数据规整

1、需求

按不同的视觉维度,进行分组,每个视觉属性规整到一个OSS文件。也就是说一个OSS文件不能存在多个对象。

2、查询

比如需要按视角分组,按时间排序输出。

select 聚合函数(t order by 时间) from tbl t group by 视角字段;  

以上SQL,数据库可能不会按视角字段重分布,而是使用两阶段提交的方式。例如

postgres=# create table tbl(uid int, info text, c1 int);  
NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'uid' as the Greenplum Database data distribution key for this table.  
HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.  
CREATE TABLE  
表按UID分布  
  
但是查询不按它,看看会不会重分布  
  
postgres=# explain select string_agg(textin(record_out(t)), chr(10)) from tbl t group by c1;  
                                               QUERY PLAN                                                 
--------------------------------------------------------------------------------------------------------  
 Gather Motion 48:1  (slice2; segments: 48)  (cost=0.03..0.06 rows=1 width=36)  
   ->  GroupAggregate  (cost=0.03..0.06 rows=1 width=36)  
         Group By: c1  
         ->  Sort  (cost=0.03..0.04 rows=1 width=36)  
               Sort Key: c1  
	       -- 按c1重分布  
               ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=0.00..0.02 rows=1 width=36)  
                     Hash Key: c1  
                     ->  Seq Scan on tbl t  (cost=0.00..0.00 rows=1 width=36)  
 Settings:  optimizer=off  
 Optimizer status: legacy query optimizer  
(10 rows)  
  
这条是不被期望的,因为发生了两次聚合。  
我们在将数据写入OSS时,不希望两次聚合。  
  
postgres=# explain select max(c1) from tbl group by info;  
                                            QUERY PLAN                                              
--------------------------------------------------------------------------------------------------  
 Gather Motion 48:1  (slice2; segments: 48)  (cost=0.04..0.05 rows=1 width=36)  
   -- 第二次聚合  
   ->  HashAggregate  (cost=0.04..0.05 rows=1 width=36)  
         Group By: tbl.info  
         -- 重分布  
	 ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=0.01..0.03 rows=1 width=36)  
               Hash Key: tbl.info  
               -- 第一次聚合  
	       ->  HashAggregate  (cost=0.01..0.01 rows=1 width=36)  
                     Group By: tbl.info  
                     ->  Seq Scan on tbl  (cost=0.00..0.00 rows=1 width=36)  
 Settings:  optimizer=off  
 Optimizer status: legacy query optimizer  
(10 rows)  

强制按指定键重分布

postgres=# explain select row_number() over (partition by c1 order by info), * from tbl;  
                                               QUERY PLAN                                                 
--------------------------------------------------------------------------------------------------------  
 Gather Motion 48:1  (slice2; segments: 48)  (cost=0.03..0.04 rows=1 width=40)  
   ->  Window  (cost=0.03..0.04 rows=1 width=40)  
         Partition By: c1  
         Order By: info  
         ->  Sort  (cost=0.03..0.04 rows=1 width=40)  
               Sort Key: c1, info  
               ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=0.00..0.02 rows=1 width=40)  
                     Hash Key: c1  
                     ->  Seq Scan on tbl  (cost=0.00..0.00 rows=1 width=40)  
 Settings:  optimizer=off  
 Optimizer status: legacy query optimizer  
(11 rows)  

按强制重分布, 改写SQL

使用窗口查询,将数据强制重分布,然后再进行计算节点的原地聚合。

postgres=# explain select string_agg(textin(record_out(t)), chr(10)) from (select row_number() over (partition by c1 order by info), * from tbl) t group by c1;  
                                                     QUERY PLAN                                                       
--------------------------------------------------------------------------------------------------------------------  
 Gather Motion 48:1  (slice2; segments: 48)  (cost=0.03..0.07 rows=1 width=36)  
   -- 计算节点原地聚合  
   ->  GroupAggregate  (cost=0.03..0.07 rows=1 width=36)  
         Group By: t.c1  
         ->  Subquery Scan t  (cost=0.03..0.05 rows=1 width=36)  
               ->  Window  (cost=0.03..0.04 rows=1 width=40)  
                     Partition By: tbl.c1  
                     Order By: tbl.info  
                     -- 按指定要求的顺序排序,例如按时间  
		     ->  Sort  (cost=0.03..0.04 rows=1 width=40)  
                           Sort Key: tbl.c1, tbl.info  
                           -- 按C1重分布  
			   ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=0.00..0.02 rows=1 width=40)  
                                 Hash Key: tbl.c1  
                                 ->  Seq Scan on tbl  (cost=0.00..0.00 rows=1 width=40)  
 Settings:  optimizer=off  
 Optimizer status: legacy query optimizer  
(14 rows)  

3、自定义聚合函数

Greenplum的自定义聚合与单节点聚合不同,一种为单阶段模式,另一种为两阶段聚合模式。

单阶段模式,将数据收到MASTER后进行聚合。流水:

初始值INITCOND,MASTER过程函数SFUNC,MASTER FINAL函数FINALFUNC。

两阶段模式先在数据节点并行执行,然后在MASTER执行第二阶段。流水:

初始值INITCOND,数据节点过程函数SFUNC(数据节点并行执行),MASTER聚合函数PREFUNC,MASTER FINAL函数FINALFUNC。

SFUNC操作流水如下

1、每个节点调用sfunc聚合,输入参数为(input_type数据 , 临时结果stype),输出为stype。处理第一条记录时,临时结果stype为 NULL 或 初始值INITCOND。

postgres=# \h create aggre  
Command:     CREATE AGGREGATE  
Description: define a new aggregate function  
Syntax:  
CREATE AGGREGATE name ( input_data_type [ , ... ] ) (  
    SFUNC = sfunc,  
    STYPE = state_data_type  
    [ , PREFUNC = prefunc ]  
    [ , FINALFUNC = ffunc ]  
    [ , INITCOND = initial_condition ]  
    [ , SORTOP = sort_operator ]  
)  
  
or the old syntax  
  
CREATE AGGREGATE name (  
    BASETYPE = base_type,  
    SFUNC = sfunc,  
    STYPE = state_data_type  
    [ , FINALFUNC = ffunc ]  
    [ , INITCOND = initial_condition ]  
    [ , SORTOP = sort_operator ]  
)  

两阶段聚合优化方法如下

在节点调用sfunc聚合,输入参数为(input_type数据 , 临时结果stype),输出为stype

sfunc( internal-state, next-data-values ) ---> next-internal-state    

segment第一阶段收集结果传输到master调用prefunc,输入(stype , stype),得到的结果为stype

prefunc( internal-state, internal-state ) ---> next-internal-state    

最后再将stype转换为聚合的输出类型即可(可选使用finalfunc)。

hll_union_agg 优化例子

CREATE AGGREGATE gp_hll_union_agg (hll) (   
  SFUNC = hll_union,   
  prefunc = hll_union, -- 第二阶段函数  
  STYPE = hll   
);   

hll_add_agg 优化例子

# select hll_empty();  
  hll_empty     
--------------  
 \021\213\177  
(1 row)  
  
CREATE AGGREGATE gp_hll_add_agg (hll_hashval) (  
  SFUNC = hll_add,   
  STYPE = hll,   
  prefunc = hll_union, -- 第二阶段函数  
  initcond='\021\213\177'  -- 初始值  
);   

但是请注意,由于在segment节点sfunc执行完没有断点接口,所以我们无法在SEGMENT节点直接将一阶段聚合的数据写入到OSS。(除非改GPDB代码,加入一个断点接口。)

怎么办呢?

通过UDF函数来实现,并要求它在每个数据节点单独执行。

create or replace function f(gid int, v anyarray) returns void as $$  
declare  
  oss_ext_tbl name;  
begin  
  oss_ext_tbl := 'ext_tbl_'||gid;  
  execute format ('insert into %I select unnest(%L)', oss_ext_tbl, v);  
end;  
$$ language plpgsql strict;  

虽然这是一种方法,但是这种方式依旧不是最高效的,因为还有一次聚合的过程。

更高效率的方法是首先对数据重分布和排序,同时在导出到文件时自动根据上下文的VALUE变化,切换文件,根据新的VALUE命名并写入新文件。

这部分工作需要修改数据库的导出代码来实现。

4、并行写出到OSS

实现了在导出到文件时自动根据上下文的VALUE变化,切换文件,根据新的VALUE命名并写入新文件这部分工作后,规整数据变得异常简单。

1、非规整外部表(来源表)

例子

create external table origin (c1 int, c2 int, c3 int, c4 text, info text, uid int, crt_time timestamp)  
.........  -- 外部表OSS位置  
;  

同样需要使用这种方法进行强制重分布

按UID规整,按crt_time排序

postgres=# explain select (t.tbl).* from (select row_number() over (partition by uid order by crt_time) as rn, tbl from origin tbl) t;  
                                                  QUERY PLAN                                                    
--------------------------------------------------------------------------------------------------------------  
 Gather Motion 48:1  (slice2; segments: 48)  (cost=0.03..0.05 rows=1 width=32)  
   ->  Subquery Scan t  (cost=0.03..0.05 rows=1 width=32)  
         ->  Window  (cost=0.03..0.04 rows=1 width=44)  
               Partition By: tbl.uid  
               Order By: tbl.crt_time  
               ->  Sort  (cost=0.03..0.04 rows=1 width=44)  
                     Sort Key: tbl.uid, tbl.crt_time  
                     ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=0.00..0.02 rows=1 width=44)  
                           Hash Key: tbl.uid  
                           ->  Seq Scan on origin tbl  (cost=0.00..0.00 rows=1 width=44)  
 Settings:  optimizer=off  
 Optimizer status: legacy query optimizer  
(12 rows)  

2、创建规整后OSS外部表

参考 阿里云HybridDB for PostgreSQL OSS存储用法

create external table dest (c1 int, c2 int, c3 int, c4 text, info text, uid int, crt_time timestamp)  
.........  -- 外部表OSS位置  
;  

3、将数据写入规整后OSS外部表

postgres=# explain insert into dest select (t.tbl).* from (select row_number() over (partition by uid order by crt_time) as rn, tbl from origin tbl) t;  
                                                     QUERY PLAN                                                       
--------------------------------------------------------------------------------------------------------------------  
 Insert (slice0; segments: 48)  (rows=1 width=32)  
         ->  Subquery Scan t  (cost=0.03..0.05 rows=1 width=32)  
               ->  Window  (cost=0.03..0.04 rows=1 width=44)  
                     Partition By: tbl.uid  
                     Order By: tbl.crt_time  
                     ->  Sort  (cost=0.03..0.04 rows=1 width=44)  
                           Sort Key: tbl.uid, tbl.crt_time  
                           ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=0.00..0.02 rows=1 width=44)  
                                 Hash Key: tbl.uid  
                                 ->  Seq Scan on origin tbl  (cost=0.00..0.00 rows=1 width=44)  
 Settings:  optimizer=off  
 Optimizer status: legacy query optimizer  
(14 rows)  

小结

使用HybridDB for PostgreSQL,同时实现了实时分析,准实时数据规整两个需求。

OSS作为海量数据入口,HDB作为OSS的计算引擎,实现海量数据实时分析。

同时HDB作为数据规整引擎,被规整的数据不需要在数据库本地落地,直接从OSS到OSS,只是用到了HDB的规整能力。

性能可以通过扩展HDB的计算节点线性扩展:

海量数据源,写入OSS,通过HybridDB for PostgreSQL的oss_ext插件,实时分析写入的数据。

OSS带宽指标:目前每个计算节点每个会话约30MB/s的读写速率。

对于列式存储格式,数值类型。1亿记录约381MB,压缩比5:1的话,约76.3MB。

按行换算的性能指标:2048个计算节点,读写吞吐约 805亿行/s。每天处理6900万亿行(当然,按多列进出打个折扣,万亿级别也是没有问题的)。

参考

阿里云HybridDB for PostgreSQL

阿里云HybridDB for PostgreSQL OSS存储用法

《Greenplum 性能评估公式 - 阿里云HybridDB for PostgreSQL最佳实践》

《Greenplum 最佳实践 - 估值插件hll的使用(以及hll分式聚合函数优化)》

《Postgres-XC customized aggregate introduction》

《PostgreSQL aggregate function customize》

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
1天前
|
存储 消息中间件 容灾
|
2天前
|
消息中间件 SQL Kafka
实时计算 Flink版产品使用问题之如何实现两个阿里云账号下的Kafka进行数据的互相传输
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
9天前
|
存储 数据挖掘 OLAP
阿里云 EMR Serverless StarRocks OLAP 数据分析场景解析
阿里云 E-MapReduce Serverless StarRocks 版是阿里云提供的 Serverless StarRocks 全托管服务,提供高性能、全场景、极速统一的数据分析体验,具备开箱即用、弹性扩展、监控管理、慢 SQL 诊断分析等全生命周期能力。内核 100% 兼容 StarRocks,性能比传统 OLAP 引擎提升 3-5 倍,助力企业高效构建大数据应用。本篇文章对阿里云EMR Serverless StarRocks OLAP 数据分析场景进行解析、存算分离架构升级以及 Trino 兼容,无缝替换介绍。
118 1
|
15天前
|
关系型数据库 MySQL 数据库
关系型数据库mysql数据增量恢复
【7月更文挑战第3天】
129 2
|
15天前
|
关系型数据库 MySQL Shell
关系型数据库mysql数据完全恢复
【7月更文挑战第3天】
84 2
|
16天前
|
存储 关系型数据库 MySQL
|
20小时前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之同样的表和数据,在PolarDB执行LEFT JOIN查询可以得到结果,但在MaxCompute中却返回为空,是什么原因?
摘要:DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
12天前
|
存储 关系型数据库 MySQL
探索MySQL:关系型数据库的基石
MySQL,作为全球最流行的开源关系型数据库管理系统(RDBMS)之一,广泛应用于各种Web应用、企业级应用和数据仓库中
|
9天前
|
关系型数据库 MySQL 网络安全
Mysql 数据库主从复制
在MySQL主从复制环境中,配置了两台虚拟机:主VM拥有IP1,从VM有IP2。主VM的`my.cnf`设置server-id为1,启用二进制日志;从VM设置server-id为2,开启GTID模式。通过`find`命令查找配置文件,编辑`my.cnf`,在主服务器上创建复制用户,记录二进制日志信息,然后锁定表并备份数据。备份文件通过SCP传输到从服务器,恢复数据并配置复制源,启动复制。检查复制状态确认运行正常。最后解锁表,完成主从同步,新用户在从库中自动更新。
978 6
Mysql 数据库主从复制
|
10天前
|
缓存 运维 关系型数据库
数据库容灾 | MySQL MGR与阿里云PolarDB-X Paxos的深度对比
经过深入的技术剖析与性能对比,PolarDB-X DN凭借其自研的X-Paxos协议和一系列优化设计,在性能、正确性、可用性及资源开销等方面展现出对MySQL MGR的多项优势,但MGR在MySQL生态体系内也占据重要地位,但需要考虑备库宕机抖动、跨机房容灾性能波动、稳定性等各种情况,因此如果想用好MGR,必须配备专业的技术和运维团队的支持。 在面对大规模、高并发、高可用性需求时,PolarDB-X存储引擎以其独特的技术优势和优异的性能表现,相比于MGR在开箱即用的场景下,PolarDB-X基于DN的集中式(标准版)在功能和性能都做到了很好的平衡,成为了极具竞争力的数据库解决方案。

相关产品

  • 云原生数据库 PolarDB