1. 前言
通过 etcd 实现分布式锁,同样需要满足一致性
、互斥性
和可靠性
等要求。etcd 中的事务 txn
、lease 租约
以及 watch 监听
特性,能够使得基于 etcd 实现上述要求的分布式锁
。
2. 思路分析
2.1 正常获取锁(etcd的事务IF-Then-Else)
通过 etcd 的事务特性可以帮助我们实现一致性和互斥性。etcd 的事务特性,使用的 IF-Then-Else 语句,IF 语言判断 etcd 服务端是否存在指定的 key,即该 key 创建版本号 create_revision 是否为 0 来检查 key 是否已存在,因为该 key 已存在的话,它的 create_revision 版本号就不是 0。满足 IF 条件的情况下则使用 then 执行 put 操作,否则 else 语句返回抢锁失败的结果。当然,除了使用 key 是否创建成功作为 IF 的判断依据,还可以创建前缀相同的 key,比较这些 key 的 revision 来判断分布式锁应该属于哪个请求。
2.2 获取锁异常
客户端请求在获取到分布式锁之后,如果发生异常,需要及时将锁给释放掉。因此需要租约,当我们申请分布式锁的时候需要指定租约时间。超过 lease 租期时间将会自动释放锁,保证了业务的可用性。是不是这样就够了呢?在执行业务逻辑时,如果客户端发起的是一个耗时的操作,操作未完成的请情况下,租约时间过期,导致其他请求获取到分布式锁,造成不一致。这种情况下则需要续租,即刷新租约,使得客户端能够和 etcd 服务端保持心跳。
3. 实现分布式锁的流程图
我们基于如上分析的思路,绘制出实现 etcd 分布式锁的流程图,如下所示:
4. 代码实现
package main import ( "context" "fmt" "github.com/coreos/etcd/clientv3" "time" ) func main() { // 客户端配置 config := clientv3.Config{ Endpoints: []string{"localhost:2379"}, DialTimeout: 5 * time.Second, } var client *clientv3.Client var err error // 建立连接 if client, err = clientv3.New(config); err != nil { fmt.Println(err) return } // 1. 上锁并创建租约 lease := clientv3.NewLease(client) var leaseGrantResp *clientv3.LeaseGrantResponse if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil { panic(err) } leaseId := leaseGrantResp.ID // 2 自动续约 // 创建一个可取消的租约,主要是为了退出的时候能够释放 ctx, cancelFunc := context.WithCancel(context.TODO()) // 3. 释放租约 defer cancelFunc() defer lease.Revoke(context.TODO(), leaseId) if keepRespChan, err := lease.KeepAlive(ctx, leaseId); err != nil { panic(err) } else { // 续约应答 go func() { for { select { case keepResp := <-keepRespChan: if keepRespChan == nil { fmt.Println("租约已经失效了") goto END } else { // 每秒会续租一次, 所以就会受到一次应答 fmt.Println("收到自动续租应答:", keepResp.ID) } } } END: }() } // 1.3 在租约时间内去抢锁(etcd 里面的锁就是一个 key) kv := clientv3.NewKV(client) // 创建事务 txn := kv.Txn(context.TODO()) //if 不存在 key,then 设置它,else 抢锁失败 txn.If(clientv3.Compare(clientv3.CreateRevision("lock"), "=", 0)). Then(clientv3.OpPut("lock", "g", clientv3.WithLease(leaseId))). Else(clientv3.OpGet("lock")) // 提交事务 if txnResp, err := txn.Commit(); err != nil { panic(err) } else { if !txnResp.Succeeded { fmt.Println("锁被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value)) return } // 抢到锁后执行业务逻辑,没有抢到退出 fmt.Println("处理任务") time.Sleep(5 * time.Second) } }
预期的执行结果如下所示:
收到自动续租应答: 6825622810871743294 处理任务 收到自动续租应答: 6825622810871743294 收到自动续租应答: 6825622810871743294 Process finished with exit code 0
总得来说,如上关于 etcd 分布式锁的实现过程分为四个步骤:
- 客户端初始化与建立连接;
- 创建租约,自动续租;
- 创建事务,获取锁;
- 执行业务逻辑,最后释放锁。
创建租约的时候,需要创建一个可取消的租约,主要是为了退出的时候能够释放。释放锁对应的步骤,在上面的 defer 语句中。当 defer 租约关掉的时候,分布式锁对应的 key 就会被释放掉了。
5. 小结
本文主要介绍了基于 etcd 实现分布式锁的案例。首先介绍了分布式锁产生的背景以及必要性,分布式架构不同于单体架构,涉及到多服务之间多个实例的调用,跨进程的情况下使用编程语言自带的并发原语没有办法实现数据的一致性,因此分布式锁出现,用来解决分布式环境中的资源互斥操作。
接着本文重点介绍了基于 etcd 实现分布式锁的方案,根据 etcd 的特点,利用事务 txn、lease 租约以及 watch 监测实现分布式锁。
在我们上面的案例中,抢锁失败,客户端就直接返回了。那么当该锁被释放之后,或者持有锁的客户端出现了故障退出了,其他锁如何快速获取锁呢?所以上述代码可以基于 watch 监测特性进行改进,各位同学可以自行试试。(可以参考:https://github.com/zieckey/etcdsync)