Greenplum支持人为多阶段聚合的方法 - 直连segment(PGOPTIONS='-c gp_session_role=utility') Or gp_dist_random('gp_id') Or 多阶段聚合 prefunc

本文涉及的产品
云数据库 RDS SQL Server,基础系列 2核4GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
推荐场景:
对影评进行热评分析
简介:

标签

PostgreSQL , 多阶段聚合 , 直连segment , gp_dist_random('gp_id')


背景

聚合操作是分析型场景中最常见的需求之一,在Greenplum中,数据已分布存储,聚合操作需要多阶段执行。

实际上PostgreSQL 9.6开始支持并行聚合后,聚合的方法也与分布式数据库的多阶段聚合类似。

创建聚合函数时,必须实现多阶段的API才能够让聚合操作真正的并行起来。

有些插件可能因为某些原因没有实现多阶段聚合

《Greenplum roaring bitmap与业务场景 (类阿里云RDS PG varbitx, 应用于海量用户 实时画像和圈选、透视)》

还有什么方法可以让聚合并行起来呢?

1、gp_dist_random('gp_id')

2、直连segment

3、gpdb mapreduce接口

1 gp_dist_random('gp_id')

PG内部有一个函数接口,优化器会将调用这个函数接口的QUERY弄到SEGMENT直接执行。

统计数据库大小时也用到了

select sum(pg_database_size('%s'))::int8 from gp_dist_random('gp_id');    

源码如下

Datum      
pg_database_size_name(PG_FUNCTION_ARGS)      
{      
        int64           size = 0;      
        Name            dbName = PG_GETARG_NAME(0);      
        Oid                     dbOid = get_database_oid(NameStr(*dbName));      
    
        if (!OidIsValid(dbOid))      
                ereport(ERROR,      
                                (errcode(ERRCODE_UNDEFINED_DATABASE),      
                                 errmsg("database \"%s\" does not exist",      
                                                NameStr(*dbName))));      
    
        size = calculate_database_size(dbOid);      
    
        if (Gp_role == GP_ROLE_DISPATCH)      
        {      
                StringInfoData buffer;      
    
                initStringInfo(&buffer);      
    
                appendStringInfo(&buffer, "select sum(pg_database_size('%s'))::int8 from gp_dist_random('gp_id');", NameStr(*dbName));      
    
                size += get_size_from_segDBs(buffer.data);      
        }      
    
        PG_RETURN_INT64(size);      
}      

1、以rb插件为例,当前的聚合是单阶段聚合,收到MASTER后才开始聚合,所以很慢。

test=# explain select rb_and_cardinality_agg(b) from testpay1;  
                                           QUERY PLAN                                              
-------------------------------------------------------------------------------------------------  
 Aggregate  (cost=908857.80..908857.81 rows=1 width=4)  
   ->  Gather Motion 256:1  (slice1; segments: 256)  (cost=0.00..907979.68 rows=351246 width=37)  
         ->  Seq Scan on testpay1  (cost=0.00..5277.46 rows=1373 width=37)  
 Settings:  effective_cache_size=8GB; gp_statistics_use_fkeys=on  
 Optimizer status: legacy query optimizer  
(5 rows)  

2、为了使用gp_dist_random('gp_id')来实现并行多阶段聚合,我们需要定义一个函数接口,让这个接口来执行prefunc的动作,比如先在SEGMENT聚合一次。

test=> create or replace function get_rb(v_sql text) returns roaringbitmap as $$  
declare  
  res roaringbitmap;  
begin  
  execute v_sql into res;  
  return res;  
end;  
$$ language plpgsql strict;  
CREATE FUNCTION  

3、但是很遗憾的是,GPDB内部做了保护,如果UDF里面访问的表不是系统表(replication table,或者叫全副本表,非分布式表),数据库会拒绝直接在segment访问。

(目标:rb_and_agg在所有的segment直接执行,返回rb类型,然后再返回给MASTER,执行candidate操作。实现并行)

