# 如何实现一个 Paxos

Paxos 作为一个经典的分布式一致性算法(Consensus Algorithm)，在各种教材中也被当做范例来讲解。但由于其抽象性，很少有人基于朴素 Paxos 开发一致性库，而 RAFT 则是工业界里实现较多的一致性算法，RAFT 的论文可以在下面参考资料中找到（In Search of an Understandable Consensus Algorithm），RAFT 通过引入强 leader 角色，解决了 Paxos 算法中很多工程实现难题，同时引入了日志+状态机的概念，将多节点同步进行了高度抽象，解决了很多问题。这里我之所以反其道而行之，选择 Paxos 进行实现，主要是因为：

• Paxos 开源实现较少，经典，各种定义高度抽象（适合作为通用库），挑战性强
• 正确性不依赖 leader 选举，适合快速写入节点切换（抢主），本实现里，单paxos group，3节点本地回环内存存储，3节点并发写性能16k/s，10ms leader lease优化43k/s（MBP13 2018下测试）
• 实现限制少，扩展性强

# 有了 Paxos 可以干什么

Paxos 如此知名，写了个库可以干些啥炫酷的事情呢？

• 强一致性，保证各个节点的数据都是一样的，及时并发地在多个节点上做写操作
• 高可用性，例如3节点的 Paxos 系统，可以容忍任何一个节点挂掉，同时继续提供服务

# 代码地址

Talk is cheap, show me the code.

zpaxos github 仓库

# Paxos 算法基础

## 算法目标

A consensus algorithm ensures that a single one among the proposed values is chosen

• Only a value that has been proposed may be chosen,
• Only a single value is chosen, and
• A process never learns that a value has been chosen unless it actually has been.

## 算法实现

Phase 1. (prepare)

• A proposer selects a proposal number n and sends a prepare request with number n to a majority of acceptors.
• If an acceptor receives a prepare request with number n greater than that of any prepare request to which it has already responded, then it responds to the request with a promise not to accept any more proposals numbered less than n and with the highest-numbered proposal (if any) that it has accepted.

Phase 2. (accept)

• If the proposer receives a response to its prepare requests (numbered n) from a majority of acceptors, then it sends an accept request to each of those acceptors for a proposal numbered n with a value v, where v is the value of the highest-numbered proposal among the responses, or is any value if the responses reported no proposals.
• If an acceptor receives an accept request for a proposal numbered n, it accepts the proposal unless it has already responded to a prepare request having a number greater than n.

### 基类Cbase

base.h 定义了算法中所需要的实体，主要包括，投票 ballot_number_t，值 value_t，acceptor 状态 state_t，角色间传递的消息 message_t

struct ballot_number_t final {
proposal_id_t proposal_id;
node_id_t node_id;
};

struct value_t final {
state_machine_id_t state_machine_id;
utility::Cbuffer buffer;
};

struct state_t final {
ballot_number_t promised, accepted;
value_t value;
};

struct message_t final {
enum type_e {
noop = 0,
prepare,
prepare_promise,
prepare_reject,
accept,
accept_accept,
accept_reject,
value_chosen,
learn_ping,
learn_pong,
learn_request,
learn_response
} type;

// Sender info.
group_id_t group_id;
instance_id_t instance_id;
node_id_t node_id;

/**
* Following field may optional.
*/

// As sequence number for reply.
proposal_id_t proposal_id;

ballot_number_t ballot;
value_t value;

// For learner data transmit.
bool overload; // Used in ping & pong. This should be consider when send learn request.
instance_id_t min_stored_instance_id; // Used in ping and pong.
std::vector<learn_t> learn_batch;
std::vector<Csnapshot::shared_ptr> snapshot_batch;
};

