如何将 PolarDB-X 与大数据等系统互通|学习笔记

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云原生数据库 PolarDB 分布式版,标准版 2核8GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 快速学习如何将 PolarDB-X 与大数据等系统互通

开发者学堂课程【PolarDB-X 动手实践如何将  PolarDB-X  与大数据等系统互通】学习笔记,与课程紧密联系,让用户快速学习知识。

地址https://developer.aliyun.com/learning/course/944/detail/14752


如何将 PolarDB-X 与大数据等系统互通

 

容介绍

一、环境准备

二、演示内容

  

一、环境准备

本节内容会讲如果将 PolarDB-X 与下游的一些数据的系统比如说大数据等,进行一个互通,其实就是讲一下 PolarDB-X 提供一个全新变化的一个能力。

首先它还是围绕 PolarDB-X 的社区版,也就是开源的版本来进行一个讲解,那么它对应的是在阿里云上的 PolarDB-X 商业版的一个 2.0,面向群体就是主要应用开发者、架构师、 DBA、DevOps、SRE 和学生等等。

图片1.png


主要的内容就是围绕在使用 PolarDB-X 的整个生命周期的过程当中所有的场景,这样一个场景化的介绍会有一些动手的实践的一个过程。

那么这个课程大概会覆盖这些 topic , topic 在后面也可能会有所增加或者是调整,比如说第一期讲了如何一键部署 PolarDB-X ,那么第二期讲了如果使用 PolarDB-X ,那今天按照这个顺序,应该是讲数据导入或者是水平或缩容,但是目前这两个功能依赖于最近即将发的一个新的版本,所以那就把它调整到这个画板之后再进行讲解,所以这一讲需要有这样一些基础的准备。

第一个就是系统,首先需要有一台 CentOS7、CentOS8、macOS 等等。 Windows 没测过,但希望它可以完成。配置就稍微好一点的配置,那么这个环境里面需要装上像 Docker、MySQL ,还有一个可使用的 PolarDB-X ,还有今天演示所依赖的一个阿里开源的工具。

图片2.png


那么按照惯例,首先回来看一下 PolarDB-X 的一个架构,那理解这个架构会有助于理解后面的很多实践动手过程当中的一些细节,所以首先还是来看一下 PolarDB-X 的一个的基础架构,它是一个分布式的一个水库,那么在系统组建实际上它也是个分布式的一种方式。最前面也就是跟应用的交互这边有吸引的一个组件,也是分布式计算的节点,那么后面紧跟的就是 DN ,它是数据存储的一个节点, DN 是一个 MySQL 的一个本质,所以为了方便理解可以简单的把它理解成是一个定制化的 MySQL ,那么最上面的是元数据中心 Gms,那它其实也是一种 DN ,只不过扮演了特殊的角色,它会保存整个系统的元数据,同时提供一个全局式的授时服务,那最后也是跟今天内容相关的 CDC 这样的一个全局日志生成的一个组件,这是整个系统的架构

 

二、演示内容

图片3.png


第一部分来演示一下 PolarDB-X 作为一个圆,中间用上开源的 Canal,作为增量的一个数据的订阅,也就是消费全级 Binlog ,那最后把 Canal 的数据本来计划是投递到 kafka 当中,因为分析的系统比较热门,所以把数据就都投递到 Kinkhouse  里面,第一个要演示的场景就是 PolarDB-X +Canal+Kafka ,最终的目标是跑起一个 PolarDB-X 或者搭建一条同步列路,然后在 PolarDB-X 里面写一些数据,实际上是在做一个数据库当中常用的一个测试场景,它叫转账测试。希望这个同链路能够把这些数据正常的同步到下游的 PolarDB-X  里面。

第二个场景是 PolarDB-X 提供了与 MySQL 完全兼容的一个能力。那么作为一个验证也会把 MySQL 作为  PolarDB-X 的一个备库,然后直接用 MySQL 的 change master 指令来连到 PolarDB-X 里面,同时在 PolarDB-X 这边再次跑一个转账测试,希望转账测试里面的数据能够正常的同步到 MySQL 这个备库里面。首先来进行第一个演示,也就是 PolarDB-X +Canal+Kafka。用了阿里云上的一台 ecs ,把这些链路搭建起来,同时因为中间有一段需要写代码的,也就是 Kinkhouse 消费到 PolarDB-X 中的 Binlog 之后,它需要用 Canal 的 Kink  去消费增量。

同时再把它写到 Kinkhouse 里面。这段代码直接把它放在那个 idea 本地建了一个工程,所以这次演示首先 PolarDB-X +Canal+Kafka,更精确的来说以及 Kinkhouse 。