test=> explain analyze select get_rb($$select RB_AND_AGG(b) from public.testpay1 where a in ('3y','10y')$$) from gp_dist_random('gp_id');  
NOTICE:  function cannot execute on segment because it accesses relation "public.testpay1" (functions.c:155)  (seg3 slice1 11.180.113.94:3068 pid=54354) (cdbdisp.c:1326)  
DETAIL:    
SQL statement "select RB_AND_AGG(b) from public.testpay1 where a in ('3y','10y')"  
PL/pgSQL function "get_rb" line 4 at execute statement  
  
  
test=> explain analyze select rb_and_cardinality_agg(get_rb($$select RB_AND_AGG(b) from public.testpay1 where a in ('3y','10y')$$)) from gp_dist_random('gp_id');  
NOTICE:  query plan with multiple segworker groups is not supported (cdbdisp.c:302)  
HINT:  likely caused by a function that reads or modifies data in a distributed table  
CONTEXT:  SQL statement "select RB_AND_AGG(b) from public.testpay1 where a in ('3y','10y')"  
PL/pgSQL function "get_rb" line 4 at execute statement  

4、而如果UDF里面访问的是系统表(replication table,或者叫全副本表,非分布式表),数据库允许直接在segment访问。

create or replace function get_catalog(v_sql text) returns int8 as $$  
declare  
  res int8;  
begin  
  execute v_sql into res;  
  return res;  
end;  
$$ language plpgsql strict;  
test=> explain analyze select get_catalog($$select max(oid::int8) from pg_class$$) from gp_dist_random('gp_id');  
                                                                QUERY PLAN                                                                  
------------------------------------------------------------------------------------------------------------------------------------------  
 Gather Motion 256:1  (slice1; segments: 256)  (cost=0.00..1.01 rows=1 width=0)  
   Rows out:  256 rows at destination with 2.887 ms to first row, 6.589 ms to end, start offset by 1.203 ms.  
   ->  Seq Scan on gp_id  (cost=0.00..1.01 rows=1 width=0)  
         Rows out:  Avg 1.0 rows x 256 workers.  Max 1 rows (seg0) with 1.243 ms to first row, 1.244 ms to end, start offset by 3.534 ms.  
 Slice statistics:  
   (slice0)    Executor memory: 495K bytes.  
   (slice1)    Executor memory: 139K bytes avg x 256 workers, 139K bytes max (seg0).  
 Statement statistics:  
   Memory used: 2047000K bytes  
 Settings:  effective_cache_size=8GB; gp_statistics_use_fkeys=on  
 Optimizer status: legacy query optimizer  
 Total runtime: 8.015 ms  
(12 rows)  

5、保护代码如下

src/backend/executor/functions.c

    110 /**  
    111  * Walker for querytree_safe_for_segment.   
    112  */  
    113 bool querytree_safe_for_segment_walker(Node *expr, void *context)  
    114 {  
    115         Assert(context == NULL);  
    116           
    117         if (!expr)  
    118         {  
    119                 /**  
    120                  * Do not end recursion just because we have reached one leaf node.  
    121                  */  
    122                 return false;  
    123         }  
    124   
    125         switch(nodeTag(expr))  
    126         {  
    127                 case T_Query:  
    128                         {  
    129                                 Query *q = (Query *) expr;  
    130                                   
    131                                 if (!allow_segment_DML &&  
    132                                         (q->commandType != CMD_SELECT  
    133                                          || q->intoClause != NULL  
    134                                          || q->resultRelation > 0))  
    135                                 {  
    136                                         ereport(ERROR,  
    137                                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),  
    138                                                          errmsg("function cannot execute on segment because it issues a non-SELECT statement")));  
    139                                 }  
    140                                   
    141                                 ListCell * f = NULL;  
    142                                 foreach(f,q->rtable)  
    143                                 {  
    144                                         RangeTblEntry *rte = (RangeTblEntry *) lfirst(f);  
    145   
    146                                         if (rte->rtekind == RTE_RELATION)  
    147                                         {  
    148                                                 Assert(rte->relid != InvalidOid);  
    149                                                   
    150                                                 Oid namespaceId = get_rel_namespace(rte->relid);  
    151   
    152                                                 Assert(namespaceId != InvalidOid);  
    153                                                   
    154                                                 if (!(IsSystemNamespace(namespaceId) ||  
    155                                                           IsToastNamespace(namespaceId) ||  
    156                                                           IsAoSegmentNamespace(namespaceId)))  
    157                                                 {  
    158                                                         ereport(ERROR,  
    159                                                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),  
    160                                                                          errmsg("function cannot execute on segment because it accesses relation \"%s.%s\"",  
    161                                                                                         quote_identifier(get_namespace_name(namespaceId)),  
    162                                                                                         quote_identifier(get_rel_name(rte->relid)))));  
    163                                                 }  
    164                                         }  
    165                                 }  
    166                                 query_tree_walker(q, querytree_safe_for_segment_walker, context, 0);  
    167                                 break;  
    168                         }  
    169                 default:  
    170                         break;  
    171         }  
    172           
    173         return expression_tree_walker(expr, querytree_safe_for_segment_walker, context);  
    174 }  

