浅谈Golang并发控制WaitGroup

简介: 浅谈Golang并发控制WaitGroup

WaitGroup

前言


  1. 理解WaitGroup的实现 - 核心是CAS的使用
  2. Add与Done应该放在哪? - Add放在Goroutine外,Done放在Goroutine中,逻辑复杂时建议用defer保证调用
  3. WaitGroup适合什么样的场景? - 并发的Goroutine执行的逻辑相同时,否则代码并不简洁,可以采用其它方式


正确与错误用法

package main
import (
  "fmt"
  "sync"
)
func wg() {
  var wg sync.WaitGroup
  for i := 0; i < 10; i++ {
    wg.Add(1) //Add放在Goroutine外
    go func(i int) {
      //TODO
      defer wg.Done() //建议用defer保证调用
    }(i)
  }
  wg.Wait()
  fmt.Println("finished")
}
// Tip: waitGroup 不要进行copy
func errWg1() {
  var wg sync.WaitGroup
  for i := 0; i < 10; i++ {
    wg.Add(1)
    go func(i int, wg sync.WaitGroup) {
      fmt.Println(i)
      defer wg.Done()
    }(i, wg)
    /*
      type WaitGroup struct {
        noCopy noCopy
        state1 [3]uint32
      }
      sync.WaitGroup是一个结构体类型,当参数传递是值拷贝
    */
  }
  wg.Wait()
  fmt.Println("finished")
}
// Tip: waitGroup 的 Add 要在goroutine前执行
func errWg2() {
  var wg sync.WaitGroup
  for i := 0; i < 10; i++ {
    go func(i int) {
      wg.Add(1)
      fmt.Println(i)
      defer wg.Done()
    }(i)
  }
  wg.Wait()
  /*
    极端情况,wg.Add放在Goroutine里
    可能for循环完了,还没执行到wg.Add(1)
    就到了wg.Wait()这一步,此时直接wait掉结束了
  */
  fmt.Println("finished")
}
// Tip: waitGroup 的 Add 很大会有影响吗?
func errWg3() {
  var wg sync.WaitGroup
  for i := 0; i < 10; i++ {
    wg.Add(100)
    go func(i int) {
      fmt.Println(i)
      defer wg.Done()
      wg.Add(-100)
      /*
        使用是没问题的,查阅wg.Done()源码,其对应的是wg.Add(-1),查阅wg.Add源码,发现其内部
        state := atomic.AddUint64(statep, uint64(delta)<<32)
      */
    }(i)
  }
  fmt.Println("finished")
}

源码剖析