这三个都是以一个 docker 容器跑在了一个远程的 ecs 的远程服务器上,然后 Kinkhouse 是跑在个人电脑。那么接下来首先登录远程演示的服务器,这是一直以来用来演示的服务器,那么接下来先来看一下,可以看到远程的服务器上,目前一个容器都没有所以全新来建几个容器,第一个需要把 PolarDB-X 建立好,这是之前已经测试过的,所以这个电脑上会有之前敲过的一些命令的记录,有的用户可能看得比较清楚,发现之前执行的命令直接补全掉,那如果不知道,给大家推荐一个比较好用的 shell 叫 fish ,然后它还是比较好用的,大家可以去用一下,这样就把 PolarDB-X 给解决掉,这个容器是用我最近打包的一个新的 PolarDB-X 的一个 one 的一个形象 iPod,那它里面启动 PolarDB-X 需要一点点的时间,那用这点时间来讲一下把那个我的 client,也就是我消费 clinicnk server 里面增量数据写到 Kinkhouse 那段代码,它的基本都是怎么样的?

昨天我在 Idea 里面建了一个工程,然后这个工程很简单,它就一个类,这个类就是为了演示,从 clinicnk server 里面拉到增量数据,我是把它投递到 Kinkhouse 里面,首先是从给它们 Kink 的那个官方的例子里面直接把那段代码给拷贝过来的,也就是那个 example 拷贝下来的,然后做了稍微的一个修改。

那前面就是连接 clinicnk server  一个过程,尝试连接去获取消息。如果没有就等待一秒之后再去尝试获取的这么一个循环,然后一旦获取到数据之后,它就是在控制台。

这个方法就把这些获取到的增量的数据打印出来,那么在 example 里面,这个代码就算完成。

那我做的修改的部分就是在它打印的时候,我同时去建了一个到一个 Kinkhouse  连接,可以看到这里是用 Kinkhouse  提供的一个标准的 gbc 连接的一个方式,首先搞了一个 Kinkhouse ,然后再写一个 connection,然后写一 statement ,之后会在这些里面的记录的时候,在这里获取达到每一行变更,如果发现里面是 insert 或者是 update 的实践,那我会尝试把它投递到 Kinkhouse 。

原来的那个 client example 里面有个 print column 这样的一个方法,它只是把它打到控制台上,到这里就完了。那我这边就追加了一段同时生成一个 insert into accounts,然后往里面加增量的数据,写到那 Kinkhouse 就是这段投递的一个代码。那么现在回到刚才那个上面来看一下,现在 PolarDB-X  有没有正常的运行,那如果它正常运行,我现在通过本地的 8527 这个端口,用  root 这个账号,密码也是 123456 就可以连上它了,可以看到已经正常的点到了 PolarDB-X 里面,来看一下系统当前的一个状态,来建一个本次测试时候需要的一个库,就转账测试所需要的一个库,建这样的一个库,我来看一下当前 Binlog 的一个状态,可以看到刚才这个 Binlog 已经建库并记录到全体 Binlog 里面,而且格式是跟 SQL 的那个格式是一样的。抛弃一个  PolarDB-X  ,那么接下来其中一个 server,那个 server 直接连接到这个 PolarDB-X 上,server 使用官方提供一个 Run ,它做了一个简单的封装,可以看到在这里会传进去一堆的参数,这些参数本身都是所需要的。这里面关键的一个点,第一个是要指定 PolarDB-X ,这里就是 8527 端口,那就账号 PolarDB-X ,密码是 123456,不过正常再过一会这个 clinicnk server 就会正常的连接到 PolarDB-X ,开始消费它的一个增量的日志,可以登到这个容器里面来看一下,这个里面应该是好了,那就再建立一个 Kinkhouse ,回去那个页面上就是给 Copy 过来了,唯一的增加的一个地方就是做了一个端口的映射,它容器里面 8123 这个端口映射到主机上,这样做的原因是一会要将这个远程的 ecs 端口在映射到本地以方便就是本地的那投递的代码能够正常的把这些变更或者投递到 Kinkhouse 里面。这个时候就在远程的服务器上的三个容器就准备完成了,第一个是 PolarDB-X  ,第二个是 clinicnk server第三个就是 Kinkhouse 现在需要再做另外一件事情就是这三个服务们的端口映射到我的本地的电脑上看这个端口的映射其实也比较简单,可以用一个 ssh,ssh 提供一个命令,它在正常登录之后加一个参数就是大写的 l 变量参数,那它前面一段的意思就是 listen 本机的一个 IP 和端口,然后后面这段的意思就是把收到的来自于本机的连接到 8527 端口的所有流量全部无脑的转发到远程那个机器 827 单子上面,这样也就达到了端口转发的两个目的,那现在需要转发两个端口,第一个是 canal server ,第二个是 pink house ,本地那个 client 这个例子就可以跑起来了,那 clever server 默认的端口是 11111,那第二个需要把 Kinkhouse  映射过来, Kinkhouse  默认的是 8123,那也来做一下。这样我就将远程的两个服务就是 clever server 和 Kinkhouse 两个端口,一个是 11111,一个是 8123,映射到了本机的端口,可以简单来看一下,那现在这个里面连接的是本地的那个点,来看一下本地所建立的端口,先看一下 11111 有没有存在,很明显是存在的,然后看一下第二个 8123,它也是存在的。接下来尝试跑一下 Canal client 的代码,然后在“52”这个位置打一个断看能不能正常的去发消息。因为现在 PolarDB-X 里边,也就是最远端,它其实是没有变更的,或者说只有一些系统心跳的消息,所以它其实不应该走到这一步,但是它居然完成了,那这就是心跳消息。再进一步到里面去看一下,也就说它走到这里,如果它是一些 dl 的变更 insert、delete、update 的时候,它才会走到所需要的这个代码,如果不是它应该不会走到这边,反之就会结束了。

