etcd 实战基础篇(二)

简介: etcd 实战基础篇(二)

上一篇etcd 实战基础篇(一)我们主要介绍了 etcd 使用场景以及最基础性的一些操作(put、get、watch)。 这一篇我们接着实战etcd其他业务场景。

基于 etcd 的分布式锁


基于 etcd 实现一个分布式锁特别简单。etcd 提供了开箱即用的包 concurrency,几行代码就实现一个分布式锁。

package src
import (
  "context"
  "flag"
  "fmt"
  "github.com/coreos/etcd/clientv3"
  "github.com/coreos/etcd/clientv3/concurrency"
  "log"
  "strings"
  "time"
)
var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address")
// 初始化etcd客户端
func initEtcdClient() *clientv3.Client {
  var client *clientv3.Client
  var err error
  // 解析etcd的地址,编程[]string
  endpoints := strings.Split(*addr, ",")
  // 创建一个 etcd 的客户端
  client, err = clientv3.New(clientv3.Config{Endpoints: endpoints,
    DialTimeout: 5 * time.Second})
  if err != nil {
    fmt.Printf("初始化客户端失败:%v\\n", err)
    log.Fatal(err)
  }
  return client
}
func Lock(id int, lockName string) {
  client := initEtcdClient()
  defer client.Close()
  // 创建一个 session,如果程序宕机奔溃,etcd可以知道
  s, err := concurrency.NewSession(client)
  if err != nil {
    log.Fatal(err)
  }
  defer s.Close()
  // 创建一个etcd locker
  locker := concurrency.NewLocker(s, lockName)
  log.Printf("id:%v 尝试获取锁%v", id, lockName)
  locker.Lock()
  log.Printf("id:%v取得锁%v", id, lockName)
  // 模拟业务耗时
  time.Sleep(time.Millisecond * 300)
  locker.Unlock()
  log.Printf("id:%v释放锁%v", id, lockName)
}


我们再写个脚本运行,看看结果。


package main
import (
  "etcd-test/src"
  "sync"
)
func main() {
  var lockName = "locker-test"
  var wg sync.WaitGroup
  for i := 0; i < 10; i++ {
    wg.Add(1)
    go func(item int) {
      defer wg.Done()
      src.Lock(item, lockName)
    }(i)
  }
  wg.Wait()
}


我们发起了10个并发抢同一个 key 锁的命令。运行结果如下,

1668504025210.jpg

从图片可以看到,同一时刻一定只有一个 G 得到锁,一个 G 获取到一个锁的前提一定是当前 key 未被锁。

有人要问了,当一个锁解开时,之前未获取到锁而发生等待的客户端谁先获取到这把锁? 这个问题,我们后续分析原理的时候再揭晓。

说到分布式锁,不得不提起 redis。它有一个看似安全实际一点都不安全的分布式锁。它的命令模式是,

set key value [EX seconds] [PX milliseconds] [NX|XX]


这其中,介绍两个关键的属性:

  • EX 标示设置过期时间,单位是秒。
  • NX 表示 当对应的 key 不存在时,才创建。

