Hello朋友们,在之前参加云原生活动的时候曾写过一篇文章《浅谈云原生技术组件—etcd》,在其中我主要说明了etcd在基于Kubernetes云原生微服务框架中的定位,主要是用来做服务的远程配置、KV存储等等,那么今天就来简要的补充讲解下etcd的另一个重要的作用——服务注册和发现,没错,正是和Zookeeper、Eureka、Consul等拥有一样角色的开源微服务组件,且毫不逊色于这些,那么我们就开始进行讲解。
1 基于etcd的服务注册与发现逻辑架构
1.1 服务注册中心抽象
(图片来自网络)
- Service Registry(服务注册表,通常也成为服务注册中心):内部拥有一个数据结构,用于存储已发布服务的配置信息。注册中心的作用一句话概括就是存放和调度服务的配置,实现服务和注册中心,服务和服务之间的相互通信,可以说是微服务中的”通讯录“,它记录了服务和服务地址的映射关系。
- Service Requestor(服务调用者):根据服务注册中心调用已有服务。
- Service Provider(服务提供者):提供服务到服务注册中心。
1.2 etcd服务注册发现简易版
2 代码实现
2.1 总体流程
服务提供者:
(1)监听网络
(2)创建gRPC服务端,并将具体的服务进行注册
(3)利用服务地址、服务名等注册etcd服务配置
(4)gRPC监听服务
服务消费者:
(1)注册etcd解析器
(2)连接etcd服务
(3)获取gRPC客户端
(4)调用gRPC服务
2.2 代码
2.2.1 服务提供方
var ( cli *clientv3.Client Schema = "ns" Host = "127.0.0.1" Port = 3000 //端口 ServiceName = "api_log_service" //服务名称 EtcdAddr = "127.0.0.1:2379" //etcd地址 ) type ApiLogServer struct{} func (api *ApiLogServer) GetApiLogByUid(ctx context.Context, req *proto.ApiLogRequest) (*proto.ApiLogResponse, error) { resp := &proto.ApiLogResponse{ Msg: "ok", Data: "Hello", } return resp, nil } //将服务地址注册到etcd中 func register(etcdAddr, serviceName, serverAddr string, ttl int64) error { var err error if cli == nil { cli, err = clientv3.New(clientv3.Config{ Endpoints: strings.Split(etcdAddr, ";"), DialTimeout: 50 * time.Second, }) if err != nil { fmt.Printf("connection server err : %s\n", err) return err } } //与etcd建立长连接,并保证连接不断(心跳检测) ticker := time.NewTicker(time.Second * time.Duration(ttl)) go func() { key := "/" + Schema + "/" + serviceName + "/" + serverAddr for { resp, err := cli.Get(context.Background(), key) if err != nil { fmt.Printf("get server address err : %s", err) } else if resp.Count == 0 { //尚未注册 err = keepAlive(serviceName, serverAddr, ttl) if err != nil { fmt.Printf("keepAlive err : %s", err) } } <-ticker.C } }() return nil } //保持服务器与etcd的长连接 func keepAlive(serviceName, serverAddr string, ttl int64) error { //创建租约 leaseResp, err := cli.Grant(context.Background(), ttl) if err != nil { fmt.Printf("create grant err : %s\n", err) return err } //将服务地址注册到etcd中 key := "/" + Schema + "/" + serviceName + "/" + serverAddr _, err = cli.Put(context.Background(), key, serverAddr, clientv3.WithLease(leaseResp.ID)) if err != nil { fmt.Printf("register service err : %s", err) return err } //建立长连接 ch, err := cli.KeepAlive(context.Background(), leaseResp.ID) if err != nil { fmt.Printf("KeepAlive err : %s\n", err) return err } //清空keepAlive返回的channel go func() { for { <-ch } }() return nil } //取消注册 func unRegister(serviceName, serverAddr string) { if cli != nil { key := "/" + Schema + "/" + serviceName + "/" + serverAddr cli.Delete(context.Background(), key) } } func RunApiLog() { //监听网络 listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", Port)) if err != nil { fmt.Println("Listen network err :", err) return } defer listener.Close() //创建grpc srv := grpc.NewServer() defer srv.GracefulStop() //注册到grpc服务中 proto.RegisterApiLogServiceServer(srv, &ApiLogServer{}) //将服务地址注册到etcd中 serverAddr := fmt.Sprintf("%s:%d", Host, Port) fmt.Printf("rpc server address: %s\n", serverAddr) register(EtcdAddr, ServiceName, serverAddr, 10) //关闭信号处理 ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT) go func() { s := <-ch unRegister(ServiceName, serverAddr) if i, ok := s.(syscall.Signal); ok { os.Exit(int(i)) } else { os.Exit(0) } }() //监听服务 err = srv.Serve(listener) if err != nil { fmt.Println("rpc server err : ", err) return } }
2.2.2 服务消费方
var ( cli *clientv3.Client Schema = "ns" ServiceName = "api_log_service" //服务名称 EtcdAddr = "127.0.0.1:2379" //etcd地址 ) type EtcdResolver struct { etcdAddr string clientConn resolver.ClientConn } func NewEtcdResolver(etcdAddr string) resolver.Builder { return &EtcdResolver{etcdAddr: etcdAddr} } func (r *EtcdResolver) Scheme() string { return Schema } //ResolveNow watch有变化调用 func (r *EtcdResolver) ResolveNow(rn resolver.ResolveNowOptions) { fmt.Println(rn) } //Close 解析器关闭时调用 func (r *EtcdResolver) Close() { fmt.Println("Close") } //Build 构建解析器 grpc.Dial()时调用 func (r *EtcdResolver) Build(target resolver.Target, clientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { var err error //构建etcd client if cli == nil { cli, err = clientv3.New(clientv3.Config{ Endpoints: strings.Split(r.etcdAddr, ";"), DialTimeout: 15 * time.Second, }) if err != nil { fmt.Printf("connect etcd err : %s\n", err) return nil, err } } r.clientConn = clientConn go r.watch("/" + target.Scheme + "/" + target.Endpoint + "/") return r, nil } //watch机制:监听etcd中某个key前缀的服务地址列表的变化 func (r *EtcdResolver) watch(keyPrefix string) { //初始化服务地址列表 var addrList []resolver.Address resp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix()) if err != nil { fmt.Println("get service list err : ", err) } else { for i := range resp.Kvs { addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(resp.Kvs[i].Key), keyPrefix)}) } } r.clientConn.NewAddress(addrList) //监听服务地址列表的变化 rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix()) for n := range rch { for _, ev := range n.Events { addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix) switch ev.Type { case mvccpb.PUT: if !exists(addrList, addr) { addrList = append(addrList, resolver.Address{Addr: addr}) r.clientConn.NewAddress(addrList) } case mvccpb.DELETE: if s, ok := remove(addrList, addr); ok { addrList = s r.clientConn.NewAddress(addrList) } } } } } func exists(l []resolver.Address, addr string) bool { for i := range l { if l[i].Addr == addr { return true } } return false } func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) { for i := range s { if s[i].Addr == addr { s[i] = s[len(s)-1] return s[:len(s)-1], true } } return nil, false } func RunClient() { //注册etcd解析器 r := NewEtcdResolver(EtcdAddr) resolver.Register(r) //连接服务器,同步调用r.Build() conn, err := grpc.Dial(r.Scheme()+"://author/"+ServiceName, grpc.WithBalancerName("round_robin"), grpc.WithInsecure()) if err != nil { fmt.Printf("connect err : %s", err) } defer conn.Close() //获得gRPC客户端 c := proto.NewApiLogServiceClient(conn) //调用服务 resp, err := c.GetApiLogByUid( context.Background(), &proto.ApiLogRequest{UId: 0}, ) if err != nil { fmt.Printf("call service err : %s", err) return } fmt.Printf("resp : %s , data : %s", resp.Msg, resp.Data) }
2.2.3 公共组件
syntax = "proto3"; package proto; option go_package = "../api_log"; service ApiLogService { rpc GetApiLogByUid(ApiLogRequest) returns (ApiLogResponse){} } message ApiLogRequest{ int32 u_id = 1; } message ApiLogResponse{ int64 code = 1; string msg = 2; int64 count = 3; string data = 4; }
注意要在编译后进行使用哈
2.3 注意事项
在我编写代码进行实现的过程中遇到过种种问题,但是最让人记忆深刻的就是etcd与gRPC版本不兼容的问题,用了很长时间才搞定,在这里记录下吧:
原因是etcd3.x版本不支持grpc1.27版本以上,但是grpc1.27以下编译成的中间代码又不支持新版本的proto buffer,这就陷入了一个两难的处境,最后通过Stack Overflow才查到:
https://stackoverflow.com/questions/64815927/undefined-grpc-clientconninterface-when-compiling-grpc
解决,在go.mod中加入这几行代码:
replace ( github.com/coreos/etcd => github.com/ozonru/etcd v3.3.20-grpc1.27-origmodule+incompatible google.golang.org/grpc => google.golang.org/grpc v1.27.0 )
3 细节剖析
3.1 服务生产端keepAlive
keepAlive是一个老生常谈的问题了,下到TCP/IP、HTTP连接,上到Redis集群、MySQL集群,都会有该机制,那么etcd的keepAlive是怎么搞的呢?
下面我们来看下:
etcd使用LeaseKeepAlive API调用创建的双向流来刷新租约。当客户端希望刷新租约时,它通过流发送一个leasekeepaliverrequest:
message LeaseKeepAliveRequest { int64 ID = 1; }
- ID :keepAlive有效的租约ID。
LeaseKeepAliveResponse
作为keepAlive的响应:
message LeaseKeepAliveResponse { ResponseHeader header = 1; int64 ID = 2; int64 TTL = 3; }
- ID :用新的TTL刷新的租约。
- TTL :新的生存时间,以秒为单位,租约剩余的时间。
3.2 服务消费端watch机制
Watch API提供了一个基于事件的接口,用于异步监视服务key的更改。etcd3 watch通过持续观察给定的修订(当前的或历史的)来等待键的更改,并将键更新流回客户端。
对每个键的每次更改都用“Event”消息表示。Event消息提供了更新的数据和更新的类型:
message Event { enum EventType { PUT = 0; DELETE = 1; } EventType type = 1; KeyValue kv = 2; KeyValue prev_kv = 3; }
- type:PUT类型表示新数据的更新,DELETE表示key的删除。
- kv:与事件相关的键值PUT事件包含kv。
- prev_kv:事件发生前修改版本的密钥的键值对。为了节省带宽,它只在watch显式启用的情况下填写。
watch流:
watch是长时间运行的请求,并使用gRPC流来流化事件数据。watch流是双向的;客户端写入流来建立监视,读取流来接收监视事件。通过使用每个watch标识符来标记事件,单个watch流可以将多个不同的手表组合在一起。这种多路复用有助于减少核心etcd集群上的内存占用和连接开销。
4 总结
微服务是当今互联网领域的广泛概念,也是一种架构演进的结果,微服务的存在让架构设计更加的解耦合,让人员的分工更加明确,当然他的落地实现也并不止步与某一两种方式,在云原生领域的Kubernetes+etcd,互联网领域常用的Spring Cloud全家桶以及Dubbo等都是微服务的具体实现,而etcd也仅仅是微服务中服务注册中心组件角色的一个代表而已。
参考:
https://etcd.io/docs/v3.5/dev-guide/grpc_naming/
https://www.jianshu.com/p/217d0e3a8d0f
https://www.cnblogs.com/wujuntian/p/12838041.html
https://stackoverflow.com/questions/64815927/undefined-grpc-clientconninterface-when-compiling-grpc