理解和掌握sync同步包的功能

本文涉及的产品
可观测可视化 Grafana 版,10个用户账号 1个月
性能测试 PTS,5000VUM额度
容器镜像服务 ACR,镜像仓库100个 不限时长
简介: 【7月更文挑战第6天】本文介绍`sync`包在Go中提供了并发控制工具,如Mutex、RWMutex、Cond和Once。文中例子的并发测试显示,Mutex在小并发下表现良好,读写锁在高并发读操作时更有优势。

简介

在Go语言中,每一个并发的执行单元叫作一个goroutine。设想这里的一个程序有两个函数,
一个函数做计算,另一个输出结果,假设两个函数没有相互之间的调用关系。一个线性的程序会先调用其中的一个函数,然后再调用另一个。如果程序中包含多个goroutine,对两个函
数的调用则可能发生在同一时刻。

如果你使用过操作系统或者其它语言提供的线程,那么你可以简单地把goroutine类比作一个线程,尽管两者的区别实际上只是一个量的区别,但量变会引起质变的道理同样适用于goroutine和线程。

其中的明显而微妙区别在于,OS中的线程通常分配2MB的内存块作为栈,而一个goroutine使用动态栈的概念,开始后初始大小只有2KB,这比OS的线程小得多。

并且与操作系统的栈一样,将保存程序中活跃或挂起的函数调用本地变量。不同的是goroutine是动态,初始化2KB,最大能达到1GB,比固定大小的线程大得多。不过一般情况下是不会达到最大值的。

Toroid托瑞德符号.png

在go程序中同时创建成百上千个goroutine是非常普遍的,最多能达到百万级,比如在sync包中,经常使用WaitGroup,这一个只是用来等待并关闭channel的goroutine,几乎不占内存空间。

2 sync 同步包互斥锁

sync同步包 在 src/sync/ 路径,在其中有这样的提示:

    不应该复制哪些包含了此包中类型的值。

    禁止复制首次使用后的Mutex
    禁止复制使用后的RWMutex
    禁止复制使用后的Cond

互斥在某些独占的数据操作中很实用,而且被sync包里的Mutex类型直接支持。
它的Lock方法能够获取到token(这里叫锁),并且Unlock方法会释放这个token。

例子1: 复制互斥锁

      type mutn struct {
        n int
        sync.Mutex
      }

      func DoMutn() {
        f := mutn{n: 19}

        go func(f mutn) {
          for {
            log.Println("goto: try to lock mutn")
            f.Lock()
            log.Println("goto: lock mutn ok.")
            time.Sleep(3 * time.Second)
            f.Unlock()
            log.Println("goto:unlock ok")

          }
        }(f)

        f.Lock()
        log.Println("goto:lock mutn ok.")

Mutex 互斥锁首次使用后复制其值

        go func(f mutn) {
          for {
            log.Println("gr3: try to lock...")
            f.Lock()
            log.Println("gr3: lock mutn ok.")
            time.Sleep(5 * time.Second)
            f.Unlock()
            log.Println("gr3:unlock ok")

          }
        }(f)

        time.Sleep(20 * time.Second)
        f.Unlock()
        log.Println("g1:unlock ok")

      }
      func main() {
        DoMutn()
      }

我们在示例中创建两个goroutine,g2和g3,示例运行的结果显示: gr3阻塞在加锁操作。

而g2按预期运行,g2和g3的区别在于g2是互斥锁首次使用之前创建的。

而g3则是在互斥锁执行完加锁操作并处于锁定状态时创建的。

并且程序在创建g3时复制了 mutn实例。 该实例包括了sync.Mutex实例。

在标准库互斥锁Mutex的定义非常简单,它有两个字段 state,sema组成

    type Mutex struct {
      state int32
      sema  uint32
    }

这两个字段表示

  state 表示当前互斥锁状态。
  sema  用于控制锁状态信号量。

对mutex实例的复制即是对两个整型字段的复制。 在初始状态,Mutex实例处于 Unlocked状态,state和sema都为0.

g2复制了处于初始化的Mutex实例,副本的state和sema为0, 这与g2自定义一个新的Mutex实例无异,这决定了g2后续可以按预期运行。