template<class T>
class Cbase {
// Const info of instance.
const node_id_t node_id_;
const group_id_t group_id_;
const write_options_t default_write_options_;

std::mutex update_lock_;
std::atomic<instance_id_t> instance_id_;

Cstorage &storage_;
Ccommunication &communication_;
CpeerManager &peer_manager_;

bool is_voter(const instance_id_t &instance_id);
bool is_all_peer(const instance_id_t &instance_id, const std::set<node_id_t> &node_set);
bool is_all_voter(const instance_id_t &instance_id, const std::set<node_id_t> &node_set);
bool is_quorum(const instance_id_t &instance_id, const std::set<node_id_t> &node_set);

int get_min_instance_id(instance_id_t &instance_id);
int get_max_instance_id(instance_id_t &instance_id);
void set_instance_id(instance_id_t &instance_id);

bool get(const instance_id_t &instance_id, state_t &state);
bool get(const instance_id_t &instance_id, std::vector<state_t> &states);
bool put(const instance_id_t &instance_id, const state_t &state, bool &lag);
bool next_instance(const instance_id_t &instance_id, const value_t &chosen_value);
bool put_and_next_instance(const instance_id_t &instance_id, const state_t &state, bool &lag);
bool put_and_next_instance(const instance_id_t &instance_id, const std::vector<state_t> &states, bool &lag);

bool reset_min_instance(const instance_id_t &instance_id, const state_t &state);

bool broadcast(const message_t &msg,
bool send(const node_id_t &target_id, const message_t &msg);
};

### Proposer 角色 Cpropose

proposer.h 负责实现 Paxos 算法中的 proposer 的行为，包括提出决议，处理 acceptor 回复的消息等。

on_prepare_reply 处理 acceptor 返回 prepare 流程的响应，相对于 Paxos 论文中的描述，这里需要对消息做详细的检测，判断是当前上下文中需要处理的消息后，加入到响应统计集合中，最后根据多数派原则，做出进一步判断，是放弃还是继续进入下一步 accept 流程。

response_set_.insert(msg.node_id);

if (message_t::prepare_promise == msg.type) {
// Promise.
promise_or_accept_set_.insert(msg.node_id);
// Caution: This will move value to local variable, and never touch it again.
update_ballot_and_value(std::forward<message_t>(msg));
} else {
// Reject.
reject_set_.insert(msg.node_id);
has_rejected_ = true;
record_other_proposal_id(msg);
}

if (base_.is_quorum(working_instance_id_, promise_or_accept_set_)) {
// Prepare success.
can_skip_prepare_ = true;
accept(accept_msg);
} else if (base_.is_quorum(working_instance_id_, reject_set_) ||
base_.is_all_voter(working_instance_id_, response_set_)) {
// Prepare fail.
state_ = proposer_idle;
last_error_ = error_prepare_rejected;
notify_idle = true;
}

on_accept_reply 处理 acceptor 返回 accept 流程的响应，这里根据 Paxos 中描述，通过多数派原则，判断该提案是否被最终通过，如果通过，则进入 chosen 流程，广播确定的值。

response_set_.insert(msg.node_id);

if (message_t::accept_accept == msg.type) {
// Accept.
promise_or_accept_set_.insert(msg.node_id);
} else {
// Reject.
reject_set_.insert(msg.node_id);
has_rejected_ = true;
record_other_proposal_id(msg);
}

if (base_.is_quorum(working_instance_id_, promise_or_accept_set_)) {
// Accept success.
chosen(chosen_msg);
chosen_value = value_;
} else if (base_.is_quorum(working_instance_id_, reject_set_) ||
base_.is_all_voter(working_instance_id_, response_set_)) {
// Accept fail.
state_ = proposer_idle;
last_error_ = error_accept_rejected;
notify_idle = true;
}

### Acceptor 角色 Cacceptor

acceptor.h 负责实现 Paxos 算法中 acceptor 的行为，处理 proposer 的请求，同时进行持久化、推高 log instance id 等。同时 Cacceptor 还有个重要使命，就是在初始化时候，加载已有的状态，保证 promise 的状态以及 accept 的值。

on_prepare 对应收到 prepare 请求后的处理，针对提案投票号，决定返回消息，及 promise 状态持久化。

if (msg.ballot >= state_.promised) {
// Promise.
response.type = message_t::prepare_promise;
if (state_.accepted) {
response.ballot = state_.accepted;
response.value = state_.value;
}

state_.promised = msg.ballot;

auto lag = false;
if (!persist(lag)) {
if (lag)
return Cbase<T>::routine_error_lag;

return Cbase<T>::routine_write_fail;
}
} else {
// Reject.
response.type = message_t::prepare_reject;
response.ballot = state_.promised;
}

on_accept 对应处理收到的 accept 请求的处理，根据自身状态和提案号，决定是更新当前状态还是返回拒绝，最终将合适的 accept 状态和 value 持久化。

if (msg.ballot >= state_.promised) {
// Accept.
response.type = message_t::accept_accept;

state_.promised = msg.ballot;
state_.accepted = msg.ballot;
state_.value = std::move(msg.value); // Move value to local variable.

auto lag = false;
if (!persist(lag)) {
if (lag)
return Cbase<T>::routine_error_lag;
return Cbase<T>::routine_write_fail;
}
} else {
// Reject.
response.type = message_t::accept_reject;
response.ballot = state_.promised;
}

on_chosen 是处理 proposer 广播的对应值确定的消息，经过判别后，会推高当前 log instance id，让当前节点进入下一个 value 的判断（multi-paxos 的逻辑）。

if (base_.next_instance(working_instance_id_, state_.value)) {
chosen_instance_id = working_instance_id_;
chosen_value = state_.value;
} else
return Cbase<T>::routine_error_lag;

# Paxos 算法进阶

## Multi-Paxos

typedef uint64_t instance_id_t; // [0, inf)

instance_id_t 0 1 2 3 ... inf
value_t a=1 b=2 b=a+1 a=b+1 ...
Paxos prepare
accept
prepare
accept
prepare
accept
prepare
accept
...

Key to the efficiency of this approach is that, in the Paxos consensus algorithm, the value to be proposed is not chosen until phase 2. Recall that, after completing phase 1 of the proposer’s algorithm, either the value to be proposed is determined or else the proposer is free to propose any value.

• value可以不仅仅是一个值，而是一个序列的值（把这些序列看成一个整套，理解为一个大值，花了多次网络进行传输），在复用 proposer id 的情况下，可以多次走 phase 2 accept 流程，实现序列值的提交
• 该优化没有打破paxos的假设及要求，因此 leader 并不是 multi-paxos 的必须项
• 该连续流程随时能被更高的 proposer id 打断（理解为新值的抢占，中断之前的传输，同样没有打破之前值的约束，只是被剪短了）

instance_id_t 0 1 2 3 ... inf
value_t a=1 b=2 b=a+1 a=b+1 ...
Paxos prepare
accept
accept accept accept ...

• proposer.h 中使用 can_skip_prepare 和 has_rejected 判断是否跳过可以 prepare 流程以及在被拒后（任何其他节点的 proposer 抢占更高 proposer id）退回到2阶段流程
• 虽然多个节点之间抢占写入并不会带来正确性问题，但多次抢占导致没有任何节点能长期进行连续 accept 优化，这里引入了 leader_manager.h，在 accept 后，无脑拒绝任何其他节点的 prepare 一段时间，让 accept 成功的节点能持续独占 acceptor 一段时间，可以在高冲突的场景下，在时间窗口中完成连续 accept 提交。

## learner 角色

To learn that a value has been chosen, a learner must find out that a proposal has been accepted by a majority of acceptors. The obvious algorithm is to have each acceptor, whenever it accepts a proposal, respond to all learners, sending them the proposal. This allows learners to find out about a chosen value as soon as possible, but it requires each acceptor to respond to each learner—a number of responses equal to the product of the number of acceptors and the number of learners.

## 单 Paxos Group 角色融合 Cinstance

proposer acceptor learner 三角色齐全了，下面就需要一个管理对象把他们融合到一起，形成一个 Paxos Group 了，这里我们使用的是 instance.h 这个类 Cinstance，通过模板的方式实现0损耗的接口，规避虚函数的调用代价，将三个角色以及处理 log instance 推进、通信、存储和状态机的 Cbase 完全连接起来。为了外部调用方便，Cinstance 也提供了带流控的阻塞接口，给定各种超时参数，向 Paxos Group 中提交一个值，在成功或超时后返回。为了让角色直接充分解耦，所有涉及到角色状态流转的接口都暴露出来，以回调的方式在 Cinstance 中处理，也能直观地在一个代码上下文中处理这些交互信息，尽可能减少逻辑bug。

void synced_value_done(const instance_id_t &instance_id, const value_t &value);

void synced_reset_instance(const instance_id_t &from, const instance_id_t &to);

Cbase::routine_status_e self_prepare(const message_t &msg);

Cbase::routine_status_e self_chosen(const message_t &msg);

void on_proposer_idle();

void on_proposer_final(const instance_id_t &instance_id, value_t &&value);

void value_chosen(const instance_id_t &instance_id, value_t &&value);

void value_learnt(const instance_id_t &instance_id, value_t &&value);

void learn_done();

Cbase::routine_status_e take_snapshots(const instance_id_t &peer_instance_id, std::vector<Csnapshot::shared_ptr> &snapshots);

Cbase::routine_status_e load_snapshots(const std::vector<Csnapshot::shared_ptr> &snapshots);

## 多线程化

• 完全事件推动模型（包括超时和状态变更）
• 超时及任务队列timer_work_queue.h
• 可重置超时机制resetable_timeout_notify.h

## log+状态机+snapshot(日志压缩)

class Csnapshot {
public:

// Get global state machine id which identify myself, and this should be **unique**.
virtual state_machine_id_t get_id() const = 0;

// The snapshot represent the state machine which run to this id(not included).
virtual const instance_id_t &get_next_instance_id() const = 0;
};

class CstateMachine {
public:

// Get global state machine id which identify myself, and this should be **unique**.
virtual state_machine_id_t get_id() const = 0;

// This will be invoked sequentially by instance id,
// and this callback should have ability to handle duplicate id caused by replay.
// Should throw exception if get unexpected instance id.
// If instance's chosen value is not for this SM, empty value will given.
virtual void consume_sequentially(const instance_id_t &instance_id, const utility::Cslice &value) = 0;

// Supply buffer which can move.
virtual void consume_sequentially(const instance_id_t &instance_id, utility::Cbuffer &&value) {
consume_sequentially(instance_id, value.slice());
}

// Return instance id which will execute on next round.
// Can smaller than actual value only in concurrent with consuming, but **never** larger than real value.
virtual instance_id_t get_next_execute_id() = 0;

// Return smallest instance id which not been persisted.
// Can smaller than actual value only in concurrent with consuming, but **never** larger than real value.
virtual instance_id_t get_next_persist_id() = 0;

// The get_next_instance_id of snapshot returned should >= get_next_persist_id().
virtual int take_snapshot(Csnapshot::shared_ptr &snapshot) = 0;

// next_persist_id should larger or equal to snapshot after successfully load.
virtual int load_snapshot(const Csnapshot::shared_ptr &snapshot) = 0;
};

## 成员变更

• 停机，手动变更配置文件
• RAFT 的实现 joint consensus

joint consensus (two-phase approach)

• Log entries are replicated to all servers in both configurations.
• Any server from either configuration may serve as leader.
• Agreement (for elections and entry commitment) requires separate majorities from both the old and new configurations.
• 一步成员变更，将成员管理问题转换为 Paxos 处理的一致性问题（本库使用的方法）

class Ccommunication {
public:

virtual int send(const node_id_t &target_id, const message_t &message) = 0;

};

};

