来源自etcd 笔记(09)— 基于 etcd 实现微服务的注册与发现_serviceregistry": { "type": "etcd", "endpoints":-CSDN博客
服务注册
go
复制代码
package main import ( "context" "fmt" "time" "github.com/coreos/etcd/clientv3" ) // 服务注册对象 type ServiceRegister struct { client *clientv3.Client kv clientv3.KV lease clientv3.Lease canclefunc func() key string leaseResp *clientv3.LeaseGrantResponse keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse } // 初始化注册服务 func InitService(host []string, timeSeconds int64) (*ServiceRegister, error) { config := clientv3.Config{ Endpoints: host, DialTimeout: 5 * time.Second, } client, err := clientv3.New(config) if err != nil { fmt.Printf("create connection etcd failed %s\n", err) return nil, err } // 得到KV和Lease的API子集 kv := clientv3.NewKV(client) lease := clientv3.NewLease(client) service := &ServiceRegister{ client: client, kv: kv, lease: lease, } return service, nil } // 设置租约 func (s *ServiceRegister) setLease(timeSeconds int64) error { leaseResp, err := s.lease.Grant(context.TODO(), timeSeconds) if err != nil { fmt.Printf("create lease failed %s\n", err) return err } // 设置续租 ctx, cancelFunc := context.WithCancel(context.TODO()) leaseRespChan, err := s.lease.KeepAlive(ctx, leaseResp.ID) if err != nil { fmt.Printf("KeepAlive failed %s\n", err) return err } s.leaseResp = leaseResp s.canclefunc = cancelFunc s.keepAliveChan = leaseRespChan return nil } // 监听续租情况 func (s *ServiceRegister) ListenLeaseRespChan() { for { select { case leaseKeepResp := <-s.keepAliveChan: if leaseKeepResp == nil { fmt.Println("续租功能已经关闭") return } else { fmt.Println("续租成功") } } } } // 通过租约注册服务 func (s *ServiceRegister) PutService(key, val string) error { fmt.Printf("PutService key <%s> val <%s>\n", key, val) _, err := s.kv.Put(context.TODO(), key, val, clientv3.WithLease(s.leaseResp.ID)) return err } // 撤销租约 func (s *ServiceRegister) RevokeLease() error { s.canclefunc() time.Sleep(2 * time.Second) _, err := s.lease.Revoke(context.TODO(), s.leaseResp.ID) return err } func main() { service, _ := InitService([]string{"127.0.0.1:12379", "127.0.0.1:22379", "127.0.0.1:32379"}, 5) service.setLease(10) defer service.RevokeLease() go service.ListenLeaseRespChan() err := service.PutService("/wohu", "http://localhost:8080") if err != nil { fmt.Printf("PutService failed %s\n", err) } // 使得程序阻塞运行,便于观察输出结果 select {} }
服务发现
go
复制代码
package main import ( "context" "fmt" "sync" "time" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/mvcc/mvccpb" ) // 客户端对象 type Client struct { client *clientv3.Client kv clientv3.KV lease clientv3.Lease watch clientv3.Watcher serverList map[string]string lock sync.Mutex } // 初始化客户端对象 func InitClient(addr []string) (*Client, error) { conf := clientv3.Config{ Endpoints: addr, DialTimeout: 5 * time.Second, } client, err := clientv3.New(conf) if err != nil { fmt.Printf("create connection etcd failed %s\n", err) return nil, err } // 得到 KV 、Lease、 Watcher 的API子集 kv := clientv3.NewKV(client) lease := clientv3.NewLease(client) watch := clientv3.NewWatcher(client) // 给客户端对象赋值 c := &Client{ client: client, kv: kv, lease: lease, watch: watch, serverList: make(map[string]string), } return c, nil } // 根据注册的服务名,获取服务实例的信息 func (c *Client) getServiceByName(prefix string) ([]string, error) { // 读取的时候带有 WithPrefix 选项,所以会读取该前缀所有的字段值 resp, err := c.kv.Get(context.Background(), prefix, clientv3.WithPrefix()) if err != nil { fmt.Printf("getServiceByName failed %s\n", err) return nil, err } // 返回的 resp 是多个字段值。需要遍历提取对应的 key value addrs := c.extractAddrs(resp) return addrs, nil } // 根据 etcd 的响应,提取服务实例的数组 func (c *Client) extractAddrs(resp *clientv3.GetResponse) []string { addrs := make([]string, 0) if resp == nil || resp.Kvs == nil { return addrs } for i := range resp.Kvs { if v := resp.Kvs[i].Value; v != nil { // 将 key value 值保存在 ServiceList 表中 c.SetServiceList(string(resp.Kvs[i].Key), string(resp.Kvs[i].Value)) addrs = append(addrs, string(v)) } } return addrs } // 设置 serverList func (c *Client) SetServiceList(key, val string) { c.lock.Lock() defer c.lock.Unlock() // serverList 为初始化设置的本地 map 对象,由于考虑到多个 client 运行,所以需要加锁控制 c.serverList[key] = string(val) fmt.Println("set data key :", key, "val:", val) } // 删除本地缓存的服务实例信息 func (c *Client) DelServiceList(key string) { c.lock.Lock() defer c.lock.Unlock() delete(c.serverList, key) fmt.Println("del data key:", key) newRes, err := c.getServiceByName(key) if err != nil { fmt.Printf("getServiceByName failed %s\n", err) } else { fmt.Printf("get key %s", key, " current val is: %v\n", newRes) } } // 获取服务实例信息 func (c *Client) GetService(prefix string) ([]string, error) { if addrs, err := c.getServiceByName(prefix); err != nil { panic(err) } else { fmt.Println("get service ", prefix, " for instance list: ", addrs) go c.watcher(prefix) return addrs, nil } } // 监控指定键值对的变更 func (c *Client) watcher(prefix string) { watchRespChan := c.watch.Watch(context.Background(), prefix, clientv3.WithPrefix()) for watchResp := range watchRespChan { for _, event := range watchResp.Events { switch event.Type { case mvccpb.PUT: // 写入的事件 c.SetServiceList(string(event.Kv.Key), string(event.Kv.Value)) case mvccpb.DELETE: // 删除的事件 c.DelServiceList(string(event.Kv.Key)) } } } } func main() { /* 先创建 etcd 连接,构建 Client 对象,随后获取指定的服务 /wohu 实例信息; 最后监测 wohu 服务实例的变更事件,根据不同的事件产生不同的行为。 */ c, _ := InitClient([]string{"127.0.0.1:12379", "127.0.0.1:22379", "127.0.0.1:32379"}) c.GetService("/wohu") // 使得程序阻塞运行,模拟服务的持续运行 select {} }