最终是需要它在 print 的时候把它写到一边,就是目前理论上它能从 messenger 拿到消息,意味着已经成功的拿到了 clever server  那边的增量的消息,也就意味着 clever server 已经成功的跟上游的 PolarDB-X 进行了一个增量的订阅,然后把这两个断点取消掉,那这个时候数据已经从 PolarDB-X 到了 canal server ,然后再到了 canal client ,那接下来需要在原端,也就是 PolarDB-X 那端来开始转账测试,产生一些 dl 的一些数据,那期望这段代码能够把这些变更投递到下游的 Kinkhouse 里面,在投递之前需要在 Kinkhouse 里面去建对应的一张表,因为这个这个容器是新建的,所以这里面既没有库,也没有表,所以如果现在直接投递会产生报错的结果,在建立的时候也是用了官方的一个一键连接到已有的容器里面的命令,连接到 Kinkhouse 容器里面的命令,这个就相当于执行一下 MySQL ,直接连接到了一个对应的 MySQL ,那这里就相当于获取到了一个连接到 Kinkhouse 一个终端,那这个连的是刚才启动 Kinkhouse 的容器,来看一下它里面目前库的情况,这里面是没有想要叫做  test  这样的一个库,所以先来建立一下,之后再建立一下需要的表,反正测试的表结构还是比较简单的,所以它名字叫 accounts ,它里面有两列,一个叫 ID 也就是用户的,第二个是它账户余额 balance ,也就是说现在系统里面有一批人,他们现在账户余额是多少钱,就会记录这么个信息,之后系统会开启这些人之间的相互转账,你这边扣 100,那他那边加 100,这样一个过程。那可以看到 Kinkhouse 里面建表与 MySQL 基本一致,就是在最后制定一些额外的东西可能不太一样。

现在这张表已经完成了,看一下那显然是空的,这个符合预期,再来看一下远端也就是 PolarDB-X ,现在 test 这个库里面是没有表的,接下来启动转账测试,我把它放在另外一个工程里面,这个是平时用来测试的一个代码,可以看到它连的确实是电脑上的 127 上面的一个 827 端口,因为用 ssh 做了一个端口映射,所以才会连接到 ecs 的那边,并且账号和密码都是正确的,现在来启动,那个转账测试启动之后会在 PolarDB-X  或者 ecs 里面建一张表。我这边端口映射还没做,我需要把远程的 PolarDB-X ,也就是它 8527 端口也映射到本地的电脑上,那再来跑一下转账测试,可以看到转账测试已经正常的在运行,然后来看一下 PolarDB-X,这张表已经有了,那里面也有 100 个账户,这账户余额都是以前的,但是现在已经有些变化了,说明它们的转账已经在进行了,那同时刚才  Canal client 的那这个代码,它的断点已经成功运行到了即将写的 Kinkhouse 的那段代码,也就是说已经成功的获取到了这些 dl 的变更。执行第一条的已经成功,也就是说此应该在 Kinkhouse 里面看到了一条数据,现在打开的终端是那个 Kinkhouse 一个命名窗口,这时候再来执行一下看一下有一条数据。

下面将这里的断点取消,让它是这些增量实施的一个同步,可以看到它在不停的 insert ,这个时候再来看一下最下游的 Kinkhouse ,那它里面已经有 385 条的,为什么会比 PolarDB-X 那边多?因为可以看到这里面有 ID 重复的情况,因为那边在代码处理 update 的时候把它转换成了 insert ,所以这边会往下游插入重复的数据,同步的数据因为没有走去重,所以其实是个追加的过程。

