前言
分布式数据库能够解决海量数据存储、超高并发吞吐、大表瓶颈以及复杂计算效率等单机数据库瓶颈难题,当业务体量即将突破单机数据库承载极限和单表过大导致性能、维护问题时,分布式数据库是解决上述问题的高性价比方案。数据库作为分布式改造的最大难点,就是"和使用单机数据库一样使用分布式数据库",这也一直是广大用户的核心诉求。
而实现一款分布式数据库,绕不过的难题有很多:数据如何分片?分布式SQL?分布式事务?数据倾斜的问题?数据如何迁移?的确,这些都是分布式领域绕不开的难题。
既要实现分布式数据库的功能,又要减少用户使用的学习成本、做到应用层对分布式架构无感,这就要做到功能和易用性的取舍,如何做到这种平衡?
Citus是基于PostgreSQL插件实现的一款开源分布式数据库,是Azure在分布式PG上的商业化实践,是一款以OLTP为主,提供部分OLAP能力的数据库。今天我们将从Citus入手,从源码级别揭秘如何基于PostgreSQL做一款分布式数据库,解决分布式场景的数据分片、分布式SQL、分布式事务、数据倾斜、数据迁移等难点问题,理解分布式领域设计的”取“与”舍“。
1. 分布式的架构和难点
图中是一个典型的数据库分布式架构,客户端的SQL经过coordinate节点(CN)的解析,在分布式的sharding节点(DN)上执行。
在这个架构中,有很多难点和现存的解决方案:
- 数据分片方式:Hash;Range
- 分布式SQL:使用sharding column单机SQL;跨sharding节点的SQL
- 分布式事务:全局一致性写(2PC);全局一致性读(HLC、TSO…)
- 数据倾斜:分片分裂;resharding
- 数据迁移:Online Schema Change;逻辑复制
而Citus是如何解决上述问题的呢?又是如何成为Azure商业化的首选呢?下面会详细讲解。
2. Citus架构设计
我们先来看下Citus整体的架构设计,值得说明的是:Citus架构中每个角色都是PG,没有其他中间件。
我们看下Citus架构中的组件构成:
- 整体架构分为:mCN+nDN
- CN:Coordinator节点,集群元数据、分布式计划
- 特点:轻计算,重网络IO
- Citus daemon:2PC事务的监测、恢复和清理
- DN:Datanode节点,存储实际的分片数据
- 特点:实际的计算和存储节点,可水平拓展
为了更深入地理解Citus的架构设计,我们会在接下来的章节中讨论Citus在各个方面的设计与取舍:
- Citus的分片方式、表和索引详见第3章;
- Citus的分布式SQL、执行计划、计算下推的原则,详见第4章;
- Citus的分布式事务实现及正确性保障、为何没有全局一致性读,详见第5章;
- Ciitus的集群管理,包括高可用和容灾、数据热点/数据倾斜、分布式的线性拓展、PITR,详见第6章。
3. Citus分片
3.1 Citus 分片方式
不同的分片方式直接影响了数据的分布方式,Citus从应用实践出发,提供了最常用的两种分片方式:
- hash分片:采用一致性哈希原理,[-2^32,2^32-1]平均分成n个区间,hash_int32(key)计算具体区间。
hash分片比较常见于多租户场景、查询多围绕某列等场景,在这样的分布式结构中,最重要的就是sharding column的选择。一个好的sharding column可以很大程度解决数据分布不均匀、访问热点等问题。
sharding column的一般选择:Where、JOIN出现频率高的列,分布相对平均的列,例如:订单表、用户表以user_id做分片;SaaS多租户。
- range分片:分片范围由人工指定。
Citus对range分片的支持比较简单,需要人工指定每个分片的范围。相比TiDB,Citus的range分片没有自动分裂,节约了很大的运维成本。Citus也指明了range分片适用于时序数据,支持人工分裂某个区间。
Citus仅支持了常用的数据分片,推荐hash分片方式,官方称解决绝大多数问题。
3.2 Citus 表和索引
Citus系统中存在三种表:
- Distributed tables:分片表,就是使用上述hash分片和range分片的表。
- Co-located table:分片方式相同的表称为亲和表。亲和表的集合成为组,通常一个分片的运维操作,指的都是一个组内的所有亲和表。
- Reference tables:参考表,每个DN上有表的全部数据,用于解决跨节点JOIN问题。
- Local tables:本地表,只存在于CN上(因为CN也是PG)。官方称Local table用来存储数据量不大的表,解决用户鉴权问题。笔者认为这是因为CN是PG,可以存储数据,本身意义不大。
Citus的索引:完全依赖PG的索引,没有全局的二级索引,没有全局的唯一、外键约束。
- Citus的唯一、外键约束,必须建立组合索引,组合索引的第一列必须是sharding column。
Citus主张把全局的约束下沉到各个DN上,这也与我们后面介绍的Citus倾向于做计算下推不谋而合。
Citus在分片、表、索引的设计都化繁为简,意在减轻用户在使用中的学习成本。随着我们逐步深入地介绍,你会更加深刻地理解这一点。
4. Citus 分布式 SQL
4.1 内核实现
Citus完全实现在PG的插件中,对内核代码0改动,既最大程度复用了PG原有的能力,又达到了代码的解耦,工程质量极高。具体实现见下图:
可以看出,Citus实现了planner_hook和executor_hook,使得所有的SQL都进入到Citus的执行逻辑中。
Citus借助PG提供的计划器和执行器的钩子函数,结合PG优化器,生成分布式执行计划:
- 计划器钩子(planner_hook)
- 输入:query tree、table sharding info
- 输出:plan with custom scan
- 在产生分布式计划的过程中,Citus会根据SQL的执行方式,判断属于以下哪种执行计划:
- local plan:计划只在CN或DN上执行
- distributed plan:计划在CN和DN上都会执行,依赖local plan
- 执行器钩子(excutor_hook)
- 根据planner_hook产生的分布式计划,执行SQL下发、结果聚合
具体而言,针对一条SQL,Citus会如下图所示,在CN进行解析,转发到DN上执行。
上图也说明了Citus的MPP能力,充分利用了DN的计算能力。
图中有几个细节:
- CN将SQL下发到DN
- select/update/delete/insert拆分成对应分片的SQL
- insert batch拆分为单个insert value,没有对同一个DN的insert聚合到一条SQL(缺点)
- CN与DN之间维护连接池
- session参数不转发:search_path、transaction_isolation
- prepare statement转换成普通SQL,因为无法保证prepare后的SQL都经过同一条连接(缺点)
4.2 分布式SQL执行
讲完了Citus分布式SQL的内核实现,那对于分布式的SQL,Citus具体是如何支持的呢?Citus将分布式的SQL从简单到复杂分为了以下4种:
- 前3种SQL本质上还是在同分片中,没有跨节点,但是涵盖了分布式下很多场景的SQL。
- 第4种是真正意义上的分布式JOIN,查询方式可以概括为 —— 重分片:
1) a表重新分片(DN互联、不经过CN),以any为sharding column,保存结果到临时表i,此时i和b是亲和表(分片方式相同);
2) CN重新生成SQL: SELECT … FROM i JOIN b on ( i.any = b.key );
3) 每个shard查询
本例只举例a.any = b.key的情况,a表要重分片;其实a.any1 = b.any2也是同理的,a、b两表都需要以各自的any字段进行一次重分片。
但第4种要经过大量的计算、中间结果,Citus虽然支持但并不鼓励这样做,默认关闭此类SQL的支持。
4.3 从SQL limitation深入理解分布式SQL执行计划
上文简单描述了分布式SQL的执行过程,下面我会从几个SQL limitation来深入讲述分布式执行计划,来看看Citus的取舍。
4.3.1 Correlated subqueries
t、t1:关联分片表;t2:非关联分片表;ref:参考表
1. select * from ref where not exists (select t.id from t); |
支持,distributed plan,子查询可以在DN执行 |
2. select * from ref where not exists (select t.id from t1 join ref on t.id = ref.id); |
支持,distributed plan,子查询可以在DN执行 |
3. select * from ref where not exists (select t.id from t where t.id = ref.id and t.shard_id=3); |
支持,local plan,子查询无需将结果返回CN |
4. select * from ref where not exists (select t.id from t where ref.id = t.id); |
不支持,distributed plan,子查询无法在DN直接执行 |
表中列举了几个子查询的例子,distributed plan涉及CN和DN的共同计算,local plan代表只需要CN或DN计算。
- 1、2、3支持的原因是子查询可以单独在DN执行,而不需要中间和CN产生交互。
- 4 不支持的原因是子查询select t.id from t where ref.id = t.id无法直接在DN上执行,即使可以很简单的改写为select t.id from t,ref where ref.id = t.id,但Citus是直接将子查询下推到DN上,所以不支持。
4.3.2 Outer joins
t、t1:关联分片表;t2:非关联分片表;ref:参考表
1. select * from t1 left join t on t1.shard_id = t.shard_id; |
支持,亲和表 join on sharding column |
2. select * from ref join t on ref.id = t.id; |
支持,参考表和分片表inner join |
3. select * from t join t2 on t.id = t2.id; |
支持,非亲和表重分布 |
4. select * from ref left join t on ref.id = t.id; |
不支持,理论上重分布也可以支持,但不推荐 |
表中列举了几个JOIN的例子,1为亲和表间的outer join,2为参考表和分片表间的inner join,3为非亲和表间的inner join,这些都在Citus的支持之列。4是参考表和分片表的outer join,不支持的原因是Citus对outer join不会推荐重分布,更倾向于让SQL下推到DN上。而1失败是因为这个SQL在单个DN上的结果是错误的,需要DN之间的数据全集做outer join,所以报错。
虽然不支持,但提供了改写方法:with a as (select id from t) select * from ref left join a on ref.id = a.id;
乍一看这条SQL与上面4不是一样嘛?不急,仔细看看,这条使用了with字句,先把t表的id全部拿出来放到CN上,已经拿到了DN之间的数据全集,那接下来的outer join就可以得到一个正确的结果。
4.3.3 Recursive CTEs
Recursive CTEs是PG的特色功能,简单例子1~100做累加,下一次的计算依赖上一次的计算结果。
t、t1:关联分片表;t2:非关联分片表;ref:参考表
1. WITH RECURSIVE a(n) AS ( VALUES (1) UNION ALL SELECT n+1 FROM a WHERE n < 1) SELECT * FROM t where id in (select n from a) and shard_id = 3; |
支持 |
2. WITH RECURSIVE a(n) AS ( VALUES (1) UNION ALL SELECT n+1 FROM a WHERE n < 1) SELECT * FROM t where id in (select n from a); |
不支持 |
1 和 2 的唯一区别就是 1 指定了sharding column,而2没有。
2 不支持的理由:Citus对recursive CTE限制比较苛刻,只允许在单个分片,避免CN参与大量的计算。
4.3.4 Grouping sets
Grouping sets是PG的特色功能,指聚合时对每个grouing sets中的列单独聚合。
t、t1:关联分片表;t2:非关联分片表;ref:参考表
1. select id, name, avg(shard_id) from t group by id, name; |
支持,group by多列最多在CN产生一次聚合 |
2. select id, name, avg(shard_id) from t group by grouping sets ((id), (name), ()); |
不支持,grouping sets会在CN产生多次聚合 |
1和2都是group by,2比1多了grouping sets,grouping sets的语义是对sets中的每一列做一次聚合。2不支持的理由:grouping sets n列,要将DN返回的结果,在CN上重新聚合n次。按照我们上面对Citus的理解,涉及到多次在CN上计算的SQL,都是不支持的。
改写方法:group by多列、或者grouping sets里只有一列。
4.4 Citus 分布式 SQL 总结
- 通过PG内核的hook实现,0代码侵入,复用PG能力。
- 以PG优化器为基础,推荐将分布式SQL转换到单机上运行,避免在CN上计算,倾向于做计算下推。
- 对比业界其他方案(TiDB、CockroachDB等),支持全部优化器算子,成熟度高。
5. Citus 分布式事务
对于分布式数据库,绕不开的话题就是分布式事务。Citus对于分布式事务的实现,也是业界标配 —— 2PC。需要注意的是,Citus的分布式事务没有全局一致性读。Citus为什么要这样设计?具体实现又是怎样?且听我娓娓道来。
5.1 分布式事务的开始
对于分布式事务的开始,要经过以下步骤:
- 遇到第一条跨DN的SQL,标记为2PC
- SQL commit时发起2PC
5.2 分布式事务的实现——2PC
2PC基本是分布式事务的标准做法了,基本流程包括两个阶段:prepare+commit。prepare成功,就可以认为此2PC事务成功。2PC的详细算法不在这里赘述,我们只关心Citus的实现。
5.2.1 2PC事务中的关键角色和表
介绍2PC事务前,先来介绍下决2PC事务中的关键角色和表。
前提:PG支持prepare/commit transaction,类比mysql的xa prepare/commit,此为2PC在数据库的基础支持。
2PC的重要角色:
- CN节点:记录事务元信息,prepare/commit发起者
- CN daemon进程:2PC事务的恢复和回滚
2PC的重要表:
- CN上的表 pg_dist_transaction:记录DN和2PC事务ID
- DN上的表 pg_prepared_xacts:PG自带,记录已经prepare没有commit的事务,commit/rollback后会自动删除
有了以上两个表,CN daemon就能够在发生错误时,判断2PC事务到底应该提交还是回滚。
5.2.2 正常流程
先来看下正常流程,如下图所示流程。
5.2.3 异常流程
2PC事务的进行过程中,任何阶段都可能发生问题,导致2PC回滚或恢复。下图详细列举了任何阶段可能出现的问题,以及Citus是如何确保2PC事务的正确性。
在解释图中的每个阶段之前,先重申下两个表的作用:
- CN上的表 pg_dist_transaction:记录DN和2PC事务ID。2PC开始时记录,但未提交对外不可见。prepare阶段成功后提交,对外可见。
- DN上的表 pg_prepared_xacts:PG自带,记录已经prepare没有commit的事务,commit/rollback后会自动删除。
我们按照图中的顺序,分别讲述每一个流程出现问题后,Citus如何处理。
- 1和2:事务没有commit,还未到2PC阶段,系统就崩溃了。
事务处理:2PC事务还未开始,系统恢复后需要回滚事务。
具体步骤:
- 若CN挂了,CN和DN的连接重置,CN与客户端的连接重置,未提交事务自动回滚,客户端收到connection reset。
- 若DN挂了,CN对存活DN发送rollback,对客户端返回ERROR
- 3和4:2PC事务正式开始,记录了2PC事务的元信息,包括2PC事务ID和参与2PC事务的DN节点。但还没对DN发起prepare,或者有一部分DN prepare了,系统就崩溃了。
事务处理:2PC事务没有全部prepare成功就崩溃了,系统恢复后需要回滚事务。CN daemon对比pg_dist_transaction和pg_prepared_xacts的记录差异,可以很容易得出在DN需要回滚的prepared transaction。
具体步骤:
- 若CN挂了,CN和DN的连接重置,CN与客户端的连接重置,未提交事务自动回滚,客户端收到connection reset。CN daemon清理DN上的prepared transaction。
- 若DN挂了,CN对存活DN发送rollback,对客户端返回ERROR。CN daemon清理DN上的prepared transaction。
- 5和6:2PC事务的prepare成功,2PC事务元信息已经提交(pg_dist_transaction表对外可见)。
事务处理:系统恢复后提交2PC事务。
具体步骤:
- 若CN挂了,CN与客户端连接重置,客户端收到connection reset,CN daemon提交DN上的prepared transaction。
- 若DN挂了,CN对客户端返回WARN,CN daemon提交DN上的prepared transaction。
(在上述实现中,pg_dist_tranaction表的访问很容易成为读写热点,为了不对其上锁,采用2次检测的方法。)
5.3 全局死锁检测
- 每个事务都有CNID、TXID的标识
- CN daemon周期性轮询所有DN的事务信息,构建全局的wait-for有向图,成环即死锁
5.4 小结
Citus在分布式事务上只支持一致性写,没有一致性读,这也与第4章不鼓励分布式JOIN交相呼应。Citus的分布式实践一直在化繁为简,减少用户的学习成本,将分布式的SQL、事务都规约到同一个shard,简单且实用。
6. Citus 集群管理:高可用和容灾、数据热点/数据倾斜、分布式的线性拓展、PITR
6.1 Citus的高可用和容灾
PG原生支持多副本复制,没有自选主能力。Citus选择复用PG原生的能力,Azure商业化最佳实践为:CN和DN节点使用主备强同步复制,保证高可用,主备部署在不同的AZ。
6.2 数据热点/数据倾斜、分布式线性拓展
分布式数据库解决了单机数据库的数据量、吞吐、并发、计算效率等问题,但数据分布方式的变化不可避免会带来新的问题,包括以下问题:数据热点问题(数据的访问集中在某些分片上)、数据倾斜问题(数据分布不均匀集中在某些分片上)、分布式线性扩展能力(CN、DN的线性拓展)等等。
接下来我们看看Citus如何解决这些问题。
6.2.1 分片迁移
解决数据热点/数据倾斜、分布式线性拓展的问题,最基础、也是最重要的能力就是数据迁移的能力。有了数据迁移,分布式数据库里的上述问题才能够解决。
数据迁移的单位是一个分片组,即所有sharding方式相同的表。我们先讲下分片迁移的实现原理,然后再介绍上述这些问题该如何解决。
分片迁移的原理其实就是迁移一个分片里的所有表+修改元数据,Citus提供了两个方案:
- 方案一:COPY table + 修改集群元信息表,这种方案将源表的数据以二进制流batch写入目标表,速度很快,但是全程禁写,只适合离线使用。
- 方案二:逻辑订阅 + 修改集群元信息表,迁移时先全量再增量,割接时迁移的表只读。这种方案是online的,可以在线上使用。
(修改集群元数据涉及到数据和缓存的一致性,Citus使用trigger+signal invalidate机制,实现很优雅,这里就不展开讲了)
上述方案在执行前会记录元信息,即使在运行中集群崩溃,重启后是可以继续的。
有了分片迁移的基础,下面我们再来讨论数据热点/数据倾斜、分布式线性拓展的问题解决方案。
6.2.2 数据热点/数据倾斜的解决方案
1. 分片分裂
某个分片的数据存在热点/倾斜,最简单也最实用的办法就是把热点/倾斜的数据拆出来,然后迁移到资源相对充裕的节点上。不同的分片方式(hash和range),会有不同的方案:
- hash分片:hash分片使用一致性哈希,分片分裂需要将原有的一个分片拆成三个分片(具体做法如下图)。这种情况适用于某个sharding column的数据过多,需要单独拿出来,例如多租户数据以user_id分片,某个user的数据量特别大,需要单独分裂出来。
- range分片:range分片的区间是人工指定,分片分裂也是将原有区间一分为二。例如数据以月份归档,1月1日~1月31日的数据量太大,拆分为1月1日~1月15日、1月16日~1月31日。这个case比较简单就不画图说明了。
hash分片的分裂方式如上图所示,sharding column=135的数据量大,hash_int32(135) = -1111111111,所以要把group1分裂为group5、group6、group7。group为sharding column相同的所有亲和表构成的集合。最后使用分片迁移,将group6迁移到另一个资源充裕的节点。
2. schema change
对于一个分布式的表,sharding column的选择可谓是十分重要,直接决定了未来数据是否分布均匀。Citus也提供了修改sharding column的方式,例如用户表(user)当前以country_id(国家)分片,数据倾斜严重,改为以user_id分片。但是这种方式要阻塞在线的读写,不建议线上使用。
6.2.3 分布式线性拓展
随着业务的增长,数据量越来越大,TPS越来越高,往往需要添加更多的计算节点(CN)和存储节点(DN)。
1. CN的线性拓展
Citus提供了多CN的方式,包含一个master CN和多个普通CN节点,只有master CN才可以修改集群元信息,包括集群拓扑等。每个CN节点都可以接受客户端的DML SQL。
但Azure基于Citus的商业化实践里,只有单CN方式。一是这样管理比较简单,无论从用户使用还是后端运维。二是Azure认为CN不会成为计算瓶颈,CN节点的计算很轻量,Citus更倾向于将计算下推到DN上进行,这个从上述SQL Limitation也可以看出。
2. DN的线性拓展
新加一个DN节点,或者删除一个DN节点,都面临着数据的迁移。数据迁移的方式就是6.2.1介绍的分片迁移,强调下,数据迁移的单位是一个分片组,即所有sharding方式相同的表。
决定数据迁移的源端和目标端,是要有一定算法的。Citus提供了两种算法支持:
- by_shard_count:尽可能让每个DN上的分片数量相同。针对图中例子,会在group1/2/3/4中选择一个迁移到Datanode3,默认选择第一个也就是group 1。
- by_disk_size:尽可能让每个DN上的数据量相同。每个表的数据量直接使用了PG原生的统计信息。针对图中例子,会计算出哪些group迁移到Datanode 3后,每个datanode数据量相对平均。
总的流程如下:
- 根据设定的算法,计算哪些分片需要迁移和迁移的目标节点,写入集群元信息表。
- 根据步骤1的迁移元信息,进行数据迁移。迁移的单位是一个分片,多个分片任务可以并行。
6.4 Citus集群PITR
分布式数据库中的按时间点还原,需要将集群中每个节点的状态还原到同一时刻,不然容易造成状态不一致导致的失败。
Citus对此的解决方案是抢整个集群的Exclusive锁,该锁禁止了所有对集群的更改,包括集群元数据和用户数据,抢到锁后对所有节点创建restore point,此步骤与PG原生无异。创建restore point时指定超时时间,超过此时间整个过程失败,避免对在线写的阻塞。
所以我们可以看出,Citus创建还原点不能特别频繁,Azure商业化实践是每小时创建还原点。
7. 分布式产品对比
7.1 服务端
Citus |
PG-XC/TDSQL/GoldenDB |
YugabyteDB |
CockroachDB/TiDB |
|
实现方式 |
PG插件 |
PG内核 |
PG内核 |
KV on RocksDB + SQL |
全局一致性读 |
不支持 |
GTM(类TSO) |
HLC |
HLC/TSO |
副本同步 |
PG原生主备复制 |
PG原生主备复制 |
raft |
raft |
数据分片 |
hash/range |
hash |
hash/range |
range |
Schema change |
只读 |
只读 |
只读 |
online |
列存 |
支持列存(append-only) |
不支持 |
不支持 |
支持 |
热点问题 |
分片分裂、修改分片column |
分片分裂 |
分片分裂 |
分片分裂 |
计算下推 |
支持 |
支持 |
支持 |
部分支持 |
7.2 客户端和中间件
Citus |
Sharding JDBC |
MyCat |
|
实现方式 |
服务端PG插件 |
Java客户端 |
服务端proxy |
分布式事务 |
MVCC(无全局MVCC) |
MVCC(无全局MVCC) |
MVCC(无全局MVCC) |
计算下推 |
全部支持 |
部分支持 |
部分支持 |
数据分片管理 |
hash/range |
每个客户端都要维护hash/range规则 |
hash/range |
分布式JOIN及优化 |
支持所有类型的JOIN |
不支持跨分片的JOIN |
只支持两表跨分片JOIN |
节点互联 |
支持 |
不支持 |
不支持 |
扩容 |
shard migration |
每个客户端都要修改分片规则 |
shard migration |
广播表 |
支持 |
支持 |
支持 |
DDL |
基本全部支持 |
支持常用DDL |
支持常用DDL |
集群管理指令 |
丰富,通过SQL函数对客户透明 |
无 |
丰富,需要二次封装 |
8. 总结
纵观Citus在分布式上的实现与取舍,可以看出并没有完成分布式数据库的所有功能,而是以用户场景为导向,致力于解决某些场景的问题。其在实现上有很多思路值得借鉴。总结如下:
- 插件化实现:PG能力复用和代码耦合达到了很高的平衡
- 最大程度复用PG已有能力
- 代码耦合度很低,可以快速适配新的PG大版本(对比GreenPlum)
- 以业务场景为导向,做分布式功能的取舍
- 舍弃了range的自动分裂
- 舍弃了全局一致性读
- Azure商业化舍弃了多CN,分布式SQL都倾向于做计算下推
9. Reference
https://en.wikipedia.org/wiki/Two-phase_commit_protocol
https://docs.citusdata.com/en/v10.2/get_started/what_is_citus.html