分布式事务的 21 种武器 - 5

简介: 分布式事务的 21 种武器 - 5

在分布式系统中,事务的处理分布在不同组件、服务中,因此分布式事务的 ACID 保障面临着一些特殊难点。本系列文章介绍了 21 种分布式事务设计模式,并分析其实现原理和优缺点,在面对具体分布式事务问题时,可以选择合适的模式进行处理。原文: Exploring Solutions for Distributed Transactions (5)


Duncan Meyer @Unsplash


在不同业务场景下,可以有不同的解决方案,常见方法有:


  1. 阻塞重试(Blocking Retry)
  2. 二阶段和三阶段提交(Two-Phase Commit (2PC) and Three-Phase Commit (3PC))
  3. 基于后台队列的异步处理(Using Queues to Process Asynchronously in the Background)
  4. TCC 补偿(TCC Compensation Matters)
  5. 本地消息表(异步保证)/发件箱模式(Local Message Table (Asynchronously Ensured)/Outbox Pattern)
  6. MQ 事务(MQ Transaction)
  7. Saga 模式(Saga Pattern)
  8. 事件驱动(Event Sourcing)
  9. 命令查询职责分离(Command Query Responsibility Segregation, CQRS)
  10. 原子提交(Atomic Commitment)
  11. 并行提交(Parallel Commits)
  12. 事务复制(Transactional Replication)
  13. 一致性算法(Consensus Algorithms)
  14. 时间戳排序(Timestamp Ordering)
  15. 乐观并发控制(Optimistic Concurrency Control)
  16. 拜占庭容错(Byzantine Fault Tolerance, BFT)
  17. 分布式锁(Distributed Locking)
  18. 分片(Sharding)
  19. 多版本并发控制(Multi-Version Concurrency Control, MVCC)
  20. 分布式快照(Distributed Snapshots)
  21. 主从复制(Leader-Follower Replication)




本文将介绍一致性算法、时间戳排序以及乐观并发控制三种模式。

13. 一致性算法(Consensus Algorithms)

图片来源: https://www.baeldung.com/cs/consensus-algorithms-distributed-systems


  • 系统中的所有节点都同意最终结果或决策。
  • 涉及如下步骤:
  • 提议(Proposal) —— 一个节点向系统中的其他节点提出一个值
  • 广播(Broadcast) —— 将建议的值广播到系统中所有节点
  • 确认(Acknowledgment) —— 每个节点确认提议并向提议者返回确认消息
  • 决定(Decision) —— 一旦提议者收到大多数节点的确认,就可以对提议的值做出决定
  • 承诺(Commitment) —— 系统中所有节点提交决定
  • Paxos 算法还包括以下几个步骤:
  • 准备阶段(Prepare phase) —— 提议者选定提案,并向系统中的大多数节点发送准备请求
  • 承诺阶段(Promise phase) —— 如果某个节点收到的准备消息序列号高于之前看到的序列号,将以承诺响应,并不再接受任何低于该序列号的提案
  • 接受阶段(Accept phase) —— 如果提议者收到大多数节点的承诺,j 就向大多数节点发送带有提案序列号的"接受"请求
  • 已接受阶段(Accepted phase) —— 如果某个节点收到的接受消息序列号高于以前见过的任何接受请求序列号,就接受该提案,并向提议者和所有其他节点发送接受消息
  • 学习阶段(Learn phase) —— 一旦提议者从大多数节点接收到被接受的消息,就可以提交提议。


from typing import List, Tuple
class PaxosNode:    def __init__(self, node_id: int, nodes: List[int]):        self.node_id = node_id        self.nodes = nodes        self.state = "proposed"        self.proposed_value = None        self.accepted_value = None        self.accepted_round = -1        def run_paxos(self, value: int) -> int:        while True:            if self.state == "proposed":                self.proposed_value = value                self.state = "prepare"                        if self.state == "prepare":                max_round, max_val = self.prepare()                if max_val is None:                    self.state = "accept"                else:                    self.state = "proposed"                        if self.state == "accept":                self.accepted_value = self.proposed_value                self.accepted_round = max_round                self.send_accept()                self.state = "decided"                        if self.state == "decided":                return self.accepted_value        def prepare(self) -> Tuple[int, int]:        max_round = -1        max_val = None        for node in self.nodes:            round, val = node.receive_prepare()            if round > max_round:                max_round = round                max_val = val                return max_round, max_val        def send_prepare(self, round: int):        for node in self.nodes:            node.receive_prepare_request(round, self.node_id)        def receive_prepare_request(self, round: int, sender_id: int):        if round > self.accepted_round:            self.accepted_round = round            self.send_prepare(round)        def receive_prepare(self) -> Tuple[int, int]:        return self.accepted_round, self.accepted_value        def send_accept(self):        for node in self.nodes:            node.receive_accept_request(self.accepted_round, self.accepted_value)        def receive_accept_request(self, round: int, value: int):        if round >= self.accepted_round:            self.accepted_round = round            self.accepted_value = value            self.send_accepted()        def send_accepted(self):        for node in self.nodes:            node.receive_accepted(self.accepted_round, self.accepted_value)        def receive_accepted(self, round: int, value: int):        if round == self.accepted_round:            self.proposed_value = value

