Etcd的v3版本官方client里有一个concurrency的包,里面实现了分布式锁和选主。本文分析一下它是如何实现的。
先贴一下锁的code (https://github.com/coreos/etcd/blob/master/clientv3/concurrency/mutex.go#L26)。
在code中注释介绍了具体的实现。
//m.pfx是前缀,比如"service/lock/"
//s.Lease()是一个64位的整数值,etcd v3引入了lease(租约)的概念,concurrency包基于lease封装了session,每一个客户端都有自己的lease,也就是说每个客户端都有一个唯一的64位整形值
//m.myKey类似于"service/lock/12345"
m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
//etcdv3新引入的多键条件事务,替代了v2中Compare-And-put操作。etcdv3的多键条件事务的语意是先做一个比较(compare)操作,如果比较成立则执行一系列操作,如果比较不成立则执行另外一系列操作。有类似于C语言中的条件表达式。
//接下来的这部分实现了如果不存在这个key,则将这个key写入到etcd,如果存在则读取这个key的值这样的功能。
//下面这一句,是构建了一个compare的条件,比较的是key的createRevision,如果revision是0,则存入一个key,如果revision不为0,则读取这个key。
//revision是etcd一个全局的序列号,每一个对etcd存储进行改动都会分配一个这个序号,在v2中叫index,createRevision是表示这个key创建时被分配的这个序号。当key不存在时,createRivision是0。
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
get := v3.OpGet(m.myKey)
resp, err := client.Txn(ctx).If(cmp).Then(put).Else(get).Commit()
if err != nil {
return err
}
m.myRev = resp.Header.Revision
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
//如果上面的code操作成功了,则myRev是当前客户端创建的key的revision值。
//waitDeletes等待匹配m.pfx ("service/lock/")这个前缀(可类比在这个目录下的)并且createRivision小于m.myRev-1所有key被删除
//如果没有比当前客户端创建的key的revision小的key,则当前客户端者获得锁
//如果有比它小的key则等待,比它小的被删除
err = waitDeletes(ctx, client, m.pfx, m.myRev-1)
总结一下,上面的锁的实现,所有的客户端都在service/lock下创建一个自己的key,createrevision最小的那个客户端获得锁,也就是最早建立key的客户端获得锁,之后按照创建的时间先后依次获得锁。
选主(https://github.com/coreos/etcd/blob/master/clientv3/concurrency/election.go#L31)的实现与锁的实现非常类似,这里就不做详述了。