每天万亿+级 实时分析、数据规整 - 阿里云HybridDB for PostgreSQL最佳实践

本文涉及的产品
云原生数据库 PolarDB 分布式版,标准版 2核8GB
RDS SQL Server Serverless,2-4RCU 50GB 3个月
推荐场景:
云数据库 RDS SQL Server,基础系列 2核4GB
简介:

背景

横看成岭侧成峰,

远近高低各不同。

不识庐山真面目,

只缘身在此山中。

不同的视角我们所看到的物体是不一样的,

http://t.m.china.com.cn/convert/c_ovWL9w.html

pic

图为墨西哥城放射状的街区广场。

pic

图为西班牙迷宫般的果树漩涡。

地心说和日心说也是视角不同所呈现的。

pic

pic

实际上数据也有这样,我们每天产生海量的数据,有各种属性,以每个属性为视角(分组、归类、聚合),看到的是对应属性为中心的数据。

对应的业务场景也非常多,例如:

1、物联网,

每个传感器有很多属性:ID,地理位置,传感器归属,各类指标等。

以传感器的角度进行观察,观察某个传感器ID在流金岁月中的值。

以归属角度(例如归属于公安、城管、某家公司、。。。)进行观察,

以地理位置为视角进行观察,。。。。

2、车联网、站长网。。。。

按车辆、按客户、按访问者。。。多重视角进行观察

观察在数据库中可以触发两种行为,一种是实时计算的行为,另一种是数据规整的行为。

数据规整指将数据按视角重新归类存放。(例如在云端汇聚了各个网站的被访问记录,站长只关注他自己的网站的被访问记录,当需要向站长提供数据时,可以按网站进行数据规整)。

那么如何在云端实现实时分析、准实时数据归类的需求呢?

1 架构

HybridDB for PostgreSQL是阿里云的一款分析型MPP数据库产品(基于Greenplum开源版本而来,新增了插件功能、云端特性以及内核代码优化),提供了水平扩展的能力以及强大的分析SQL兼容性,同时与云端海量存储OSS进行了深度整合,可以并行读写OSS,将OSS作为数据存储来使用。

实时计算架构

pic

海量数据源,写入OSS,通过HybridDB for PostgreSQL的oss_ext插件,实时分析写入的数据。

OSS带宽指标:目前每个计算节点每个会话约30MB/s的读写速率。

对于列式存储格式,数值类型。1亿记录约381MB,压缩比5:1的话,约76.3MB。

按行换算的性能指标:2048个计算节点,读写吞吐约 805亿行/s。每天处理6900万亿行(当然,按多列进出打个折扣,万亿级别也是没有问题的)。

准实时数据规整架构

pic

实时数据规整的目的是按视角将数据规整,数据进入OSS时,是打乱的。由HybridDB for PostgreSQL对接重分布分组规整后,再写出到OSS,形成规整的数据。

为什么需要重分布?

前面谈到了视角问题,我们可能有多重视角来观察数据,而在数据库中只能选择一种固定的分布键,当视角与之不同时,就需要重分布。

准实时导出的优化:

对于一个视角,可能有少量或多种属性,例如用户实际,假设有100万个用户,如果每个计算节点分别导出100万用户,每个用户对应到OSS的一个规整文件,那么由于文件数过多,导出会较慢。

那么可以对用户重分布,例如1000个节点,每个节点分配到1000个用户的数据,这样的话,并行写出到OSS时,一下子就降低到了每个节点写1000个文件的规模。

如何强制重分布呢?后面讲到。

2 实时计算

HybridDB for PostgreSQL与OSS对接的详细文档请参考:

https://help.aliyun.com/document_detail/35457.html

简略步骤如下:

1、创建OSS用户

2、创建OSS BUCKET,例如每个小时一个BUCKET

3、写入数据(最好写入小文件,数量为HybridDB for PostgreSQL的倍数)

4、在HybridDB for PostgreSQL中创建OSS外部表,例如每个小时一个

5、直接读取OSS外表进行分析

3 准实时数据规整

1、需求