这样就会有这样一个现象,但没关系,这次的重点是已经成功的搭建起了一条同步的链路,这个链路最上游是 PolarDB-X ,那它的中间层是 clinicnk server ,再往下有一个 Canal client 消费的 clinicnk server 里面的数据,最后 Canal client  这个代码的数据投递到了最下游的 Kinkhouse ,现在已经成功的搭建这样的一个的链路,那这里想强调几个点,第一个 PolarDB-X 提供了一个全局的一个 Binlog ,那这个功能是跟 MySQL 保持了一个完全的兼容,像 Canal 这样的一个工具,因为 Canal 是用来订阅 SQL Binlog 的,那把它拿过来无脑的接到了  PolarDB-X 上,然后成功地取到了增量,说明 PolarDB-X  的 Binlog 跟 MySQL 的 Binlog 确实有一个很好的兼容性。

那在接下来获取到了增量之后,能够更了解其他的按照格式去解析,接着把增长数据也能投给下游,所以也就是说当原有的一些系统切换 PolarDB-X  之后,你原有的是通过那个 Binlog 获取增量的,所有的下游的系统不需要做修改,还是可以正常的在运行,这样可以极大的降低去做系统切换时候的一些成本,这是第一个例子。

那现在就把这些东西全部停掉,一个是先把转账测试给剪掉。可以看到此时这个代码也停掉了,这个就是第一个例子。回到 PPT,那在演示第二个例子之前,也就是将 MySQL 作为 PolarDB-X 的一个这样的一个例子。

图片4.png

 

先把环境清理一下,PolarDB-X 还用刚才那个,可以看到现在就剩一个容器,这个容器就是 PolarDB-X ,接下来将容器里面的数据清一下,那回到初始的状态,就是这个时候系统里面有一个库,里面的没有表的。

接下来建一个 MySQL 的容器,它也是直接用了官方提供的一个形象,我跑了 MySQL 容器,它的名字叫 MySQL ,然后 root 密码设了 123456,然后做端口映射,这样容器里面 3306 映射到了我的那个 ecs 上面,这样就是在远程的 ecs 上面,我就跑了两个容器,一个容器是 PolarDB-X ,第二个容器是 MySQL , PolarDB-X 的是 8527 这个端口, MySQL 是 3306,那么接下来就是将这个 MySQL 通过指令连接到 PolarDB-X 上,昨天连接到 MySQL 上面的,可以看到连的是 330 端口,账号用的是 root,密码用 123456。首先来看一下里面是不是空的?需要建一个数据库,这样就上游 PolarDB-X 保持对齐了,那表就不建了,直接把它同步过来。接下来用这个自带的指令,就是打起一个同步链路的指令,它会有几个参数,第一个是指定 master host ,第二个参数是 master port,第三个参数是 master user,用来做同步的账号,最后才会让你指定说你从哪个 Binlog 文件开始,来看一下目前 PolarDB-X 上有哪些 Binlog 文件,目前还是一个 Binlog 文件,直接无脑复制过来。最后就是要指定从这个 Binlog 文件的那个位置开始消费。然后来看一下,可以看到这个因为 PolarDB-X 里面目前没变更,但是这个位点却一直在增加,为什么呢?

因为这里边有一个系统内置的一个心跳。在 PolarDB-X 上面来看一下目前有哪些连接,可以看到当前有四个连接,两个 cdc 可以忽略,这是系统内置的两个连接,一个是 test,其实就是现在在之前面连接的。另外一个也就是刚才的一个 MySQL 的一个连接。接下来就可以开启刚才所说的转账测试,在开启之前检查一下端口映射是否正常,现在再次跑起转账测试,这次转账测试是在远端的 PolarDB-X 执行,然后下游的一个 MySQL 打了一个同步的链路,那么希望通过同步的机制,可以将 PolarDB-X 里面的增量变更正常给同步过去。在此启动转账测试,这边可以看到这张表已经成功的建出来了,里面有 100 个账户,并且已经开始了转账,那么来下路这边来看一下,这边来看一下下游好像挂了,那先把上游的停掉,检查一下哪里出问题了。可以看到上游已经没什么变化了,看着像是 my SQL 挂了,那就重新跑一个,现在 PolarDB-X 这个容器是很好的, MySQL 的容器不知道什么原因挂了,那就重新启动一个,等到 MySQL 初始化完成之后,在重新尝试做一遍,先把之前的表再次删除,回到初始状态

下游的 MySQL 因为是刚建的,肯定里面也是空的,再次执行 master 的指令,这次把它的那个位点设置的更往后一些,因为刚才已经执行了一堆那个他们从这个位置开始跑步,再次开启如果再崩就是什么地方真的有问题了,真的有问题,就跟昨天看到的结果不太一样,我去查一下原因,那么这个例子就便是了后面我找到原因之后再在下一次的那个演讲当中跟大家来讲一下这种问题,排查可能也是一个非常有意思的过程。

