在分布式系统中,事务的处理分布在不同组件、服务中,因此分布式事务的 ACID 保障面临着一些特殊难点。本系列文章介绍了 21 种分布式事务设计模式,并分析其实现原理和优缺点,在面对具体分布式事务问题时,可以选择合适的模式进行处理。原文: Exploring Solutions for Distributed Transactions (6)
在不同业务场景下,可以有不同的解决方案,常见方法有:
- 阻塞重试(Blocking Retry)
- 二阶段和三阶段提交(Two-Phase Commit (2PC) and Three-Phase Commit (3PC))
- 基于后台队列的异步处理(Using Queues to Process Asynchronously in the Background)
- TCC 补偿(TCC Compensation Matters)
- 本地消息表(异步保证)/发件箱模式(Local Message Table (Asynchronously Ensured)/Outbox Pattern)
- MQ 事务(MQ Transaction)
- Saga 模式(Saga Pattern)
- 事件驱动(Event Sourcing)
- 命令查询职责分离(Command Query Responsibility Segregation, CQRS)
- 原子提交(Atomic Commitment)
- 并行提交(Parallel Commits)
- 事务复制(Transactional Replication)
- 一致性算法(Consensus Algorithms)
- 时间戳排序(Timestamp Ordering)
- 乐观并发控制(Optimistic Concurrency Control)
- 拜占庭容错(Byzantine Fault Tolerance, BFT)
- 分布式锁(Distributed Locking)
- 分片(Sharding)
- 多版本并发控制(Multi-Version Concurrency Control, MVCC)
- 分布式快照(Distributed Snapshots)
- 主从复制(Leader-Follower Replication)
本文将介绍拜占庭容错、分布式锁以及分片三种模式。
16. 拜占庭容错(Byzantine Fault Tolerance, BFT)
- 一种容错机制,允许分布式系统在存在故障节点的情况下正常运行。
- 节点之间相互通信从而就决策达成共识,即使存在某些恶意或错误节点,也可以工作。
- 涉及如下步骤:
- 系统中每个节点向所有其他节点发送消息,其中包含建议的决策或值的信息。
- 每个节点检查接收到的消息的有效性。如果接收到来自故障节点或攻击者的消息,则忽略该消息。
- 每个节点收集接收到的所有有效消息,并创建系统视图(view)。视图是一组特定节点的有效消息,每个节点与系统中的所有其他节点共享视图。
- 每个节点检查从其他节点接收到的视图的有效性。如果接收到无效视图,则忽略该视图。
- 每个节点创建包含其系统视图的证书(certificate),并与所有其他节点共享。证书是经过签名的声明,用于证明视图的有效性。
- 每个节点检查从其他节点收到的证书的有效性。如果收到无效证书,应该忽略该证书。
- 一旦节点验证了收到的所有证书,就可以对提议的决策或值达成共识。每个节点根据收到的证书对相同的值达成共识。
- 一旦达成共识,每个节点执行已达成一致的决策。
from typing import List, Dict, Tuple class Message: def __init__(self, sender: int, content: str): self.sender = sender self.content = content class ByzantineNode: def __init__(self, id: int, network: Dict[int, List[Message]], threshold: int): self.id = id self.network = network self.threshold = threshold self.decisions = {} def send_message(self, receiver: int, content: str): message = Message(self.id, content) self.network[receiver].append(message) def receive_messages(self) -> List[Message]: messages = self.network[self.id] self.network[self.id] = [] return messages def generate_vote(self, messages: List[Message]) -> bool: count = 0 for message in messages: if message.content == 'True': count += 1 elif message.content == 'False': count -= 1 return count >= self.threshold def run_bft(self, decision_content: str): # Phase 1: Broadcast proposal to all nodes proposal = Message(self.id, decision_content) for node_id in self.network: self.send_message(node_id, str(proposal)) # Phase 2: Receive messages and generate votes messages = self.receive_messages() vote = self.generate_vote(messages) # Phase 3: Broadcast decision to all nodes decision = Message(self.id, str(vote)) for node_id in self.network: self.send_message(node_id, str(decision)) # Phase 4: Receive decisions and count votes decisions = [m.content for m in self.receive_messages()] count_true = decisions.count('True') count_false = decisions.count('False') # Record decision if it meets threshold, else record failure if count_true >= self.threshold: self.decisions[decision_content] = True elif count_false >= self.threshold: self.decisions[decision_content] = False else: self.decisions[decision_content] = None
复制代码
示例代码
- 只要故障节点数量小于阈值,该算法就可以容忍故障。
- 由两个类组成:
- Message —— 表示网络中节点之间发送的消息,包含发送者 ID 和消息内容。
- ByzantineNode —— 表示网络节点,包含 ID、网络拓扑、容忍的故障数量阈值,以及存储节点决策的字典。
- ByzantineNode 类提供了几个方法:
- send_message()方法 —— 发送消息到网络中的另一个节点
- receive_messages()方法 —— 检索自上次调用 receive_messages()以来收到的所有消息
- generate_vote()方法 —— 将消息列表作为输入,并根据消息内容生成投票。如果"True"消息的数量大于或等于阈值,则该方法返回 True,否则返回 False。
- run_bft()方法 —— 实现 BFT 算法的 4 个阶段。
- 阶段 1 —— 用 send_message()方法向网络中所有节点广播提案。提案是个 Message 对象,其内容是作为参数传递给 run_bft()方法的 decision_content。
- 阶段 2 —— 用 receive_messages()方法接收来自网络中其他节点的消息。用 generate_vote()方法根据收到的消息生成投票,根据收到的"True"和"False"数量,投票"True"或"False"。
- 阶段 3 —— 用 send_message()方法将决策广播到网络中所有节点。决策是个 Message 对象,其内容为上一阶段生成的投票。
- 阶段 4 —— 计算收到的票数,如果票数达到阈值,则记录决策。使用 receive_messages()方法检索自上次调用 receive_messages()以来收到的所有消息。检查每条消息内容,并计算"True"和"False"消息的数量。如果"True"的数量大于或等于阈值,则将该决策记录为 True。如果"False"的数量大于或等于阈值,则该决策被记录为 False。如果两个条件都不满足,则该决策记录为 None。
优点
- 在分布式系统中容忍一定数量的错误或失败
- 即使存在故障节点或恶意攻击,也能确保分布式系统中所有节点达成一致的决策
- 为用于加密货币和其他应用的区块链网络提供高水平的安全性和弹性
缺点
- 可能需要昂贵的计算,并且需要节点之间有高质量网络通信,否则可能会增加延迟并降低系统性能
- 因为可能需要节点之间的高级协调和通信,因此可能不适合所有类型的分布式系统
- 不能为分布式系统中所有类型的故障或攻击提供完整解决方案
适用场景
- 金融系统 —— 股票交易
- 基础设施系统 —— 电网或运输系统
- 区块链网络 —— 加密货币和其他应用
挑战
- 设计和实现 BFT 系统可能很复杂,并且需要在分布式系统、密码学和安全性方面具有高水平专业知识。
- 确保所有节点都是可信、没有恶意的。
- 在 BFT 系统中实现高性能和低延迟具有挑战性。
17. 分布式锁(Distributed Locking)
- 管理分布式系统中共享资源的访问。
- 保证系统中多个节点不能同时访问或修改相同的资源,避免可能的不一致和数据损坏。
- 涉及以下步骤:
- 节点请求对共享资源加锁。请求包含资源的唯一标识符以及所请求的锁类型(例如,读或写)。
- 锁管理器管理锁,接收请求,并检查资源是否已经锁定。如果资源未被锁定,锁管理器将锁授予请求节点并发送确认。
- 如果资源已经被锁定,锁管理器检查请求节点是否被授权访问该资源。如果该节点已获得授权,锁管理器将该请求添加到资源的挂起请求队列中,并向请求节点发送确认信息。如果该节点未被授权,则锁管理器拒绝该请求并发送拒绝消息。
- 在等待授予锁时,请求节点定期轮询锁管理器以获取锁状态。
- 当节点访问完资源后,通过向锁管理器发送释放请求来释放锁。锁管理器从资源中删除锁,并将锁授予队列中的下一个节点(如果有的话)。
- 如果持有锁的节点发生故障或崩溃,锁管理器将检测到该故障,并代表发生故障的节点释放锁,然后将锁授予队列中的下一个节点(如果有的话)。
- 如果节点请求锁,但没有收到锁管理器的响应,那么假定锁管理器已经失败,并通过选举新的领导节点来接管锁管理器的角色。
from kazoo.client import KazooClient from kazoo.exceptions import LockTimeout import time class DistributedLock: def __init__(self, zk_address, lock_path): self.zk = KazooClient(hosts=zk_address) self.lock_path = lock_path self.lock = None def __enter__(self): self.zk.start() self.lock = self.zk.Lock(self.lock_path) try: self.lock.acquire(timeout=10) except LockTimeout: self.zk.stop() raise Exception("Timeout while waiting for lock") def __exit__(self, exc_type, exc_val, exc_tb): self.lock.release() self.zk.stop() if __name__ == '__main__': zk_address = 'localhost:2181' lock_path = '/my_lock' with DistributedLock(zk_address, lock_path): print("Acquired lock!") time.sleep(10) print("Released lock!")
复制代码
示例代码
- 基于 Apache ZooKeeper 分布式协调服务
- 导入所需的库 —— KazooClient 和 LockTimeout
- KazooClient —— ZooKeeper 的 Python 客户端
- LockTimeout —— 当不能在指定超时内获得锁时引发异常。
- 定义 DistributedLock 类 —— 接受 2 个参数:
zk_address
和lock_path
zk_address
—— ZooKeeper 服务器地址lock_path
—— 锁节点在 ZooKeeper 中的路径- 在
init
方法中初始化 ZooKeeper 客户端,并存储lock_path
,并且将锁变量初始化为 None。 - 在
enter
方法中启动 ZooKeeper 客户端,创建锁对象,并尝试获取锁。如果锁不能在 10 秒内获得,则引发 LockTimeout 异常。 - 在
exit
方法中释放锁并停止 ZooKeeper 客户端。 - 在代码主体部分,用指定的 ZooKeeper 地址和锁路径创建 DistributedLock 类实例。
- 用
with
语句获取锁。 - 当获得锁时,打印一条消息,表示已获得锁,然后 sleep 10 秒,以模拟持有锁时正在完成的一些工作。
- sleep 后,锁会被 with 语句自动释放,打印一条消息,表明锁已被释放。
优点
- 在分布式系统中,通过确保一次只有一个进程可以修改共享资源来维护数据一致性
- 防止多个进程同时访问共享资源,以确保该资源在需要时始终可用
- 允许多个进程跨多个节点访问共享资源
缺点
- 需要在分布式系统中的多个节点之间进行协调
- 在分布式系统中引入延迟并降低性能
- 如果分布式锁定机制失败,可能会导致整个分布式系统失败
适用场景
- 分布式数据库
- 电子商务系统 —— 管理对购物车的访问或防止多个用户同时购买相同的商品
挑战
- 需要公平分配资源,以确保所有进程都能平等访问共享资源
- 不正确实现的分布式锁定可能导致死锁,多个进程等待彼此释放锁
18. 分片(Sharding)
- 用于在多个服务器之间对数据进行水平分区,称为分片。
- 每个分片包含数据的一个子集,所有分片组合起来就构成了完整的数据集。
- 用于提高分布式数据库的可伸缩性、性能和可用性。
- 涉及如下步骤:
- 根据分片键将数据划分为更小的子集。分片键的选择使得数据可以均匀分布在各个分片上,并且可以将查询路由到正确的分片。
- 数据分区后,分片分布在多个服务器上。每个分片一个特定服务器,多个分片可以分配给同一个服务器。
- 当客户端向数据库发送查询时,该查询首先路由到协调器节点。协调器节点负责确定哪个分片包含执行查询所需的数据。
- 一旦协调节点确定了正确的分片,查询将被发送到包含该分片的服务器。服务器执行查询并将结果返回给协调器节点。
- 如果需要来自多个分片的数据完成查询,协调节点将每个分片的结果聚合并将最终结果返回给客户端。
import mysql.connector # Connect to MySQL database mydb = mysql.connector.connect( host="localhost", user="yourusername", password="yourpassword", database="mydatabase" ) # Define sharding rules shard_key = "user_id" num_shards = 4 # Create sharded tables for i in range(num_shards): cursor = mydb.cursor() cursor.execute(f"CREATE TABLE users_{i} (id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255), email VARCHAR(255))") # Insert data into sharded tables users = [ {"id": 1, "name": "John", "email": "john@example.com"}, {"id": 2, "name": "Jane", "email": "jane@example.com"}, {"id": 3, "name": "Bob", "email": "bob@example.com"}, # ... ] for user in users: shard_id = user[shard_key] % num_shards cursor = mydb.cursor() cursor.execute(f"INSERT INTO users_{shard_id} (id, name, email) VALUES (%s, %s, %s)", (user["id"], user["name"], user["email"])) # Query data from sharded tables cursor = mydb.cursor() cursor.execute("SELECT * FROM users_0 UNION SELECT * FROM users_1 UNION SELECT * FROM users_2 UNION SELECT * FROM users_3") users = cursor.fetchall() print(users)
复制代码
示例代码
- 使用 MySQL
- 可能因用例和所使用的数据库系统而异。
- 连接 MySQL 数据库,定义分片规则。在本例中,我们用
user_id
作为分片键,并创建 4 个分片表来存储数据。 - 通过计算基于用户 ID 的分片 ID,并用该 ID 将数据插入到相应的分片表中,从而将数据插入到分片表。
- 使用 UNION 语句查询所有分片表中的数据并打印结果。
优点
- 允许数据库水平扩容,因此随着数据增长,可以向系统添加额外的服务器来处理增加的负载。
- 因为每个服务器只需要搜索较小的数据集,因此可以更快执行查询。
- 如果一台服务器出现故障,只会影响部分数据,系统的其余部分可以继续正常运行。
缺点
- 需要确保对数据进行了正确的分区,并且分片均匀分布在各个服务器上。
- 维护所有分片之间的数据一致性可能具有挑战
- 实现分片需要额外的硬件、软件和维护成本
适用场景
- 适用于高读写负载的大型数据库,可以横向扩展以处理增加的流量。当存在地理或法规限制,需要将数据存储在不同位置时,会非常有用。
- 生成大量数据并需要快速存储和检索的社交媒体平台
- 处理大量交易,需要快速存储和检索数据的电商平台
- 需要快速安全的存储和检索大量患者数据的医疗保健应用
参考文献
Byzantine Fault Tolerance (BFT) | River Glossary
What Is Byzantine Fault Tolerance?
Byzantine Fault Tolerance (BFT) Explained
Understanding Database Sharding
你好,我是俞凡,在 Motorola 做过研发,现在在 Mavenir 做技术工作,对通信、网络、后端架构、云原生、DevOps、CICD、区块链、AI 等技术始终保持着浓厚的兴趣,平时喜欢阅读、思考,相信持续学习、终身成长,欢迎一起交流学习。微信公众号:DeepNoMind