按不同的视觉维度,进行分组,每个视觉属性规整到一个OSS文件。也就是说一个OSS文件不能存在多个对象。

2、查询

比如需要按视角分组,按时间排序输出。

select 聚合函数(t order by 时间) from tbl t group by 视角字段;  

以上SQL,数据库可能不会按视角字段重分布,而是使用两阶段提交的方式。例如

postgres=# create table tbl(uid int, info text, c1 int);  
NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'uid' as the Greenplum Database data distribution key for this table.  
HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.  
CREATE TABLE  
表按UID分布  
  
但是查询不按它,看看会不会重分布  
  
postgres=# explain select string_agg(textin(record_out(t)), chr(10)) from tbl t group by c1;  
                                               QUERY PLAN                                                 
--------------------------------------------------------------------------------------------------------  
 Gather Motion 48:1  (slice2; segments: 48)  (cost=0.03..0.06 rows=1 width=36)  
   ->  GroupAggregate  (cost=0.03..0.06 rows=1 width=36)  
         Group By: c1  
         ->  Sort  (cost=0.03..0.04 rows=1 width=36)  
               Sort Key: c1  
	       -- 按c1重分布  
               ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=0.00..0.02 rows=1 width=36)  
                     Hash Key: c1  
                     ->  Seq Scan on tbl t  (cost=0.00..0.00 rows=1 width=36)  
 Settings:  optimizer=off  
 Optimizer status: legacy query optimizer  
(10 rows)  
  
这条是不被期望的,因为发生了两次聚合。  
我们在将数据写入OSS时,不希望两次聚合。  
  
postgres=# explain select max(c1) from tbl group by info;  
                                            QUERY PLAN                                              
--------------------------------------------------------------------------------------------------  
 Gather Motion 48:1  (slice2; segments: 48)  (cost=0.04..0.05 rows=1 width=36)  
   -- 第二次聚合  
   ->  HashAggregate  (cost=0.04..0.05 rows=1 width=36)  
         Group By: tbl.info  
         -- 重分布  
	 ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=0.01..0.03 rows=1 width=36)  
               Hash Key: tbl.info  
               -- 第一次聚合  
	       ->  HashAggregate  (cost=0.01..0.01 rows=1 width=36)  
                     Group By: tbl.info  
                     ->  Seq Scan on tbl  (cost=0.00..0.00 rows=1 width=36)  
 Settings:  optimizer=off  
 Optimizer status: legacy query optimizer  
(10 rows)  

强制按指定键重分布

postgres=# explain select row_number() over (partition by c1 order by info), * from tbl;  
                                               QUERY PLAN                                                 
--------------------------------------------------------------------------------------------------------  
 Gather Motion 48:1  (slice2; segments: 48)  (cost=0.03..0.04 rows=1 width=40)  
   ->  Window  (cost=0.03..0.04 rows=1 width=40)  
         Partition By: c1  
         Order By: info  
         ->  Sort  (cost=0.03..0.04 rows=1 width=40)  
               Sort Key: c1, info  
               ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=0.00..0.02 rows=1 width=40)  
                     Hash Key: c1  
                     ->  Seq Scan on tbl  (cost=0.00..0.00 rows=1 width=40)  
 Settings:  optimizer=off  
 Optimizer status: legacy query optimizer  
(11 rows)  

按强制重分布, 改写SQL

使用窗口查询,将数据强制重分布,然后再进行计算节点的原地聚合。

postgres=# explain select string_agg(textin(record_out(t)), chr(10)) from (select row_number() over (partition by c1 order by info), * from tbl) t group by c1;  
                                                     QUERY PLAN                                                       