那今天讲解的第三部分就是全局 Binlog 的一个原理,就是它到底是怎么实现的。可能用户并不清楚 MySQL Binlog 是什么样子的,我这边也简单做一个介绍。

图片5.png


首先可以把它想象成一个很长事件,比如这样一个队列,那么它是按照以基本的一个事物为单位,也是这个事务涉及到哪些变更,他会把它记录下来,这样以事物为单位,一个一个的排队,把他们给记录下来,这样很长的一个单一的队列,那么也就是说如果你的数据库在实例创建的一开始就记住了 Binlog ,并且一直到现在也没有删除这个 Binlog ,以及投放这个 Binlog ,它可以把数据原模原样的给还原出来,这就是 Binlog 这样的一种存在。

那么 MySQL 的 Binlog 有三种格式,一种格式叫做 statement ,就是也是基于你的 SQL 来记录的,它会把你历史以来所有的 CQ 顺序把它给记录下来;那第二种叫做 Row based 就是基于行变化的,也就是这个系统里面这行总 insert 一直到后面的 update,一直到后面的 delete 整个变化过程,这是每一行都这样去记录的,这是第二种格式;那第三个其实是将这两个混合起来,既记录了那个 CQ 又记录了 Row 的变化,在它认为合适的地方,它用 Row 这样的一种方式。

这几种方式各有优缺点,但是目前就是从 MySQL 5 点 7.7 以后官方默认的是 Row .因为它很简单,而且能够严格的保证数据的一个移植性,你这些像那个基于 SQL ,它可能会有些问题。你在下午再重发的时候,并不能严格保证数据的一致性,这样就是 MySQL 的 Binlog。

也就是说可以从现在开始简单的认为 MySQL 的 Binlog 是一个单一的队列,这个队列按照它的每行数据的时间变化严格地把他们给记录下来了,这就是 MySQL 的 Binlog。那 PolarDB-X 里面它的那个增量的日志是什么样子的呢?

首先前面讲了一下 PolarDB-X 的一个架构,那么它的那个存储层(DN),可以无脑的认为它是一个 MySQL ,那就意味着在 PolarDB-X 里面也有一份 MySQL 的 Binlog ,只不过是现在系统当中可能会有很多 DN,也就有很多的 MySQL ,也就意味着有很多的 Binlog 的队列,那么如果想生成一个 MySQL 的 PolarDB-X 的兼容 MySQL 的 Binlog  这样的一个增量日志,那是不是说只要比较简单的将这几条物理的 MySQL 的 Binlog ,也就是 DN 的一个 Binlog 进行一个合并,也是那个数据结构算法里面做了一个经典的题目,这样的一个过程那就可以了?

图片6.png

 

并不是这样的,为什么呢?因为 PolarDB-X 的这个分布式的系统为了实现数据库的一些基本功能,比如说 15,它引入了很多新的改变,那这些改变就会导致做的全局 Binlog 在实现的时候也有一定的复杂度。首先来看一下全局 Binlog 实现过程当中,因为和那个相关的英文分布式而已引入的一些协议、算法、设计等等。

 

那么讲数据库不得不讲这个话题就是事物,但在 PolarDB-X  里面也不例外,提供了一个分布式事物的能力,现在主流的分布式系统里面一般实现分布式事物,都是通过两阶段提交这样的一个方式进行的。那如果不清楚两阶段提交的,也就是这样的一个协议或者过程的,后面可以去网上搜索,这种讲解的资料是非常多的。简单来说,它要去达成一个目标,这个目标是什么呢?

就网络当中有对一个的,比如两个或者更多的这样的一些组建,它们之间要协商一本的数据,并且有一个一致的一个结果。这个时候就可以通过两阶段当中有两个角色,第一个叫协调者,第二个叫参与者,那么这两阶段大概的过程就是协调者首先向所有的参与者发起这样的东西叫 prepare ,就是准备好做这个变更,准备好接受新的一个数据,这就是第一个阶段。那么所有的参与者如果现在状态很好,它经过各种判断之后,可以接受这个新的数据,那么就会跟协调者之间返回一个 OK ,那么协调者在收到所有参与者返回的 OK 之后,它认为这个 Binlog 变更就可以做下去了,因为所有人都告诉他已经 OK 了。那么第二步,他就会再通知所有的参与者说现在把这个变更真实的把它给记录下来,那所有的参与者接收到了之后,他把这个变更再记录一下来,这样就完成了一个连阶段提交的过程,听上去是比较简单,也容易理解的,那么这里需要注意一些细节。第一个是如果有一些参与者在第一阶段的时候因为各种原因嗯回了个不 OK 或者因为网络的问题没有回,那这个时候协调者者会怎么办呢?那协调者只要满足说不是所有参与者告诉我 ok 了,那就把这件事情或者说在数据库里面这个事务回滚掉。

 