后续主程序调用Lock方法,Mutex 实例变成 Locked状态,state字段值为 sync.mutex-Locked
此后g3创建时恰恰复制了处于 Locked状态的实例。

实例副本state字段值也为 sync.mutexLocked ,因此g3在对其实例副本调用Lock将导致进入阻塞
--- 也就是死锁 因为没有任何其他计划调用该副本的Unlock方法,Go不支持递归锁---

那些sync包中类型的实例在首次使用后被复制得到的副本,一旦再被使用将导致不可预期结果,为此在使用sync包的类型时,

推荐通过闭包方式或传递类型实例(或包裹该类型的类型实例)的地址或指针进行,这是sync包最需要注意的。

互斥锁 sync.Mutex 临界区同步原语首选,常被用来对结构体对象内部状态,缓存进行保护。 使用最为广泛。

它通常被用以保护结构体内部状态,缓存,是广泛的临界区同步原语。

3 sync同步包读写锁

读写锁 RWMutex 有大并发需求的创建,使用读写锁。 RWMutex

读写锁适合具有一定并发量,并且读取操作明显大于写操作的场景。

例子2:

    package main

    import (
      "sync"
      "testing"
    )

    var (

临界区需要保护的数据

      dataOne  = 0
      dataTwo  = 1
      mutexOne sync.Mutex
      mutexTwo sync.RWMutex
    )

互斥锁性能 读取

    func BenchmarkReadSyncByMutex(b *testing.B) {
      b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
          mutexOne.Lock()
          _ = dataOne
          mutexOne.Unlock()
        }
      })
    }

互斥锁性能 写入

    func BenchmarkWriteSyncByMutex(b *testing.B) {
      b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
          mutexOne.Lock()
          dataOne += 1
          mutexOne.Unlock()
        }
      })
    }

读取性能评估

    func BenchmarkReadSyncByRWMutex(b *testing.B) {
      b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
          mutexTwo.Lock()
          _ = dataTwo
          mutexTwo.Unlock()
        }
      })
    }

写性能评估

    func BenchmarkWriteSyncByRWMutex(b *testing.B) {
      b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
          mutexTwo.Lock()
          dataTwo += 1
          mutexTwo.Unlock()
        }
      })
    }

执行:

      go test -v -count 2 -bench .  mutex_rw_bench_test.go   -cpu 2,4,8,32,128 >bm.txt

      goarch: amd64
      cpu: AMD Ryzen 5 3500U with Radeon Vega Mobile Gfx  
      BenchmarkReadSyncByMutex
      BenchmarkReadSyncByMutex-2            30831019          40.23 ns/op
      BenchmarkReadSyncByMutex-2            32428663          42.62 ns/op
      BenchmarkReadSyncByMutex-4            10713606         114.1 ns/op
      BenchmarkReadSyncByMutex-4            10344114          98.16 ns/op
      BenchmarkReadSyncByMutex-8            10293854         116.5 ns/op
      BenchmarkReadSyncByMutex-8            10168749         116.9 ns/op
      BenchmarkReadSyncByMutex-32           11110328         111.3 ns/op
      BenchmarkReadSyncByMutex-32           10753728         108.3 ns/op
      BenchmarkReadSyncByMutex-128          12562038          98.00 ns/op
      BenchmarkReadSyncByMutex-128          12499010          96.89 ns/op
      BenchmarkWriteSyncByMutex
      BenchmarkWriteSyncByMutex-2           17350693          67.81 ns/op
      BenchmarkWriteSyncByMutex-2           15188412          66.77 ns/op
      BenchmarkWriteSyncByMutex-4            9374296         125.0 ns/op
      BenchmarkWriteSyncByMutex-4           10168714         126.8 ns/op
      BenchmarkWriteSyncByMutex-8            9916609         119.1 ns/op
      BenchmarkWriteSyncByMutex-8            9755517         121.1 ns/op
      BenchmarkWriteSyncByMutex-32          10713538         113.9 ns/op
      BenchmarkWriteSyncByMutex-32          10568701         113.5 ns/op
      BenchmarkWriteSyncByMutex-128         11649591         102.3 ns/op
      BenchmarkWriteSyncByMutex-128         11973096         102.5 ns/op
      BenchmarkReadSyncByRWMutex
      BenchmarkReadSyncByRWMutex-2          13524128         102.7 ns/op
      BenchmarkReadSyncByRWMutex-2          11999124         101.4 ns/op
      BenchmarkReadSyncByRWMutex-4           8391038         145.8 ns/op
      BenchmarkReadSyncByRWMutex-4          14412699         126.1 ns/op
      BenchmarkReadSyncByRWMutex-8          10525567         116.3 ns/op
      BenchmarkReadSyncByRWMutex-8          10255752         116.4 ns/op
      BenchmarkReadSyncByRWMutex-32         10255778         117.3 ns/op
      BenchmarkReadSyncByRWMutex-32         10208638         117.9 ns/op
      BenchmarkReadSyncByRWMutex-128        10810089         111.0 ns/op
      BenchmarkReadSyncByRWMutex-128        11110348         108.1 ns/op
      BenchmarkWriteSyncByRWMutex
      BenchmarkWriteSyncByRWMutex-2         12499010          91.11 ns/op
      BenchmarkWriteSyncByRWMutex-2         11999124          99.52 ns/op
      BenchmarkWriteSyncByRWMutex-4          7842598         147.7 ns/op
      BenchmarkWriteSyncByRWMutex-4          7946450         151.0 ns/op
      BenchmarkWriteSyncByRWMutex-8         10210080         118.1 ns/op
      BenchmarkWriteSyncByRWMutex-8         10168724         115.7 ns/op
      BenchmarkWriteSyncByRWMutex-32         9835380         119.9 ns/op
      BenchmarkWriteSyncByRWMutex-32        10339772         117.5 ns/op
      BenchmarkWriteSyncByRWMutex-128       10908296         109.5 ns/op
      BenchmarkWriteSyncByRWMutex-128       10810030         109.9 ns/op
      PASS

