PostgreSQL 10 自定义并行计算聚合函数的原理与实践

本文涉及的产品
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS SQL Server,基础系列 2核4GB
简介:

标签

PostgreSQL , 聚合函数 , 自定义 , AGGREGATE , 并行 , COMBINEFUNC


背景

PostgreSQL 9.6开始就支持并行计算了,意味着聚合、扫描、排序、JOIN等都开始支持并行计算。对于聚合操作来说,并行计算与非并行计算是有差异的。

例如avg聚合,对一张表进行计算时,一个任务中操作和多个并行任务操作,算法是不一样的。

PostgreSQL提供了一套标准的接口,可以支持聚合函数的并行操作。

自定义并行聚合的原理和例子

创建聚合函数的语法如下:

CREATE AGGREGATE name ( [ argmode ] [ argname ] arg_data_type [ , ... ] ) (  
    SFUNC = sfunc,  
    STYPE = state_data_type  
    [ , SSPACE = state_data_size ]  
    [ , FINALFUNC = ffunc ]  
    [ , FINALFUNC_EXTRA ]  
    [ , COMBINEFUNC = combinefunc ]  
    [ , SERIALFUNC = serialfunc ]  
    [ , DESERIALFUNC = deserialfunc ]  
    [ , INITCOND = initial_condition ]  
    [ , MSFUNC = msfunc ]  
    [ , MINVFUNC = minvfunc ]  
    [ , MSTYPE = mstate_data_type ]  
    [ , MSSPACE = mstate_data_size ]  
    [ , MFINALFUNC = mffunc ]  
    [ , MFINALFUNC_EXTRA ]  
    [ , MINITCOND = minitial_condition ]  
    [ , SORTOP = sort_operator ]  
    [ , PARALLEL = { SAFE | RESTRICTED | UNSAFE } ]  
)  

相比非并行,多了一个过程,那就是combinefunc的过程(也叫partial agg)。

非并行模式的聚合流程大致如下:

循环  
sfunc( internal-state, next-data-values ) ---> next-internal-state  
  
最后调用一次(可选)  
ffunc( internal-state ) ---> aggregate-value  

pic

并行模式的聚合流程大致如下,如果没有写combinefunc,那么实际上聚合过程并没有实现并行而只是扫描并行:

pic

下面这个例子,我们可以观察到一个COUNT操作的并行聚合。

postgres=# set max_parallel_workers=4;  
SET  
postgres=# set max_parallel_workers_per_gather =4;  
SET  
postgres=# set parallel_setup_cost =0;  
SET  
postgres=# set parallel_tuple_cost =0;  
SET  
postgres=# alter table test set (parallel_workers =4);  
ALTER TABLE  
postgres=# explain (analyze,verbose,timing,costs,buffers) select count(*) from test;  
                                                                  QUERY PLAN                                                                     
-----------------------------------------------------------------------------------------------------------------------------------------------  
 -- final并行,可有可无,看具体的聚合算法  
 Finalize Aggregate  (cost=15837.02..15837.03 rows=1 width=8) (actual time=57.296..57.296 rows=1 loops=1)  
   Output: count(*)  
   Buffers: shared hit=3060  
   ->  Gather  (cost=15837.00..15837.01 rows=4 width=8) (actual time=57.287..57.292 rows=5 loops=1)  
         Output: (PARTIAL count(*))  
         Workers Planned: 4  
         Workers Launched: 4  
         Buffers: shared hit=3060  
           
	 -- 一下就是combinefunc完成的聚合并行(显示为PARTIAL agg)  
	 ->  Partial Aggregate  (cost=15837.00..15837.01 rows=1 width=8) (actual time=52.333..52.333 rows=1 loops=5)  
               Output: PARTIAL count(*)  
               Buffers: shared hit=12712  
               Worker 0: actual time=50.917..50.918 rows=1 loops=1  
                 Buffers: shared hit=2397  
               Worker 1: actual time=51.293..51.294 rows=1 loops=1  
                 Buffers: shared hit=2423  
               Worker 2: actual time=51.062..51.063 rows=1 loops=1  
                 Buffers: shared hit=2400  
               Worker 3: actual time=51.436..51.436 rows=1 loops=1  
                 Buffers: shared hit=2432  
               ->  Parallel Seq Scan on public.test  (cost=0.00..15212.00 rows=250000 width=0) (actual time=0.010..30.499 rows=200000 loops=5)  
                     Buffers: shared hit=12712  
                     Worker 0: actual time=0.013..30.343 rows=190269 loops=1  
                       Buffers: shared hit=2397  
                     Worker 1: actual time=0.010..30.401 rows=192268 loops=1  
                       Buffers: shared hit=2423  
                     Worker 2: actual time=0.013..30.467 rows=190350 loops=1  
                       Buffers: shared hit=2400  
                     Worker 3: actual time=0.009..30.221 rows=192861 loops=1  
                       Buffers: shared hit=2432  
 Planning time: 0.074 ms  
 Execution time: 60.169 ms  