复制代码


示例代码


  • 示例代码为 Paxos 算法的一种实现,即使存在网络故障或其他可能发生的故障,也能在一组节点之间就某个值达成共识。
  • PaxosNode类表示分布式系统中参与 Paxos 算法的节点。
  • 构造函数接受 2 个参数,node_id是节点的 ID,nodes是系统中所有节点的 ID 列表。
  • run_paxos方法运行 Paxos 算法,接受一个值作为输入,并返回系统中所有节点都同意的值。该方法将无限循环执行,直到商定一个值为止。
  • prepare方法向系统中所有节点发送"prepare"消息,并等待响应。该方法返回系统中其他节点可以接受的最大整数值,如果不接受任何值,则返回None
  • send_prepare方法,当节点想要向系统中的其他节点发送"prepare"消息时,调用该方法。
  • receive_prepare_request方法,当一个节点从另一个节点接收到"prepare"消息时,调用该方法。如果"prepare"消息携带的整数大于该节点接受的整数,则该节点更新其接受的整数,并向系统中其他节点发送"prepare"消息。
  • receive_prepare方法,当节点接收到对"prepare"消息的响应时,调用该方法,返回可接受的整数值。
  • send_accept方法,当节点想要向系统中的其他节点发送"accept"消息时,调用该方法。
  • receive_accept_request方法,当节点接收到来自另一个节点的"accept"消息时,调用该方法。如果"accept"消息携带的整数大于或等于该节点已接受的整数,则该节点更新其接受的整数值,并向系统中其他节点发送"accepted"消息。
  • send_accepted方法,当节点想要向系统中其他节点发送"accepted”消息时,调用该方法。
  • receive_accepted方法,当节点从另一个节点接收到"accepted"消息时,调用该方法。如果"accepted"消息的整数与节点已接受的整数相同,则节点使用"accepted"消息中的值更新其建议值。


优点


  • 所有节点都同意系统的状态
  • 可以容忍某些节点的故障,即使某些节点发生故障,系统也可以继续运行


缺点


  • 算法比较复杂,难以实现
  • 算法执行较慢,可能导致系统延迟增加
  • 算法需要在节点之间通信,增加了网络带宽和处理能力方面的开销


适用场景


  • 确保交易一致和准确的金融系统
  • 确保所有节点具有相同供应链视图的供应链管理系统



14. 时间戳排序(Timestamp Ordering)

图片来源: https://www.geeksforgeeks.org/multiversion-timestamp-ordering


  • 一种用于在分布式系统中对事务排序的共识算法。
  • 每个事务被分配一个时间戳,并且事务按照其时间戳的顺序执行。
  • 涉及如下步骤:
  • 每个节点为接收到的事务生成唯一的时间戳,可以用全局时钟生成时间戳,也可以使用带有某种同步机制的本地时钟生成时间戳。
  • 时间戳由(T, N)组成,其中 T 为时间戳值,N 为生成时间戳的节点标识符。
  • 当节点接收到新事务时,会根据之前接收到的所有事务的时间戳检查该事务的时间戳。如果新事务的时间戳比之前接收到的事务的时间戳都早,则立即执行。如果新事务的时间戳比以前接收到的事务的时间戳都要新,那么将被延迟,直到所有旧事务都执行完为止。
  • 如果两个事务具有相同的时间戳,则使用 tie-breaking 机制来解决冲突。一种可能的 tie-breaking 机制是使用节点标识符作为判断标准,首先执行具有较低节点标识符的事务。
  • 一旦事务被执行,结果就会传播到所有其他节点。
  • 如果某个节点发生故障或断开网络连接,则一旦它重新加入网络,它的事务可以由另一个节点重新执行。事务可以按照时间戳顺序执行,以确保系统状态保持一致。


from typing import List, Tuple
class Timestamp:
    def __init__(self, node_id: int):
        self.node_id = node_id
        self.counter = 0
    def increment(self):
        self.counter += 1
    def __str__(self):
        return f"{self.node_id}:{self.counter}"
class Event:
    def __init__(self, node_id: int, timestamp: Timestamp, data: str):
        self.node_id = node_id
        self.timestamp = timestamp
        self.data = data
    def __str__(self):
        return f"{self.node_id} {self.timestamp} {self.data}"
