安全的双检锁
有了对sync.Once的理解,我们可以改造之前写的双检锁逻辑,让它也能安全起来。
type Conn struct { Addr string State int } var c *Conn var mu sync.Mutex var done uint32 func getInstance() *Conn { if atomic.LoadUint32(&done) == 0 { mu.Lock() defer mu.Unlock() if done == 0 { defer atomic.StoreUint32(&done, 1) c = &Conn{"127.0.0.1:8080", 1} } } return c }
改变的地方就是sync.Once做的两个增强;原子读写和defer中更改完成标识。
当然如果要做的工作仅限于此,还不如直接使用sync.Once。
有时候我们需要的单例不是一成不变的,比如在ylog中需要每小时创建一个日志文件的实例,再比如需要为每一个用户创建不同的单例;再比如创建实例的过程中发生了错误,可能我们还会期望再执行实例的创建过程,直到成功。这两个需求是sync.Once无法做到的。
处理panic
这里在创建Conn的时候模拟一个panic。
i:=0 func newConn() *Conn { fmt.Println("newConn") div := i i++ k := 10 / div return &Conn{"127.0.0.1:8080", k} }
第1次执行newConn时会发生一个除零错误,并引发 panic。再执行时则可以正常创建。
panic可以通过recover进行处理,因此可以在捕捉到panic时不更改完成标识,之前的getInstance方法可以修改为:
func getInstance() *Conn { if atomic.LoadUint32(&done) == 0 { mu.Lock() defer mu.Unlock() if done == 0 { defer func() { if r := recover(); r == nil { defer atomic.StoreUint32(&done, 1) } }() c = newConn() } } return c }
可以看到这里只是改了下defer函数,捕捉不到panic时才去更改完成标识。注意此时c并没有创建成功,会返回零值,或许你还需要增加其它的错误处理。
处理error
如果业务代码不是抛出panic,而是返回error,这时候怎么处理?
可以将error转为panic,比如newConn是这样实现的:
func newConn() (*Conn, error) { fmt.Println("newConn") div := i i++ if div == 0 { return nil, errors.New("the divisor is zero") } k := 1 / div return &Conn{"127.0.0.1:8080", k}, nil }
我们可以再把它包装一层:
func mustNewConn() *Conn { conn, err := newConn() if err != nil { panic(err) } return conn }
如果不使用panic,还可以再引入一个变量,有error时对它赋值,在defer函数中增加对这个变量的判断,如果有错误值,则不更新完成标识位。代码也比较容易实现,不过还要增加变量,感觉复杂了,这里就不测试这种方法了。
有范围的单例
前文提到过有时单例不是一成不变的,我这里将这种单例称为有范围的单例。
这里还是复用前文的Conn结构体,不过需求修改为要为每个用户创建一个Conn实例。
看一下User的定义:
type User struct { done uint32 Id int64 mu sync.Mutex c *Conn }
其中包括一个用户Id,其它三个字段还是用于获取当前用户的Conn单例的。
再看看getInstance函数怎么改:
func getInstance(user *User) *Conn { if atomic.LoadUint32(&user.done) == 0 { user.mu.Lock() defer user.mu.Unlock() if user.done == 0 { defer func() { if r := recover(); r == nil { defer atomic.StoreUint32(&user.done, 1) } }() user.c = newConn() } } return user.c }
这里增加了一个参数 user,方法内的逻辑基本没变,只不过操作的东西都变成user的字段。这样就可以为每个用户创建一个Conn单例。
这个方法有点泛型的意思了,当然不是泛型。
**有范围单例的另一个示例:**在ylog中需要每小时创建一个日志文件用于记录当前小时的日志,在每个小时只需创建并打开这个文件一次。
先看看Logger的定义(这里省略和创建单例无关的内容。):
type FileLogger struct { lastHour int64 file *os.File mu sync.Mutex ... }
这里增加了一个参数 user,方法内的逻辑基本没变,只不过操作的东西都变成user的字段。这样就可以为每个用户创建一个Conn单例。
这个方法有点泛型的意思了,当然不是泛型。
**有范围单例的另一个示例:**在ylog中需要每小时创建一个日志文件用于记录当前小时的日志,在每个小时只需创建并打开这个文件一次。
先看看Logger的定义(这里省略和创建单例无关的内容。):
type FileLogger struct { lastHour int64 file *os.File mu sync.Mutex ... }
lastHour是记录的小时数,如果当前小时数不等于记录的小时数,则说明应该创建新的文件,这个变量类似于sync.Once中的done字段。
file是打开的文件实例。
mu是创建文件实例时需要加的锁。
下边看一下打开文件的方法:
func (l *FileLogger) ensureFile() (err error) { curTime := time.Now() curHour := getTimeHour(curTime) if atomic.LoadInt64(&l.lastHour) != curHour { return l.ensureFileSlow(curTime, curHour) } return } func (l *FileLogger) ensureFileSlow(curTime time.Time, curHour int64) (err error) { l.mu.Lock() defer l.mu.Unlock() if l.lastHour != curHour { defer func() { if r := recover(); r == nil { atomic.StoreInt64(&l.lastHour, curHour) } }() l.createFile(curTime, curHour) } return }
这里模仿sync.Once中的处理方法,有两点主要的不同:数值比较不再是0和1,而是每个小时都会变化的数字;增加了对panic的处理。如果打开文件失败,则还会再次尝试打开文件。
要查看完整的代码请访问Github:github.com/bosima/ylog…
双检锁的性能
从原理上分析,双检锁的性能要好过互斥锁,因为互斥锁每次都要加锁;不使用原子操作的双检锁要比使用原子操作的双检锁好一些,毕竟原子操作也是有些成本的。那么实际差距是多少呢?
这里做一个Benchmark Test,还是处理上文的Conn结构体,为了方便测试,定义一个上下文:
func ensure_unsafe_dcl(context *Context) { if context.done == 0 { context.mu.Lock() defer context.mu.Unlock() if context.done == 0 { defer func() { context.done = 1 }() context.c = newConn() } } } func ensure_dcl(context *Context) { if atomic.LoadUint32(&context.done) == 0 { context.mu.Lock() defer context.mu.Unlock() if context.done == 0 { defer atomic.StoreUint32(&context.done, 1) context.c = newConn() } } } func ensure_mutex(context *Context) { context.mu.Lock() defer context.mu.Unlock() if context.done == 0 { defer func() { context.done = 1 }() context.c = newConn() } }
这三个方法分别对应不安全的双检锁、使用原子操作的安全双检锁和每次都加互斥锁。它们的作用都是确保Conn结构体的实例存在,如果不存在则创建。
使用的测试方法都是下面这种写法,按照计算机逻辑处理器的数量并行运行测试方法:
func BenchmarkInfo_DCL(b *testing.B) { context := &Context{} b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { ensure_dcl(context) processConn(context.c) } }) }
可以看到使用双检锁相比每次加锁的提升是两个数量级,这是正常的。
而不安全的双检锁和使用原子操作的安全双检锁时间消耗相差无几,为什么呢?
主要原因是这里写只有1次,剩下的全是读。即使使用了原子操作,绝大部分情况下CPU读数据的时候也不用在多个核心之间同步(锁总线、锁缓存等),只需要读缓存就可以了。这也从一个方面证明了双检锁模式的意义。
另外上文提到过Go读写超过一个机器字的变量时是非原子的,那如果读写只有1个机器字呢?在64位机器上读写int64本身就是原子操作,也就是说读写应该都只需1次操作,不管用不用atomic方法。这可以在编译器文档或者CPU手册中验证。(Reference:preshing.com/20130618/at…
不过这两个分析不是说我们使用原子操作没有意义,不安全双检锁的执行结果是没有Go语言规范保证的,上边的结果只是在特定编译器、特定平台下的基准测试结果,不同的编译器、CPU,甚至不同版本的Go都不知道会出什么幺蛾子,运行的效果也就无法保证。我们不得不考虑程序的可移植性。
以上就是本文主要内容,如有问题欢迎反馈。完整代码已经上传到Github,欢迎访问:github.com/bosima/go-d…