简单分析:

    1 在小并发量时,互斥锁性能更好,并发量增大,互斥锁竞争激烈,导致加锁和解锁性能下降,
      但是最后也恒定在最好记录的2倍左右。
    2 读写锁的读锁性能并未随着并发量增大而性能下降,始终在恒定值.
    3 并发量较大时,读写锁的写锁性能比互斥锁,读写锁的读锁都差,并且随着并发量增大,写锁性能有继续下降趋势。

多个例程goroutine可以同时持有读锁,从而减少在锁竞争等待的时间。

而互斥锁即便为读请求,同一时刻也只能有一个例程持有锁,其他goroutine被阻塞在加锁操作等待被调度。

由于处于for循环测试中,需要注意的是,不能在 unlock时使用 defer,

      b.RunParallel(func(pb *testing.PB) {
          for pb.Next() {
            mutexTwo.Lock()
            dataTwo += 1
            defer mutexTwo.Unlock()
          }
        })

如此在并发执行时,函数不会退出,defer得不到执行,将导致全部死锁。

4 条件变量 sync.Cond

  同步原语之一,用于避免轮询。
  本质是一个容器,存放了一个或一组等待某条件的一些 协程 goroutine
  当条件成立时,这些协程 goroutine将被唤醒去执行。

例子3: 条件变量的实现

    import (
      "fmt"
      "sync"
      "time"
    )

使用sync.Cond 实例初始化

    type signal struct{}

    var ready bool

    func worker(i int) {
      fmt.Printf("worker %d: is working...\n", i)
      time.Sleep(1 * time.Second)
      fmt.Printf("worker %d:worker done\n", i)
    }

    func spawnGroup(f func(i int), num int, mu *sync.Mutex) <-chan signal {
      c := make(chan signal)
      var wg sync.WaitGroup

      for i := 0; i < num; i++ {
        wg.Add(1)
        go func(i int) {
          for {
            mu.Lock()
            if !ready {
              mu.Unlock()
              time.Sleep(100 * time.Microsecond)
              continue
            }
            mu.Unlock()
            fmt.Printf("worker %d: start to work...\n", i)
            f(i)
            wg.Done()
            return
          }
        }(i + 1)
      }
      go func() {
        wg.Wait()
        c <- signal(struct{}{})
      }()
      return c
    }