class Network:
    def __init__(self, nodes: List[int]):
        self.nodes = nodes
        self.message_queues = {node: [] for node in nodes}
    def send(self, sender: int, receiver: int, message: str):
        self.message_queues[receiver].append((sender, message))
    def receive(self, node_id: int) -> Tuple[int, str]:
        if len(self.message_queues[node_id]) > 0:
            return self.message_queues[node_id].pop(0)
        else:
            return None
class Node:
    def __init__(self, node_id: int, network: Network, initial_data: List[str]):
        self.node_id = node_id
        self.network = network
        self.clock = Timestamp(node_id)
        self.queue = []
        for data in initial_data:
            self.queue.append(Event(node_id, self.clock, data))
            self.clock.increment()
    def run(self):
        while True:
            event = self.queue.pop(0)
            print(f"Node {self.node_id} executing event {event}")
            self.clock = max(self.clock, event.timestamp)  # Update local clock

复制代码


示例代码


  • 实现分布式系统的时间戳排序算法
  • 定义了几个类和方法来实现时间戳排序算法的模拟
  • Timestamp类表示时间戳,由node_idcounter组成
  • Event类表示分布式系统中的事件。事件由node_id(生成事件的节点 ID)、timestamp(事件的时间戳)和data(事件的有效负载)组成。
  • Network类表示连接分布式系统中节点的网络
  • Node类代表节点,由node_id(节点 ID)、network(所连接的网络)、clock(本地时间戳)和queue(节点已经生成并等待执行的事件列表)组成。
  • increment()方法增加时间戳的计数器值
  • __str__()方法以"node_id:counter"的格式返回时间戳的字符串表示形式。
  • __str__()方法以"node_id:timestamp data"的格式返回事件的字符串表示形式。
  • send()方法将消息从一个节点发送到另一个节点
  • receive()方法接收给定节点队列中的下一条消息,如果队列中没有消息,则返回None
  • run()方法是节点的主循环,实现为无限循环,按时间戳顺序执行队列中的事件。当某个事件被执行时,节点将其本地时钟更新为其当前时间戳和被执行事件时间戳的最大值。
  • nodes属性是系统中节点的 id 列表
  • message_queues属性将每个节点 ID 映射到字典


优点


  • 所有事件都是有序的,并且在系统中所有节点上以一致的顺序发生
  • 解决并发事件之间的冲突


缺点


  • 过程复杂且资源密集
  • 时钟同步、一致性等问题


适用场景


  • 金融系统: 交易按照正确顺序进行处理,而不管来自哪个节点
  • 供应链管理系统: 跟踪货物流动,确保所有事件都按照正确的顺序处理



15. 乐观并发控制(Optimistic Concurrency Control)
  • 一种在数据库管理系统(DBMS)中用于处理并发访问数据的技术
  • 主要没有冲突,可以允许多个事务并发修改相同的数据项
  • 涉及如下步骤:
  • 开始事务(T1) —— 当事务开始时,读取需要的数据库记录,并将读取的版本记录在其私有工作区中。
  • 修改数据 —— 当事务修改数据时,将修改记录在其私有工作区中,而不更新实际的数据库记录。
  • 事务结束(T1) —— 当事务准备提交时,检查是否有其他事务在读取数据后修改了相同的数据。另外,将其私有工作区中的数据版本与数据库中数据的当前版本进行比较。
  • 验证检查 —— 如果数据库中数据的当前版本与 T1 读取的版本相同,则 T1 可以将其更改提交到数据库。但是,如果数据的当前版本与 T1 读取的版本不同,则意味着 T1 的更改与另一个事务的更改发生了冲突。
  • 回滚 —— 如果 T1 的更改与另一个事务的更改发生了冲突,那么 T1 必须中止并回滚其更改。T1 可以在延迟后重试事务或采取其他适当操作。
  • 提交 —— 如果 T1 的更改不与任何其他事务的更改冲突,那么 T1 可以将其更改提交到数据库,用其私有工作区中所做的修改来更新数据库记录。


from typing import List
class Account:
    def __init__(self, id: int, balance: float):
        self.id = id
        self.balance = balance
        self.version = 0
    def withdraw(self, amount: float):
        self.balance -= amount
        self.version += 1
    def deposit(self, amount: float):
        self.balance += amount
        self.version += 1
