本节书摘来自华章出版社《云数据管理:挑战与机遇》一书中的第二章,第2节,作者[美] 迪卫艾肯特·阿格拉沃尔(Divyakant Agrawal) 苏迪皮托·达斯(Sudipto Das)阿姆鲁·埃尔·阿巴迪(Amr El Abbadi) 更多章节内容可以访问云栖社区“华章计算机”公众号查看。
‖第2章
分布式数据管理
云计算建立在过去几十年计算机科学领域,尤其是在分布式计算和分布式数据管理领域积累的重要概念、协议和模型的基础上。本章主要讨论分布式系统和数据管理的基本背景,其构成了云数据库系统的基础。我们的主要目标是为读者提供足够的背景知识,以帮助读者理解后面章节的内容。对这些内容比较熟悉的读者可以直接跳过这些部分。我们同时也为读者提供了一些关于分布式数据库系统的参考资料[Gray and Reuter,1992,2.1 分布式系统
我们首先介绍分布式系统的一些重要基本概念,这些基本概念也是与云计算和数据中心有关的相关概念和协议的重要基础。简单来说,分布式系统就是一些独立的计算进程或处理器(常称作节点)的集合,这些节点基于消息传递机制,通过通信网络相互通信。这意味着节点上的进程没有共享内存,拥有独立的故障模型,不共享相同的时钟。节点可能会因系统崩溃、停止运行、甚至人为恶意破坏而失效。网络可能会出现连接故障。一般情况下,系统也可能出现分区失效,也就是说,系统被划分成若干个子分区,单个子分区内部的节点之间可以相互通信,但是不同分区之间的节点之间无法通信。分区失效的原因可能包括由于网关故障而引起的连接故障和节点故障。
分布式系统也可以分为同步系统和异步系统。在异步分布式系统中,消息传递的时间、处理器处理时间和本地时钟漂移时间的界限是未知的。在同步系统中,这些界限都是已知的,因此,可以利用超时来进行故障检测,在必要的情况下,也可以执行相应的操作。
2.1.1 逻辑时间和Lamport时钟
Lamport于1978年在他的一篇代表性论文里提出了一个简单的分布式系统模型[Lamport, 1978]。该模型中,进程被建模成一个全序事件的序列。事件分为本地(local)事件、发送(send)事件和接收(receive)事件。发送事件负责发送消息,该消息由相应的接收事件接收。本地事件是非通信事件,如,内存读写、矩阵相乘等。图2-1展示了一个包括4个进程(p1、p2、p3和p4)的分布式系统示例。事件e2和e4在进程p1上执行,事件e1、e3和e9在进程p2执行,等等。事件e3是进程p2上的本地事件,而事件e1是一个发送事件,e2是相应的接收事件。
若两个事件e和f满足下列任一条件,则事件e发生在事件f之间,记作e→f:
1. 如果e和f是发生在同一进程内的两个事件,并且e发生在f之前,那么e→f;
2. 如果e代表了某个进程的消息发送事件send(m),f代表另一进程中针对这同一个消息的接收事件receive(m),那么e→f;
3. 如果存在一个事件g,满足e→g并且g→f,那么e→f。
“发生在前”(happens-before)关系可以很好地反映任意两个事件之间的潜在因果依赖关系。并且,如果两个事件e和f既不存在e→f关系,也不存在f→e关系,那么e和f是并发的。在图2-1中,事件e4发生在事件e6之前,而事件e3与事件e2和e4都是并发的。
时间概念以及时间与事件之间的关系对很多分布式系统协议来说都是至关重要的。一般情况下,不一定需要实时时钟或近似实时时钟,只要有一个时间概念能够捕获潜在的因果关系就足够了。Lamport引入了一种可以捕获事件之间的潜在因果关系的逻辑时钟概念。逻辑时钟为每一个事件e赋一个值clock(e),因此,对任意两个事件e和f,存在如下关系:
如果e→f,那么clock(e)<clock(f)。
为了能够实现这种逻辑时钟,Lamport为每一个进程设置了一个时钟计数器。该计数器在同一进程中的任意两个事件之间都必须是递增的,并且,每一个消息都携带了发送者的时钟值。当消息到达目的地之后,本地时钟计数器被设置为本地值的最大值,同时消息的时间戳加1。这种实现方式可以满足上述逻辑时钟的条件。
在图2-2中,使用与图2-1相同的例子,为系统中的所有事件都赋一个逻辑时间。
图2-2 Lamport时钟
因为“发生在前”关系是一个偏序,因此,多个事件可能被赋值相同的逻辑时钟。但是,在很多协议中,为每一个事件赋一个唯一的时间值更为方便。这种情况下,为了打破这种关系,时间值可以设置为<t, p>,其中,t是本地时钟计数器设置的逻辑时间,p是事件执行所在进程的进程标识。一般情况下,每一个进程都被赋值一个唯一的全序的进程标识,这些进程标识可以打破具有相同逻辑时间的事件之间的关系。
2.1.2 向量时钟
逻辑时钟可以捕获潜在的因果关系,但是,这并不意味着一定有因果关系,逻辑时钟条件只是一个必要条件,并不是充分条件。分布式系统中的所有事件可能需要一个更强的时钟条件:
e→f当且仅当clock(e)<clock(f)。
该条件可按如下方式实现:为每一进程i赋一个长度为n的向量Vi,n是系统中所有进程的数量。每一个执行的事件都被赋一个本地向量。
每个向量都初始化为0,即:Vi[j] = 0,其中i, j = 1, …, N。进程i在每一个事件之前增加本地向量元素的值,Vi[j] = Vi[j] +1。当进程i发送消息的时候,会将本地向量Vi和消息一起发送。当进程j接收消息时,会将接收向量和本地向量的元素逐个进行比较,并将本地向量设置为两者之中较大的值,Vj[i] = max(Vi[i], Vj[i]), i = 1, …, N。
给定两个向量V和V',V=V'当且仅当V[i] = V'[i], i = 1, …, N,并且V≤V'当且仅当V[i]≤V'[i], i = 1, …, N。如果至少存在一个j(1≤j≤N),使得V[j]<V'[j],并且,对所有的i≠j,其中,1≤i≤N,V[i]≤V'[i],则V<V'。对任意两个事件e和f,e→f当且仅当V(e)<V(f)。如果既不满足V(e)<V(f),又不满足V(f)<V(e),那么两个事件是并发的。
虽然向量时间可以准确地捕获因果关系,但是向量的大小是网络大小的函数,可能非常大,并且每一个消息都需要携带额外的向量。
2.1.3 互斥和仲裁集
互斥是并发进程访问共享资源时涉及的一个基本概念。互斥是操作系统中的一个重要操作,后来也被扩展到数据库中。互斥可以按照如下方式进行定义:给定一个进程集合和一个单独的资源,开发一种协议,该协议可以确保在同一时间,一个资源只能被一个进程进行排他性访问。针对集中式系统和分布式系统都已经提出了多种解决方案。针对分布式互斥问题的一种简单的集中式解决方案可以设计如下:指定一个进程为协调者,当进程需要访问资源时,发送一个请求消息给协调者。协调者维护一个等待请求队列。当协调者接收一个请求消息时,检查该队列是否为空,如果队列为空,协调者就为请求客户端发送一个回复消息,请求客户端就可以访问共享资源。否则,请求消息就被添加到等待请求队列中。进程在共享资源上执行完成以后,向协调者发送一个释放消息。接收到释放消息以后,协调者从队列中移除请求,然后为其他等待的请求检查队列。该协议已经被Lamport[1978]扩展成分布式协议,很多其他研究人员对该协议进行了优化。
该基本协议的普遍应用需要系统中所有进程的参与。为了克服障碍,Gifford提出了仲裁集的概念。比较重要的发现是任意两个请求都应该有一个共同的进程来充当仲裁者。假定进程pi(pj)从集合qi(qj)中请求许可,其中qi和qi是仲裁集,也可以是系统中所有进程的子集。qi和qj的交集不能为空。例如,包括系统中大部分进程的集合就可以构成一个仲裁集。使用仲裁集,而非系统中的所有进程,基本协议仍然有效,但是有可能出现死锁[Maekawa, 1985]。图2-4a展示了一个包含7个进程的系统,任意一个大于等于4的集合和另外一个大于等于4的集合一定相交,即对于任意两个仲裁集, 每一个仲裁集都包含大部分站点,它们的交集一定是非空的。
在数据库中,仲裁集的概念可以理解成基本的读、写操作,读操作不需要互斥。然而,多个读操作虽然可以并发执行,但是,针对数据的写操作仍需要互斥访问。因此,设计了两种仲裁集:读仲裁集和写仲裁集,其中,两个写仲裁集之间的交集不能为空,一个读仲裁集和一个写仲裁集之间的交集也不能为空,针对两个读仲裁集的交集没有强制性要求。图2-4b展示了一个包含6个进程的系统,写仲裁集是大小为4的任意集合,读仲裁集是大小为3的任意集合。需要注意的是,任意读仲裁集和写仲裁集必须相交,任意两个写仲裁集也必须相交。但是,读仲裁集之间不一定相交,因此,多个读操作可以并行执行。
a)互斥仲裁集 b)读写仲裁集
2.1.4 领导者选举
很多分布式算法都需要一个进程来充当协调者,然而,实际当中选择哪个进程作为协调者通常并不重要。该问题通常被称为领导者选举(leader election),其关键在于要确保一个唯一的协调者被选中。该协议非常简单,通常要求每个进程有一个进程编号,所有的进程编号都是唯一并且完全排序的。我们使用具有代表性的Bully算法(Bully Algorithm [Garcia-Molina, 1982])来对该协议进行举例,该算法假设通信是可靠的。其核心思想是努力选择具有最大进程编号的进程。任何一个进程,如果该进程刚从故障中恢复,或者该进程怀疑当前的协调者失效了,都可以发起新的选举。有三类消息可以使用:election、ok和I won。
进程可以同时发起选举。发起进程p向所有拥有较高ID的进程发送一个election消息,并等待ok消息。如果没有收到任何ok消息,则p成为协调者,并向所有拥有较低ID的进程发送I won消息。如果该进程收到ok消息,则退出并等待I won消息。如果一个进程收到了election消息,可以返回。一个ok消息,并发起一个新的选举。如果进程收到了一个I won消息,则发送者就是协调者。很容易证明Bully算法的正确性。选举协议也可以利用逻辑通信结构或者覆盖网络(如环)来实现。Chang and Roberts [1979]设计了这种协议,该协议把所有的节点组织成一个逻辑环,其中每一个进程都知道它的近邻,目的也是选择具有最大ID的进程作为协调者。一个进程如果刚刚恢复或者检测到协调者失效,可以发起新的选举。该进程按顺序对后继节点进行询问,直到发现活动节点,就把election消息发送给下游最近的活动节点。每一个接收到election消息的进程把自己的ID添加到该消息中并顺着环继续传递。一旦消息返回到发起者,就选择具有最大ID的节点作为领导者并顺着环发布一个特殊的coordinator消息。注意,多个选举可以并发执行。
2.1.5 基于广播和多播的组通信
如果数据被复制到多个节点上进行存储,数据更新操作需要发送给所有的副本。广播或多播操作是一种简单的通信原语。一般来说,广播方式把同一条消息发送给系统中的所有站点,而多播只发送给部分站点。不失一般性,我们用多播来表示发送信息到特定的节点集合。下面将介绍已经提出的多种不同的原语,这些原语已经在分布式系统和数据中心等不同场景中得到了应用。
FIFO多播或发送者有序的多播:消息按照被发送的顺序传输(单个发送者)。
因果序多播:如果发送m1和m2两个消息,并且m1的发送事件在m2的发送事件之前发生,那么在所有相同的目的地上,m1都必须先于m2传输。
全序(或原子)多播:所有消息都以相同的顺序发送给接收者。
实现不同多播协议的关键在于如何设计一种方法从而保证顺序一致性约束。假设底层网络只支持点对点通信,不支持任何多播原语。另外,需要把网络中消息的接收者和应用层中消息的实际传输者进行区分。接收到一条消息之后,该消息被插入到队列中,当序列条件满足时,消息才能开始传输。下面将对实现这些原语的协议进行描述。图2-5展示了一个包含3个因果相关多播e1、e2和e3的示例。如果这些多播都是因果相关多播,那么,部分消息的传输就需要推迟,直到因果序条件得到满足以后才能继续传输。例如,虽然进程r接收到e2的时间比e1的接收者时间早,但是因为e1发生在e2之前,所以,必须等到r对e1完成接收和传输之后才能对e2开始传输。同样,e3必须等到e1和e2传输完成之后才能开始传输。再看另外一个例子,图2-6也包含了3个多播e1、e2和e3。尽管e1和e2不是因果相关,并且是从不同的进程p和q发出的,如果它们是全序多播的话,那么所有的站点都要按照相同的顺序进行传输,而与它们的接收顺序无关。例如,虽然进程r接收e2的时间比接收e1的时间早,而在进程s中该顺序刚好相反,但是,所有的站点都必须按照相同的顺序来传输这两个多播,比如先传输e2,再传输e1。需要说明的是,即使发送操作是因果相关的,全序也不需要一定要满足因果序。例如,e2和e3是因果相关的,并且e2发生在e3之前,但是所有的进程仍可能是先传输e3,再传输e2。
FIFO多播可以用一种类TCP传输协议来简单地实现,即消息发送者可以设置一个有序的消息标识符,任意一条消息在其之前的消息完成接收和传输之前都需要等待。如果有消息丢失了,接受者可以向发送者请求丢失的消息。
因果多播可以通过如下方式来实现:要求每一个广播消息都携带所有因果前置消息。在传输之前,接受者必须通过传输任何丢失的因果前置消息来确保因果关系。但是,这种协议的开销非常大。还有另外一种可供选择的协议(ISIS [Birman, 1985]),该协议使用向量时钟来延迟消息的传输,直到所有的因果前置消息都被传输完成。每一个进程负责维护一个长度为n的向量时钟V,n是系统中节点的数量。V的元素被初始化为0。当节点i发送一个新的消息m时,对应节点i的元素值就加1。每一个消息都与发送者的本地向量组合在一起。当节点发送消息时,该节点需要利用如下方式对其向量进行更新:选择本地向量和随消息到达的向量之间的元素的较大值来更新。节点i利用向量VT发送消息m,如果向量VT中与发送者相对应的元素刚好比接收端本地向量中的发送者元素大1(即是下一条消息),并且,本地向量的所有元素都大于等于VT中的对应元素,那么接收者就接收到了所有的因果前置消息。
全序多播可以通过集中式方法来实现,例如固定的协调者(使用在Amoeba [Kaashoek et al., 1989]中),或者移动令牌等[Défago et al., 2004]。另外,ISIS [Birman, 1985]也提出了分布式协议。在ISIS分布式协议中,所有进程通过三轮来对序号(或优先级)达成一致意见。发送者将具有唯一标识符的消息m发送给所有接收者。接受者会建议一个优先级(序号),并把建议的优先级反馈给发送者。发送者收集完所有的优先级建议,并确定一个最终的优先级(通过进程编号打破关系),然后针对消息的重新发送最终达成一致意见的优先级。最后,接收者再按照最终的优先级来传输消息m。
2.1.6 一致性问题
一致性是一个基本的分布式系统问题,在出现故障的情况下,需要多个步骤来达成一致[Pease et al., 1980]。该问题经常出现在如下场景中:通信是可靠的,但是由于系统崩溃或认为恶意破坏等原因(即未按照指定的协议或代码进行响应),站点可能会失效。一般而言,该问题可以使用一个单独的协调者,或称general,协调者给n-1个参与者发送一个二进制值,并满足下列条件:
一致:所有参与者都认同一个值。
正确:如果general是正确的,那么每一个参与者都认同general发送的值。
接下来介绍两个不可能出现的结果。在异步系统中,如果进程由于崩溃而失效,并且进程是通过消息传递来进行通信的,Fischer et al. [1983, 1985]证明一致性是不可能解决的。另一方面,在一个存在恶意故障的同步系统中,Dolev [1982] 证明了如果一个系统的进程数量小于3f+1,其中,f是故障(恶意)进程的最大值,那么该系统也无法解决一致性问题。
已经有多种协议可以用来解决同步系统和异步系统中的一致性问题。同步系统需要指定恶意故障站点的最大数量的上界,如三分之一。另一方面,异步系统可能无法确保系统能够终止。近来,Lamport [1998, 2001]为异步系统开发的Paxos协议广受欢迎。抽象地讲,Paxos是一个以领导者为基础的(leader-based)的协议,每一个进程都可以估计当前的领导者是谁。当一个进程希望在某个值上达成一致时,进程就把该值发送给领导者。领导者对操作进行排序并通过一致性算法来实现一致。通常情况下,该协议经历两个阶段。在每一个阶段,领导者会与大部分站点进行联系,往往会有多个并发的领导者。用投票来区分不同领导者提供的值。两个阶段的具体过程可以总结如下:第一阶段,又称为准备阶段,认为自己是领导者的节点可以选择一个新的唯一的投票号码,并把该号码发送给所有的站点,然后等待来自大部分站点的较小的投票号码的结果。第二阶段,又称接受阶段,领导者根据自己的投票号码建议一个值。如果领导者能够获得大多数支持,那么该值就会被接受,其他站点也会用对应的投票号码对该值进行判断。图2-7展示了基于Paxos协议的不同进程之间的通信模式。
2.1.7 CAP理论
Brewer[2000]提出了下列理论,后来由Gilbert and Lynch[2002]加以证明:一个分布式共享数据系统最多同时满足下列三个属性中的两种:
一致性(C)
可用性(A)
网络分区容忍性(P)
该理论就是著名的CAP理论。一般情况下,大规模云数据中心的分布式系统需要支持分区,以便能够处理大规模操作。此时,在进行网络划分的过程中,根据CAP理论的要求,就需要在一致性和可用性之间做出选择。传统的数据库系统选择一致性,而一些最新出现的数据存储系统,如键-值存储系统,比较偏爱可用性。Brewer[2012]对CAP理论的其他分支进行了评估,并对该理论中的任意两个方面的细微差别进行了详细描述。在分区故障不经常出现的情况下,可以设计一种大部分时间内兼顾一致性和可用性的系统。但是,当分区故障发生时,就需要采取一定的策略去检测分区,并开发最合适的策略对这种情况加以处理。另一个需着重强调的重要方面是延迟与分区之间的重要关系,分区归因于超时,因此,从使用的观点来看,分区故障是有时间限制的。Gilbert and Lynch [2012]对该问题进行了进一步的详细描述,CAP理论被认为是对不可靠分布式系统中安全性和活跃性之间进行均衡的一种描述,这与出现故障的异步系统中不可能存在分布式一致性有密切关系[Fischer et al., 1983]。