Citus 分布式 PostgreSQL 集群 - SQL Reference(查询处理)

本文涉及的产品
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
简介: Citus 分布式 PostgreSQL 集群 - SQL Reference(查询处理)

一个 Citus 集群由一个 coordinator 实例和多个 worker 实例组成。数据在 worker 上进行分片和复制,而 coordinator 存储有关这些分片的元数据。向集群发出的所有查询都通过 coordinator 执行。 coordinator 将查询划分为更小的查询片段,其中每个查询片段可以在分片上独立运行。然后协调器将查询片段分配给 worker,监督他们的执行,合并他们的结果,并将最终结果返回给用户。查询处理架构可以通过下图进行简要描述。


Citus 的查询处理管道涉及两个组件:

  • 分布式查询计划器和执行器
  • PostgreSQL 计划器和执行器

我们将在后续部分中更详细地讨论它们。


 分布式查询计划器


Citus 的分布式查询计划器接收 SQL 查询并规划它以进行分布式执行。

对于 SELECT 查询,计划器首先创建输入查询的计划树,并将其转换为可交换和关联形式,以便可以并行化。它还应用了一些优化以确保以可扩展的方式执行查询,并最大限度地减少网络 I/O


接下来,计划器将查询分为两部分 - 在 coordinator 上运行的 coordinator 查询和在 worker 上的各个分片上运行的 worker 查询片段。然后,计划器将这些查询片段分配给 worker,以便有效地使用他们的所有资源。在这一步之后,分布式查询计划被传递给分布式执行器执行。


分布列上的键值查找或修改查询的规划过程略有不同,因为它们恰好命中一个分片。一旦计划器收到传入的查询,它需要决定查询应该路由到的正确分片。为此,它提取传入行中的分布列并查找元数据以确定查询的正确分片。然后,计划器重写该命令的 SQL 以引用分片表而不是原始表。然后将该重写的计划传递给分布式执行器。


 分布式查询执行器


Citus 的分布式执行器运行分布式查询计划并处理故障。执行器非常适合快速响应涉及过滤器、聚合和共置连接的查询,以及运行具有完整 SQL 覆盖的单租户查询。它根据需要为每个分片打开一个与 woker 的连接,并将所有片段查询发送给他们。然后它从每个片段查询中获取结果,合并它们,并将最终结果返回给用户。


子查询/CTE Push-Pull 执行


如有必要,Citus 可以将来自子查询和 CTE 的结果收集到 coordinator 节点中,然后将它们推送回 worker 以供外部查询使用。这允许 Citus 支持更多种类的 SQL 构造。


例如,在 WHERE 子句中包含子查询有时不能与主查询同时执行内联,而必须单独执行。假设 Web 分析应用程序维护一个按 page_id 分区的 page_views 表。要查询前 20 个访问量最大的页面上的访问者主机数,我们可以使用子查询来查找页面列表,然后使用外部查询来计算主机数。


SELECT page_id, count(distinct host_ip)
FROM page_views
WHERE page_id IN (
  SELECT page_id
  FROM page_views
  GROUP BY page_id
  ORDER BY count(*) DESC
  LIMIT 20
)
GROUP BY page_id;


执行器希望通过 page_id 对每个分片运行此查询的片段,计算不同的 host_ips,并在 coordinator 上组合结果。但是,子查询中的 LIMIT 意味着子查询不能作为片段的一部分执行。通过递归规划查询,Citus 可以单独运行子查询,将结果推送给所有 worker,运行主片段查询,并将结果拉回 coordinatorpush-pull(推拉) 设计支持上述子查询。


让我们通过查看此查询的 EXPLAIN 输出来了解这一点。它相当参与:


