10.1 内容前导
上一章我们学到,互斥锁是一个很有用的同步工具,它可以保证每一时刻进入临界区的goroutine只有一个。通过对互斥锁的合理使用,我们可以使一个goroutine在执行临界区中的代码时,不被其他的goroutine打扰,但是它仍然可能会被中断(interruption)。
那什么是原子操作呢?我们已经知道,原子操作即是进行过程中不能被中断的操作。也就是说,针对某个值的原子操作在被进行的过程当中,CPU绝不会再去进行其它的针对该值的操作。为了实现这样的严谨性,原子操由 CPU 提供芯片级别的支持,所以绝对有效,即使在拥有多 CPU 核心,或者多 CPU 的计算机系统中,原子操作的保证也是不可撼动的。这使得原子操作可以完全地消除竞态条件,并能够绝对地保证并发安全性,它的执行速度要比其他的同步工具快得多,通常会高出好几个数量级。
不过它的缺点也很明显,正因为原子操作不能被中断,所以它需要足够简单,并且要求快速。你可以想象一下,如果原子操作迟迟不能完成,而它又不会被中断,那么将会给计算机执行指令的效率带来多么大的影响,所以操作系统层面只对针对二进制位或整数的原子操作提供了支持。
因此,我们可以结合实际情况,来判断是否可以将锁替换成原子操作。
10.2 用法说明
10.2.1 增或减
atomic包中提供了如下以Add为前缀的增减操作:
func AddInt32(addr *int32, delta int32) (new int32)func AddInt64(addr *int64, delta int64) (new int64)func AddUint32(addr *uint32, delta uint32) (new uint32)func AddUint64(addr *uint64, delta uint64) (new uint64)func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
需要注意的是,第一个参数必须是指针类型的值,通过指针变量可以获取被操作数在内存中的地址,从而施加特殊的CPU指令,确保同一时间只有一个goroutine能够进行操作,看个简单的示例:
var opts int64 = 0for i := 0; i < 50; i++ { // 注意第一个参数必须是地址 atomic.AddInt64(&opts, 3) //加操作 //atomic.AddInt64(&opts, -1) 减操作 time.Sleep(time.Millisecond) }time.Sleep(time.Second)fmt.Println("opts: ", atomic.LoadInt64(&opts))
用于原子加法操作的函数可以做原子减法吗?比如atomic.AddInt32函数可以用于减小那个被操作的整数值吗?atomic.AddInt32函数的第二个参数代表差量,它的类型int32是有符号的,如果我们想做原子减法,那么把这个差量设置为负整数就可以了,对于atomic.AddInt64函数来说也是类似的。不过如果想用atomic.AddUint32和atomic.AddUint64函数做原子减法,因为它们的第二个参数的类型uint32和uint64都是无符号的,就不能同AddInt32进行相同处理,但是可以依据下面这个表达式来给定atomic.AddUint32函数的第二个参数值:
^uint32(-N-1))
其中的N代表由负整数表示的差量,我们先要把差量的绝对值减去1,然后再把得到的这个无类型的整数常量,转换为uint32类型的值,最后在该值之上做按位异或操作,就可以获得最终的参数值。简单来说,此表达式的结果值的补码,与使用前一种方法得到的值的补码相同,所以这两种方式是等价的。
10.2.2 比较并交换
该操作简称 CAS(Compare And Swap),第一个参数的值应该是指向被操作值的指针值,该值的类型即为*int32,后两个参数的类型都是int32类型,它们的值应该分别代表被操作值的旧值和新值,函数在被调用之后会先判断参数addr指向的被操作值与参数old的值是否相等。仅当此判断得到肯定的结果之后,该函数才会用参数new代表的新值替换掉原先的旧值。否则,后面的替换操作就会被忽略。
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
当有大量的goroutine 对变量进行读写操作时,可能导致CAS操作无法成功,这时可以利用for循环多次尝试:
var value int64func atomicAddOp(tmp int64) { for { oldValue := value if atomic.CompareAndSwapInt64(&value, oldValue, oldValue+tmp) { return } }}
比较并交换操作与交换操作相比有什么不同,优势在哪里呢?比较并交换操作即 CAS 操作,是有条件的交换操作,只有在条件满足的情况下才会进行值的交换。CAS 操作并不是单一的操作,而是一种操作组合,这与其他的原子操作都不同。正因为如此,它的用途要更广泛一些,例如我们将它与for语句联用就可以实现一种简易的自旋锁(spinlock):
for { if atomic.CompareAndSwapInt32(&num2, 10, 0) { fmt.Println("The second number has gone to zero.") break } time.Sleep(time.Millisecond * 500)}
10.2.3 载入
atomic包中提供了如下以Load为前缀的增减操作:
func LoadInt32(addr *int32) (val int32)func LoadInt64(addr *int64) (val int64)func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)func LoadUint32(addr *uint32) (val uint32)func LoadUint64(addr *uint64) (val uint64)func LoadUintptr(addr *uintptr) (val uintptr)
载入操作能够保证原子的读变量的值,当读取的时候,任何其他CPU操作都无法对该变量进行读写,其实现机制受到底层硬件的支持。假设我已经保证了对一个变量的写操作都是原子操作,比如:加或减、存储、交换等等,那我对它进行读操作的时候,还有必要使用原子操作吗?答案是很有必要,你可以对照一下读写锁,为什么在读写锁保护下的写操作和读操作之间是互斥的?这是为了防止读操作读到没有被修改完的值,如果写操作还没有进行完,读操作就来读了,那么就只能读到仅修改了一部分的值,这显然破坏了值的完整性。因此一旦决定要对一个共享资源进行保护,那就要做到完全的保护,不完全的保护基本上与不保护没有什么区别。
10.2.4 存储
atomic包中提供了如下以Store为前缀的存储操作:
func StoreInt32(addr *int32, val int32)func StoreInt64(addr *int64, val int64)func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)func StoreUint32(addr *uint32, val uint32)func StoreUint64(addr *uint64, val uint64)func StoreUintptr(addr *uintptr, val uintptr)
此类操作确保了写变量的原子性,避免其他操作读到了修改变量过程中的脏数据。然后对于存储,需要掌握2条规则:
- 我们不能把nil作为参数值传入原子值的Store方法,否则就会引发一个panic。这里要注意,如果有一个接口类型的变量,它的动态值是nil,但动态类型却不是nil,那么它的值就不等于nil,这样一个变量的值是可以被存入原子值,这块知识可以在接口这一章中查看。
- 我们向原子值存储的第一个值,决定了它今后能且只能存储哪一个类型的值。例如,我第一次向一个原子值存储了一个string类型的值,那我在后面就只能用该原子值来存储字符串了。如果我又想用它存储结构体,那么在调用它的Store方法的时候就会引发一个panic,这个panic会告诉我,这次存储的值的类型与之前的不一致。
10.2.5 交换
atomic包中提供了如下以Swap为前缀的交换操作:
func SwapInt32(addr *int32, new int32) (old int32)func SwapInt64(addr *int64, new int64) (old int64)func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)func SwapUint32(addr *uint32, new uint32) (old uint32)func SwapUint64(addr *uint64, new uint64) (old uint64)func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
相对于CAS,明显此类操作更为暴力直接,并不管变量的旧值是否被改变,直接赋予新值然后返回背替换的值。
10.3 扩展知识
对于原子操作,还有几条具体的使用建议:
- 不要把内部使用的原子值暴露给外界。比如,声明一个全局的原子变量并不是一个正确的做法,这个变量的访问权限最起码也应该是包级私有的。
- 如果不得不让包外,或模块外的代码使用你的原子值,那么可以声明一个包级私有的原子变量,然后再通过一个或多个公开的函数,让外界间接地使用到它。注意,这种情况下不要把原子值传递到外界,不论是传递原子值本身还是它的指针值。
- 如果通过某个函数可以向内部的原子值存储值的话,那么就应该在这个函数中先判断被存储值类型的合法性。若不合法,则应该直接返回对应的错误值,从而避免 panic 的发生。
- 如果可能的话,我们可以把原子值封装到一个数据类型中,比如一个结构体类型。这样,我们既可以通过该类型的方法更加安全地存储值,又可以在该类型中包含可存储值的合法类型信息。
除了上述使用建议之外,我还要再特别强调一点:尽量不要向原子值中存储引用类型的值。因为这很容易造成安全漏洞。请看下面的代码:
var box6 atomic.Valuev6 := []int{1, 2, 3}box6.Store(v6)v6[1] = 4 // 注意,此处的操作不是并发安全的!
我把一个[]int类型的切片值v6存入了原子值box6,由于切片类型属于引用类型,我在外面改动这个切片值,就等于修改了box6中存储的那个值,这相当于绕过了原子值而进行了非并发安全的操作,那么应该怎样修补这个漏洞呢?可以这样做:
store := func(v []int) { replica := make([]int, len(v)) copy(replica, v) box6.Store(replica)}store(v6)v6[2] = 5 // 此处的操作是安全的。
我先为切片值v6创建了一个完全的副本,这个副本涉及的数据已经与原值毫不相干,然后再把这个副本存入box6,因此无论我再对v6的值做怎样的修改,都不会破坏box6提供的安全保护。
10.4 实战场景:环形队列
// 环形队列type RingBuffer struct { err error count int32 size int32 head int32 tail int32 buf []unsafe.Pointer}// Get方法从buf中取出对象func (r *RingBuffer) Get() interface{} { // 在高并发开始的时候,队列容易空,直接判断空性能最优 if atomic.LoadInt32(&r.count) <= 0 { return nil } // 当扣减数量后没有超,就从队列里取出对象 if atomic.AddInt32(&r.count, -1) >= 0 { idx := (atomic.AddInt32(&r.head, 1) - 1) % r.size if obj := atomic.LoadPointer(&r.buf[idx]); obj != unsafe.Pointer(nil) { o := *(*interface{})(obj) atomic.StorePointer(&r.buf[idx], nil) return o } } else { // 当减数量超了,再加回去 atomic.AddInt32(&r.count, 1) } return nil}// Put方法将对象放回到buf中。如果buf满了,返回falsefunc (r *RingBuffer) Put(obj interface{}) bool { // 在高并发结束的时候,队列容易满,直接判满性能最优 if atomic.LoadInt32(&r.count) >= r.size { return false } // 当增加数量后没有超,就将对象放到队列里 if atomic.AddInt32(&r.count, 1) <= r.size { idx := (atomic.AddInt32(&r.tail, 1) - 1) % r.size atomic.StorePointer(&r.buf[idx], unsafe.Pointer(&obj)) return true } // 当加的数量超了,再减回去 atomic.AddInt32(&r.count, -1) return false}
10.5 总结
原子值类型的优势很明显,但它的使用规则也更多一些。首先,在首次真正使用后,原子值就不应该再被复制了。其次,原子值的Store方法对其参数值(也就是被存储值)有两个强制的约束。一个约束是参数值不能为nil。另一个约束是,参数值的类型不能与首个被存储值的类型不同。也就是说,一旦一个原子值存储了某个类型的值,那它以后就只能存储这个类型的值了。最后在扩展知识中,提出了几条使用建议,包括:不要对外暴露原子变量、不要传递原子值及其指针值、尽量不要在原子值中存储引用类型的值等。