--------------------------------------------------------------------------------------------------------------------  
 Gather Motion 48:1  (slice2; segments: 48)  (cost=0.03..0.07 rows=1 width=36)  
   -- 计算节点原地聚合  
   ->  GroupAggregate  (cost=0.03..0.07 rows=1 width=36)  
         Group By: t.c1  
         ->  Subquery Scan t  (cost=0.03..0.05 rows=1 width=36)  
               ->  Window  (cost=0.03..0.04 rows=1 width=40)  
                     Partition By: tbl.c1  
                     Order By: tbl.info  
                     -- 按指定要求的顺序排序,例如按时间  
		     ->  Sort  (cost=0.03..0.04 rows=1 width=40)  
                           Sort Key: tbl.c1, tbl.info  
                           -- 按C1重分布  
			   ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=0.00..0.02 rows=1 width=40)  
                                 Hash Key: tbl.c1  
                                 ->  Seq Scan on tbl  (cost=0.00..0.00 rows=1 width=40)  
 Settings:  optimizer=off  
 Optimizer status: legacy query optimizer  
(14 rows)  

3、自定义聚合函数

Greenplum的自定义聚合与单节点聚合不同,一种为单阶段模式,另一种为两阶段聚合模式。

单阶段模式,将数据收到MASTER后进行聚合。流水:

初始值INITCOND,MASTER过程函数SFUNC,MASTER FINAL函数FINALFUNC。

两阶段模式先在数据节点并行执行,然后在MASTER执行第二阶段。流水:

初始值INITCOND,数据节点过程函数SFUNC(数据节点并行执行),MASTER聚合函数PREFUNC,MASTER FINAL函数FINALFUNC。

SFUNC操作流水如下

1、每个节点调用sfunc聚合,输入参数为(input_type数据 , 临时结果stype),输出为stype。处理第一条记录时,临时结果stype为 NULL 或 初始值INITCOND。

postgres=# \h create aggre  
Command:     CREATE AGGREGATE  
Description: define a new aggregate function  
Syntax:  
CREATE AGGREGATE name ( input_data_type [ , ... ] ) (  
    SFUNC = sfunc,  
    STYPE = state_data_type  
    [ , PREFUNC = prefunc ]  
    [ , FINALFUNC = ffunc ]  
    [ , INITCOND = initial_condition ]  
    [ , SORTOP = sort_operator ]  
)  
  
or the old syntax  
  
CREATE AGGREGATE name (  
    BASETYPE = base_type,  
    SFUNC = sfunc,  
    STYPE = state_data_type  
    [ , FINALFUNC = ffunc ]  
    [ , INITCOND = initial_condition ]  
    [ , SORTOP = sort_operator ]  
)  

两阶段聚合优化方法如下

在节点调用sfunc聚合,输入参数为(input_type数据 , 临时结果stype),输出为stype

sfunc( internal-state, next-data-values ) ---> next-internal-state    

segment第一阶段收集结果传输到master调用prefunc,输入(stype , stype),得到的结果为stype

prefunc( internal-state, internal-state ) ---> next-internal-state    

最后再将stype转换为聚合的输出类型即可(可选使用finalfunc)。

hll_union_agg 优化例子

CREATE AGGREGATE gp_hll_union_agg (hll) (   
  SFUNC = hll_union,   
  prefunc = hll_union, -- 第二阶段函数  
  STYPE = hll   
);   

hll_add_agg 优化例子

# select hll_empty();  
  hll_empty     
--------------  
 \021\213\177  
(1 row)  
  
CREATE AGGREGATE gp_hll_add_agg (hll_hashval) (  
  SFUNC = hll_add,   
  STYPE = hll,   
  prefunc = hll_union, -- 第二阶段函数  
  initcond='\021\213\177'  -- 初始值  
);   

但是请注意,由于在segment节点sfunc执行完没有断点接口,所以我们无法在SEGMENT节点直接将一阶段聚合的数据写入到OSS。(除非改GPDB代码,加入一个断点接口。)

怎么办呢?

通过UDF函数来实现,并要求它在每个数据节点单独执行。

create or replace function f(gid int, v anyarray) returns void as $$  
declare  
  oss_ext_tbl name;  
begin  
  oss_ext_tbl := 'ext_tbl_'||gid;  
  execute format ('insert into %I select unnest(%L)', oss_ext_tbl, v);  
end;  
$$ language plpgsql strict;  

虽然这是一种方法,但是这种方式依旧不是最高效的,因为还有一次聚合的过程。

