Citus 分布式 PostgreSQL 集群 - SQL Reference(查询分布式表 SQL)

本文涉及的产品
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
简介: Citus 分布式 PostgreSQL 集群 - SQL Reference(查询分布式表 SQL)

如前几节所述,Citus 是一个扩展,它扩展了最新的 PostgreSQL 以进行分布式执行。这意味着您可以在 Citus 协调器上使用标准 PostgreSQL SELECT 查询进行查询。 Citus 将并行化涉及复杂选择、分组和排序以及 JOINSELECT 查询,以加快查询性能。在高层次上,CitusSELECT 查询划分为更小的查询片段,将这些查询片段分配给 worker,监督他们的执行,合并他们的结果(如果需要,对它们进行排序),并将最终结果返回给用户。


  • SELECT

在以下部分中,我们将讨论您可以使用 Citus 运行的不同类型的查询。


 聚合函数


Citus 支持和并行化 PostgreSQL 支持的大多数聚合函数,包括自定义用户定义的聚合。聚合使用以下三种方法之一执行,优先顺序如下:


  1. 当聚合按表的分布列分组时,Citus 可以将整个查询的执行下推到每个 worker。在这种情况下支持所有聚合,并在 worker 上并行执行。(任何正在使用的自定义聚合都必须安装在 worker 身上。)
  2. 当聚合没有按表的分布列分组时,Citus 仍然可以根据具体情况进行优化。Citussum()avg()count(distinct) 等某些聚合有内部规则,允许它重写查询以对 worker 进行部分聚合。例如,为了计算平均值,Citus 从每个 worker 那里获得一个总和和一个计数,然后 coordinator 节点计算最终的平均值。特殊情况聚合的完整列表:


avg, min, max, sum, count, array_agg, jsonb_agg, jsonb_object_agg, json_agg, json_object_agg, bit_and, bit_or, bool_and, bool_or, every, hll_add_agg, hll_union_agg, topn_add_agg, topn_union_agg, any_value, var_pop(float4), var_pop(float8), var_samp(float4), var_samp(float8), variance(float4), variance(float8) stddev_pop(float4), stddev_pop(float8), stddev_samp(float4), stddev_samp(float8) stddev(float4), stddev(float8) tdigest(double precision, int), tdigest_percentile(double precision, int, double precision), tdigest_percentile(double precision, int, double precision[]), tdigest_percentile(tdigest, double precision), tdigest_percentile(tdigest, double precision[]), tdigest_percentile_of(double precision, int, double precision), tdigest_percentile_of(double precision, int, double precision[]), tdigest_percentile_of(tdigest, double precision), tdigest_percentile_of(tdigest, double precision[])


  1. 最后的手段:从 worker 中提取所有行并在 coordinator 节点上执行聚合。如果聚合未在分布列上分组,并且不是预定义的特殊情况之一,则 Citus 会退回到这种方法。它会导致网络开销,并且如果要聚合的数据集太大,可能会耗尽 coordinator 的资源。(可以禁用此回退,见下文。)

请注意,查询中的微小更改可能会改变执行模式,从而导致潜在的令人惊讶的低效率。例如,按非分布列分组的 sum(x) 可以使用分布式执行,而 sum(distinct x) 必须将整个输入记录集拉到 coordinator


SELECT sum(value1), sum(distinct value2) FROM distributed_table;


为避免意外将数据拉到 coordinator,可以设置一个 GUC


SET citus.coordinator_aggregation_strategy TO 'disabled';


请注意,禁用 coordinator 聚合策略将完全阻止 “类型三”(最后的手段) 聚合查询工作。


Count (Distinct) 聚合



Citus 以多种方式支持 count(distinct) 聚合。如果 count(distinct) 聚合在分布列上,Citus 可以直接将查询下推给 worker。如果不是,Citus 对每个 worker 运行 select distinct 语句, 并将列表返回给 coordinator,从中获取最终计数。

请注意,当 worker 拥有更多 distinct 项时,传输此数据会变得更慢。对于包含多个 count(distinct) 聚合的查询尤其如此,例如:


-- multiple distinct counts in one query tend to be slow
SELECT count(distinct a), count(distinct b), count(distinct c)
FROM table_abc;


对于这类查询,worker 上产生的 select distinct 语句本质上会产生要传输到 coordinator 的行的 cross-product(叉积)