GroupAggregate  (cost=0.00..0.00 rows=0 width=0)
  Group Key: remote_scan.page_id
  ->  Sort  (cost=0.00..0.00 rows=0 width=0)
    Sort Key: remote_scan.page_id
    ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=0 width=0)
      ->  Distributed Subplan 6_1
        ->  Limit  (cost=0.00..0.00 rows=0 width=0)
          ->  Sort  (cost=0.00..0.00 rows=0 width=0)
            Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.worker_column_2))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC
            ->  HashAggregate  (cost=0.00..0.00 rows=0 width=0)
              Group Key: remote_scan.page_id
              ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=0 width=0)
                Task Count: 32
                Tasks Shown: One of 32
                ->  Task
                  Node: host=localhost port=9701 dbname=postgres
                  ->  HashAggregate  (cost=54.70..56.70 rows=200 width=12)
                    Group Key: page_id
                    ->  Seq Scan on page_views_102008 page_views  (cost=0.00..43.47 rows=2247 width=4)
      Task Count: 32
      Tasks Shown: One of 32
      ->  Task
        Node: host=localhost port=9701 dbname=postgres
        ->  HashAggregate  (cost=84.50..86.75 rows=225 width=36)
          Group Key: page_views.page_id, page_views.host_ip
          ->  Hash Join  (cost=17.00..78.88 rows=1124 width=36)
            Hash Cond: (page_views.page_id = intermediate_result.page_id)
            ->  Seq Scan on page_views_102008 page_views  (cost=0.00..43.47 rows=2247 width=36)
            ->  Hash  (cost=14.50..14.50 rows=200 width=4)
              ->  HashAggregate  (cost=12.50..14.50 rows=200 width=4)
                Group Key: intermediate_result.page_id
                ->  Function Scan on read_intermediate_result intermediate_result  (cost=0.00..10.00 rows=1000 width=4)


让我们把它拆开并检查每一块。


GroupAggregate  (cost=0.00..0.00 rows=0 width=0)
  Group Key: remote_scan.page_id
  ->  Sort  (cost=0.00..0.00 rows=0 width=0)
    Sort Key: remote_scan.page_id


树的 rootcoordinator 节点对 worker 的结果所做的事情。在这种情况下,它正在对它们进行分组,并且 GroupAggregate 要求首先对它们进行排序。


->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=0 width=0)
      ->  Distributed Subplan 6_1
.


自定义扫描有两个大子树,从“分布式子计划”开始。


->  Limit  (cost=0.00..0.00 rows=0 width=0)
          ->  Sort  (cost=0.00..0.00 rows=0 width=0)
            Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(remote_scan.worker_column_2))::bigint, '0'::bigint))))::bigint, '0'::bigint) DESC
            ->  HashAggregate  (cost=0.00..0.00 rows=0 width=0)
              Group Key: remote_scan.page_id
              ->  Custom Scan (Citus Adaptive)  (cost=0.00..0.00 rows=0 width=0)
                Task Count: 32
                Tasks Shown: One of 32
                ->  Task
                  Node: host=localhost port=9701 dbname=postgres
                  ->  HashAggregate  (cost=54.70..56.70 rows=200 width=12)
                    Group Key: page_id
                    ->  Seq Scan on page_views_102008 page_views  (cost=0.00..43.47 rows=2247 width=4)
.


工作节点为 32 个分片中的每一个运行上述内容(Citus 正在选择一个代表进行显示)。我们可以识别 IN (...) 子查询的所有部分:排序、分组和限制。当所有 worker 完成此查询后,他们会将其输出发送回 coordinatorcoordinator 将其组合为“中间结果”。


Task Count: 32
      Tasks Shown: One of 32
      ->  Task
        Node: host=localhost port=9701 dbname=postgres
        ->  HashAggregate  (cost=84.50..86.75 rows=225 width=36)
          Group Key: page_views.page_id, page_views.host_ip
          ->  Hash Join  (cost=17.00..78.88 rows=1124 width=36)
            Hash Cond: (page_views.page_id = intermediate_result.page_id)
.


Citus 在第二个子树中启动另一个执行器作业。它将在 page_views 中计算不同的主机。它使用 JOIN 连接中间结果。中间结果将帮助它限制在前二十页。


