etcd 实战基础篇(一)

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: etcd 实战基础篇(一)

Etcd 是什么


Etcd 是由 Go 编写的。它是一个强一致性的分布式键值存储,提供一种可靠的方式来存储需要由分布式系统或者机器集群访问的数据。 同时 Etcd 各节点中的通信是通过 Raft 一致性算法来处理的。 有很多大型开源项目的底层都基于 Etcd,举几个比较有名的工业级项目:kubernetes、 CoreDNS、ROOK......


Etcd 的场景


  • 服务发现。(可以把服务存储到某个 prefix 开头的 key中,然后消费端或者服务信息以调用, 同时消费者也可以通过 watch 获得 key 的变化)
  • 消息分布和订阅
  • 分布式锁
  • Leader 选举
  • 分布式队列
  • 负载均衡
  • ......


和 redis 的区别


面试的时候可能有面试官喜欢问:


  • redis 的数据类型更丰富(string, hash, set ,zset, list),etcd 仅仅就是 key-val。
  • etcd 的底层是 Raft 算法,可以保证数据的强一致性。而 redis 数据复制上是主备异步复制,只能最终一致性。
  • 读写性能上,因为 etcd 保证强一致性,所以会比 redis 差。
  • 存储方面,etcd 使用的是持久化存储boltdb,而 redis 的方案是可持久化的 aof/rdb 。


环境与说明


直接下载编译好的二进制文件也好,还是自己下载源码编译运行,先开启一个单节点服务就行。我本地使用 goreman 搭建了三个实例。 

1668503656102.jpg

这里稍微说明一下:PEER ADDRS 指的是向其他 etcd server 暴露的通信地址,比如上图 name=infra1 要调用 infra2调用的就是 http://127.0.0.1:22380 CLIENT ADDRS 是对客户端暴露的地址。比如接下来我们的客户端连接的是 infra1使用的就是 http://127.0.0.1:2379

目前网上的教程大多使用编译好的 etcdctl 这样的二进制文件,通过命令行来进行操作,简单直观。比如:

1668503673684.jpg

但是也会导致一个问题,你并不知道客户端底层是如何运行的,这中间又涉及了哪些接口,对应的数据结构是什么样的。 所以为了一步步深入 etcd,我们从代码层面操作 etcd 客户端。


初始化 etcd 客户端


我们先初始化一个 etcd 客户端。

var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address")
var cli *clientv3.Client
// 初始化etcd 客户端
func init() {
  flag.Parse()
  var err error
  // 解析etcd的地址,编程[]string
  endpoints := strings.Split(*addr, ",")
  // 创建一个 etcd 的客户端
  cli, err = clientv3.New(clientv3.Config{Endpoints: endpoints,
    DialTimeout: 5 * time.Second})
  if err != nil {
    fmt.Printf("初始化客户端失败:%v\\n", err)
    log.Fatal(err)
  }
}


put 操作


命令行 etcdctl put key val对应操作。

// 设置key
func PutKey(key string, value string) {
  var err error
  var resp *clientv3.PutResponse
  resp, err = cli.Put(context.Background(), key, value)
  if err != nil {
    fmt.Printf("设置 key 失败:%v\\n", err)
    return
  }
  fmt.Printf("操作结果:%v\\n", resp)
}


除了简单的设置,我们还有一种租约模式,也就是设置一个key的有效期,在有效期之内可以进行续租,如果没续租到期就过期。 对应的命令行是分两段:

etcdctl lease grant 200
// lease 326978bac638650a granted with TTL(200s)
etcdctl put hello world --lease=326978bac638650a


对应操作,

// 设置会过期的key
func PutKeyLease(key string, value string, ttl int64) {
  var err error
  var resp *clientv3.PutResponse
  // 创建一个租约对象
  var lease clientv3.Lease
  lease = clientv3.NewLease(cli)
  var leaseResp *clientv3.LeaseGrantResponse
  // 根据时间,生成一个租约
  leaseResp, err = lease.Grant(context.Background(), ttl)
  if err != nil {
    fmt.Printf("设置 租约 失败:%v\\n", err)
  }
  resp, err = cli.Put(context.Background(), key, value, clientv3.WithLease(leaseResp.ID))
  if err != nil {
    fmt.Printf("设置 key 失败:%v\\n", err)
    return
  }
  fmt.Printf("操作结果:%v\\n", resp)
}


etcd 的租约模式,简单的说, 当 Lease server 收到 client 请求,比如上面创建一个有效期200秒的请求,会通过 Raft 模块完成日志同步, 随后 Apply 模块的 Grant 接口执行日志条目内容。这是后续我们要研究的,这里略微提一下。