class OptimisticConcurrencyControl:
    def __init__(self, accounts: List[Account]):
        self.accounts = accounts
    def transfer(self, sender_id: int, receiver_id: int, amount: float):
        # Find sender and receiver accounts
        sender = next(acc for acc in self.accounts if acc.id == sender_id)
        receiver = next(acc for acc in self.accounts if acc.id == receiver_id)
        # Create copies of the accounts to modify
        sender_copy = Account(sender.id, sender.balance)
        receiver_copy = Account(receiver.id, receiver.balance)
        # Withdraw from sender and deposit to receiver
        sender_copy.withdraw(amount)
        receiver_copy.deposit(amount)
        # Update the global accounts list if there are no conflicts
        for i, acc in enumerate(self.accounts):
            if acc.id == sender_id:
                if acc.version != sender.version:
                    raise Exception("Optimistic Concurrency Control failed")
                self.accounts[i] = sender_copy
            elif acc.id == receiver_id:
                if acc.version != receiver.version:
                    raise Exception("Optimistic Concurrency Control failed")
                self.accounts[i] = receiver_copy

复制代码


示例代码


  • Account类表示包含 ID、余额和版本号的银行帐户,版本号用于跟踪帐户更新的次数。
  • OptimisticConcurrencyControl类将帐户列表作为输入。
  • transfer方法的输入为发送方和接收方账户 id 以及要转账的金额。
  • withdrawdeposit方法,修改帐户余额并增加版本号。
  • 将全局帐号列表中帐号的版本号与发送方、接收方帐号的版本号进行比较,检查更新帐号是否存在冲突,如果存在冲突就会引发异常。如果没有冲突,用修改后的帐户更新全局帐户列表。在发生冲突的情况下,可以重试事务,或者通知用户手动解决冲突。


优点


  • 不需要锁或复杂的数据结构
  • 事务可以同时修改相同的数据,因此可以支持高并发性
  • 事务不需要等待锁被释放


缺点


  • 如果多个事务频繁修改相同的数据项,将造成大量修改失败并回滚
  • 需要额外的处理开销来比较数据版本并检查冲突
  • 不提供任何关于事务提交顺序的保证


适用场景


  • 电子商务应用 —— 大量用户同时访问同一个数据库
  • 银行和金融应用 —— 大量交易同时发生


挑战


  • 不正确的实现将导致数据不一致
  • 需要处理冲突和回滚的机制,会增加应用程序的复杂性



参考文献

What is a consensus algorithm?


Consensus Algorithms in Blockchain


How Many Consensus Algorithms Are There? An Overview


Analysis of the Blockchain Consensus Algorithms


Consensus Algorithms Distributed Systems


Multiversion Timestamp Ordering


DBMS Timestamp Ordering Protocol


Timestamp-based Concurrency Control


Timestamp Ordering Protocol in DBMS


Timestamp-based Ordering Protocol in DBMS


What is an optimistic concurrency control in DBMS


Optimistic vs Pessimistic Concurrency: What Every Developer Should Know


Dealing with Optimistic Concurrency Control Collisions




你好,我是俞凡,在 Motorola 做过研发,现在在 Mavenir 做技术工作,对通信、网络、后端架构、云原生、DevOps、CICD、区块链、AI 等技术始终保持着浓厚的兴趣,平时喜欢阅读、思考,相信持续学习、终身成长,欢迎一起交流学习。

微信公众号:DeepNoMind

目录
相关文章
|
6月前
|
Java 数据库连接 数据库
数据安全之舞:Spring事务处理的实用指南与技术要点
数据安全之舞:Spring事务处理的实用指南与技术要点
49 0
数据安全之舞:Spring事务处理的实用指南与技术要点
|
6月前
|
存储 Java 应用服务中间件
【分布式技术专题】「架构实践于案例分析」盘点互联网应用服务中常用分布式事务(刚性事务和柔性事务)的原理和方案
【分布式技术专题】「架构实践于案例分析」盘点互联网应用服务中常用分布式事务(刚性事务和柔性事务)的原理和方案
183 0
|
存储 算法 关系型数据库
分布式事务的 21 种武器 - 6
分布式事务的 21 种武器 - 6
45 0
|
消息中间件 存储 供应链
分布式事务的 21 种武器 - 2
分布式事务的 21 种武器 - 2
36 0
|
存储 消息中间件 设计模式
分布式事务的 21 种武器 - 3
分布式事务的 21 种武器 - 3
59 0
|
SQL 存储 监控
分布式事务的 21 种武器 - 4
分布式事务的 21 种武器 - 4
41 0
|
存储 消息中间件 算法
分布式事务的 21 种武器 - 1
分布式事务的 21 种武器 - 1
42 0
|
存储 设计模式 人工智能
分布式事务的 21 种武器 - 7
分布式事务的 21 种武器 - 7
52 0
分布式事务,阿里为什么钟爱TCC
分布式事务,阿里为什么钟爱TCC
522 1
分布式事务,阿里为什么钟爱TCC
|
存储 调度
【攻破技术盲点】一起学习和巩固TCC分布式事务模型
【攻破技术盲点】一起学习和巩固TCC分布式事务模型
166 0
【攻破技术盲点】一起学习和巩固TCC分布式事务模型