GO 中 ETCD 的编码案例分享
我们来回顾一下上次我们说到的 服务注册和发现
- 分享了服务注册和发现是什么
- CAP 定理是什么
- ETCD 是什么,以及ETCD 和 Zookeeper的对比
- ETCD 的分布式锁实现的简单原理
要是对 服务注册与发现,ETCD 还有点兴趣的话,欢迎查看文章 服务注册与发现之ETCD
今天我们来看看 GO 如何去操作 ETCD ,这个开源的、高可用的分布式key-value存储系统
感兴趣的小伙伴可以看看GO 的 ETCD 官方文档
根据官方文档,我们本次分享几个点
- ETCD 如何安装
- ETCD 里面对于 KEY 的PUT 和GET操作
- WATCH操作
- Lease 租约
- KeepAlive 保活
- ETCD 分布式锁的实现
ETCD 如何安装
ETCD 的安装和部署
这里我们就做一个简单的单机部署
- 到
github
上 下载最新的 etcd 包,github.com/etcd-io/etc… - 解压后,将 etcd 和 etcdctl 拷贝到我们的
$GOBIN
目录下 , 或者加入我们系统的环境变量即可(目的是 可以直接键入etcd 系统能够运行该可执行文件) - 可以使用
etcd --version
查看版本
关于 ETCD 的命令就不在此做过的分享了,今天主要是分享 GO 如何 使用 ETCD
包的安装
本次我们使用的是 ETCD 的 clientv3
包 ,我们执行如下命令即可正确安装 ETCD
go get go.etcd.io/etcd/clientv3
无论你是直接执行上面的命令, 还是通过 go mod 的方式,去下载 ETCD 的 clientv3
包, 可能会出现如下问题:
/root/go/pkg/mod/github.com/coreos/etcd@v3.3.25+incompatible/clientv3/balancer/picker/roundrobin_balanced.go:55:54: undefined: balancer.PickOptions # github.com/coreos/etcd/clientv3/balancer/resolver/endpoint /root/go/pkg/mod/github.com/coreos/etcd@v3.3.25+incompatible/clientv3/balancer/resolver/endpoint/endpoint.go:114:78: undefined: resolver.BuildOption /root/go/pkg/mod/github.com/coreos/etcd@v3.3.25+incompatible/clientv3/balancer/resolver/endpoint/endpoint.go:182:31: undefined: resolver.ResolveNowOption
如上问题,是因为包冲突了 ,我们只需要将如下替换包的命令放到 我们 go.mod
下面即可
replace google.golang.org/grpc => google.golang.org/grpc v1.26.0
例如我的 go.mod 是这样的
module my_etcd go 1.15 require ( github.com/coreos/etcd v3.3.25+incompatible // indirect github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/uuid v1.2.0 // indirect go.etcd.io/etcd v3.3.25+incompatible go.uber.org/zap v1.17.0 // indirect google.golang.org/grpc v1.38.0 // indirect ) replace google.golang.org/grpc => google.golang.org/grpc v1.26.0
这里顺便插一句, go mod进行包管理的方式从 GO 1.14之后就开始有了,go mod管理包非常方便,这里简单分享一下如何使用
- 在和 main.go 的同级目录下,初始化一个go mod,执行如下命令
shell
复制代码
go mod init xxx
- 写好我们的代码在
main.go
文件中 , 即可在main.go
的同级目录下执行 go build 进行编译 go程序 - 若编译出现上述问题,那么就可以在 生成的go.mod 文件中 加入上述替换包的语句即可
包安装好了,我们可以开始进行编码了
ETCD 的 设置 KEY 和获取 KEY 操作
ETCD 的默认端口是这样的:
2379
端口
提供 HTTP API 服务
2380
端口
用来与 peer 通信
我们开始写一个 GET 和 PUT KEY 的DEMO
package main import ( "context" "log" "time" "go.etcd.io/etcd/clientv3" ) func main() { // 设置 log 参数 ,打印当前时间 和 当前行数 log.SetFlags(log.Ltime | log.Llongfile) // ETCD 默认端口号是 2379 // 使用 ETCD 的 clientv3 包 client, err := clientv3.New(clientv3.Config{ Endpoints: []string{"127.0.0.1:2379"}, //超时时间 10 秒 DialTimeout: 10 * time.Second, }) if err != nil { log.Printf("connect to etcd error : %v\n", err) return } log.Printf("connect to etcd successfully ...") // defer 最后关闭 连接 defer client.Close() // PUT KEY 为 name , value 为 xiaomotong ctx, cancel := context.WithTimeout(context.Background(), time.Second) _, err = client.Put(ctx, "name", "xiaomotong") cancel() if err != nil { log.Printf("PUT key to etcd error : %v\n", err) return } // 获取ETCD 的KEY ctx, cancel = context.WithTimeout(context.Background(), time.Second) resp, err := client.Get(ctx, "name") cancel() if err != nil { log.Printf("GET key-value from etcd error : %v\n", err) return } // 遍历读出 KEY 和对应的 value for _, ev := range resp.Kvs { log.Printf("%s : %s\n", ev.Key, ev.Value) } }
感兴趣的小伙伴可以将上述代码拷贝到你的环境中进行运行,即可看到你想要的答案
ETCD 的 WATCH操 作
WATCH操作就是拍一个哨兵监控某一个key对应值的变化,包括新增,删除,修改
func main() { // 设置 log 参数 ,打印当前时间 和 当前行数 log.SetFlags(log.Ltime | log.Llongfile) // ETCD 默认端口号是 2379 // 使用 ETCD 的 clientv3 包 client, err := clientv3.New(clientv3.Config{ Endpoints: []string{"127.0.0.1:2379"}, DialTimeout: 10 * time.Second, }) if err != nil { log.Printf("connect to etcd error : %v\n", err) return } log.Printf("connect to etcd successfully ...") defer client.Close() // 派一个哨兵 一直监视 name 的变化 // respCh 是一个通道 respCh := client.Watch(context.Background(), "name") // 若 respCh 为空,会阻塞在这里 for watchResp := range respCh { for _, v := range watchResp.Events { log.Printf("type = %s , Key = %s , Value = %s\n", v.Type, v.Kv.Key, v.Kv.Value) } } }
上述代码因为 respCh
是一个通道,若里面没有数据的话,下面的 for 循环,会阻塞的等,因此需要我们自己在终端上面模拟 新增,删除,修改 name 对应的值,那么,我们的程序就会做出对应的相应
例如,我在终端命令中敲入:etcdctl --endpoints=http://127.0.0.1:2379 put name "xiaomotong"
那么,我们上述代码运行的程序就会输出如下语句
./my_etcd 22:18:39 /home/xiaomotong/my_etcd/main.go:23: connect to etcd successfully ... 22:18:43 /home/xiaomotong/my_etcd/main.go:31:type = PUT , Key = name , Value = xiaomotong
ETCD 的 LEASE 操作
LEASE ,租约,就是将自己的某一个 key 设置一个有效时间 / 过期时间,类似于 REDIS
里面的 SETNX
func main() { // 设置 log 参数 ,打印当前时间 和 当前行数 log.SetFlags(log.Ltime | log.Llongfile) // ETCD 默认端口号是 2379 // 使用 ETCD 的 clientv3 包 client, err := clientv3.New(clientv3.Config{ Endpoints: []string{"127.0.0.1:2379"}, DialTimeout: 10 * time.Second, }) if err != nil { log.Printf("connect to etcd error : %v\n", err) return } log.Printf("connect to etcd successfully ...") defer client.Close() // 我们创建一个 20秒钟的租约 resp, err := client.Grant(context.TODO(), 20) if err != nil { log.Printf("client.Grant error : %v\n", err) return } // 20秒钟之后, /name 这个key就会被移除 _, err = client.Put(context.TODO(), "/name", "xiaomotong", clientv3.WithLease(resp.ID)) if err != nil { log.Printf("client.Put error : %v\n", err) return } }
上述 name , 20 秒钟之后 就会自动失效
ETCD 的保活操作
顺便说一下,keepalived
也是一个开源的组件,用作高可用,感兴趣的可以深入了解一下
此处的 keepalived
是 保活, 这里是 ETCD 的保活, 可以在上述代码中做一个调整,上述的 name ,不失效
func main() { // 设置 log 参数 ,打印当前时间 和 当前行数 log.SetFlags(log.Ltime | log.Llongfile) // ETCD 默认端口号是 2379 // 使用 ETCD 的 clientv3 包 client, err := clientv3.New(clientv3.Config{ Endpoints: []string{"127.0.0.1:2379"}, DialTimeout: 10 * time.Second, }) if err != nil { log.Printf("connect to etcd error : %v\n", err) return } log.Printf("connect to etcd successfully ...") defer client.Close() // 我们创建一个 20秒钟的租约 resp, err := client.Grant(context.TODO(), 20) if err != nil { log.Printf("client.Grant error : %v\n", err) return } // 20秒钟之后, /name 这个key就会被移除 _, err = client.Put(context.TODO(), "/name", "xiaomotong", clientv3.WithLease(resp.ID)) if err != nil { log.Printf("client.Put error : %v\n", err) return } // 这个key name ,将永久被保存 ch, kaerr := client.KeepAlive(context.TODO(), resp.ID) if kaerr != nil { log.Fatal(kaerr) } for { ka := <-ch log.Println("ttl:", ka.TTL) } }
咱可以看看 keepalived
的官方说明 ,
KeepAlive
使给定的租约永远存活。如果发送到通道的 keepalive
响应没有立即被使用,租期客户端将至少每秒钟继续向 etcd
服务器发送keepalive
请求,直到使用最新的响应。
// KeepAlive keeps the given lease alive forever. If the keepalive response // posted to the channel is not consumed immediately, the lease client will // continue sending keep alive requests to the etcd server at least every // second until latest response is consumed. // // The returned "LeaseKeepAliveResponse" channel closes if underlying keep // alive stream is interrupted in some way the client cannot handle itself; // given context "ctx" is canceled or timed out. "LeaseKeepAliveResponse" // from this closed channel is nil. // // If client keep alive loop halts with an unexpected error (e.g. "etcdserver: // no leader") or canceled by the caller (e.g. context.Canceled), the error // is returned. Otherwise, it retries. // // TODO(v4.0): post errors to last keep alive message before closing // (see https://github.com/coreos/etcd/pull/7866) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
来看看 ETCD 的分布式锁实现
这里需要引入一个新的包,"github.com/coreos/etcd/clientv3/concurrency"
不过使用go mod
管理方式的小伙伴就不用操心了, 写完代码,直接 go build
,GO 工具会直接帮我们下载相关包,并编译好
Go 这一点真的相当不戳
package main import ( "context" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3/concurrency" "log" ) func main (){ // 设置 log 参数 ,打印当前时间 和 当前行数 log.SetFlags(log.Ltime | log.Llongfile) // ETCD 默认端口号是 2379 // 使用 ETCD 的 clientv3 包 // Endpoints 需填入 url 列表 client, err := clientv3.New(clientv3.Config{Endpoints: []string{"/name"}}) if err != nil { log.Printf("connect to etcd error : %v\n", err) return } defer client.Close() // 创建第一个 会话 session1, err := concurrency.NewSession(client) if err != nil { log.Printf("concurrency.NewSession 1 error : %v\n", err) return } defer session1.Close() // 设置锁 myMu1 := concurrency.NewMutex(session1, "/lock") // 创建第二个 会话 session2, err := concurrency.NewSession(client) if err != nil { log.Printf("concurrency.NewSession 2 error : %v\n", err) return } defer session2.Close() // 设置锁 myMu2 := concurrency.NewMutex(session2, "/lock") // 会话s1获取锁 if err := myMu1.Lock(context.TODO()); err != nil { log.Printf("myMu1.Lock error : %v\n", err) return } log.Println("Get session1 lock ") m2Chan := make(chan struct{}) go func() { defer close(m2Chan) // 如果加锁不成功会阻塞,知道加锁成功为止 // 这里是使用一个通道的方式来通信 // 当 myMu2 能加锁成功,说明myMu1 解锁成功 // 当 myMu2 加锁成功的时候,会关闭 通道 // 关闭通道,从通道中读出来的就是nil if err := myMu2.Lock(context.TODO()); err != nil { log.Printf("myMu2.Lock error : %v\n", err) return } }() // 解锁 if err := myMu1.Unlock(context.TODO()); err != nil { log.Printf("myMu1.Unlock error : %v\n", err) return } log.Println("Release session1 lock ") // 读取到nil <-m2Chan log.Println("Get session2 lock") }
在上述代码中,我们创建 2 个会话来模拟分布式锁
我们先让第 1 个会话拿到锁, 并且第 2 个会话会去尝试加锁
当 第 2个会话,正确加锁成功的时候, 会关闭一个通道,来确认自己真的加到锁了
上述第 2 个会话加锁的逻辑如下:
- 如果加锁不成功会阻塞,知道加锁成功为止
- 这里是使用一个通道的方式来通信
- 当
myMu2
能加锁成功,说明myMu1
解锁成功 - 当
myMu2
加锁成功的时候,会关闭m2Chan
通道 - 关闭通道,从
m2Chan
通道中读出来的就是nil , 确认会话 2 加锁成功
总结
- 分享了ETCD的简单单点部署,ETCD 使用到的包安装,以及会遇到的问题
- ETCD 的设置 和 获取KEY
- ETCD 的WATCH 监控 KEY的简化
- ETCD 的租约 和保活机制
- ETCD 的分布式锁的简单实现
如上的编码案例,大家可以拿下来自己运行看看效果,一起学习,一起进步
若想更多的深入了解和学习,可以看文章最开始说到的官方文档,官方文档中的案例更加详尽
具体的源码也是非常详细的,就怕你学不会
欢迎点赞,关注,收藏
朋友们,你的支持和鼓励,是我坚持分享,提高质量的动力
好了,本次就到这里
技术是开放的,我们的心态,更应是开放的。拥抱变化,向阳而生,努力向前行。
我是阿兵云原生,欢迎点赞关注收藏,下次见~