首先你得创建一个 Lease(租约),获取到一个 Lease 唯一id,然后 put 的时候带上这个 id。当一个 key 指定一个 Lease 的时候, 底层最终是会把这个 key 关联到 Lease 的内存集合中。所以本质上,一个 Lease 可以 关联 n 个 key。而我们平常使用的缓存 key 设置过期时间,一般是把 key 和过期时间一对一绑定。可能有人还要问,Lease 到期了是如何删除掉关联的 key?


其实原理解释起来也很简单。Lease 在底层存储的结构是堆。由一个异步的 G 专门负责的去淘汰过期的 Lease。定时从最小堆中取出已经到期的 Lease。 然后删除 Lease 以及 删除通过 LeaseId 关联上此 Lease 的 key 列表。后面我们分析源码的时候专门讨论这块。


这里我还要说一点,你可以看到,不管是 put 一个普通的 key,还是一个带有租约的 key,调用的都是同一个方法。

// 普通的
  resp, err = cli.Put(context.Background(), key, value)
// 租约
  resp, err = cli.Put(context.Background(), key, value, clientv3.WithLease(leaseResp.ID))
// 源码里面
type OpOption func(*Op)
func WithLease(leaseID LeaseID) OpOption {
  return func(op *Op) { op.leaseID = leaseID }
}
func (op *Op) applyOpts(opts []OpOption) {
  for _, opt := range opts {
    opt(op)
  }
}

看出来了吗?一个很常见的设计模式,装饰器。


Get 操作


命令行etcdctl get key对应操作,

func GetKey(key string) {
  var err error
  var res *clientv3.GetResponse
  res, err = cli.Get(context.Background(), key)
  if err != nil {
    fmt.Printf("获取 key 失败 :%v\\n", err)
    return
  }
  fmt.Printf("key %v 的值是:%+v\\n", key, res)
}


我们都知道,etcd从 v3 开始,底层实现了 MVCC 机制。所以在 etcd 中的 key 是存在多个历史版本的。

我们会在命令行中操作etcdctl get hello --rev=?比如

 1668503758707.jpg

可以看到,不同版本的 key("hello")的值是不一样的。