virtual int broadcast(const message_t &message, broadcast_range_e range, broadcast_type_e type) = 0;
};

class CpeerManager {
public:

virtual bool is_voter(const group_id_t &group_id, const instance_id_t &instance_id,
const node_id_t &node_id) = 0;

virtual bool is_all_peer(const group_id_t &group_id, const instance_id_t &instance_id,
const std::set<node_id_t> &node_set) = 0;

virtual bool is_all_voter(const group_id_t &group_id, const instance_id_t &instance_id,
const std::set<node_id_t> &node_set) = 0;

virtual bool is_quorum(const group_id_t &group_id, const instance_id_t &instance_id,
const std::set<node_id_t> &node_set) = 0;
};

# 参考资料

|
7月前
《从Paxos到ZooKeeper分布式一致性原理与实践》学习知识导图
《从Paxos到ZooKeeper分布式一致性原理与实践》学习知识导图
46 0
|

Quorum=3的条件，在原生的Paxos中是硬性条件，在一些场景中，我们需要对提案的收敛更快，也就是希望提案能尽快的达成共识，那么我们希望尽可能的减少Quorum要求的数量。
84 1
|

758 0
|

《分布式》系列
169 0
|

paxos算法简单入门
paxos算法简单入门
142 0
|

【分布式】Chubby与Paxos
在上一篇理解了Paxos算法的理论基础后，接下来看看Paxos算法在工程中的应用。
280 0
|

178 0
|

158 0
|

226 0

132 0

• 云迁移中心

更多