->  Seq Scan on page_views_102008 page_views  (cost=0.00..43.47 rows=2247 width=36)
            ->  Hash  (cost=14.50..14.50 rows=200 width=4)
              ->  HashAggregate  (cost=12.50..14.50 rows=200 width=4)
                Group Key: intermediate_result.page_id
                ->  Function Scan on read_intermediate_result intermediate_result  (cost=0.00..10.00 rows=1000 width=4)
.


工作人员使用 read_intermediate_result 函数在内部检索中间结果,该函数从 coordinator 节点复制的文件中加载数据。


这个例子展示了 Citus 如何使用分布式子计划在多个步骤中执行查询,以及如何使用 EXPLAIN 来了解分布式查询执行。


 PostgreSQL 计划器和执行器


一旦分布式执行器将查询片段发送给 worker,它们就会像常规 PostgreSQL 查询一样被处理。该 worker 上的 PostgreSQL 计划程序选择在相应分片表上本地执行该查询的最佳计划。 PostgreSQL 执行器然后运行该查询并将查询结果返回给分布式执行器。您可以从 PostgreSQL 手册中了解有关 PostgreSQL 计划器和执行器的更多信息。最后,分布式执行器将结果传递给 coordinator 进行最终聚合。


  • 计划器
  • 执行器
相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
9天前
|
关系型数据库 分布式数据库 数据库
PostgreSQL+Citus分布式数据库
PostgreSQL+Citus分布式数据库
39 15
|
3天前
|
SQL 监控 关系型数据库
SQL语句当前及历史信息查询-performance schema的使用
本文介绍了如何使用MySQL的Performance Schema来获取SQL语句的当前和历史执行信息。Performance Schema默认在MySQL 8.0中启用,可以通过查询相关表来获取详细的SQL执行信息,包括当前执行的SQL、历史执行记录和统计汇总信息,从而快速定位和解决性能瓶颈。
|
14天前
|
SQL 存储 缓存
如何优化SQL查询性能?
【10月更文挑战第28天】如何优化SQL查询性能?
59 10
|
8天前
|
SQL 关系型数据库 MySQL
|
22天前
|
SQL 数据库 开发者
功能发布-自定义SQL查询
本期主要为大家介绍ClkLog九月上线的新功能-自定义SQL查询。
|
18天前
|
SQL 关系型数据库 MySQL
mysql编写sql脚本:要求表没有主键,但是想查询没有相同值的时候才进行插入
mysql编写sql脚本:要求表没有主键,但是想查询没有相同值的时候才进行插入
30 0
|
1月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
3月前
|
NoSQL Redis
基于Redis的高可用分布式锁——RedLock
这篇文章介绍了基于Redis的高可用分布式锁RedLock的概念、工作流程、获取和释放锁的方法,以及RedLock相比单机锁在高可用性上的优势,同时指出了其在某些特殊场景下的不足,并提到了ZooKeeper作为另一种实现分布式锁的方案。
110 2
基于Redis的高可用分布式锁——RedLock
|
3月前
|
缓存 NoSQL Java
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
这篇文章是关于如何在SpringBoot应用中整合Redis并处理分布式场景下的缓存问题,包括缓存穿透、缓存雪崩和缓存击穿。文章详细讨论了在分布式情况下如何添加分布式锁来解决缓存击穿问题,提供了加锁和解锁的实现过程,并展示了使用JMeter进行压力测试来验证锁机制有效性的方法。
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
|
7天前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁通过SETNX指令实现,确保仅在键不存在时设置值。此机制用于控制多个线程对共享资源的访问,避免并发冲突。然而,实际应用中需解决死锁、锁超时、归一化、可重入及阻塞等问题,以确保系统的稳定性和可靠性。解决方案包括设置锁超时、引入Watch Dog机制、使用ThreadLocal绑定加解锁操作、实现计数器支持可重入锁以及采用自旋锁思想处理阻塞请求。
40 16