更高效率的方法是首先对数据重分布和排序,同时在导出到文件时自动根据上下文的VALUE变化,切换文件,根据新的VALUE命名并写入新文件。

这部分工作需要修改数据库的导出代码来实现。

4、并行写出到OSS

实现了在导出到文件时自动根据上下文的VALUE变化,切换文件,根据新的VALUE命名并写入新文件这部分工作后,规整数据变得异常简单。

1、非规整外部表(来源表)

例子

create external table origin (c1 int, c2 int, c3 int, c4 text, info text, uid int, crt_time timestamp)  
.........  -- 外部表OSS位置  
;  

同样需要使用这种方法进行强制重分布

按UID规整,按crt_time排序

postgres=# explain select (t.tbl).* from (select row_number() over (partition by uid order by crt_time) as rn, tbl from origin tbl) t;  
                                                  QUERY PLAN                                                    
--------------------------------------------------------------------------------------------------------------  
 Gather Motion 48:1  (slice2; segments: 48)  (cost=0.03..0.05 rows=1 width=32)  
   ->  Subquery Scan t  (cost=0.03..0.05 rows=1 width=32)  
         ->  Window  (cost=0.03..0.04 rows=1 width=44)  
               Partition By: tbl.uid  
               Order By: tbl.crt_time  
               ->  Sort  (cost=0.03..0.04 rows=1 width=44)  
                     Sort Key: tbl.uid, tbl.crt_time  
                     ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=0.00..0.02 rows=1 width=44)  
                           Hash Key: tbl.uid  
                           ->  Seq Scan on origin tbl  (cost=0.00..0.00 rows=1 width=44)  
 Settings:  optimizer=off  
 Optimizer status: legacy query optimizer  
(12 rows)  

2、创建规整后OSS外部表

参考 阿里云HybridDB for PostgreSQL OSS存储用法

create external table dest (c1 int, c2 int, c3 int, c4 text, info text, uid int, crt_time timestamp)  
.........  -- 外部表OSS位置  
;  

3、将数据写入规整后OSS外部表

postgres=# explain insert into dest select (t.tbl).* from (select row_number() over (partition by uid order by crt_time) as rn, tbl from origin tbl) t;  
                                                     QUERY PLAN                                                       
--------------------------------------------------------------------------------------------------------------------  
 Insert (slice0; segments: 48)  (rows=1 width=32)  
         ->  Subquery Scan t  (cost=0.03..0.05 rows=1 width=32)  
               ->  Window  (cost=0.03..0.04 rows=1 width=44)  
                     Partition By: tbl.uid  
                     Order By: tbl.crt_time  
                     ->  Sort  (cost=0.03..0.04 rows=1 width=44)  
                           Sort Key: tbl.uid, tbl.crt_time  
                           ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=0.00..0.02 rows=1 width=44)  
                                 Hash Key: tbl.uid  
                                 ->  Seq Scan on origin tbl  (cost=0.00..0.00 rows=1 width=44)  
 Settings:  optimizer=off  
 Optimizer status: legacy query optimizer  
(14 rows)  

小结

使用HybridDB for PostgreSQL,同时实现了实时分析,准实时数据规整两个需求。

OSS作为海量数据入口,HDB作为OSS的计算引擎,实现海量数据实时分析。

同时HDB作为数据规整引擎,被规整的数据不需要在数据库本地落地,直接从OSS到OSS,只是用到了HDB的规整能力。

性能可以通过扩展HDB的计算节点线性扩展:

海量数据源,写入OSS,通过HybridDB for PostgreSQL的oss_ext插件,实时分析写入的数据。

OSS带宽指标:目前每个计算节点每个会话约30MB/s的读写速率。

对于列式存储格式,数值类型。1亿记录约381MB,压缩比5:1的话,约76.3MB。

按行换算的性能指标:2048个计算节点,读写吞吐约 805亿行/s。每天处理6900万亿行(当然,按多列进出打个折扣,万亿级别也是没有问题的)。

参考

阿里云HybridDB for PostgreSQL

阿里云HybridDB for PostgreSQL OSS存储用法