为了提高性能,您可以选择进行近似计数。请按照以下步骤操作:


  1. 在所有 PostgreSQL 实例(coordinator 和所有 worker)上下载并安装 hll 扩展。有关获取扩展的详细信息,请访问 PostgreSQL hll github 存储库。
  1. 只需从 coordinator 运行以下命令,即可在所有 PostgreSQL 实例上创建 hll 扩展


CREATE EXTENSION hll;


  1. 通过设置 Citus.count_distinct_error_rate 配置值启用计数不同的近似值。此配置设置的较低值预计会提供更准确的结果,但需要更多时间进行计算。我们建议将其设置为 0.005


SET citus.count_distinct_error_rate to 0.005;


  1. 在这一步之后,count(distinct) 聚合会自动切换到使用 HLL,而无需对您的查询进行任何更改。您应该能够在表的任何列上运行近似 count distinct 查询。


HyperLogLog 列


某些用户已经将他们的数据存储为 HLL 列。在这种情况下,他们可以通过调用 hll_union_agg(hll_column) 动态汇总这些数据。


估计 Top N 个项



通过应用 countsortlimit 来计算集合中的前 n 个元素很简单。然而,随着数据大小的增加,这种方法变得缓慢且资源密集。使用近似值更有效。

Postgres 的开源 TopN 扩展可以快速获得 “top-n” 查询的近似结果。该扩展将 top 值具体化为 JSON 数据类型。TopN 可以增量更新这些 top 值,或者在不同的时间间隔内按需合并它们。


  • TopN 扩展


基本操作


在查看 TopN 的实际示例之前,让我们看看它的一些原始操作是如何工作的。首先 topn_add 更新一个 JSON 对象,其中包含一个 key 被看到的次数:


select topn_add('{}', 'a');
-- => {"a": 1}
-- record the sighting of another "a"
select topn_add(topn_add('{}', 'a'), 'a');
-- => {"a": 2}


该扩展还提供聚合以扫描多个值:


-- for normal_rand
create extension tablefunc;
-- count values from a normal distribution
SELECT topn_add_agg(floor(abs(i))::text)
  FROM normal_rand(1000, 5, 0.7) i;
-- => {"2": 1, "3": 74, "4": 420, "5": 425, "6": 77, "7": 3}


如果 distinct 值的数量超过阈值,则聚合会丢弃那些最不常见的信息。这可以控制空间使用。阈值可以由 topn.number_of_counters GUC 控制。它的默认值为 1000


现实例子


现在来看一个更现实的例子,说明 TopN 在实践中是如何工作的。让我们提取 2000 年的亚马逊产品评论,并使用 TopN 快速查询。首先下载数据集:


curl -L https://examples.citusdata.com/customer_reviews_2000.csv.gz | \
  gunzip > reviews.csv


接下来,将其摄取到分布式表中:


CREATE TABLE customer_reviews
(
    customer_id TEXT,
    review_date DATE,
    review_rating INTEGER,
    review_votes INTEGER,
    review_helpful_votes INTEGER,
    product_id CHAR(10),
    product_title TEXT,
    product_sales_rank BIGINT,
    product_group TEXT,
    product_category TEXT,
    product_subcategory TEXT,
    similar_product_ids CHAR(10)[]
);
SELECT create_distributed_table('customer_reviews', 'product_id');
\COPY customer_reviews FROM 'reviews.csv' WITH CSV


接下来我们将添加扩展,创建一个目标表来存储 TopN 生成的 json 数据,并应用我们之前看到的 topn_add_agg 函数。


-- run below command from coordinator, it will be propagated to the worker nodes as well
CREATE EXTENSION topn;
-- a table to materialize the daily aggregate
CREATE TABLE reviews_by_day
(
  review_date date unique,
  agg_data jsonb
);
SELECT create_reference_table('reviews_by_day');
-- materialize how many reviews each product got per day per customer
INSERT INTO reviews_by_day
  SELECT review_date, topn_add_agg(product_id)
  FROM customer_reviews
  GROUP BY review_date;


现在,我们无需在 customer_reviews 上编写复杂的窗口函数,只需将 TopN 应用于 reviews_by_day。例如,以下查询查找前五天中每一天最常被评论的产品:


SELECT review_date, (topn(agg_data, 1)).*
FROM reviews_by_day
ORDER BY review_date
LIMIT 5;