我们在使用 redis 做分布式锁的时候会这么写。(代码用了包https://github.com/go-redis/redis)

func RedisLock(item int) {
  rdb = redis.NewClient(&redis.Options{
    Addr: "127.0.0.1:6379",
    Password: "",
    DB: 0,
  })
  fmt.Printf("item:%v 尝试获取锁,时间:%v\\n", item, time.Now().String())
  res, _ := rdb.SetNX(ctx, "key", "value", 2*time.Second).Result()
  if !res {
    fmt.Printf("item:%v 尝试获取锁失败\\n", item)
    return
  }
  fmt.Printf("item:%v 获取到锁,时间:%v\\n", item, time.Now().String())
  time.Sleep(1 * time.Second) //模拟业务耗时
  fmt.Printf("item:%v 释放锁,时间:%v\\n", item, time.Now().String())
  rdb.Del(ctx, "key")
}


rdb.SetNX(ctx, "key", "value", 2*time.Second)

我们规定锁的过期时间是2秒,下面有一句time.Sleep(1 * time.Second)用来模拟处理业务的耗时。业务处理结束,我们删除 keyrdb.Del(ctx, "key")

我们写个简单的脚本,

func main() {
  var wg sync.WaitGroup
  for i := 0; i < 10; i++ {
    wg.Add(1)
    go func(item int) {
      defer wg.Done()
      RedisLock(item)
    }(i)
  }
  wg.Wait()
}

我们开启十个 G 并发的调用RedisLock函数。每次调用,函数内部都会新建一个 redis 客户端,本质上是10个客户端。

运行这段程序,

1668504069687.jpg

从图中看出,同一时刻只有一个客户端获取到锁,并且在一秒的任务处理后,释放了锁,好像没太大的问题。

那么,我再写一个简单的例子。


import (
  "context"
  "fmt"
  "github.com/go-redis/redis/v8"
  "sync"
  "time"
)
var ctx = context.Background()
var rdb *redis.Client
func main() {
  var wg sync.WaitGroup
  wg.Add(2)
  go func() {
    defer wg.Done()
    ExampleLock(1, 0)
  }()
  go func() {
    defer wg.Done()
    ExampleLock(2, 5)
  }()
  wg.Wait()
}
func ExampleLock(item int, timeSleep time.Duration) {
  rdb = redis.NewClient(&redis.Options{
    Addr: "127.0.0.1:6379",
    Password: "",
    DB: 0,
  })
  if timeSleep > 0 {
    time.Sleep(time.Second * timeSleep)
  }
  fmt.Printf("item:%v 尝试获取锁,时间:%v\\n", item, time.Now().String())
  res, _ := rdb.SetNX(ctx, "key", "value", 3*time.Second).Result()
  if !res {
    fmt.Printf("item:尝试获取锁失败:%v\\n", item)
    return
  }
  fmt.Printf("item:%v 获取到锁,时间:%v\\n", item, time.Now().String())
  time.Sleep(7 * time.Second)
  fmt.Printf("item:%v 释放锁,时间:%v\\n", item, time.Now().String())
  rdb.Del(ctx, "key")
}


我们设置锁的过期时间是 3 秒,而获取锁之后的任务处理时间为 7 秒。

然后我们开启两个 G。


ExampleLock(1, 0)
ExampleLock(2, 5)


其中第二行数字5,从代码中可以看出,是指启动 G 后过5秒去获取锁。

这段代码整体流程是这样的:G(1) 获取到锁后,设置的锁持有时间是3秒,由于任务执行需要7秒的时间,因此在3秒过后锁会自动释放。G(2) 可以在第5秒的时候获取到锁,然后它执行任务也得7秒。


最后,G(1)在获取锁后7秒执行释放锁的操作,G(2)同理。

1668504086227.jpg

发现问题了吗?

G(1) 的锁在3秒后已经自动释放了。但是在任务处理结束后又执行了解锁的操作,可此时这个锁是 G(2) 的呀。

那么接下来由于 G(1) 误解了G(2) 的锁,如果此时有其他的 G,那么就可以获取到锁。

等 G(2) 任务执行结束,同理又会误解其他 G 的锁,这是一个恶性循环。 这也是掘金一篇由 redis 分布式锁造成茅台超卖重大事故的原因之一。

至于其他的,可以自行查看这篇文章[1]


基于 etcd 的分布式队列


对队列更多的理论知识就不加以介绍了。我们都知道,队列是一种先进先出的数据结构,一般也只有入队和出队两种操作。 我们常常在单机的应用中使用到队列。

那么,如何实现一个分布式的队列呢?。

我们可以使用 etcd 开箱即用的工具,在etcd底层recipe包里结构Queue实现了一个多读多写的分布式队列。


type Queue struct {
  client *v3.Client
  ctx context.Context
  keyPrefix string
}
func NewQueue(client *v3.Client, keyPrefix string) *Queue
func (q *Queue) Dequeue() (string, error)
func (q *Queue) Enqueue(val string)


我们基于此包可以很方便的实现。


package src
import (
  "github.com/coreos/etcd/clientv3"
  recipe "github.com/coreos/etcd/contrib/recipes"
  "log"
  "strconv"
  "strings"
  "sync"
  "time"
)
var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address")
// 初始化etcd客户端
func initEtcdClient() *clientv3.Client {
  var client *clientv3.Client
  var err error
  // 解析etcd的地址,编程[]string
  endpoints := strings.Split(*addr, ",")
  // 创建一个 etcd 的客户端
  client, err = clientv3.New(clientv3.Config{Endpoints: endpoints,
    DialTimeout: 5 * time.Second})
  if err != nil {
    log.Printf("初始化客户端失败:%v\\n", err)
    log.Fatal(err)
  }
  return client
}
func Push(keyName string) {
  client := initEtcdClient()
  defer client.Close()
  q := recipe.NewQueue(client, keyName)
  var wg sync.WaitGroup
  for i := 0; i < 3; i++ {
    for j := 0; j < 10; j++ {
      wg.Add(1)
      go func(item int) {
        defer wg.Done()
        err := q.Enqueue(strconv.Itoa(item))
        if err != nil {
          log.Printf("push err:%v\\n", err)
        }
      }(j)
    }
    time.Sleep(2 * time.Second)
  }
  wg.Wait()
}
func Pop(keyName string) {
  client := initEtcdClient()
  defer client.Close()
  q := recipe.NewQueue(client, keyName)
  for {
    res, err := q.Dequeue()
    if err != nil {
      log.Fatal(err)
      return
    }
    log.Printf("接收值:%v\\n", res)
  }
}

push中,我们开启3轮发送值入队,每次发送10个发送一轮休息2秒。 在pop中,通过死循环获取队列中的值。

运行脚本程序如下。


package main
import (
  "etcd-test/src"
  "time"
)
func main() {
  key := "test-queue"
  go src.Pop(key)
  time.Sleep(1 * time.Second)
  go src.Push(key)
  time.Sleep(20 * time.Second)
}

我们使用两个G代表 分别运行 pushpop操作。 同时为了达到运行效果,我们先运行 pop 等待有入队的元素。 运行结果动画如下,

0.gif

etcd提供了优先级的分布式的队列。和上面的用法相似。只是在入队的时候,不仅仅需要提供一个值,还需要提供一个整数,来表示当前push值的优先级。数值越小,优先级越高。

我们改动一下上述的代码。


package src
import (
  "github.com/coreos/etcd/clientv3"
  recipe "github.com/coreos/etcd/contrib/recipes"
  "log"
  "strconv"
  "strings"
  "sync"
  "time"
)
var addr = flag.String("addr", "http://127.0.0.1:2379", "etcd address")
// 初始化etcd客户端
func initEtcdClient() *clientv3.Client {
  var client *clientv3.Client
  var err error
  // 解析etcd的地址,编程[]string
  endpoints := strings.Split(*addr, ",")
  // 创建一个 etcd 的客户端
  client, err = clientv3.New(clientv3.Config{Endpoints: endpoints,
    DialTimeout: 5 * time.Second})
  if err != nil {
    log.Printf("初始化客户端失败:%v\\n", err)
    log.Fatal(err)
  }
  return client
}
func PriorityPush(keyName string) {
  client := initEtcdClient()
  defer client.Close()
  q := recipe.NewPriorityQueue(client, keyName)
  var wg sync.WaitGroup
  for j := 0; j < 10; j++ {
    wg.Add(1)
    go func(item int) {
      defer wg.Done()
      err := q.Enqueue(strconv.Itoa(item), uint16(item))
      if err != nil {
        log.Printf("push err:%v\\n", err)
      }
    }(j)
  }
  wg.Wait()
}
func PriorityPop(keyName string) {
  client := initEtcdClient()
  defer client.Close()
  q := recipe.NewPriorityQueue(client, keyName)
  for {
    res, err := q.Dequeue()
    if err != nil {
      log.Fatal(err)
      return
    }
    log.Printf("接收值:%v\\n", res)
  }
}


然后以下是我们的测试代码:


package main
import (
  "etcd-test/src"
  "sync"
  "time"
)
func main() {
  key := "test-queue"
  var wg sync.WaitGroup
  wg.Add(1)
  go func() {
    defer wg.Done()
    src.PriorityPush(key)
  }()
  wg.Wait()
  go src.PriorityPop(key)
  time.Sleep(20 * time.Second)
}

我们把0到9的数并发的push到队列中,对应的优先级整数值就是它本身,push完毕我们运行PriorityPop 函数,看最终结果显示就是从0到9。

0.gif


总结


这篇文章主要介绍了如何使用 etcd 实现分布式锁以及分布式队列。其他etcd的场景,可以自行实践。

相关文章
|
6月前
|
机器学习/深度学习 人工智能 自然语言处理
AI内容创作Agent架构解析:基于移动端原生框架的内容特工队AI (ReelsAgent)与传统短视频工具的技术差异
传统的AI视频工具链往往基于单点功能堆栈或PC/Web端的SaaS架构,难以承载短视频营销所需的高频、高并发、全流程自动化需求。本文将从AI Agent系统架构角度,对比内容特工队AI (ReelsAgent)的移动端原生设计与现有主流工具的实现路径,以评估其在工程实践中的优劣。
716 7
|
2月前
|
API Docker 容器
开源剪映小助手(capcut-mate)v3.0.26发布
CapCut Mate API 是开源免费的剪映草稿自动化工具,基于 FastAPI 构建,支持独立部署。赋能大模型实现智能视频剪辑,覆盖剪映核心功能全流程,可对接 Coze/n8n 或云渲染生成成品视频。Docker 三行命令极速部署,文档更清晰简洁。(239字)
|
4月前
|
人工智能 关系型数据库 API
AI Agent 工程师职业能力体系与进阶指南 —— 基于阿里云生态的落地视角
本文面向阿里云开发者,系统解析AI Agent时代工程师的角色转型、核心技术(认知架构/记忆系统/工具协同)与三阶成长路径(原型→系统→专家),结合通义千问、PolarDB、PAI等阿里云生态实践,助工程师构建自主决策系统的工程化能力。(239字)
395 1
|
缓存 5G 开发者
【提效】docker镜像构建优化-提速10倍
本文主要记录了自己通过查阅相关资料,一步步排查问题,最后通过优化Docerfile文件将docker镜像构建从十几分钟降低到1分钟左右,效率提高了10倍左右。
1272 122
|
9月前
|
Cloud Native 算法 Java
Java:历久弥新的企业级技术基石
Java:历久弥新的企业级技术基石
|
机器学习/深度学习 缓存 关系型数据库
用PyTorch从零开始编写DeepSeek-V2
DeepSeek-V2是一个强大的开源混合专家(MoE)语言模型,通过创新的Transformer架构实现了经济高效的训练和推理。该模型总共拥有2360亿参数,其中每个令牌激活21亿参数,支持最大128K令牌的上下文长度。
723 14
|
Ubuntu
Ubuntu20.04 编译安装FFmpeg,出错分析以及解决方案
通过上述步骤,可以在 Ubuntu 20.04 上成功编译和安装 FFmpeg。如果遇到问题,可以通过检查依赖包的安装情况以及 `config.log` 文件来解决。掌握这些技巧和解决方案,可以有效地解决编译过程中遇到的各种问题,提高安装成功率。
1595 13
|
Linux API 图形学
OpenGL生态中的GL, GLU, GLUT, GLX, GLEW, GLEE和GLEXT详解
该文介绍了OpenGL生态系统中的关键组件,包括GL(基础绘图API)、GLU(提供辅助函数)、GLUT(窗口管理和事件处理)、GLX(X Window系统上的OpenGL扩展)、GLEW和GLEE(动态查询和封装OpenGL扩展)以及GLEXT(OpenGL扩展集合)。这些库和工具协同工作,帮助开发者构建跨平台的图形应用程序,提升兼容性与功能。理解它们的作用对优化OpenGL编程至关重要。