图片7.png

 

这是协调者要处理的一个问题,如果因为网络的原因,所以协调者这边同时会有一个超时的机制,那跟所有的参与者之间通信的时候,有一个网络超时,那如果在超时时间之内没有回复我,也认为它是失败了,这样就保证这件事情一定会达到一个最终状态,这就是简单来说所谓的两阶段提交。

那么在  PolarDB-X  里面同样也用了这样的一种提交的方式来进行分布式事物,因为 PolarDB-X 的 DN 是 MySQL 的一个分支,那么 MySQL 在实现跟分布事物相关的能力的时候,它就实现了 XA 两阶段提交的这样一个规范,那么也就是说在 XA 这样的一个规范里面,它规定了连接提交里面的各种角色,应该在什么情况下应该怎么做,它是一个规范。

MySQL 是遵循这样一个规范的,所以 PolarDB-X 就直接去借用了 MySQL 这样的一个规范,把 CN 层作为规范里面的事物的管理者,那么把 MySQL(DN)作为我的一个 resource management,也就是两阶段当中的一个参与者。

通过这样的一个机制,就可以实现我的一个分布式事物,那这个分布事物最终在 PolarDB-X 的里面是什么样一种结果呢?就是如果你的应用做的一个事物,它的变更的数据涉及到了多个 DN ,也就及到了多个 MySQL ,那么能够保证就是它在多个 DN 中有一个事物的约束,这就是第一步,也就是说 PolarDB-X 用两阶段提交和实现了分布式事物,那么实现了这样一个事物其实用 XA 的方式也有一些问题,这个问题是什么呢?这个问题是当比如说这些事物真的发生冲突的时候,比如说两个事物涉及到了同一个 DN 上面的同一条数据的时候,在这里面就会有一个冲突,在这些场景下面,这个事物的方案实现的效率就会非常低,所以为了进一步提高效率需要继续往下做优化,通常业界所做的优化就是引入 micc 这样的一种多版本控制,那引入多版本之后会引入一些新的问题,因为在读版本机制里面有一个基本的概念叫做全局的快照,那在分布式的场景下面,也就是说需要在新这一层能够获取若干个 DN 里面数据的一个一致性的快照,那么这件事情是比较麻烦的,为什么这么说呢?

因为在分布式的场景下面,大家最基本的一个点所有的 DN 的机制时间是不一样的。第二个 CN 到所有的 DN 之间通信网络的延迟是不一样的,所以没有办法用一个某一个机制上的时间来做为一个衡量的标准,来说这个小于这个时间之前的数据,就认为就可以拿来做一个快照了。这边做一个简单的比喻,比如说这张图的右边的最上面这一部分,这边有第一个 CN ,它现在在处理一个事物,那么它现在想获取这边有两个 DN 里面数据的一个全局的,进而进行一个拍照,那它如果先用自己机器的时间,拿这样的一个时间去往下游的两个 DN 里面去查询,如果我查询的时候是这么个时间点,就把早于这个时间的那些数据快照给我就好了。

那这样就有问题,比如说下游的某个 DN 的机器时间,我就是比你走的快,或者我就是走的比你慢,这个时候就可能会有问题,所以就是最终需要提供一个全局性的大家达成一致的一个授时服务,也就是前面所说的那个组件,它所提供的一个服务叫做 TSO ,简单来说它是系统的一个单点,它能够为系统里面所有的组件来提供一个能够达成一致这样一个时间服务。那引入这一堆概念之后,你会发现这些东西会跟 Binlog 有什么关系呢?

因为在 MySQL 里面所有的变更,最终它会记录在 Binlog 里面。因为分不知事物用了 my SQL 的 xa 这样一个协议的实现。那 xa 这样的一个协议,Binlog 的格式又跟前面所说的那个稍微有点不一样,就是在单机 MySQL 的事物下面,它记录的 Binlog 就是一个事物一个事物一个事物,按照它们事务处理的顺序记住在一个单一的队列里面,但是一旦用了 XA 之后,一个 XA 这样的一个事物,它记录的格式是分成了两阶段,那这样会导致什么问题呢?

如果没有多个事物记录在这个 Binlog 的时候,你会发现他们的每个事物的 p 和 c 可能会交叉,也就是我的 Binlog 里面可能会出现这样的一种场景, p1p2Cc2 甚至有可能会出现 P1p2C1C2 这样的一个状态,这样就会增加生成 Binlog 的一个复杂度,这是第一点;第二点是什么?第二点就是因为在分布式的场景下面一个事物会涉及到多个 DN ,也就是说一个事物的变更被记录在了多个的物理的 Binlog 的流里面,但是最终要对下游提供一个单一的对全局的这样的一个 Binlog 的一个流,那也就意味着需要将同一个分布式事物分布在不同的 DN 上面,那些变更再次把它们收集起来放到一个事物的时间里面,那需要有一个依据来知道这个 DN 物理 Binlog 里面的这一段变更,它和另一个 DN 的另一段 Binlog 里面的物理变更,他们其实是属于同一个分布事物的。

