本次记录[利用etcd选主sdk实践master/slave故障转移], 并利用etcdctl客户端验证选主sdk的工作原理。
master/slave高可用集群
本文目标
在异地多机房部署节点,slave作为备用实例启动,但不接受业务流量, 监测到master宕机,slave节点自动提升为master并接管业务流量。
基本思路
各节点向etcd注册带租约的节点信息, 并各自维持心跳保活,选主sdk根据目前存活的、最早创建的节点信息键值对 来确立leader, 并通过watch机制通知业务代码leader变更。
讲道理,每个节点只需要知道两个信息就能各司其职
- • 谁是leader ==> 当前节点是什么角色===> 当前节点该做什么事情
- • 感知集群leader变更的能力 ===》当前节点现在要不要改变行为
除了官方etcd客户端go.etcd.io/etcd/client/v3, 还依赖go.etcd.io/etcd/client/v3/concurrency
package:实现了基于etcd的分布式锁、屏障、选举
选主过程 | 实质 | api |
竞选前先查询leader了解现场 | 查询当前存活的,最早创建的kv值 | *concurrency.Election.Leader() |
初始化时,各节点向etcd阻塞式竞选 | 各节点向etcd注册带租约的键值对 | *concurrency.Election.compaign |
建立master/slave集群,还能及时收到变更通知 | 通过chan传递最新的leader value | *concurrency.Election.Observe() |
重点解读
1.初始化etcd go客户端
注意:etcd客户端和服务端是通过grpc来通信,目前新版本的etcd客户端默认使用非阻塞式连接, 也就是说v3.New
函数仅表示从指定配置创建etcd客户端。
为快速确定etcd选举的可用性,本实践使用阻塞式创建客户端:
cli, err := v3.New(v3.Config{ Endpoints: addr, DialTimeout: time.Second * 5, DialOptions: []grpc.DialOption{grpc.WithBlock()}, }) if err != nil { log.WithField("instance", Id).Errorln(err) return nil, err }
2. 竞选
使用阻塞式命令compaign
竞选之前,应先查询当前leader:
// 将id:ip:port作为竞选时写入etcd的value func (c *Client) Election(id string, notify chan<- bool) error { //竞选前先试图去了解情况 ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() resp, err := c.election.Leader(ctx) if err != nil { if err != concurrency.ErrElectionNoLeader { return err } } else { // 已经有leader了 c.Leader = string(resp.Kvs[0].Value) notify <- (c.Leader == id) } if err = c.election.Campaign(context.TODO(), id); err != nil { log.WithError(err).WithField("id", id).Error("Campaign error") return err } else { log.Infoln("Campaign success!!!") c.Leader = id notify <- true } c.election.Key() return nil }
参选:将持续刷新的leaseID
作为key,将特定的客户端标记(这里使用ip:port)作为value,写到etcd.
当选:当前存活的、最早创建的key是leader , 也就是说leader故障转移并不是随机的。
3. watch leader变更
golang使用信道完成goroutine通信,
本例声明信道: notify = make(chan bool, 1)
一石二鸟:监听该信道能够知道集群leader是否发生变化;信道内传的值表示当前节点是否是leader
func (c *Client) Watchloop(id string, notify chan<- bool) error { ch := c.election.Observe(context.TODO()) // 观察leader变更 tick := time.NewTicker(c.askTime) defer tick.Stop() for { var leader string select { case _ = <-c.sessionCh: log.Warning("Recv session event") return fmt.Errorf("session Done") // 一次续约不稳,立马退出程序 case e := <-ch: log.WithField("event", e).Info("watch leader event") leader = string(e.Kvs[0].Value) ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() resp, err := c.election.Leader(ctx) if err != nil { if err != concurrency.ErrElectionNoLeader { return err } else { // 目前没leader,开始竞选了 if err = c.election.Campaign(context.TODO(), id); err != nil { log.WithError(err).WithField("id", id).Error("Campaign error") return err } else { // 竞选成功 leader = id } } } else { leader = string(resp.Kvs[0].Value) } } if leader != c.Leader { log.WithField("before", c.Leader).WithField("after", leader == id).Info("leader changed") notify <- (leader == id) } c.Leader = leader } }
c.election.Observe(context.TODO()) 返回最新的leader信息,配合select case控制结构
能够及时拿到leader变更信息。
如题:通过Leader字段和chan <- bool信道, 掌控了整个选举集群的状态, 可根据这两个信息去完成业务上的master/slave故障转移。
使用etcdctl确定leader
election.Leader的源码证明了[当前存活的,最早创建的kv为leader]
// Leader returns the leader value for the current election. func (e *Election) Leader(ctx context.Context) (*v3.GetResponse, error) { client := e.session.Client() resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...) if err != nil { return nil, err } else if len(resp.Kvs) == 0 { // no leader currently elected return nil, ErrElectionNoLeader } return resp, nil }
等价于
./etcdctl get /merc --prefix --sort-by=CREATE --order=ASCEND --limit=1
-- sort-by :以某标准(创建时间)检索数据
-- order :以升/降序给已检出的数据排序
-- limit:从以检出的数据中取x条数据显示