┌─────────────┬────────────┬───────────┐
│ review_date │    item    │ frequency │
├─────────────┼────────────┼───────────┤
│ 2000-01-01  │ 0939173344 │        12 │
│ 2000-01-02  │ B000050XY8 │        11 │
│ 2000-01-03  │ 0375404368 │        12 │
│ 2000-01-04  │ 0375408738 │        14 │
│ 2000-01-05  │ B00000J7J4 │        17 │
└─────────────┴────────────┴───────────┘


TopN 创建的 json 字段可以与 topn_uniontopn_union_agg 合并。我们可以使用后者来合并整个第一个月的数据,并列出该期间最受好评的五个产品。


SELECT (topn(topn_union_agg(agg_data), 5)).*
FROM reviews_by_day
WHERE review_date >= '2000-01-01' AND review_date < '2000-02-01'
ORDER BY 2 DESC;


┌────────────┬───────────┐
│    item    │ frequency │
├────────────┼───────────┤
│ 0375404368 │       217 │
│ 0345417623 │       217 │
│ 0375404376 │       217 │
│ 0375408738 │       217 │
│ 043936213X │       204 │
└────────────┴───────────┘


有关更多详细信息和示例,请参阅 TopN readme


百分位计算


在大量行上找到精确的百分位数可能会非常昂贵, 因为所有行都必须转移到 coordinator 以进行最终排序和处理。另一方面,找到近似值可以使用所谓的 sketch 算法在 worker 节点上并行完成。 coordinator 节点然后将压缩摘要组合到最终结果中,而不是读取完整的行。


一种流行的百分位数 sketch 算法使用称为 t-digest 的压缩数据结构,可在 tdigest 扩展中用于 PostgreSQLCitus 集成了对此扩展的支持。


以下是在 Citus 中使用 t-digest 的方法:

  1. 在所有 PostgreSQL 节点( coordinator 和所有 worker)上下载并安装 tdigest 扩展。tdigest 扩展 github 存储库有安装说明。
  1. 在数据库中创建 tdigest 扩展。在 coordinator 上运行以下命令:


CREATE EXTENSION tdigest;


  1. coordinator 也会将命令传播给 worker

当在查询中使用扩展中定义的任何聚合时,Citus 将重写查询以将部分 tdigest 计算下推到适用的 worker


T-digest 精度可以通过传递给聚合的 compression 参数来控制。权衡是准确性与 workercoordinator 之间共享的数据量。有关如何在 tdigest 扩展中使用聚合的完整说明,请查看官方 tdigest github 存储库中的文档。


 限制下推


Citus 还尽可能将限制条款下推到 worker 的分片,以最大限度地减少跨网络传输的数据量。


但是,在某些情况下,带有 LIMIT 子句的 SELECT 查询可能需要从每个分片中获取所有行以生成准确的结果。例如,如果查询需要按聚合列排序,则需要所有分片中该列的结果来确定最终聚合值。由于大量的网络数据传输,这会降低 LIMIT 子句的性能。在这种情况下,如果近似值会产生有意义的结果,Citus 提供了一种用于网络高效近似 LIMIT 子句的选项。


LIMIT 近似值默认禁用,可以通过设置配置参数 citus.limit_clause_row_fetch_count 来启用。在这个配置值的基础上,Citus 会限制每个任务返回的行数,用于在 coordinator 上进行聚合。由于这个 limit,最终结果可能是近似的。增加此 limit 将提高最终结果的准确性,同时仍提供从 worker 中提取的行数的上限。


SET citus.limit_clause_row_fetch_count to 10000;


 分布式表的视图


Citus 支持分布式表的所有视图。有关视图的语法和功能的概述,请参阅 CREATE VIEW 的 PostgreSQL 文档。


请注意,某些视图导致查询计划的效率低于其他视图。有关检测和改进不良视图性能的更多信息,请参阅子查询/CTE 网络开销。(视图在内部被视为子查询。)

Citus 也支持物化视图,并将它们作为本地表存储在 coordinator 节点上。


 连接(Join)


Citus 支持任意数量的表之间的 equi-JOIN,无论它们的大小和分布方法如何。查询计划器根据表的分布方式选择最佳连接方法和 join 顺序。它评估几个可能的 join 顺序并创建一个 join 计划,该计划需要通过网络传输最少的数据。


共置连接


当两个表共置时,它们可以在它们的公共分布列上有效地 joinco-located join(共置连接)join 两个大型分布式表的最有效方式。


注意