(31 rows)  

了解了并行聚合的原理后,我们就可以写自定义聚合函数的并行计算了。

例子

例如我们要支持一个数组的聚合,并且在聚合过程中我们要实现对元素去重。

1、创建测试表

create table test(id int, col int[]);  

2、生成测试数据

CREATE OR REPLACE FUNCTION public.gen_arr(integer, integer)  
 RETURNS integer[]  
 LANGUAGE sql  
 STRICT  
AS $function$  
  select array(select ($1*random())::int from generate_series(1,$2));  
$function$;  
  
insert into test select random()*1000, gen_arr(500,10) from generate_series(1,10000);  

3、创建聚合函数

例子1,没有combinefunc,只支持扫描并行。

数组去重函数

postgres=# create or replace function uniq(int[]) returns int[] as $$  
  select array( select unnest($1) group by 1);  
$$ language sql strict parallel safe;  
CREATE FUNCTION  

数组合并与去重函数

postgres=# create or replace function array_uniq_cat(anyarray,anyarray) returns anyarray as $$  
  select uniq(array_cat($1,$2));   
$$ language sql strict parallel safe;  
CREATE FUNCTION  

聚合函数(不带COMBINEFUNC)

create aggregate arragg (anyarray) (sfunc = array_uniq_cat, stype=anyarray, PARALLEL=safe);  

并行查询例子:

postgres=# set max_parallel_workers=4;  
SET  
postgres=# set max_parallel_workers_per_gather =4;  
SET  
postgres=# set parallel_setup_cost =0;  
SET  
postgres=# set parallel_tuple_cost =0;  
SET  
postgres=# alter table test set (parallel_workers =4);  
ALTER TABLE  
postgres=# explain (analyze,verbose,timing,costs,buffers) select id, arragg(col) from test group by id ;  

很明显没有设置COMBINEFUNC时,未使用并行聚合。

postgres=# explain (analyze,verbose,timing,costs,buffers) select id, arragg(col) from test group by id ;  
                                                            QUERY PLAN                                                               
-----------------------------------------------------------------------------------------------------------------------------------  
 HashAggregate  (cost=4139.74..4141.74 rows=200 width=36) (actual time=602.957..603.195 rows=1001 loops=1)  
   Output: id, arragg(col)  
   Group Key: test.id  
   Buffers: shared hit=6  
   ->  Gather  (cost=0.00..163.37 rows=15748 width=36) (actual time=0.328..43.734 rows=10000 loops=1)  
         Output: id, col  
         Workers Planned: 4  
         Workers Launched: 4  
         Buffers: shared hit=6  
         -- 只有并行扫描,没有并行聚合。  
	 ->  Parallel Seq Scan on public.test  (cost=0.00..163.37 rows=3937 width=36) (actual time=0.017..0.891 rows=2000 loops=5)  
               Output: id, col  
               Buffers: shared hit=124  
               Worker 0: actual time=0.019..0.177 rows=648 loops=1  
                 Buffers: shared hit=8  
               Worker 1: actual time=0.022..0.180 rows=648 loops=1  
                 Buffers: shared hit=8  
               Worker 2: actual time=0.017..3.772 rows=7570 loops=1  
                 Buffers: shared hit=94  
               Worker 3: actual time=0.015..0.189 rows=648 loops=1  
                 Buffers: shared hit=8  
 Planning time: 0.084 ms  
 Execution time: 603.450 ms  
(22 rows)  

例子2,有combinefunc,支持并行聚合。

drop aggregate arragg(anyarray);  
  
create aggregate arragg (anyarray) (sfunc = array_uniq_cat, stype=anyarray, COMBINEFUNC = array_uniq_cat, PARALLEL=safe);   

使用了并行聚合。

postgres=# explain (analyze,verbose,timing,costs,buffers) select id, arragg(col) from test group by id ;  
                                                               QUERY PLAN                                                                  