// 获取指定版本的key
func GetKeyByVersion(key string, version int64) {
  var err error
  var res *clientv3.GetResponse
  res, err = cli.Get(context.Background(), key, clientv3.WithRev(version))
  if err != nil {
    fmt.Printf("删除 key:%v 失败:%v", key, err)
    return
  }
  fmt.Printf("请求key:%v,请求版本:%v,获取结果:%+v\\n", key, version, res)

一样的套路。我们也可以运行这段代码演示一下

src.GetKeyByVersion("hello", 20)
src.GetKeyByVersion("hello", 21)

其他参数暂时忽略,主要看 Kvs 里面的结果


Watch 操作


命令行./etcdctl watch hello

为了避免客户端的反复轮询, etcd 提供了 event 机制。客户端可以订阅一系列的 event ,用于 watch 某些 key 。 当这些被 watch 的 key 更新时, etcd 就会通知客户端。

// 监听key 变动
func WatchKey(key string) {
  var watch clientv3.WatchChan
  watch = cli.Watch(context.Background(), key)
  for {
    res := <-watch
    fmt.Printf("key:%v变动通知:%+v\\n", key, res)
    fmt.Printf("值:%+v\\n", *res.Events[0])
  }
}


从上面这段代码看出,watch 最终是通过 channel 的方式来进行通知的。

// 开启一个 G
// go src.WatchKey("hello")


然后我们运行这段程序,在命令行上操作 hello 这个 key。

./etcdctl lease grant 30
lease 326978bac638651e granted with TTL(30s)
./etcdctl put hello world-age --lease=326978bac638651e

1668503797928.jpg

可以看到接收到两个事件,一个是 put,一个是租约到期 delete



总结


以下是这篇文章全部代码。

package src
import (
  "context"
  "flag"
  "fmt"
  "github.com/coreos/etcd/clientv3"
  "log"
  "strings"
  "time"
)
var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address")
var cli *clientv3.Client
// 初始化etcd 客户端
func init() {
  flag.Parse()
  var err error
  // 解析etcd的地址,编程[]string
  endpoints := strings.Split(*addr, ",")
  // 创建一个 etcd 的客户端
  cli, err = clientv3.New(clientv3.Config{Endpoints: endpoints,
    DialTimeout: 5 * time.Second})
  if err != nil {
    fmt.Printf("初始化客户端失败:%v\\n", err)
    log.Fatal(err)
  }
}
// 设置key
func PutKey(key string, value string) {
  var err error
  var resp *clientv3.PutResponse
  resp, err = cli.Put(context.Background(), key, value)
  if err != nil {
    fmt.Printf("设置 key 失败:%v\\n", err)
    return
  }
  fmt.Printf("操作结果:%v\\n", resp)
}
// 设置会过期的key
func PutKeyLease(key string, value string, ttl int64) {
  var err error
  var resp *clientv3.PutResponse
  // 创建一个租约对象
  var lease clientv3.Lease
  lease = clientv3.NewLease(cli)
  var leaseResp *clientv3.LeaseGrantResponse
  // 根据时间,生成一个租约
  leaseResp, err = lease.Grant(context.Background(), ttl)
  if err != nil {
    fmt.Printf("设置 租约 失败:%v\\n", err)
  }
  resp, err = cli.Put(context.Background(), key, value, clientv3.WithLease(leaseResp.ID))
  if err != nil {
    fmt.Printf("设置 key 失败:%v\\n", err)
    return
  }
  fmt.Printf("操作结果:%v\\n", resp)
}
// 获取key
func GetKey(key string) {
  var err error
  var res *clientv3.GetResponse
  res, err = cli.Get(context.Background(), key)
  if err != nil {
    fmt.Printf("获取 key 失败 :%v\\n", err)
    return
  }
  fmt.Printf("key %v 的值是:%+v\\n", key, res)
}
// 获取指定版本的key
func GetKeyByVersion(key string, version int64) {
  var err error
  var res *clientv3.GetResponse
  res, err = cli.Get(context.Background(), key, clientv3.WithRev(version))
  if err != nil {
    fmt.Printf("删除 key:%v 失败:%v", key, err)
    return
  }
  fmt.Printf("请求key:%v,请求版本:%v,获取结果:%+v\\n", key, version, res)
}
// 删除key
func DeleteKey(key string) {
  var err error
  var res *clientv3.DeleteResponse
  res, err = cli.Delete(context.Background(), key)
  if err != nil {
    fmt.Printf("删除 key:%v 失败:%v", key, err)
    return
  }
  fmt.Printf("操作结果:%+v\\n", res)
}
// 监听key 变动
func WatchKey(key string) {
  var watch clientv3.WatchChan
  watch = cli.Watch(context.Background(), key, clientv3.WithRev(21))
  for {
    res := <-watch
    fmt.Printf("key:%v变动通知:%+v\\n", key, res)
    fmt.Printf("值:%+v\\n", *res.Events[0])
  }
}

这篇文章主要介绍了etcd这个分布式存储工具,包括它的应用场景以及实战基本的操作。 上面其实还有很多的实例没有写出来,一个是因为懒,没必要一个个演示一遍,另一个原因是留给你们自行实现。 我们以这个为开始,一步步敲开 etcd 的大门。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
Kubernetes Linux Shell
【K8S 系列】k8s 学习二,kubernetes 核心概念及初步了解安装部署方式
【K8S 系列】k8s 学习二,kubernetes 核心概念及初步了解安装部署方式
147 0
|
存储 域名解析 缓存
|
运维
ETCD系列之一:简介
本文介绍etcd使用场景,工作原理。
76737 3
|
3月前
|
存储 监控 算法
[etcd]简介与安装
[etcd]简介与安装
|
6月前
|
存储 算法 开发者
etcd入门指南
etcd入门指南
380 4
etcd入门指南
|
6月前
|
存储 消息中间件 算法
ETCD(一)简介
ETCD(一)简介
96 0
|
数据可视化 Ubuntu Go
etcd源码分析 - 0.搭建学习etcd的环境
如果要更深入地研究etcd,就需要我们涉及到源码、并结合实践进行学习。那么,接下来,我将基于`v3.4`这个版本,做一期深入的环境搭建。
96 0
|
Kubernetes 监控 容器
etcd源码分析 - 1.【打通核心流程】etcd server的启动流程
在第一阶段,我将从主流程出发,讲述一个`PUT`指令是怎么将数据更新到`etcd server`中的。今天,我们先来看看server是怎么启动的。
173 0
|
存储 负载均衡 监控
Etcd 认识第二篇
etcd的整体架构 - 客户端层 - Clientv3/etcdctl客户端。用户通过命令行或者客户端调用api方式调用使用 - 客户端层提供负载均衡和节点间故障转移等特性
199 0
分布式学习十四:etcd实现服务注册/发现
分布式学习十四:etcd实现服务注册/发现
96 0
分布式学习十四:etcd实现服务注册/发现