模拟ready准备工作

    func DoWorkers() {
      fmt.Println("start a group of workers...")
      mu := &sync.Mutex{}
      c := spawnGroup(worker, 5, mu)
      time.Sleep(5 * time.Second) 
      fmt.Println("the group of workers start to work...")

      mu.Lock()
      ready = true
      mu.Unlock()
      <-c
      fmt.Println("the group of workers work done!")
    }

    func main() {
      DoWorkers()
    }

执行例子3:

    go run .\cond_case.go
    start a group of workers...
    the group of workers start to work...
    worker 5: start to work...
    worker 5: is working...   
    worker 1: start to work...
    worker 1: is working...   
    worker 2: start to work...
    worker 2: is working...   
    worker 4: start to work...
    worker 4: is working...   
    worker 3: start to work...
    worker 3: is working...   
    worker 3:worker done
    worker 5:worker done
    worker 2:worker done
    worker 1:worker done
    worker 4:worker done
    the group of workers work done!

我们看到sync.Cond 实例的初始化需要一个满足实现了sync.Locker 接口的类型实例,
通常我们使用sync.Mutex.

条件变量需要这个互斥锁同步临界区,保护用作条件的数据。

各个等待条件成立的例程 goroutine 在加锁后判断条件是否成立,如果不成立,则调用 sync.Cond 的Wait方法进入等待。

Wait方法在goroutine挂起前进行Unlock操作。

在执行函数,例程将ready置为true并调用sync.Cond 的Broadcast方法,各个阻塞的例程将被唤醒并从Wait方法返回。

在Wait方法返回前,Wait方法再次加锁让goroutine进入临界区。 接下来例程将再次对条件数据进行判定。
如果条件成立,则解锁进入下一个阶段,如果不成立,那么再次调用Wait方法挂起等待。

5 单例支持 sync.Once

  在go原生只被执行一次且goroutine安全的函数只有每个的包的init函数。

  sync.Once 提供了另一种更为灵活的机制,可以保证任意一个函数在程序运行期间只运行一次。
  这经常被用于初始化和资源清理过程,以避免重复执行初始化或资源清理操作。

例子4: 单例的实现

    package main

    import (
      "log"
      "os"
      "sync"
      "time"
    )

    type Foo struct{}

    var (
      once     sync.Once
      instance *Foo
      logger   = log.New(os.Stdout, "info-", 18)
    )

    func GetInstance(id int) *Foo {
      defer func() {
        if e := recover(); e != nil {
          logger.Printf("goroutine-%d:caught a panic:%s", id, e)
        }
      }()

      logger.Printf("goroutine-%d:enter GetInstance\n", id)
      once.Do(func() {
        instance = &Foo{}
        time.Sleep(3 * time.Second)
        logger.Printf("goroutine-%d:the addr of instance is %p \n", id, instance)
        panic("once.Do func panic")
      })
      return instance
    }

    func DoMains() {
      var wg sync.WaitGroup
      for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(i int) {
          inst := GetInstance(i)
          logger.Printf("goroutine-%d, the addr of instance returned is %p\n", i, inst)
          wg.Done()
        }(i + 1)
      }
      time.Sleep(5 * time.Second)
      inst := GetInstance(0)
      logger.Printf("goroutine-0:the addr of instance is %p\n", inst)
      wg.Wait()
      logger.Printf("all goroutines exit.\n")
    }
    func main() {
      DoMains()
    }

执行结果:

  info-10:48:39 once_pkg.go:25: goroutine-2:enter GetInstance
  info-10:48:39 once_pkg.go:25: goroutine-5:enter GetInstance
  info-10:48:39 once_pkg.go:25: goroutine-4:enter GetInstance
  info-10:48:39 once_pkg.go:25: goroutine-3:enter GetInstance
  info-10:48:39 once_pkg.go:25: goroutine-1:enter GetInstance
  info-10:48:42 once_pkg.go:29: goroutine-2:the addr of instance is 0x1033f38 
  info-10:48:42 once_pkg.go:21: goroutine-2:caught a panic:once.Do func panic      
  info-10:48:42 once_pkg.go:41: goroutine-2, the addr of instance returned is 0x0  
  info-10:48:42 once_pkg.go:41: goroutine-5, the addr of instance returned is 0x1033f38
  info-10:48:42 once_pkg.go:41: goroutine-4, the addr of instance returned is 0x1033f38
  info-10:48:42 once_pkg.go:41: goroutine-3, the addr of instance returned is 0x1033f38
  info-10:48:42 once_pkg.go:41: goroutine-1, the addr of instance returned is 0x1033f38
  info-10:48:44 once_pkg.go:25: goroutine-0:enter GetInstance
  info-10:48:44 once_pkg.go:47: goroutine-0:the addr of instance is 0x1033f38
  info-10:48:44 once_pkg.go:49: all goroutines exit.