package sync
import (
  "internal/race"
  "sync/atomic"
  "unsafe"
)
type WaitGroup struct {
  noCopy noCopy
  //64位值:高32位为计数器,低32位为waiter计数,另外32bit是用作信号量
  //64位原子操作需要64位对齐
  //但需要32位对齐编译器并不能保证这一点
  //所以我们分配12个字节,然后使用其中对齐的8个字节作为状态,其他4个字节作为存储
  //state1:具有复合意义的字段,包含WaitGroup计数值,waiter计数和信号量
  state1 [3]uint32
}
// state返回指向存储在wg.state1中的state和sema字段的指针。
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
  if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
    //如果地址是64bit对齐的,数组前两个元素做state,后一个元素做信号量
    return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
  } else {
    //如果地址是32bit对齐的,数组后两个元素用来做state,它可以用来做64bit的原子操作,第一个元素32bit用来做信号量
    return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
  }
}
// Add方法主要操作的state1字段中计数值部分
// 当Add方法被调用时,首先会将delta参数值左移32位(计数值在高32位)
// 然后内部通过原子操作将这个值加到计数值上
// 需要注意的是,delta的取值范围可正可负,因为调用Done()方法时,内部通过Add(-1)方法实现的
func (wg *WaitGroup) Add(delta int) {
  // statep表示wait数和计数值
  // 低32位表示wait数,高32位表示计数值
  statep, semap := wg.state()
  //下面的代码去掉了源码里的race判断,race判断是竞争检测相关的
  //只有当运行程序时带-race参数才起作用,不是WaitGroup的核心功能,去掉后方便看清程序主体
  // uint64(delta)<<32 将delta左移32位
  // 因为高32位表示计数值,所以将delta左移32,v = v + delta
  state := atomic.AddUint64(statep, uint64(delta)<<32)
  //state右移32位得到v的值
  v := int32(state >> 32)
  //state的低32位是w的值
  w := uint32(state)
  //v变成负数时就panic。比如第一次调用Add就传负数,就会出现这种panic
  if v < 0 {
    panic("sync: negative WaitGroup counter")
  }
  //正常Add在Wait之前调用,w是0才对
  if w != 0 && delta > 0 && v == int32(delta) {
    panic("sync: WaitGroup misuse: Add called concurrently with Wait")
  }
  //正常Add之后程序return
  if v > 0 || w == 0 {
    return
  }
  //执行到这,此时计数器V=0;那么等待计数器肯定和整个state的值相等,
  //不然只有一个情况:有人调了Add(),并且是并发调用的。
  if *statep != state {
    panic("sync: WaitGroup misuse: Add called concurrently with Wait")
  }
  //把计数器v和w都置0
  //如果计数值v为0并且waiter的数量w不为0,那么state的值就是waiter的数量
  //将waiter的数量设置为0,因为计数值v也是0,所以它们俩的组合*statep直接设置为0即可。此时需要并唤醒所有的waiter
  *statep = 0
  //发送w个信号量,也就是有多少goroutine调用了Wait()就发送多少信号量,让他们都解除阻塞
  for ; w != 0; w-- {
    runtime_Semrelease(semap, false, 0)
  }
}
//内部就是调用Add(-1)方法
func (wg *WaitGroup) Done() {
  wg.Add(-1)
}
//不断检查state值。如果其中的计数值为零,则说明所有的子goroutine已全部执行完毕,调用者不必等待,直接返回。
//如果计数值大于零,说明此时还有任务没有完成,那么调用者变成等待者,需要加入wait队列,并且阻塞自己。
func (wg *WaitGroup) Wait() {
  //statep表示wait数和计数值
  //低32位表示wait数,高32位表示计数值
  statep, semap := wg.state()
  //循环检查计数器V啥时候等于0
  for {
    //原子操作的方式获取state的值
    state := atomic.LoadUint64(statep)
    //将state右移32位,表示当前计数值
    v := int32(state >> 32)
    //w表示waiter等待值
    w := uint32(state)
    if v == 0 {
      //如果当前计数值为零,表示当前子goroutine已全部执行完毕,则直接返回
      return
    }
    //否则使用原子操作将state值加一
    //调用Wait()的goroutine的数目加1,Add()发送信号量的时候知道多少goroutine在等待
    if atomic.CompareAndSwapUint64(statep, state, state+1) {
      //获取信号量,如果拿不到就阻塞,如果拿到了就继续执行
      runtime_Semacquire(semap)
      //程序运行到这里说明Add()已经发送了信号量,Wait()解除阻塞了,这个时间state的值是0才正常。非0是个异常情况,这里panic
      if *statep != 0 {
        panic("sync: WaitGroup is reused before previous Wait has returned")
      }
      return
    }
  }
}

核心

//wg.Add
state := atomic.AddUint64(statep, uint64(delta)<<32)
//wg.Wait
state := atomic.LoadUint64(statep)
atomic.CompareAndSwapUint64(statep, state, state+1)
//CAS一般情况下会比锁的性能更高

后记

CAS

CAS存在ABA问题要注意

