目录
开发raft时用到的比较主流的两个库是Etcd Raft 和hashicorp Raft,网上也有一些关于这两个库的讨论。之前分析过etcd Raft,发现该库相对hashicorp Raft比较难以理解,其最大的问题是没有实现网络层,实现难度比较大,因此本文在实现时使用了hashicorp Raft。
下文中会参考consul的一致性协议来讲解如何实现Raft协议。
Raft概述
术语
- Log entry:Raft的主要单元。Raft将一致性问题分解为日志复制。日志是一个有序的表项,其包含了Raft的集群变更信息(如添加/移除节点)以及对应用数据的操作等。
- FSM:Finite State Machine。FSM是有限状态的集合。当一条日志被Raft apply后,可以对FSM进行状态转换。相同顺序的日志在apply之后必须产生相同的结果,即行为必须是确定性的。
- Peer set:指所有参与日志复制的成员。
- Quorum:仲裁指peer set中的大部分成员:对于包含N个成员的peer set,仲裁要求有(N/2)+1个成员。如果出于某些原因导致仲裁节点不可用,则集群会变为unavailable状态,且新的日志也不会被commit。
- Committed Entry:当一个Log entry持久化到仲裁数量的节点后,该认为该Log entry是Committed的。只有当Log entry 被Committed之后,它才会被FSM apply。
- Leader:任何时间,peer set会选举一个节点作为leader。leader负责处理新的Log entry,并将其复制给follower,以及决定何时将Log entry判定为committed状态。
Raft机制简介
Raft节点总是处于三种状态之一: follower, candidate, leader。一开始,所有的节点都是follower,该状态下,节点可以从leader接收log,并参与选举,如果一段时间内没有接收到任何Log entry,则节点会自提升到candidate状态。在candidate状态下,节点会请求其他节点的选举,如果一个candidate接收到大部分节点(仲裁数目)的认同,就会被提升为leader。leader必须接收新的Log entry,并复制到所有其他follower。
如果用户无法接受旧的数据,则所有的请求必须由leader执行。
一旦一个集群有了leader,就可以接收新的Log entry。客户端可以请求leader追加一个新的Log entry。Leader会将Log entry写入持久化存储,并尝试将其复制给仲裁数目的follower。一旦Log entry被commit,就可以将该Log entry apply到FSM。FSM是应用特定的存储,在Consul中,使用 MemDB来维护集群状态。
无限量复制log的方式是不可取的。Raft 提供了一种机制,可以对当前状态进行快照并压缩log。由于 FSM 的抽象,FSM 的状态恢复必须与replay log的状态相同。Raft 可以捕获某个时刻的 FSM 状态,然后移除用于达到该状态的所有log。这些操作可以在没有用户干预的情况下自动执行,防止无限使用磁盘,同时最小化replay log所花费的时间。
Raft Consensus中所有的操作都必须经过Leader,因此需要保证所有的请求都能够发送到Leader节点,然后由Leader将请求发送给所有Follower,并等待大部分(仲裁数目的)节点处理完成该命令,leader通过选举机制产生。之后每个follower会执行如下操作:
- 在接收到命令之后,使用WAL方式将数据保存为Log entry
- 在成功写入log entry之后,将数据发给FSM进行处理
- 在FSM成功处理完数据之后,返回数据。之后Leader会注意到该节点已经成功完成数据处理。
如下表述来自Raft Protocol Overview,它的意思是,如果是查询类的请求,直接从FSM返回结果即可,如果是修改类的请求,则需要通过raft.Apply
来保证变更的一致性
所有raft集群中的成员都知道当前的leader,当一个RPC请求到达一个非leader的成员时,它会将该请求转发给当前的leader。如果是查询类型的RPC,意味着它是只读的,leader会基于FSM的当前状态生成结果;如果是事务类型的RPC,意味着它需要修改状态,Leader会生成一条新的log entry,并执行
Raft.Apply
,当log entry被提交并apply到FSM之后,事务才算执行完成。
接口和原理描述
Apply是外部数据进入Raft的接口,Raft的主要作用是维护数据操作的一致性。在上图中,由两个apply:一个是Raft.Apply
(内部会通过applyCh
传递log
)。另一个是FSM.Apply,其数据源头是Raft.Apply
。FSM基于Raft实现了读写一致性。在上图中可以看到,leader的FSM.Apply
是在数据commit成功(仲裁成功)之后才执行的,这样就能以Raft的方式保证分布式场景下的数据一致性,可以将FSM.Apply
理解为应用数据的写入操作。
Raft中的一条log
表示一个操作。使用hashicorp/raft
时应该将实现分为两层:一层是底层的Raft,支持Raft数据的存储、快照等,集群的选举和恢复等,这一部分由Raft模块自实现;另一层是应用层,需要由用户实现FSM接口,FSM的接口并不对外,在Raft的处理过程中会自动调用FSM的接口来实现应用数据的存储、备份和恢复等操作。这两层都有数据的读写和快照实现,因此在理解上需要进行区分。
另外需要注意的是传入raft.Apply
接口(func (r *Raft) Apply(cmd []byte, timeout time.Duration)
)的数据需要是byte格式的,因此需要对外部数据编码,而在fsm.Apply
中则需要使用对应的格式进行解码。
Raft节点的初始化
如果是新建的Raft节点,可以使用BootstrapCluster
方法初始化该节点。为避免非新的节点被初始化,在调用BootstrapCluster
前可以使用raft.HasExistingState
来判断实例中是否包含相关状态(logs,当前term或snapshot):
if (s.config.Bootstrap) && !s.config.ReadReplica { hasState, err := raft.HasExistingState(log, stable, snap) if err != nil { return err } if !hasState { configuration := raft.Configuration{ Servers: []raft.Server{ { ID: s.config.RaftConfig.LocalID, Address: trans.LocalAddr(), }, }, } if err := raft.BootstrapCluster(s.config.RaftConfig, log, stable, snap, trans, configuration); err != nil { return err } } }
raft的集群信息保存在Configuration数据结构中,在对raft节点进行修改(如AddVoter
、RemoveServer
等)时,也会变更Configuration
中保存的节点信息,并将Configuration
中的节点信息保存到磁盘中。在raft启动时会使用保存的Configuration
信息来寻找peers。
可以通过raft.GetConfiguration()
获取集群节点信息。
Raft节点的创建
Raft节点的创建方法如下,如果存储非空,则Raft会尝试恢复该节点:
func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, trans Transport) (*Raft, error) {
包括:
fsm
:由应用实现,用于处理应用数据。FSM中的数据来自底层的Raftlog
。logs
,stable
和snaps
:logs
(存储Raftlog
) ,stable
(保存Raft选举信息,如角色、term等信息) 可以使用raftboltdb.New
进行初始化,snaps
用于Leader和follower之间的批量数据同步以及(手动或自动)集群恢复,可以使用raft.NewFileSnapshotStore
或raft.NewFileSnapshotStoreWithLogger
进行初始化。trans
:Transport是raft集群内部节点之间的信息通道,节点之间需要通过该通道来同步log、选举leader等。下面接口中的AppendEntriesPipeline
和AppendEntries
方法用于log
同步,RequestVote
用于leader选举,InstallSnapshot
用于在follower 的log
落后过多的情况下,给follower发送snapshot(批量log
)。
可以使用raft.NewTCPTransport
、raft.NewTCPTransportWithLogger
或raft.NewNetworkTransportWithConfig
方法来初始化trans。
type Transport interface { ... AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error) AppendEntries(id ServerID, target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error // RequestVote sends the appropriate RPC to the target node. RequestVote(id ServerID, target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error InstallSnapshot(id ServerID, target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error ... }
NewRaft
方法中会运行如下后台任务:
r.goFunc(r.run) //处理角色变更和RPC请求 r.goFunc(r.runFSM) //负责将logs apply到FSM r.goFunc(r.runSnapshots) //管理FSM的snapshot
监控Leader变化
为保证数据的一致性,只能通过leader写入数据,因此需要及时了解leader的变更信息,在Raft的配置中有一个变量NotifyCh chan<- bool
,当Raft变为leader时会将true
写入该chan,通过读取该chan来判断本节点是否是leader。在初始化Raft配置的时候传入即可:
leaderNotifyCh := make(chan bool, 10) raftConfig.NotifyCh = leaderNotifyCh
还有其他方式可以获取leader变更状态:
如下方法可以生成一个chan,当本节点变为Leader时会发送
true
,当本节点丢失Leader角色时发送false
,该方法的用途与上述方式相同,但由于该方法没有缓存,可能导致丢失变更信号,因此推荐使用上面的方式。
func (r *Raft) LeaderCh() <-chan bool
实现FSM
至此已经完成了Raft的初始化。下面就是要实现初始化函数中要求实现的内容,主要就是实现FSM
接口。其中logs
、stable
、snaps
和trans
已经提到,使用现成的方法初始化即可。对于存储来说,也可以根据需要采用其他方式,如S3。
下面是LogStore
、SnapshotStore
和StableStore
的接口定义。
type LogStore interface {//用于存储Raft log // FirstIndex returns the first index written. 0 for no entries. FirstIndex() (uint64, error) // LastIndex returns the last index written. 0 for no entries. LastIndex() (uint64, error) // GetLog gets a log entry at a given index. GetLog(index uint64, log *Log) error // StoreLog stores a log entry. StoreLog(log *Log) error // StoreLogs stores multiple log entries. StoreLogs(logs []*Log) error // DeleteRange deletes a range of log entries. The range is inclusive. DeleteRange(min, max uint64) error }
type SnapshotStore interface {//用于快照的生成和恢复 // Create is used to begin a snapshot at a given index and term, and with // the given committed configuration. The version parameter controls // which snapshot version to create. Create(version SnapshotVersion, index, term uint64, configuration Configuration, configurationIndex uint64, trans Transport) (SnapshotSink, error) // List is used to list the available snapshots in the store. // It should return then in descending order, with the highest index first. List() ([]*SnapshotMeta, error) // Open takes a snapshot ID and provides a ReadCloser. Once close is // called it is assumed the snapshot is no longer needed. Open(id string) (*SnapshotMeta, io.ReadCloser, error) }
type StableStore interface { //用于存储集群元数据 Set(key []byte, val []byte) error // Get returns the value for key, or an empty byte slice if key was not found. Get(key []byte) ([]byte, error) SetUint64(key []byte, val uint64) error // GetUint64 returns the uint64 value for key, or 0 if key was not found. GetUint64(key []byte) (uint64, error) }
FSM基于Raft来实现,包含三个方法:
Apply
:在Raft完成commit索引之后,保存应用数据。Snapshot
:用于支持log压缩,可以保存某个时间点的FSM快照。需要注意的是,由于Apply
和Snapshot
运行在同一个线程中(如run
和runFSM
线程),因此要求函数能够快速返回,否则会阻塞Apply
的执行。在实现中,该函数只需捕获指向当前状态的指针,而对于IO开销较大的操作,则放到FSMSnapshot.Persist
中执行。执行逻辑如下:
snapshot, _ := fsm.Snapshot() //从FSM获取snapshot实例 snapshot.Persist(sink) //将获取到的snapshot持久化
Restore
:用于从snapshot恢复FSM
type FSM interface { // Apply is called once a log entry is committed by a majority of the cluster. // // Apply should apply the log to the FSM. Apply must be deterministic and // produce the same result on all peers in the cluster. // // The returned value is returned to the client as the ApplyFuture.Response. Apply(*Log) interface{} // Snapshot returns an FSMSnapshot used to: support log compaction, to // restore the FSM to a previous state, or to bring out-of-date followers up // to a recent log index. // // The Snapshot implementation should return quickly, because Apply can not // be called while Snapshot is running. Generally this means Snapshot should // only capture a pointer to the state, and any expensive IO should happen // as part of FSMSnapshot.Persist. // // Apply and Snapshot are always called from the same thread, but Apply will // be called concurrently with FSMSnapshot.Persist. This means the FSM should // be implemented to allow for concurrent updates while a snapshot is happening. Snapshot() (FSMSnapshot, error) // Restore is used to restore an FSM from a snapshot. It is not called // concurrently with any other command. The FSM must discard all previous // state before restoring the snapshot. Restore(snapshot io.ReadCloser) error }
FSMSnapshot
是实现快照需要实现的另一个接口,用于保存持久化FSM状态,后续可以通过FSM.Restore
方法恢复FSM。该接口不会阻塞Raft.Apply
,但在持久化FSM的数据时需要保证不影响Raft.Apply
的并发访问。
FSMSnapshot.Persist
的入参sink
是调用SnapshotStore.Creates
时的返回值。如果是通过raft.NewFileSnapshotStore
初始化了SnapshotStore
,则入参sink
的类型就是FileSnapshotStore。
FSMSnapshot.Persist
执行结束之后需要执行SnapshotSink.Close()
,如果出现错误,则执行SnapshotSink.Cancel()
。
// FSMSnapshot is returned by an FSM in response to a Snapshot // It must be safe to invoke FSMSnapshot methods with concurrent // calls to Apply. type FSMSnapshot interface { // Persist should dump all necessary state to the WriteCloser 'sink', // and call sink.Close() when finished or call sink.Cancel() on error. Persist(sink SnapshotSink) error // Release is invoked when we are finished with the snapshot. Release() }
FSM的备份和恢复
FSM的备份和恢复的逻辑比较难理解,一方面备份的数据存储在Raft中,FSM接口是由Raft主动调用的,另一方面又需要由用户实现FSM的备份和恢复逻辑,因此需要了解Raft是如何与FSM交互的。
FSM依赖snapshot来实现备份和恢复,snapshot中保存的也都是FSM信息。
何时备份和恢复
备份的时机
- 当用户执行
RecoverCluster
接口时会调用FSM.Snapshot
触发创建一个新的FSM snapshot - 手动调用如下接口也会触发创建FSM snapshot:
func (r *Raft) Snapshot() SnapshotFuture
- Raft自动备份也会触发创建FSM snapshot,默认时间为[120s, 240s]之间的随机时间。
恢复的时机
- 当用户执行
RecoverCluster
接口时会调用FSM.Restore
,用于手动恢复集群 - 当用户执行
Raft.Restore
接口时会调用FSM.Restore
,用于手动恢复集群 - 通过NewRaft创建Raft节点时会尝试恢复snapshot(
Raft.restoreSnapshot
-->Raft.tryRestoreSingleSnapshot
-->fsmRestoreAndMeasure
-->fsm.Restore
)
因此在正常情况下,Raft会不定期创建snapshot,且在创建Raft节点(新建或重启)的时候也会尝试通过snapshot来恢复FSM。
备份和恢复的内部逻辑
FSM的备份和恢复与SnapshotStore接口息息相关。
在备份FSM时的逻辑如下,首先通过SnapshotStore.Create
创建一个snapshot,然后初始化一个FSMSnapshot
实例,并通过FSMSnapshot.Persist
将FSM保存到创建出的snapshot中:
sink, err := snaps.Create(version, lastIndex, lastTerm, configuration, 1, trans) //创建一个snapshot snapshot, err := fsm.Snapshot() //获取FSMSnapshot实例 snapshot.Persist(sink) //调用FSMSnapshot.Persist将FSM保存到上面的snapshot中
恢复FSM的逻辑如下,首先通过SnapshotStore.List
获取snapshots,然后通过SnapshotStore.Open
逐个打开获取到的snapshot,最后调用FSM.Restore
恢复FSM,其入参可以看做是snapshot的文件描述符:
snapshots, err = snaps.List() for _, snapshot := range snapshots { _, source, err = snaps.Open(snapshot.ID) crc := newCountingReadCloser(source) err = fsm.Restore(crc) // Close the source after the restore has completed source.Close() }
下面以consul的实现为例看下它是如何进行FSM的备份和恢复的。
备份
FSM.Snapshot()
的作用就是返回一个SnapshotSink
接口对象,进而调用SnapshotSink.Persist
来持久化FSM。
下面是consul的SnapshotSink
实现,逻辑比较简单,它将FSM持久化到了一个snapshot中,注意它在写入snapshot前做了编码(编码类型为ChunkingStateType
):
// Persist saves the FSM snapshot out to the given sink. func (s *snapshot) Persist(sink raft.SnapshotSink) error { ... // Write the header header := SnapshotHeader{ LastIndex: s.state.LastIndex(), } encoder := codec.NewEncoder(sink, structs.MsgpackHandle) if err := encoder.Encode(&header); err != nil { sink.Cancel() return err } ... if _, err := sink.Write([]byte{byte(structs.ChunkingStateType)}); err != nil { return err } if err := encoder.Encode(s.chunkState); err != nil { return err } return nil } func (s *snapshot) Release() { s.state.Close() }
恢复
备份时将FSM保存在了snapshot中,恢复时读取并解码对应类型的snapshot即可:
// Restore streams in the snapshot and replaces the current state store with a // new one based on the snapshot if all goes OK during the restore. func (c *FSM) Restore(old io.ReadCloser) error { defer old.Close() ... handler := func(header *SnapshotHeader, msg structs.MessageType, dec *codec.Decoder) error { switch { case msg == structs.ChunkingStateType: //解码数据 chunkState := &raftchunking.State{ ChunkMap: make(raftchunking.ChunkMap), } if err := dec.Decode(chunkState); err != nil { return err } if err := c.chunker.State(chunkState); err != nil { return err } ... default: if msg >= 64 { return fmt.Errorf("msg type <%d> is a Consul Enterprise log entry. Consul OSS cannot restore it", msg) } else { return fmt.Errorf("Unrecognized msg type %d", msg) } } return nil } if err := ReadSnapshot(old, handler); err != nil { return err } ... return nil }
// ReadSnapshot decodes each message type and utilizes the handler function to // process each message type individually func ReadSnapshot(r io.Reader, handler func(header *SnapshotHeader, msg structs.MessageType, dec *codec.Decoder) error) error { // Create a decoder dec := codec.NewDecoder(r, structs.MsgpackHandle) // Read in the header var header SnapshotHeader if err := dec.Decode(&header); err != nil { return err } // Populate the new state msgType := make([]byte, 1) for { // Read the message type _, err := r.Read(msgType) if err == io.EOF { return nil } else if err != nil { return err } // Decode msg := structs.MessageType(msgType[0]) if err := handler(&header, msg, dec); err != nil { return err } } }
至此已经完成了Raft的开发介绍。需要注意的是,FSM接口都是Raft内部调用的,用户并不会直接与之交互。
更多参见:Raft Developer Documentation
Raft关键对外接口
Raft节点管理
将节点添加到集群中,节点刚添加到集群中时状态是staging,当其ready之后就会被提升为voter,参与选举。如果节点已经是voter,则该操作会更新服务地址。该方法必须在leader上调用:
func (r *Raft) AddVoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture
如下方法用于添加一个只接收log entry、但不参与投票或commit log的节点:该方法必须在leader上调用
func (r *Raft) AddNonvoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture
将节点从集群中移除,如果移除的节点是leader,则会触发leader选举。该方法必须在leader上调用:
func (r *Raft) RemoveServer(id ServerID, prevIndex uint64, timeout time.Duration) IndexFuture
取消节点的投票权,节点不再参与投票或commit log。该方法必须在leader上调用:
func (r *Raft) DemoteVoter(id ServerID, prevIndex uint64, timeout time.Duration) IndexFuture
重新加载节点配置:
func (r *Raft) ReloadConfig(rc ReloadableConfig) error
Raft数据的存储和读取
用于阻塞等待FSM apply所有操作。该方法必须在leader上调用:
func (r *Raft) Barrier(timeout time.Duration) Future
apply一个命令到FSM,该方法必须在leader上调用:
func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture
从上面接口可以看到,在Raft协议中,必须通过leader才能写入(apply)数据,在非leader的节点上执行
Apply()
会返回ErrNotLeader
的错误。
Apply
方法会调用LogStore
接口的StoreLogs
方法存储log(cmd
)。Raft.applyCh负责将log发送给FSM进行处理,最后通过dispatchLogs
将log分发给其他节点(dispatchLogs
会调用Transport.AppendEntries
来将log分发给对端)。在分布式环境中,外部请求可能通过LB转发到非leader节点上,此时非leader节点需要将请求转发到leader节点上进行处理,在consul中会通过ForwardRPC将请求转发给leader,再由leader执行
Apply
操作。
集群恢复
当集群中的节点少于仲裁数目时,集群将无法正常运作,此时可以手动调用如下接口尝试恢复集群,但这样会可能会导致原本正在复制的日志被commit。
最佳方式是停止所有节点,并在所有节点上运行RecoverCluster
,当集群重启之后,会发生选举,Raft也会恢复运作。
func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, trans Transport, configuration Configuration) error
通过如下方式可以让集群使用外部snapshot(如备份的snapshot)。注意该操作只适用于DR,且只能在Leader上运行。
func (r *Raft) Restore(meta *SnapshotMeta, reader io.Reader, timeout time.Duration) error
状态获取
获取节点的状态信息:
func (r *Raft) Stats() map[string]string
返回当前leader的地址和集群ID。如果当前没有leader则返回空:
func (r *Raft) LeaderWithID() (ServerAddress, ServerID)
节点数据交互
各个节点之间主要通过RPC来交互log和选举信息,可以分为RPC客户端和RPC服务端。
RPC客户端通过调用Transport接口方法来传递数据(如Leader执行Raft.Apply
log之后会调用Transport.AppendEntries
来分发log)。
RPC服务端的实现如下,其处理了不同类型的RPC请求,如AppendEntriesRequest
就是Leader执行Transport.AppendEntries
传递的请求内容:
func (r *Raft) processRPC(rpc RPC) { if err := r.checkRPCHeader(rpc); err != nil { rpc.Respond(nil, err) return } switch cmd := rpc.Command.(type) { case *AppendEntriesRequest: r.appendEntries(rpc, cmd) case *RequestVoteRequest: r.requestVote(rpc, cmd) case *InstallSnapshotRequest: r.installSnapshot(rpc, cmd) case *TimeoutNowRequest: r.timeoutNow(rpc, cmd) default: r.logger.Error("got unexpected command", "command", hclog.Fmt("%#v", rpc.Command)) rpc.Respond(nil, fmt.Errorf("unexpected command")) } }
实现描述
实现Raft时需要考虑如下几点:
- 实现FSM接口,包含
FSM
和FSMSnapshot
这两个接口 - 如何实现Raft节点的自动发现,包含节点的加入和退出
- 客户端和应用的交互接口,主要用于应用数据的增删改等查等操作,对FSM的修改必须通过
Raft.Apply
接口实现,以保证FSM的数据一致性,而在读取应用数据时,如果要求数据强一致,则需要从leader的FSM读取,否则也可以从follower的FSM读取 - 在非Leader节点接收到客户端的修改类请求后,如何将请求转发给Leader节点
在此次实现Raft的过程中,主要参考了stcache和consul的源代码,其中FSM的实现参考了前者,而Raft的初始化和节点发现参考了后者。
源代码结构如下:
- src discovery #节点发现代码 http #主服务代码 raft #raft代码 rpc #请求转发代码
- discovery:采用serf来实现节点发现,它底层采用的还是memberlist,通过gossip来管理节点。
serf主要用于管理集群节点的状态,它支持如下事件(event),其中
EventUser
为自定义事件,自定义事件会被广播到所有节点,用于处理一些用户场景,如部署或重启等,通过UserEvent接口可以发送自定义事件,需要注意的是无法通过返回值来确认事件是否被传递,但serf会通过TCP和消息重放尽量传递事件。此外还有一个EventQuery用于处理更复杂的场景,如执行系统命令等,但它仅用于serf 命令行场景,serf 库本身没有提供该功能。
const ( EventMemberJoin EventType = iota EventMemberLeave EventMemberFailed EventMemberUpdate EventMemberReap EventUser EventQuery )
serf官方给出了一些应用场景,如管理load balancer或刷新DNS等。
- rpc:实现了非Leader节点向Leader节点转发请求的功能,本demo仅实现了
/api/v1/set
接口转发,对于/api/v1/get
接口,则直接从本节点的FSM中获取数据,因此get
接口不是强一致性的。
使用如下命令可以生成rpc模块的pb.go文件:
$ protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative ./forward.proto
启动demo
下面启动3个节点来组成Raft集群:
入参描述如下:
- http-address:与用户交互的服务
- raft-address:Raft服务
- rpc-address:请求转发的gRpc服务
- data-dir:Raft存储路径,创建Raft节点时会用到
- bootstrap:该节点是否需要使用bootstrap方式启动
- serf-address:serf节点发现服务
- join-address:加入Raft集群的地址,为
serfAddress
,可以添加多个,如add1,add2
- 第一个节点启动时并没有需要加入的集群,因此第一个节点以bootstrap方式启动,启动后成为leader。
$ raft --http-address 0.0.0.0:5000 --raft-address 192.168.1.42:6000 --rpc-address=0.0.0.0:7000 --serf-address 192.168.1.42:8000 --data-dir /Users/charlie.liu/home/raftDatadir/node0 --bootstrap
注意:
raftTCPAddress
不能为0.0.0.0
,否则raft会报错误:"local bind address is not advertisable",serfAddress
的地址最好也不要使用0.0.0.0
。
- 启动第2、3个节点,后续的节点启动的时候需要加入集群,启动的时候指定第一个节点的地址:
$ raft --http-address 0.0.0.0:5001 --raft-address 192.168.1.42:6001 --rpc-address=0.0.0.0:7001 --serf-address 192.168.1.42:8001 --data-dir /Users/charlie.liu/home/raftDatadir/node1 --joinAddress 192.168.1.42:8000
$ raft --http-address 0.0.0.0:5002 --raft-address 192.168.1.42:6002 --rpc-address=0.0.0.0:7002 --serf-address 192.168.1.42:8002 --data-dir /Users/charlie.liu/home/raftDatadir/node2 --joinAddress 192.168.1.42:8000
在节点启动之后,就可以在Leader的标准输出中可以看到Raft集群中的成员信息:
[INFO] raft: updating configuration: command=AddVoter server-id=192.168.1.42:6002 server-addr=192.168.1.42:6002 servers="[{Suffrage:Voter ID:192.168.1.42:6000 Address:192.168.1.42:6000} {Suffrage:Voter ID:192.168.1.42:6001 Address:192.168.1.42:6001} {Suffrage:Voter ID:192.168.1.42:6002 Address:192.168.1.42:6002}]"
使用/opera/stats
接口可以查看各个节点的状态,num_peers
展示了对端节点数目,state
展示了当前节点的角色。
$ curl 0.0.0.0:5000/api/opera/stats|jq //node0为Leader { "applied_index": "6", "commit_index": "6", "fsm_pending": "0", "last_contact": "0", "last_log_index": "6", "last_log_term": "2", "last_snapshot_index": "0", "last_snapshot_term": "0", "latest_configuration": "[{Suffrage:Voter ID:192.168.1.42:6000 Address:192.168.1.42:6000} {Suffrage:Voter ID:192.168.1.42:6001 Address:192.168.1.42:6001} {Suffrage:Voter ID:192.168.1.42:6002 Address:192.168.1.42:6002}]", "latest_configuration_index": "0", "num_peers": "2", "protocol_version": "3", "protocol_version_max": "3", "protocol_version_min": "0", "snapshot_version_max": "1", "snapshot_version_min": "0", "state": "Leader", "term": "2" } $ curl 0.0.0.0:5001/opera/stats|jq //node2为Follower { "applied_index": "6", "commit_index": "6", "fsm_pending": "0", "last_contact": "15.996792ms", "last_log_index": "6", "last_log_term": "2", "last_snapshot_index": "0", "last_snapshot_term": "0", "latest_configuration": "[{Suffrage:Voter ID:192.168.1.42:6000 Address:192.168.1.42:6000} {Suffrage:Voter ID:192.168.1.42:6001 Address:192.168.1.42:6001} {Suffrage:Voter ID:192.168.1.42:6002 Address:192.168.1.42:6002}]", "latest_configuration_index": "0", "num_peers": "2", "protocol_version": "3", "protocol_version_max": "3", "protocol_version_min": "0", "snapshot_version_max": "1", "snapshot_version_min": "0", "state": "Follower", "term": "2" } $ curl 0.0.0.0:5002/opera/stats|jq //node2为Follower { "applied_index": "6", "commit_index": "6", "fsm_pending": "0", "last_contact": "76.764584ms", "last_log_index": "6", "last_log_term": "2", "last_snapshot_index": "0", "last_snapshot_term": "0", "latest_configuration": "[{Suffrage:Voter ID:192.168.1.42:6000 Address:192.168.1.42:6000} {Suffrage:Voter ID:192.168.1.42:6001 Address:192.168.1.42:6001} {Suffrage:Voter ID:192.168.1.42:6002 Address:192.168.1.42:6002}]", "latest_configuration_index": "0", "num_peers": "2", "protocol_version": "3", "protocol_version_max": "3", "protocol_version_min": "0", "snapshot_version_max": "1", "snapshot_version_min": "0", "state": "Follower", "term": "2" }
Leader切换
停掉上述Demo中的Leader节点(node0),可以看到node1成为新的leader,且term
变为4:
$ curl 0.0.0.0:5001/opera/stats|jq //新的Leader { "applied_index": "15", "commit_index": "15", "fsm_pending": "0", "last_contact": "0", "last_log_index": "15", "last_log_term": "4", "last_snapshot_index": "0", "last_snapshot_term": "0", "latest_configuration": "[{Suffrage:Voter ID:192.168.1.42:6000 Address:192.168.1.42:6000} {Suffrage:Voter ID:192.168.1.42:6001 Address:192.168.1.42:6001} {Suffrage:Voter ID:192.168.1.42:6002 Address:192.168.1.42:6002}]", "latest_configuration_index": "0", "num_peers": "2", "protocol_version": "3", "protocol_version_max": "3", "protocol_version_min": "0", "snapshot_version_max": "1", "snapshot_version_min": "0", "state": "Leader", "term": "4" } $ curl 0.0.0.0:5002/opera/stats|jq { "applied_index": "15", "commit_index": "15", "fsm_pending": "0", "last_contact": "42.735ms", "last_log_index": "15", "last_log_term": "4", "last_snapshot_index": "0", "last_snapshot_term": "0", "latest_configuration": "[{Suffrage:Voter ID:192.168.1.42:6000 Address:192.168.1.42:6000} {Suffrage:Voter ID:192.168.1.42:6001 Address:192.168.1.42:6001} {Suffrage:Voter ID:192.168.1.42:6002 Address:192.168.1.42:6002}]", "latest_configuration_index": "0", "num_peers": "2", "protocol_version": "3", "protocol_version_max": "3", "protocol_version_min": "0", "snapshot_version_max": "1", "snapshot_version_min": "0", "state": "Follower", "term": "4" }
本实现中没有主动移除Raft节点的接口,也可以添加一个接口来调用
Raft.RemoveServer
,进而移除预期的节点,注意只能在Leader节点上执行Raft.RemoveServer
。
应用数据的读写
下面我们验证应用数据的写入和读取。
- 向非Leader节点写入数据,其会将写入请求转发给leader,由leader执行数据写入。下面展示向非Leader节写入数据的场景:
$ curl 0.0.0.0:5001/opera/stats|jq { "applied_index": "64", "commit_index": "64", "fsm_pending": "0", "last_contact": "4.312667ms", "last_log_index": "64", "last_log_term": "137", "last_snapshot_index": "0", "last_snapshot_term": "0", "latest_configuration": "[{Suffrage:Voter ID:0.0.0.0:7000 Address:192.168.1.42:6000} {Suffrage:Voter ID:0.0.0.0:7001 Address:192.168.1.42:6001} {Suffrage:Voter ID:0.0.0.0:7002 Address:192.168.1.42:6002}]", "latest_configuration_index": "0", "num_peers": "2", "protocol_version": "3", "protocol_version_max": "3", "protocol_version_min": "0", "snapshot_version_max": "1", "snapshot_version_min": "0", "state": "Follower", #非Leader节点 "term": "137" } $ curl -XPOST localhost:5001/api/v1/set --header 'Content-Type: application/json' -d ' { "key" : "testKey", "value" : "testValue" }'
- 向所有节点查询写入的数据,可以看到所有节点都可以查询到该数据:
$ curl -XGET localhost:5000/api/v1/get --header 'Content-Type: application/json' -d ' { "key" : "testKey" }' testValue $ curl -XGET localhost:5001/api/v1/get --header 'Content-Type: application/json' -d ' { "key" : "testKey" }' testValue $curl -XGET localhost:5002/api/v1/get --header 'Content-Type: application/json' -d ' { "key" : "testKey" }' testValue
Kubernetes中的实现方式
在kubernetes环境中,pod会因为创建、删除等操作而发生变化。因此可以通过Controller监控endpoints的变化。在endpoints发生变化时遍历endpoints,如果有新增的endpoint,则执行raft.AddVoter
,如果有被删除的endpoint,则执行raft.RemoveServer
。
注意事项
hashicorp的raft并没有实现joint Consensus,因此一次只能添加或移除一个节点。
TIPS
- 验证场景下,如果节点IP发生变动,可以通过删除
--dataDir
目录来清除集群元数据 - 如果集群中的节点不足仲裁数目,则节点可能处理
candidate
状态,无法变为Leader
,因此要保证集群中有足够的节点,避免一次停掉过多节点。 - Voter权限的节点过多会影响Leader的选举性能,如果集群需要扩展,可以考虑通过
AddNonvoter
添加非Voter节点 - 在管理Raft节点时,有时会遇到重启或升级节点等情况,这时候需要通过了解节点的状态来判断节点是否恢复正常。Consul中主要依赖RaftStats来判断节点的健康情况,其三个成员对应
/opera/stats
返回的last_contact
、last_log_term
和last_log_index
:
// RaftStats holds miscellaneous Raft metrics for a server. type RaftStats struct { // LastContact is the time since this node's last contact with the leader. LastContact string // LastTerm is the highest leader term this server has a record of in its Raft log. LastTerm uint64 // LastIndex is the last log index this server has a record of in its Raft log. LastIndex uint64 }
- consul中判断节点健康的实现如下,其中函数入参
lastTerm
和leaderLastIndex
取自当前Leader的状态,可以通过我们的/opera/stats
接口获取Leader的状态,分别对应last_log_term
和last_log_index
。主要判断逻辑为:
- 通过
leaderLastIndex
或lastTerm
是否为0来判断节点是否已经启动并加入节点 - 判断节点状态是不是alive,其节点状态来自serf
- 通过
RaftStats.LastContact
判断与Leader的交互时长是否大于预期的值(conf.LastContactThreshold
),交互时长过长说明可能发生了网络或节点等异常情况 - 通过
RaftStats.LastTerm
判断当前节点的term是不是等于Leader的term,term不同说明选举结果不一致 - 通过
RaftStats.LastIndex
判断当前节点复制的log是不是能跟上leader的进度。由于日志复制不是及时的,因此设置了一个容忍值conf.MaxTrailingLogs
// isHealthy determines whether this ServerState is considered healthy // based on the given Autopilot config func (s *ServerState) isHealthy(lastTerm uint64, leaderLastIndex uint64, conf *Config) bool { // Raft hasn't been bootstrapped yet so nothing is healthy if leaderLastIndex == 0 || lastTerm == 0 { return false } // Check that the application still thinks the server is alive and well. if s.Server.NodeStatus != NodeAlive { return false } // Check to ensure that the server was contacted recently enough. if s.Stats.LastContact > conf.LastContactThreshold || s.Stats.LastContact < 0 { return false } // Check if the server has a different Raft term from the leader if s.Stats.LastTerm != lastTerm { return false } // Check if the server has fallen behind more than the configured max trailing logs value if s.Stats.LastIndex+conf.MaxTrailingLogs < leaderLastIndex { return false } return true }