once.Do 将等待f执行完毕后返回,这期间其他执行once.Do函数的例程 将会阻塞等待。
如上 goroutine 2~5 的 Do函数返回后,后续的goroutine在执行Do函数将不再执行f并立即返回。

如上面的 gouroutine-0 即便在函数f中出现 panic,sync.Once 原语也认为once.Do 执行完毕,后续对once.Do调用将不再执行f。

5 sync.Pool 降低垃圾回收成本

官方介绍:

一个同步池是一组临时对象,可能是单独保存和检索的。
在池中存储的任何项目,都可以随时被删除,并且不会有删除通知。

如果池只有唯一的引用,那么该项目可能被剥离。

池可以被多个例程同时使用,并且是安全的。

池的目标是那些已经分配缓存但暂时没有使用的项目,以供再次使用。这将减轻GC压力。
这个特点使得它容易被用于建立高效,线程安全的列表。

但是它并不适合全部场景的自由大小的列表。 free lists

适当使用池管理一组临时项目,并务必在并发独立的包的独立客户端使用。
池提供了一种方式,支持跨多个客户端的摊销分配

使用池子的一个良好例子是 fmt 包,它用以维护一个动态大小的临时输出缓冲区。
当有许多goroutine例程在打印时,池子将在有负载时放大,在静止下缩小。

另一方面,一个不限制大小的列表作为 短周期对象不适合被用作池子。
该场景的开销不会被分摊。

短周期对象使用自己的自由列表将更高效。

池在使用后,不能被复制。

在Go内存模型术语中, Put(x) 被调用 synchronizes before
一个调用为获取返回同样的值 x。

类似地,一个调用到新的返回值 x synchronizes before
另一个调用为获取相同的值x。

      type Pool struct {
        noCopy noCopy

        local     unsafe.Pointer  
        localSize uintptr         

        victim     unsafe.Pointer  
        victimSize uintptr         

        New func() any
      }

属性 local 分配固定尺寸的 池子,实际类型为 [P]poolLocal

属性 localSize 本地数组大小

属性 victim 从上一个循环来的local

属性 victimSize victims数组的大小
func() 可选择指定何时将返回 nil 时,生成值功能,它可能不会随时更改。

性能评估代码:

      var (
        p sync.Pool
      )

基础功能 依次 放入,获取 一个

      func BenchmarkPool(b *testing.B) {

        b.RunParallel(func(pb *testing.PB) {
          for pb.Next() {
            p.Put(1)
            p.Get()
          }
        })
      }

连续放入100个,再连续拿取100个

      func BenchmarkPoolOverflow(b *testing.B) {

        b.RunParallel(func(pb *testing.PB) {
          for pb.Next() {
            for b := 0; b < 100; b++ {
              p.Put(1)
            }
            for b := 0; b < 100; b++ {
              p.Get()
            }
          }
        })
      }

模拟饥饿对象,以强制从其他池子拿取对象

      func BenchmarkPoolStarvation(b *testing.B) {
        var p sync.Pool
        count := 100

将放入的对象数量减少33%

这将创建pool对象饥饿

并且强制 P-local 存储器 从其他池子Ps拿取对象。

        countStarved := count - int(float32(count)*0.33)
        b.RunParallel(func(pb *testing.PB) {
          for pb.Next() {
            for b := 0; b < countStarved; b++ {
              p.Put(1)
            }
            for b := 0; b < count; b++ {
              p.Get()
            }
          }
        })
      }