《Greenplum 性能评估公式 - 阿里云HybridDB for PostgreSQL最佳实践》

《Greenplum 最佳实践 - 估值插件hll的使用(以及hll分式聚合函数优化)》

《Postgres-XC customized aggregate introduction》

《PostgreSQL aggregate function customize》

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
12月前
|
SQL Oracle 关系型数据库
实时计算 Flink版操作报错之往GREENPLUM 6 写数据,用postgresql-42.2.9.jar 报 ON CONFLICT (uuid) DO UPDATE SET 语法有问题。怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
2月前
|
SQL 关系型数据库 PostgreSQL
【YashanDB 知识库】从 PostgreSQL 迁移到 YashanDB 如何进行数据行数比对
【YashanDB 知识库】从 PostgreSQL 迁移到 YashanDB 如何进行数据行数比对
|
2月前
|
SQL Oracle 关系型数据库
【YashanDB知识库】从PostgreSQL迁移到YashanDB如何进行数据行数比对
本文介绍了通过Oracle视图`v$sql`和`v$sql_plan`分析SQL性能的方法。首先,可通过`plan_hash_value`从`v$sql_plan`获取SQL执行计划,结合示例展示了具体查询方式。文章还创建了一个UDF函数`REPEAT`用于格式化输出,便于阅读复杂执行计划。最后,通过实例展示了如何根据`plan_hash_value`获取SQL文本及其内存中的执行计划,帮助优化性能问题。
|
5月前
|
存储 关系型数据库 数据库
【赵渝强老师】PostgreSQL的数据文件
PostgreSQL的物理存储结构主要包括数据文件、日志文件等。数据文件按oid命名,超过1G时自动拆分。通过查询数据库和表的oid,可定位到具体的数据文件。例如,查询数据库oid后,再查询特定表的oid及relfilenode,即可找到该表对应的数据文件位置。
156 1
|
11月前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
1081 0
|
11月前
|
DataWorks 安全 关系型数据库
DataWorks产品使用合集之使用Flink CDC读取PostgreSQL数据时如何指定编码格式
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
151 0
|
Cloud Native 关系型数据库 OLAP
云原生数据仓库产品使用合集之阿里云云原生数据仓库AnalyticDB PostgreSQL版的重分布时间主要取决的是什么
阿里云AnalyticDB提供了全面的数据导入、查询分析、数据管理、运维监控等功能,并通过扩展功能支持与AI平台集成、跨地域复制与联邦查询等高级应用场景,为企业构建实时、高效、可扩展的数据仓库解决方案。以下是对AnalyticDB产品使用合集的概述,包括数据导入、查询分析、数据管理、运维监控、扩展功能等方面。
|
7月前
|
人工智能 自然语言处理 关系型数据库
阿里云云原生数据仓库 AnalyticDB PostgreSQL 版已完成和开源LLMOps平台Dify官方集成
近日,阿里云云原生数据仓库 AnalyticDB PostgreSQL 版已完成和开源LLMOps平台Dify官方集成。
|
9月前
|
SQL 关系型数据库 MySQL
SQL Server、MySQL、PostgreSQL:主流数据库SQL语法异同比较——深入探讨数据类型、分页查询、表创建与数据插入、函数和索引等关键语法差异,为跨数据库开发提供实用指导
【8月更文挑战第31天】SQL Server、MySQL和PostgreSQL是当今最流行的关系型数据库管理系统,均使用SQL作为查询语言,但在语法和功能实现上存在差异。本文将比较它们在数据类型、分页查询、创建和插入数据以及函数和索引等方面的异同,帮助开发者更好地理解和使用这些数据库。尽管它们共用SQL语言,但每个系统都有独特的语法规则,了解这些差异有助于提升开发效率和项目成功率。
949 0
|
9月前
|
SQL 关系型数据库 HIVE
实时计算 Flink版产品使用问题之如何将PostgreSQL数据实时入库Hive并实现断点续传
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

相关产品

  • 云原生数据库 PolarDB
  • 云数据库 RDS PostgreSQL 版