src/backend/cdb/dispatcher/cdbdisp.c

     36 /*  
     37  * cdbdisp_dispatchToGang:  
     38  * Send the strCommand SQL statement to the subset of all segdbs in the cluster  
     39  * specified by the gang parameter. cancelOnError indicates whether an error  
     40  * occurring on one of the qExec segdbs should cause all still-executing commands to cancel  
     41  * on other qExecs. Normally this would be true. The commands are sent over the libpq  
     42  * connections that were established during cdblink_setup.      They are run inside of threads.  
     43  * The number of segdbs handled by any one thread is determined by the  
     44  * guc variable gp_connections_per_thread.  
     45  *  
     46  * The caller must provide a CdbDispatchResults object having available  
     47  * resultArray slots sufficient for the number of QEs to be dispatched:  
     48  * i.e., resultCapacity - resultCount >= gp->size.      This function will  
     49  * assign one resultArray slot per QE of the Gang, paralleling the Gang's  
     50  * db_descriptors array. Success or failure of each QE will be noted in  
     51  * the QE's CdbDispatchResult entry; but before examining the results, the  
     52  * caller must wait for execution to end by calling CdbCheckDispatchResult().  
     53  *  
     54  * The CdbDispatchResults object owns some malloc'ed storage, so the caller  
     55  * must make certain to free it by calling cdbdisp_destroyDispatcherState().  
     56  *  
     57  * When dispatchResults->cancelOnError is false, strCommand is to be  
     58  * dispatched to every connected gang member if possible, despite any  
     59  * cancellation requests, QE errors, connection failures, etc.  
     60  *  
     61  * NB: This function should return normally even if there is an error.  
     62  * It should not longjmp out via elog(ERROR, ...), ereport(ERROR, ...),  
     63  * PG_THROW, CHECK_FOR_INTERRUPTS, etc.  
     64  */  
     65 void  
     66 cdbdisp_dispatchToGang(struct CdbDispatcherState *ds,  
     67                                            struct Gang *gp,  
     68                                            int sliceIndex,  
     69                                            CdbDispatchDirectDesc *disp_direct)  
     70 {  
     71         struct CdbDispatchResults *dispatchResults = ds->primaryResults;  
     72   
     73         Assert(Gp_role == GP_ROLE_DISPATCH);  
     74         Assert(gp && gp->size > 0);  
     75         Assert(dispatchResults && dispatchResults->resultArray);  
     76   
     77         if (dispatchResults->writer_gang)  
     78         {  
     79                 /*  
     80                  * Are we dispatching to the writer-gang when it is already busy ?  
     81                  */  
     82                 if (gp == dispatchResults->writer_gang)  
     83                 {  
     84                         if (dispatchResults->writer_gang->dispatcherActive)  
     85                         {  
     86                                 ereport(ERROR,  
     87                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),  
     88                                                  errmsg("query plan with multiple segworker groups is not supported"),  
     89                                                  errhint("likely caused by a function that reads or modifies data in a distributed table")));  
     90                         }  
     91   
     92                         dispatchResults->writer_gang->dispatcherActive = true;  
     93                 }  
     94         }  
     95   
     96         /*  
     97          * WIP: will use a function pointer for implementation later, currently just use an internal function to move dispatch  
     98          * thread related code into a separate file.  
     99          */  
    100         (pDispatchFuncs->dispatchToGang)(ds, gp, sliceIndex, disp_direct);  
    101 }  