所以这个系统去处理的另外一个复杂的一个事情;第三个事情是什么呢?就是现在这个图里面的最右下角的这张图,这张图是个什么东西呢?它就是现代的分布式系统,分布式数据库通常也会支持一个 online 这样的一个能力,就是可以做一个 dl 的表,比如说结果的一个变更,那目前一个组织的方案就是 Google 的 fone 的方案,也就是所谓的 online change ,这个方案简单来说它有这样的一个特点,它允许你的系统当中同一时间存在两个版本的信息,比如说你现在有一张表,就这张图里面它有 ab 这两列,那过一段时间你想要加个 E ,那它说的事情就是说我允许在我的系统当中,比如说我如果有一个应用的连接到了其中的一个 CN ,它看到了一张表有两列,那另外一个应用然后通过连接到另外一个 cn ,它看到了结果里面有三列,那这样的那事情是允许的,所以那个方案里面讲的就是这种事情,并且不引起任何一致性的一个问题。那么这样就会引入一个新的问题,也就是他如果允许这样的一种方式,那么 DDL 在实现的时候,它会给讲一堆的逻辑,那确实是按照那种方式实现的,但是最终会导致什么样的一种结果呢?

这个结果就是比如说最下面只有两个物理 Binlog 的流,最左边的时候大家那个表结构是一致的,那过一段时间肯定要做 DDL ,因为不同的 DN 的时间不一样, DN 跟 CN 之间的网络交互延迟也不一样,所以收到那个表结构变更这条指令肯定时间是不一样的,也就是开始做这个 ddl 的时间是不一样的,这样就会存在一个同样一张表的关于同一行他在一个 Binlog 里面他说他有两列,在另外一个是不同的行,但是同一张表的数据里面,他说他是有两列,在另外一个他有三列,那我全局 Binlog 最终是要合到一起的,那在这个阶段应该用两列还是三列?所以这就是个问题,这会导致实现还是非常复杂的。那怎么解这些问题呢?

这些问题最终都是可解的,所以今天简单来说一下,把这个问题给简化一下,来看一下站在 Binlog 的角度来看一下整个系统的数据的流。这张图是从左往右看,最主要是 CN ,它接受应用的一些 SQL,然后它先将这些 SQL 经过计算之后,下发到了下游的一个 DN ,那这些 DN 处理完之后就会产生他们的一个物理的 Binlog 。

假设系统当中现在有四个 CN 四个 DN ,那么四个 DN 就会产生四条 Binlog,也就是这张图里面的这个目前的一个状态,又因为前面所说的两阶段提交了那些乱序等等这些问题,所以这四个物理的 Binlog 流里面可能是现在图里面那种状态。首先每一格代表的是一个 XA 事物里面那个变更,那里面的数字代表的是事物的一个 ID,那标成橙色的部分代表了它们的顺序已经乱掉了,对于四条物理的 Binlog 的流,那要怎么去处理它们呢?

那首先第一点是要把它们单个的这四条物理的 Binlog 流里面的顺序给理顺。

 图片8.png

 

比如最上面那条,它是 1233548,那首先我要把“4”的位置给它放对,所以要做的第一件事情就是要把每一个物理 Binlog 流的顺序理顺。这就是到了第二阶段,那每一个流理顺之后,还要做件事情因为刚才说了在单机的 MySQL 里面每一个变更,它是同一个事物的变更,它们是在一起的。在差异里面,它是 p 和 c 这两个阶段,那所以这个时候还要将 xa 里面的事物合并了,之后 but 合成把它转换成一个单机的形态,比如把 PC 变成原来的 MySQL 这个单机的那种形态,这是我做的第二点事情。

然后第三个就是刚才说的 DDO ,因为可能会有所变化,这里面会做一些其他的一些处理,然后最后一个还需要做一些过滤,因为 PolarDB-X 会有一些系统的表,那这个是没必要对下游进行暴露的,另外一个它会有一些内部实现的一些类似于广播表或者是全局索引等等这样的一种功能,这些东西在 DN 里面也都会体现,比如广播表实际上站在我记得视角来看,它就是一张表,但是在实现上它其实分布在了所有的 DN 里面,这样也就会导致广播表里面的一条数据,那会出现在四个点里面,那也就是会出现在这四个的流里面,那其实它是一条数据,那在处理的时候就需要识别出来这样的一种场景,然后把这四条里面的三条给去掉留一条,留着四条其实是一样的。会把内部的一些逻辑给屏蔽掉,那经过这个处理之后,你会发现我得到了四条干净的有序的物理 Binlog 流。