确保表分布到相同数量的分片中,并且每个表的分布列具有完全匹配的类型。尝试加入类型略有不同的列(例如 `int` 和 `bigint`)可能会导致问题。

引用表连接


引用表可以用作“维度”表, 以有效地与大型“事实”表连接。因为引用表在所有 worker 上完全复制, 所以 reference join 可以分解为每个 worker 上的本地连接并并行执行。 reference join 就像一个更灵活的 co-located join 版本, 因为引用表没有分布在任何特定的列上,并且可以自由地 join 到它们的任何列上。


引用表也可以与 coordinator 节点本地的表连接。


重新分区连接


在某些情况下,您可能需要在除分布列之外的列上连接两个表。对于这种情况,Citus 还允许通过动态重新分区查询的表来连接非分布 key 列。


在这种情况下,要分区的表由查询优化器根据分布列、连接键和表的大小来确定。使用重新分区的表,可以确保只有相关的分片对相互连接,从而大大减少了通过网络传输的数据量。


通常,co-located joinrepartition join 更有效,因为 repartition join 需要对数据进行混洗。因此,您应该尽可能通过 common join 键来分布表。

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
4月前
|
存储 关系型数据库 分布式数据库
PolarDB 并行查询问题之分布式查询执行过程中的数据分发如何解决
PolarDB 并行查询问题之分布式查询执行过程中的数据分发如何解决
52 1
|
1月前
|
关系型数据库 分布式数据库 数据库
PostgreSQL+Citus分布式数据库
PostgreSQL+Citus分布式数据库
64 15
|
3月前
|
SQL 关系型数据库 C语言
PostgreSQL SQL扩展 ---- C语言函数(三)
可以用C(或者与C兼容,比如C++)语言编写用户自定义函数(User-defined functions)。这些函数被编译到动态可加载目标文件(也称为共享库)中并被守护进程加载到服务中。“C语言函数”与“内部函数”的区别就在于动态加载这个特性,二者的实际编码约定本质上是相同的(因此,标准的内部函数库为用户自定义C语言函数提供了丰富的示例代码)
|
4月前
|
SQL 存储 关系型数据库
PostgreSQL核心之SQL基础学习
PostgreSQL核心之SQL基础学习
60 3
|
4月前
|
SQL 安全 关系型数据库
PostgreSQL SQL注入漏洞(CVE-2018-10915)--处理
【8月更文挑战第8天】漏洞描述:PostgreSQL是一款自由的对象关系型数据库管理系统,支持多种SQL标准及特性。存在SQL注入漏洞,源于应用未有效验证外部输入的SQL语句,允许攻击者执行非法命令。受影响版本包括10.5及更早版本等。解决方法为升级PostgreSQL
304 2
|
4月前
|
存储 SQL 运维
“震撼发布!PolarDB-X:云原生分布式数据库巨擘,超高并发、海量存储、复杂查询,一网打尽!错过等哭!”
【8月更文挑战第7天】PolarDB-X 是面向超高并发、海量存储和复杂查询场景设计的云原生分布式数据库系统
116 1
|
4月前
|
SQL 关系型数据库 MySQL
SQL Server、MySQL、PostgreSQL:主流数据库SQL语法异同比较——深入探讨数据类型、分页查询、表创建与数据插入、函数和索引等关键语法差异,为跨数据库开发提供实用指导
【8月更文挑战第31天】SQL Server、MySQL和PostgreSQL是当今最流行的关系型数据库管理系统,均使用SQL作为查询语言,但在语法和功能实现上存在差异。本文将比较它们在数据类型、分页查询、创建和插入数据以及函数和索引等方面的异同,帮助开发者更好地理解和使用这些数据库。尽管它们共用SQL语言,但每个系统都有独特的语法规则,了解这些差异有助于提升开发效率和项目成功率。
483 0
|
5月前
|
负载均衡 数据管理
ClickHouse的分布式查询流程
ClickHouse的分布式查询流程
|
5月前
|
SQL 监控 关系型数据库
实时计算 Flink版操作报错合集之在设置监控PostgreSQL数据库时,将wal_level设置为logical,出现一些表更新和删除操作报错,怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
SQL Cloud Native 关系型数据库
ADBPG(AnalyticDB for PostgreSQL)是阿里云提供的一种云原生的大数据分析型数据库
ADBPG(AnalyticDB for PostgreSQL)是阿里云提供的一种云原生的大数据分析型数据库
1302 1