6、如果你的GPDB没有RB插件,可以使用普通类型测试模拟这个问题

postgres=# create table test(id int, info text, crt_time timestamp);  
NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' as the Greenplum Database data distribution key for this table.  
HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.  
CREATE TABLE  
  
postgres=# insert into test select id, md5(random()::text), clock_timestamp() from generate_series(1,1000000) t(id);  
INSERT 0 1000000  
  
  
create or replace function get_max(v_sql text) returns int as $$  
declare  
  res int;  
begin  
  execute v_sql into res;  
  return res;  
end;  
$$ language plpgsql strict;  
  
  
  
postgres=# \set VERBOSITY verbose  
postgres=# select get_max($$select max(id) from test$$) from gp_dist_random('gp_id');  
ERROR:  0A000: function cannot execute on segment because it accesses relation "public.test"  (seg0 slice1 127.0.0.1:25432 pid=1443)  
DETAIL:    
SQL statement "select max(id) from test"  
PL/pgSQL function "get_max" line 4 at EXECUTE statement  
LOCATION:  cdbdisp_finishCommand, cdbdisp.c:254  

7、用元数据欺骗不了GPDB,因为保护不是在元数据层面判断,而是在执行层面。

postgres=# create table tmp_gp_distribution_policy as select * from gp_distribution_policy where localoid='test'::regclass;  
NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 'localoid' as the Greenplum Database data distribution key for this table.  
HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.  
SELECT 1  
  
postgres=# set allow_system_table_mods=DML;  
SET  
  
postgres=# delete from gp_distribution_policy where localoid='test'::regclass;  
DELETE 1  
  
依旧的错误  
  
postgres=# \set VERBOSITY verbose  
postgres=# select get_max($$select max(id) from test$$) from gp_dist_random('gp_id');  
ERROR:  0A000: function cannot execute on segment because it accesses relation "public.test"  (seg0 slice1 127.0.0.1:25432 pid=1443)  
DETAIL:    
SQL statement "select max(id) from test"  
PL/pgSQL function "get_max" line 4 at EXECUTE statement  
LOCATION:  cdbdisp_finishCommand, cdbdisp.c:254  

2 直连SEGMENT

《Greenplum segment节点直接读写配置与性能》

《Greenplum & PostgreSQL UPSERT udf 实现 - 2 batch批量模式》

《Greenplum & PostgreSQL UPSERT udf 实现 - 1 单行模式》

这个方法是可行的,不过过于麻烦,需要直连。

postgres=# select * from gp_segment_configuration where content<>'-1' and role='p';  
 dbid | content | role | preferred_role | mode | status | port  |        hostname         |  address  | replication_port   
------+---------+------+----------------+------+--------+-------+-------------------------+-----------+------------------  
    2 |       0 | p    | p              | s    | u      | 25432 | iZbp13nu0s9j3x3op4zpd4Z | localhost |                   
    3 |       1 | p    | p              | s    | u      | 25433 | iZbp13nu0s9j3x3op4zpd4Z | localhost |                   
(2 rows)  
PGOPTIONS='-c gp_session_role=utility' psql -h iZbp13nu0s9j3x3op4zpd4Z -p 25432   
digoal@iZbp13nu0s9j3x3op4zpd4Z-> PGOPTIONS='-c gp_session_role=utility' psql -h iZbp13nu0s9j3x3op4zpd4Z -p 25432 -c "select max(id) from test"  
   max     
---------  
 1000000  
(1 row)  
  
digoal@iZbp13nu0s9j3x3op4zpd4Z-> PGOPTIONS='-c gp_session_role=utility' psql -h iZbp13nu0s9j3x3op4zpd4Z -p 25433 -c "select max(id) from test"  
  max     