目录
相关文章
|
存储 安全 编译器
Golang 语言中 map 的键值类型选择,它是并发安全的吗?
Golang 语言中 map 的键值类型选择,它是并发安全的吗?
79 0
|
3月前
|
安全 Go
Golang语言goroutine协程并发安全及锁机制
这篇文章是关于Go语言中多协程操作同一数据问题、互斥锁Mutex和读写互斥锁RWMutex的详细介绍及使用案例,涵盖了如何使用这些同步原语来解决并发访问共享资源时的数据安全问题。
100 4
|
7月前
|
Go
深度探讨 Golang 中并发发送 HTTP 请求的最佳技术
深度探讨 Golang 中并发发送 HTTP 请求的最佳技术
142 4
|
7月前
|
存储 缓存 安全
Golang深入浅出之-Go语言中的并发安全容器:sync.Map与sync.Pool
Go语言中的`sync.Map`和`sync.Pool`是并发安全的容器。`sync.Map`提供并发安全的键值对存储,适合快速读取和少写入的情况。注意不要直接遍历Map,应使用`Range`方法。`sync.Pool`是对象池,用于缓存可重用对象,减少内存分配。使用时需注意对象生命周期管理和容量控制。在多goroutine环境下,这两个容器能提高性能和稳定性,但需根据场景谨慎使用,避免不当操作导致的问题。
208 7
|
7月前
|
安全 Go 开发者
Golang深入浅出之-Go语言中的CSP模型:深入理解并发哲学
【5月更文挑战第2天】Go语言的并发编程基于CSP模型,强调通过通信共享内存。核心概念是goroutines(轻量级线程)和channels(用于goroutines间安全数据传输)。常见问题包括数据竞争、死锁和goroutine管理。避免策略包括使用同步原语、复用channel和控制并发。示例展示了如何使用channel和`sync.WaitGroup`避免死锁。理解并发原则和正确应用CSP模型是编写高效安全并发程序的关键。
179 7
|
7月前
|
安全 Go
Golang深入浅出之-Go语言中的并发安全队列:实现与应用
【5月更文挑战第3天】本文探讨了Go语言中的并发安全队列,它是构建高性能并发系统的基础。文章介绍了两种实现方法:1) 使用`sync.Mutex`保护的简单队列,通过加锁解锁确保数据一致性;2) 使用通道(Channel)实现无锁队列,天生并发安全。同时,文中列举了并发编程中常见的死锁、数据竞争和通道阻塞问题,并给出了避免这些问题的策略,如明确锁边界、使用带缓冲通道、优雅处理关闭以及利用Go标准库。
470 5
|
7月前
|
安全 Go 开发者
Golang深入浅出之-Go语言中的CSP模型:深入理解并发哲学
【5月更文挑战第1天】Go语言基于CSP理论,借助goroutines和channels实现独特的并发模型。Goroutine是轻量级线程,通过`go`关键字启动,而channels提供安全的通信机制。文章讨论了数据竞争、死锁和goroutine泄漏等问题及其避免方法,并提供了一个生产者消费者模型的代码示例。理解CSP和妥善处理并发问题对于编写高效、可靠的Go程序至关重要。
168 2
|
7月前
|
设计模式 Go 调度
Golang深入浅出之-Go语言中的并发模式:Pipeline、Worker Pool等
【5月更文挑战第1天】Go语言并发模拟能力强大,Pipeline和Worker Pool是常用设计模式。Pipeline通过多阶段处理实现高效并行,常见问题包括数据竞争和死锁,可借助通道和`select`避免。Worker Pool控制并发数,防止资源消耗,需注意任务分配不均和goroutine泄露,使用缓冲通道和`sync.WaitGroup`解决。理解和实践这些模式是提升Go并发性能的关键。
87 2
|
Go
Golang 语言怎么控制并发 goroutine?
Golang 语言怎么控制并发 goroutine?
46 0
|
Go 调度 数据安全/隐私保护
Golang 并发&同步的详细原理和使用技巧
Golang 并发&同步的详细原理和使用技巧