-----------------------------------------------------------------------------------------------------------------------------------------  
 Finalize HashAggregate  (cost=1361.46..1363.46 rows=200 width=36) (actual time=285.489..285.732 rows=1001 loops=1)  
   Output: id, arragg(col)  
   Group Key: test.id  
   Buffers: shared hit=36  
   ->  Gather  (cost=1157.46..1159.46 rows=800 width=36) (actual time=63.654..74.163 rows=4297 loops=1)  
         Output: id, (PARTIAL arragg(col))  
         Workers Planned: 4  
         Workers Launched: 4  
         Buffers: shared hit=36  
         -- 并行聚合  
	 ->  Partial HashAggregate  (cost=1157.46..1159.46 rows=200 width=36) (actual time=57.367..57.727 rows=859 loops=5)  
               Output: id, PARTIAL arragg(col)  
               Group Key: test.id  
               Buffers: shared hit=886  
               Worker 0: actual time=54.788..54.997 rows=857 loops=1  
                 Buffers: shared hit=213  
               Worker 1: actual time=56.881..57.255 rows=861 loops=1  
                 Buffers: shared hit=213  
               Worker 2: actual time=55.415..55.813 rows=856 loops=1  
                 Buffers: shared hit=212  
               Worker 3: actual time=56.453..56.854 rows=838 loops=1  
                 Buffers: shared hit=212  
               ->  Parallel Seq Scan on public.test  (cost=0.00..163.37 rows=3937 width=36) (actual time=0.011..0.736 rows=2000 loops=5)  
                     Output: id, col  
                     Buffers: shared hit=124  
                     Worker 0: actual time=0.009..0.730 rows=1981 loops=1  
                       Buffers: shared hit=25  
                     Worker 1: actual time=0.012..0.773 rows=2025 loops=1  
                       Buffers: shared hit=25  
                     Worker 2: actual time=0.015..0.741 rows=1944 loops=1  
                       Buffers: shared hit=24  
                     Worker 3: actual time=0.012..0.751 rows=1944 loops=1  
                       Buffers: shared hit=24  
 Planning time: 0.073 ms  
 Execution time: 285.949 ms  
(34 rows)  

实际上并行聚合与分布式数据库聚合阶段原理是一样的,分布式数据库自定义聚合可以参考末尾的文章。

参考

https://www.postgresql.org/docs/10/static/sql-createaggregate.html

https://www.postgresql.org/docs/10/static/xaggr.html#XAGGR-PARTIAL-AGGREGATES

《PostgreSQL aggregate function customize》

《Greenplum 最佳实践 - 估值插件hll的使用(以及hll分式聚合函数优化)》

《Postgres-XC customized aggregate introduction》

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
8月前
|
关系型数据库 MySQL 数据库
第十四章 演示MYSQL自定义values.yaml绑定PV和PVC和数据库用户密码
第十四章 演示MYSQL自定义values.yaml绑定PV和PVC和数据库用户密码
91 0
|
关系型数据库 Go PostgreSQL
golang pgx自定义PostgreSQL类型
golang的pgx驱动提供了大约70种PostgreSQL类型支持,但还是有一些类型没有涵盖,本文介绍如何自己编写代码支持特殊的类型。
|
8月前
|
关系型数据库 MySQL 流计算
Flink自定义sink写入mysql
Flink自定义sink写入mysql
118 0
|
消息中间件 关系型数据库 MySQL
Flink--6、输出算子(连接到外部系统、文件、kafka、MySQL、自定义Sink)
Flink--6、输出算子(连接到外部系统、文件、kafka、MySQL、自定义Sink)
|
3月前
|
存储 自然语言处理 关系型数据库
MySQL 自定义变量并声明字符编码
MySQL 自定义变量并声明字符编码
147 1
|
3月前
|
存储 SQL 关系型数据库
MySQL 给查询结果增列并自定义列数据
MySQL 给查询结果增列并自定义列数据
712 2
|
6月前
|
SQL 关系型数据库 MySQL
Mysql:如何自定义导出表结构
通过以上方法,你可以灵活地自定义导出MySQL中的表结构,以满足不同的需求和场景。在进行操作的时候要注意权限问题以及路径问题,确保MySQL用户有权限写入指定的文件路径。在执行导出任务之前,还应确保你对数据库及其内容有足够的了解,以避免不必要的数据丢失或损坏。
103 1
|
7月前
|
自然语言处理 关系型数据库 数据库
技术经验解读:【转】PostgreSQL的FTI(TSearch)与中文全文索引的实践
技术经验解读:【转】PostgreSQL的FTI(TSearch)与中文全文索引的实践
78 0
|
关系型数据库 数据管理 Go
《PostgreSQL数据分区:原理与实战》
《PostgreSQL数据分区:原理与实战》
211 0
|
8月前
|
关系型数据库 MySQL 数据库
docker自定义安装mysql 5.7
docker自定义安装mysql 5.7
157 0

相关产品

  • 云原生数据库 PolarDB
  • 云数据库 RDS PostgreSQL 版