性能执行:

    go test -v -count 2 -bench .  .\pool_sync_test.go   -cpu 2,4,8,32,128 >bmpool.txt

    goarch: amd64
    cpu: AMD Ryzen 5 3500U with Radeon Vega Mobile Gfx  
    BenchmarkPool
    BenchmarkPool-2                 92275749          12.69 ns/op
    BenchmarkPool-2                 100000000         12.57 ns/op
    BenchmarkPool-4                 205023169          5.613 ns/op
    BenchmarkPool-4                 228077668          6.074 ns/op
    BenchmarkPool-8                 321266732          3.493 ns/op
    BenchmarkPool-8                 345006420          3.483 ns/op
    BenchmarkPool-32                349300358          3.690 ns/op
    BenchmarkPool-32                310861339          3.449 ns/op
    BenchmarkPool-128               339489630          3.491 ns/op
    BenchmarkPool-128               360954796          3.460 ns/op
    BenchmarkPoolOverflow
    BenchmarkPoolOverflow-2           659786        1543 ns/op
    BenchmarkPoolOverflow-2           749961        1728 ns/op
    BenchmarkPoolOverflow-4          1295523         919.3 ns/op
    BenchmarkPoolOverflow-4          1296963         933.8 ns/op
    BenchmarkPoolOverflow-8          2085678         575.9 ns/op
    BenchmarkPoolOverflow-8          2098604         583.7 ns/op
    BenchmarkPoolOverflow-32         2099030         570.7 ns/op
    BenchmarkPoolOverflow-32         2083220         571.3 ns/op
    BenchmarkPoolOverflow-128        1942476         605.2 ns/op
    BenchmarkPoolOverflow-128        2034183         604.0 ns/op
    PASS
    ok    command-line-arguments  33.053s

小结

本文介绍sync包几个关键功能。

Mytex互斥锁,复制已锁定的Mutex会导致死锁。推荐通过指针使用这些类型。Mutex适用于保护临界区。
RWMutex适合读多写少的情况。
sync.Cond用于在条件满足时自动唤醒然后执行。
sync.Once确保函数只执行一次,常用于初始化。
sync.Pool用于缓存对象,减少GC压力,但使用需谨慎。

目录
相关文章
|
4月前
|
编译器 数据库连接 Go
Go Sync 包:并发的 6 个关键概念
Go Sync 包:并发的 6 个关键概念
|
6月前
spdlog 日志库部分源码说明——让你可以自定义的指定自动切换日志时间
spdlog 日志库部分源码说明——让你可以自定义的指定自动切换日志时间
150 7
|
6月前
|
Linux
仅同步最近5分钟的文件如何操作
【6月更文挑战第28天】仅同步最近5分钟的文件如何操作
28 0
|
安全 Java Go
GO通道和 sync 包的分享
GO通道和 sync 包的分享
|
算法 小程序 调度
同步
同步
72 0
|
存储 开发工具 数据安全/隐私保护
乾坤大挪移,如何将同步阻塞(sync)三方库包转换为异步非阻塞(async)模式?Python3.10实现。
众所周知,异步并发编程可以帮助程序更好地处理阻塞操作,比如网络 IO 操作或文件 IO 操作,避免因等待这些操作完成而导致程序卡住的情况。云存储文件传输场景正好包含网络 IO 操作和文件 IO 操作,比如业内相对著名的七牛云存储,官方sdk的默认阻塞传输模式虽然差强人意,但未免有些循规蹈矩,不够锐意创新。在全球同性交友网站Github上找了一圈,也没有找到异步版本,那么本次我们来自己动手将同步阻塞版本改造为异步非阻塞版本,并上传至Python官方库。
乾坤大挪移,如何将同步阻塞(sync)三方库包转换为异步非阻塞(async)模式?Python3.10实现。
|
存储 安全 Go
Go语言,sync包如何控制并发?
除了 channel 通道,还有 sync.Mutex、sync.WaitGroup 这些原始的同步机制来,更加灵活的实现数据同步和控制并发。
91 0
Go语言,sync包如何控制并发?
|
Kubernetes 安全 Go
GO并发之好用的sync包
GO并发之好用的sync包
|
存储 监控 安全
lsyncd 配合 rsync 实时差异同步节点文件
lsyncd 配合 rsync 实时差异同步节点文件
460 0
|
Linux
Mosquitto-1.5.0开始源码新增了epoll机制,如何编译实现?
Mosquitto-1.5.0开始源码新增了epoll机制,如何编译实现?
262 0