你真的了解 sync.Once 吗

简介: 你真的了解 sync.Once 吗

是什么


引用官方描述的一段话,Once is a object that will perform exactly one action,即它是一个对象,它提供了保证某个动作只被执行一次的功能。最典型的场景当然就是单例对象的初始化操作。


咋么做


Once 的代码很简洁,从头到尾加注释不超过 70 行代码。对外暴露了一个唯一接口 Do(f func()) ,使用起来也是非常简单。

package main
import (
  "fmt"
  "sync"
)
func main() {
  var once sync.Once
  fun1 := func() {
    fmt.Println("第一次打印")
  }
  once.Do(fun1)
  fun2 := func() {
    fmt.Println("第二次打印")
  }
  once.Do(fun2)
}

在运行上面这段代码之后,从结果中你会发现只运行了 fun1。这样看好像没什么问题,但是这段代码并不是并发的调用 Do() ,那就稍微调整一下代码:

package main
import (
  "fmt"
  "sync"
  "time"
)
func main() {
  var once sync.Once
  for i := 0; i < 5; i++ {
    go func(i int) {
      fun1 := func() {
        fmt.Printf("i:=%d\n", i)
      }
      once.Do(fun1)
    }(i)
  }
  // 为了防止主goroutine直接运行完了,啥都看不到
  time.Sleep(50 * time.Millisecond)
}

我们开启了 5 个并发的 goroutine ,不管你咋么运行,始终只打印一次,至于 i 是多少,就看先执行的是哪个 g 了。Once 保证只有第一次调用 Do() 方法时,传递的 f (无参数无返回值的函数) 才会执行,并且之后不管调用的参数是否改变了,也不再执行。


咋么实现

在看一个功能的同时,其实我们本身也可以站在技术的角度上来思考,如果是你,你会咋么实现这个 Once。我觉得这是件很有意思的事情。


第一时间想到的就是 go 中开箱即用的 sync.Mutex 的 Lock() 方法的第一段:

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
  // Fast path: grab unlocked mutex.
  if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
      ......
        return
  }
   ......
}

利用 atomic 的原子操作来实现这个需求。这确实可以保证只执行一次。但是也存在一个巨大的坑,我们来验证下:

package main
import (
  "fmt"
  "net"
  "sync/atomic"
  "time"
)
type OnceA struct {
  done uint32
}
func (o *OnceA) Do(f func()) {
  if !atomic.CompareAndSwapUint32(&o.done, 0, 1) {
    return
  }
  f()
}
func main() {
  var once OnceA 
  var conn net.Conn
  go func() {
    fun1 := func() {
      time.Sleep(5 * time.Second) //模拟初始化的速度很慢
      conn, _ = net.DialTimeout("tcp", "baidu.com:80", time.Second)
    }
    once.Do(fun1)
  }()
  time.Sleep(500 * time.Millisecond)
  fun2 := func() {
    fmt.Println("执行fun2")
    conn, _ = net.DialTimeout("tcp", "baidu.com:80", time.Second)
  }
  //再调用do已经检查到done为1了
  once.Do(fun2)
  _, err := conn.Write([]byte("\"GET / HTTP/1.1\\r\\nHost: baidu.com\\r\\n Accept: */*\\r\\n\\r\\n\""))
  if err != nil {
    fmt.Println("err:", err)
  }
}

conn 是一个 net.Conn 的接口类型变量,这里为了达到效果,通过 sleep 模拟了初始化资源的耗时 ,当 fun2() 想要进行初始化的时候,已然发现 done 的值是 1 了,但是 fun1 初始化速度很慢,导致接下来操作 conn.Write 的时候,因为此时 conn 还是一个空资源,最终运行时抛出空指针的 panic 了。


这个问题的原因在于真正使用资源的时候,资源初始化还没到位,真是尴尬😅。


那么 Go 是如何避免这种问题的呢?

// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package sync
import (
  "sync/atomic"
)
// Once is an object that will perform exactly one action.
type Once struct {
  done uint32
  m    Mutex
}
func (o *Once) Do(f func()) {
  // Note: Here is an incorrect implementation of Do:
  //
  //  if atomic.CompareAndSwapUint32(&o.done, 0, 1) {
  //    f()
  //  }
  //
  // Do guarantees that when it returns, f has finished.
  // This implementation would not implement that guarantee:
  // given two simultaneous calls, the winner of the cas would
  // call f, and the second would return immediately, without
  // waiting for the first's call to f to complete.
  // This is why the slow path falls back to a mutex, and why
  // the atomic.StoreUint32 must be delayed until after f returns.
  if atomic.LoadUint32(&o.done) == 0 {
    // Outlined slow-path to allow inlining of the fast-path.
    o.doSlow(f)
  }
}
func (o *Once) doSlow(f func()) {
  o.m.Lock()
  defer o.m.Unlock()
  if o.done == 0 {
    defer atomic.StoreUint32(&o.done, 1)
    f()
  }
}

你看大佬都直接注释贴心的告诉你 if atomic.CompareAndSwapUint32(&o.done, 0, 1) 这个不是正确的实现。并发的情况下,胜者获得调用 f ,但是第二个会直接返回,没有等待第一个初始化结束。


所以 Once 实现使用了一个互斥锁,互斥锁保证了只有一个 g 初始化,同时采取的是双检查的机制,再次判断 Once.done 是否为 0,如果为 0,代表第一次初始化,等到初始化结束之后,再释放锁。并发情况下,其他的 g 就会被阻塞在 o.m.Lock()。


如何避坑


说是避坑,但是绝大多数的坑都是由于程序员自身代码问题所导致的,虽然有点尴尬,但确实如此。 Once 的 “坑” 还算少的,不像 sync.Mutex 和 Channel 那样,稍微姿势不注意点就 panic 了。这一块后续再写文章介绍下。除了上面需要注意的使用资源的时候资源还未初始化完成的问题,在 Once 中还需要避免的是死锁问题。