--------  
 999999  
(1 row)  
  
digoal@iZbp13nu0s9j3x3op4zpd4Z-> psql -c "select greatest(1000000,999999)"  
 greatest   
----------  
  1000000  
(1 row)  

小结

1、gp_dist_random('gp_id') 的方法,因为内部做了保护,目前只使用与复制表,不适合分布式表。(用户感知)

2、使用直连SEGMENT的方法,可行,但是操作过于繁琐,而且需要用户直连SEGMENT。(用户感知)

3、最好的方法,依旧是聚合接口本身支持prefunc API,内部多阶段并行。(用户无感知)

参考

直连SEGMENT

《Greenplum segment节点直接读写配置与性能》

《Greenplum & PostgreSQL UPSERT udf 实现 - 2 batch批量模式》

《Greenplum & PostgreSQL UPSERT udf 实现 - 1 单行模式》

多阶段聚合

《PostgreSQL 11 preview - 多阶段并行聚合array_agg, string_agg》

《PostgreSQL 10 自定义并行计算聚合函数的原理与实践 - (含array_agg合并多个数组为单个一元数组的例子)》

《HybridDB PostgreSQL "Sort、Group、distinct 聚合、JOIN" 不惧怕数据倾斜的黑科技和原理 - 多阶段聚合》

《Greenplum 最佳实践 - 估值插件hll的使用(以及hll分式聚合函数优化)》

《Postgres-XC customized aggregate introduction》

《PostgreSQL aggregate function customize》

《Greenplum roaring bitmap与业务场景 (类阿里云RDS PG varbitx, 应用于海量用户 实时画像和圈选、透视)》

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
流计算
在Flink中,Regular Join(包括Left Join)的结果顺序是由Flink的分区策略和数据的分布方式共同决定的
在Flink中,Regular Join(包括Left Join)的结果顺序是由Flink的分区策略和数据的分布方式共同决定的
68 1
|
SQL Oracle 关系型数据库
java实现oracle和mysql的group by分组功能|同时具备max()/min()/sum()/case when 函数等功能
java实现oracle和mysql的group by分组功能|同时具备max()/min()/sum()/case when 函数等功能
|
关系型数据库
PG/GP limit...offset...实现机制
PG/GP limit...offset...实现机制
119 0
|
监控 关系型数据库 数据库
Greenplum csvlog(日志数据)检索、释义(gp_toolkit.gp_log*)
标签 PostgreSQL , Greenplum , csvlog , gp_toolkit 背景 由于GP为分布式数据库,当查看它的一些日志时,如果到服务器上查看,会非常的繁琐,而且不好排查问题。
2613 0
SAP QM QS41 试图维护Catalog为3的Code Group, 报错-You need to maintain catalog 3 (Usage Decisions) in Customi
SAP QM QS41 试图维护Catalog为3的Code Group, 报错-You need to maintain catalog 3 (Usage Decisions) in Customi
SAP QM QS41 试图维护Catalog为3的Code Group, 报错-You need to maintain catalog 3 (Usage Decisions) in Customi
|
数据库
LeetCode(数据库)- The Number of Seniors and Juniors to Join the Company II
LeetCode(数据库)- The Number of Seniors and Juniors to Join the Company II
117 0
|
数据库
LeetCode(数据库)- The Number of Seniors and Juniors to Join the Company
LeetCode(数据库)- The Number of Seniors and Juniors to Join the Company
106 0
|
SQL 分布式计算 Spark
SPARK SQL中 Grouping sets转Expand怎么实现的(逻辑计划级别)
SPARK SQL中 Grouping sets转Expand怎么实现的(逻辑计划级别)
556 0
|
关系型数据库 PostgreSQL
PgSQL · 新特征 · PG11并行Hash Join介绍
关键字 Parallelized, Parallel-aware hash joins 摘要 本文将介绍一下PostgreSQL 11 beta 1 新增的全并行Hash join特征。 将给读者介绍一下postgreSQL并行的设计与实现,并分析一下PostgreSQL的全并行hash join的设计与实现细节。
2056 0
下一篇
DataWorks