那接下来就是经典的一个规定这样的一个过程,也就是第三阶段,会把这四个物理 Binlog 流按照顺序做一个规定,规定完之后,你会发现他们都会同一个事物就会放到一起,比如说现在这个处理好的流就变成了前三个就是 111,接下来就是 222,然后五个三等等这样的一个顺序,三个 111 其实就是同一个分布式事物分布在了三个物理 Binlog 流,如果它们被归到了一起,那么这个时候其实就是可以进一步的将这三个事物的变更整合成一个,所以接下来会将这个单一的流再做进一步处理

第一个会把分布式事物进行合并,第二个会按照 MySQL 的不是,进行一个录盘那到此终于到了第四阶段就是我终于生成了跟 SQL 编了个格式一样的一个文件,就是在前面的弹幕当中大家看到的那个 log 50 一个文件,他其实跟买这款的文件完全一模一样的,还需要进一步的将这个冰冻文件,也就是单一的时间流分发给下游,也就是在这次的两个弹幕里面,一个是 server 一个是 bicycle 那么,在给下游提供这种增量文件订阅的时候呢,就是 cn 层同样的实现了 mexico 协议同时在很多的一些行为上会跟进行一个兼容,比如说它会去查一下,回去看这个有没有设置好,还有一些其他的一些等等一些跟相关的一些参数甚至不相关的一些参数,然后很多工具都会为它们一个标准的过程,在这些边过订阅的行为当中把这个接着这样就实现了一个给下游提供一个完全兼容的这样的一个过程。

相关文章
|
5月前
|
数据采集 存储 数据处理
数据平台问题之知识管理系统的效果如何评估
数据平台问题之知识管理系统的效果如何评估
|
5月前
|
分布式计算 DataWorks 关系型数据库
MaxCompute 生态系统中的数据集成工具
【8月更文第31天】在大数据时代,数据集成对于构建高效的数据处理流水线至关重要。阿里云的 MaxCompute 是一个用于处理大规模数据集的服务平台,它提供了强大的计算能力和丰富的生态系统工具来帮助用户管理和处理数据。本文将详细介绍如何使用 DataWorks 这样的工具将 MaxCompute 整合到整个数据处理流程中,以便更有效地管理数据生命周期。
164 0
|
5月前
|
存储 SQL 分布式计算
Hadoop生态系统概述:构建大数据处理与分析的基石
【8月更文挑战第25天】Hadoop生态系统为大数据处理和分析提供了强大的基础设施和工具集。通过不断扩展和优化其组件和功能,Hadoop将继续在大数据时代发挥重要作用。
|
5月前
|
分布式计算 搜索推荐 物联网
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
|
5月前
|
人工智能 分布式计算 架构师
大数据及AI典型场景实践问题之基于MaxCompute构建Noxmobi全球化精准营销系统如何解决
大数据及AI典型场景实践问题之基于MaxCompute构建Noxmobi全球化精准营销系统如何解决
|
5月前
|
存储 监控 安全
大数据架构设计原则:构建高效、可扩展与安全的数据生态系统
【8月更文挑战第23天】大数据架构设计是一个复杂而系统的工程,需要综合考虑业务需求、技术选型、安全合规等多个方面。遵循上述设计原则,可以帮助企业构建出既高效又安全的大数据生态系统,为业务创新和决策支持提供强有力的支撑。随着技术的不断发展和业务需求的不断变化,持续优化和调整大数据架构也将成为一项持续的工作。
|
5月前
|
存储 数据可视化 大数据
基于Python Django的大数据招聘数据分析系统,包括数据大屏和后台管理
本文介绍了一个基于Python Django框架开发的大数据招聘数据分析系统,该系统具备后台管理功能和数据大屏展示,利用大数据技术收集和分析招聘市场趋势,帮助企业和招聘机构提高招聘效率和质量。
177 3
|
8月前
|
安全 druid Java
Seata 1.8.0 正式发布,支持达梦和 PolarDB-X 数据库
Seata 1.8.0 正式发布,支持达梦和 PolarDB-X 数据库
642 17
Seata 1.8.0 正式发布,支持达梦和 PolarDB-X 数据库
|
8月前
|
存储 DataWorks 监控
DataWorks,一个 polar db 有上万个数据库,解决方案
DataWorks,一个 polar db 有上万个数据库,解决方案
|
SQL 存储 Web App开发
PolarDB-X 分布式数据库中的外键
外键是关系型数据库中非常便利的一种功能,它通过一个或多个列为两张表建立连接,从而允许跨表交叉引用相关数据。外键通过约束来保持数据的一致性,通过级联来同步数据在多表间的更新和删除。在关系数据库系统中,大多数表都遵循外键的概念。