// 由于嵌套调用 Do 里面的 lock导致死锁
func ErrOne() {
  var o sync.Once
  o.Do(func() {
    o.Do(func() {
      fmt.Println("初始化")
    })
  })
}

这里 Do 调用了 f,f 里面又调用了 Do,最终导致死锁。我把上面的代码简化成下面这样

package main
import "sync"
func main() {
  var mu sync.Mutex
  mu.Lock()
  mu.Lock()
}

避免这种错误也很简单,不要在 f 函数中再次调用当前的 Once 即可。


延伸


上面有提到过,Once.Do 由于某些原因导致初始化失败,但是原生的问题在于,后续再也没有机会执行同一个 Once.Do 了,发生这样的情况,理想的处理是,只有真正初始化成功,才设置 Done 的值,并且如果初始化失败,理应通知到上游服务,这样上游服务可以做一些重试机制或者异常处理等操作。

package main
import (
  "fmt"
  "io"
  "net"
  "os"
  "sync"
  "sync/atomic"
  "time"
)
type Once struct {
  done uint32
  m    sync.Mutex
}
// 传入的f 有返回值,如果初始化失败,返回对应error,
// Do方法再把这个err返回给上游服务
func (o *Once) Do(f func() error) error {
  if atomic.LoadUint32(&o.done) == 1 { //fast path
    return nil
  }
  return o.doSlow(f)
}
func (o *Once) doSlow(f func() error) error {
  o.m.Lock()
  defer o.m.Unlock()
  var err error
  if o.done == 0 { //双检查,还没有初始化
    err = f()
    if err == nil { // 只有真正初始化成功才把 done 的值改成1
      atomic.StoreUint32(&o.done, 1)
    }
  }
  return err
}

我们改变了 f 函数,增加了一个返回值,在初始化失败之后返回给 Do 函数,由 Do 函数再把错误返回给上游的调用方,把控制权交还给调用方做失败的处理。另外改动的一点是,只有真正初始化成功之后才把 Done 的值改成 1。那么我们可以简单的把上面的业务代码改造一下:

package main
import (
  "fmt"
  "io"
  "net"
  "os"
  "sync"
  "sync/atomic"
  "time"
)
type Once struct {
  done uint32
  m    sync.Mutex
}
// 传入的f 有返回值,如果初始化失败,返回对应error,
// Do方法再把这个err返回给上游服务
func (o *Once) Do(fn func() error) error {
  if atomic.LoadUint32(&o.done) == 1 {
    return nil
  }
  return o.doSlow(fn)
}
func (o *Once) doSlow(fn func() error) error {
  o.m.Lock()
  defer o.m.Unlock()
  var err error
  if o.done == 0 { /双检查,还没有初始化
    err = fn()
    if err == nil { // 只有真正初始化成功才把 done 的值改成1
      atomic.StoreUint32(&o.done, 1)
    }
  }
  return err
}
func main() {
  urls := []string{
    "127.0.0.1:3453",
    "127.0.0.1:9002",
    "127.0.0.1:9003",
    "baidu.com:80",
  }
  var conn net.Conn
  var o Once
  count := 0
  var err error
  for _, url := range urls {
    err := o.Do(func() error {
      count++
      fmt.Printf("初始化%d次\n", count)
      conn, err = net.DialTimeout("tcp", url, time.Second)
      fmt.Println(err)
      return err
    })
    if err == nil {
      break
    }
    if count == 3 {
      fmt.Println("初始化失败,不再重试")
      break
    }
  }
  if conn != nil {
    _, _ = conn.Write([]byte("GET / HTTP/1.1\r\nHost: google.com\r\n Accept: */*\r\n\r\n"))
    _, _ = io.Copy(os.Stdout, conn)
  }
}
相关文章
|
安全 Java 调度
多线程锁sync+lock使用,@Async使用
多线程锁sync+lock使用,@Async使用
多线程锁sync+lock使用,@Async使用
|
JavaScript
彻底理解sync的用法
彻底理解sync的用法
171 0
|
存储 缓存 Go
原来sync.Once还能这么用
原来sync.Once还能这么用
159 0
原来sync.Once还能这么用
|
缓存 安全 算法
Go基础:channel、定时器、select、锁、sync、atomic
Go基础:channel、定时器、select、锁、sync、atomic
310 0
Go基础:channel、定时器、select、锁、sync、atomic
|
存储 缓存 数据处理
完全揭秘log file sync等待事件
什么是log file sync等待事件呢?在一个提交(commit)十分频繁的数据库中,一般会出现log file sync等待事件,当这个等待事件出现在top5中,这个时侯我们需要针对log file sync等待事件进行优化,一定要尽快分析并解决问题,否则当log file sync等待时间从几毫秒直接到20几毫秒可能导致系统性能急剧下降,甚至会导致短暂的挂起。
完全揭秘log file sync等待事件
|
SQL 监控 关系型数据库
mysql主从复制出现Waiting for Slave Worker to release partition
本文分享mysql主从复制出现Waiting for Slave Worker to release partition
mysql主从复制出现Waiting for Slave Worker to release partition
|
安全 Java Go
sync
sync包有以下几个内容: (1)sync.Pool 临时对象池 (2)sync.Mutex 互斥锁 (3)sync.RWMutex 读写互斥锁 (4)sync.WaitGroup 组等待 (5)sync.Cond 条件等待 (6)sync.Once 单次执行 一、临时对象池 Pool可以用来存储临时对象,其实原理就是这个对象池指向对象变量,以防没有变量指向对象时,被GC所回收。
1423 0