现象
pod实例的go服务每个几个小时就自动重启
初步分析
初步判断go服务遇到未捕捉的异常(造成panic且没有recover),才导致程序挂掉,docker服务给重启了。
具体分析
首先对代码进行初步的review,发现没有明显的问题导致的panic,因为预计的地方都已经加了recover。下一步就是定位哪些不能recover
- 数据竞争(比如:对map进行并发读写),可以通过golang的编译标记race对代码进行检测是否存在数据竞争(比如:并发读写map)
- 内存不足
- 死锁
上线了几个pod,终于捕获到了完整的stack fatal error , 对照代码看了是并发写map 导致了fatal error , 这种是不能revovey的,后来在map的地方加了sync.RWMutex 解决了这个问题,上线后没有重启了
此外,也可以考虑concurrent-map
package cmap
import (
"encoding/json"
"fmt"
"sync"
)
var SHARD_COUNT = 32
type Stringer interface {
fmt.Stringer
comparable
}
// A "thread" safe map of type string:Anything.
// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.
type ConcurrentMap[K comparable, V any] struct {
shards []*ConcurrentMapShared[K, V]
sharding func(key K) uint32
}
// A "thread" safe string to anything map.
type ConcurrentMapShared[K comparable, V any] struct {
items map[K]V
sync.RWMutex // Read Write mutex, guards access to internal map.
}
func create[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] {
m := ConcurrentMap[K, V]{
sharding: sharding,
shards: make([]*ConcurrentMapShared[K, V], SHARD_COUNT),
}
for i := 0; i < SHARD_COUNT; i++ {
m.shards[i] = &ConcurrentMapShared[K, V]{items: make(map[K]V)}
}
return m
}
// Creates a new concurrent map.
func New[V any]() ConcurrentMap[string, V] {
return create[string, V](fnv32)
}
// Creates a new concurrent map.
func NewStringer[K Stringer, V any]() ConcurrentMap[K, V] {
return create[K, V](strfnv32[K])
}
// Creates a new concurrent map.
func NewWithCustomShardingFunction[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] {
return create[K, V](sharding)
}
// GetShard returns shard under given key
func (m ConcurrentMap[K, V]) GetShard(key K) *ConcurrentMapShared[K, V] {
return m.shards[uint(m.sharding(key))%uint(SHARD_COUNT)]
}
func (m ConcurrentMap[K, V]) MSet(data map[K]V) {
for key, value := range data {
shard := m.GetShard(key)
shard.Lock()
shard.items[key] = value
shard.Unlock()
}
}
// Sets the given value under the specified key.
func (m ConcurrentMap[K, V]) Set(key K, value V) {
// Get map shard.
shard := m.GetShard(key)
shard.Lock()
shard.items[key] = value
shard.Unlock()
}
// Callback to return new element to be inserted into the map
// It is called while lock is held, therefore it MUST NOT
// try to access other keys in same map, as it can lead to deadlock since
// Go sync.RWLock is not reentrant
type UpsertCb[V any] func(exist bool, valueInMap V, newValue V) V
// Insert or Update - updates existing element or inserts a new one using UpsertCb
func (m ConcurrentMap[K, V]) Upsert(key K, value V, cb UpsertCb[V]) (res V) {
shard := m.GetShard(key)
shard.Lock()
v, ok := shard.items[key]
res = cb(ok, v, value)
shard.items[key] = res
shard.Unlock()
return res
}
// Sets the given value under the specified key if no value was associated with it.
func (m ConcurrentMap[K, V]) SetIfAbsent(key K, value V) bool {
// Get map shard.
shard := m.GetShard(key)
shard.Lock()
_, ok := shard.items[key]
if !ok {
shard.items[key] = value
}
shard.Unlock()
return !ok
}
// Get retrieves an element from map under given key.
func (m ConcurrentMap[K, V]) Get(key K) (V, bool) {
// Get shard
shard := m.GetShard(key)
shard.RLock()
// Get item from shard.
val, ok := shard.items[key]
shard.RUnlock()
return val, ok
}
// Count returns the number of elements within the map.
func (m ConcurrentMap[K, V]) Count() int {
count := 0
for i := 0; i < SHARD_COUNT; i++ {
shard := m.shards[i]
shard.RLock()
count += len(shard.items)
shard.RUnlock()
}
return count
}
// Looks up an item under specified key
func (m ConcurrentMap[K, V]) Has(key K) bool {
// Get shard
shard := m.GetShard(key)
shard.RLock()
// See if element is within shard.
_, ok := shard.items[key]
shard.RUnlock()
return ok
}
// Remove removes an element from the map.
func (m ConcurrentMap[K, V]) Remove(key K) {
// Try to get shard.
shard := m.GetShard(key)
shard.Lock()
delete(shard.items, key)
shard.Unlock()
}
// RemoveCb is a callback executed in a map.RemoveCb() call, while Lock is held
// If returns true, the element will be removed from the map
type RemoveCb[K any, V any] func(key K, v V, exists bool) bool
// RemoveCb locks the shard containing the key, retrieves its current value and calls the callback with those params
// If callback returns true and element exists, it will remove it from the map
// Returns the value returned by the callback (even if element was not present in the map)
func (m ConcurrentMap[K, V]) RemoveCb(key K, cb RemoveCb[K, V]) bool {
// Try to get shard.
shard := m.GetShard(key)
shard.Lock()
v, ok := shard.items[key]
remove := cb(key, v, ok)
if remove && ok {
delete(shard.items, key)
}
shard.Unlock()
return remove
}
// Pop removes an element from the map and returns it
func (m ConcurrentMap[K, V]) Pop(key K) (v V, exists bool) {
// Try to get shard.
shard := m.GetShard(key)
shard.Lock()
v, exists = shard.items[key]
delete(shard.items, key)
shard.Unlock()
return v, exists
}
// IsEmpty checks if map is empty.
func (m ConcurrentMap[K, V]) IsEmpty() bool {
return m.Count() == 0
}
// Used by the Iter & IterBuffered functions to wrap two variables together over a channel,
type Tuple[K comparable, V any] struct {
Key K
Val V
}
// Iter returns an iterator which could be used in a for range loop.
//
// Deprecated: using IterBuffered() will get a better performence
func (m ConcurrentMap[K, V]) Iter() <-chan Tuple[K, V] {
chans := snapshot(m)
ch := make(chan Tuple[K, V])
go fanIn(chans, ch)
return ch
}
// IterBuffered returns a buffered iterator which could be used in a for range loop.
func (m ConcurrentMap[K, V]) IterBuffered() <-chan Tuple[K, V] {
chans := snapshot(m)
total := 0
for _, c := range chans {
total += cap(c)
}
ch := make(chan Tuple[K, V], total)
go fanIn(chans, ch)
return ch
}
// Clear removes all items from map.
func (m ConcurrentMap[K, V]) Clear() {
for item := range m.IterBuffered() {
m.Remove(item.Key)
}
}
// Returns a array of channels that contains elements in each shard,
// which likely takes a snapshot of `m`.
// It returns once the size of each buffered channel is determined,
// before all the channels are populated using goroutines.
func snapshot[K comparable, V any](m ConcurrentMap[K, V]) (chans []chan Tuple[K, V]) {
//When you access map items before initializing.
if len(m.shards) == 0 {
panic(`cmap.ConcurrentMap is not initialized. Should run New() before usage.`)
}
chans = make([]chan Tuple[K, V], SHARD_COUNT)
wg := sync.WaitGroup{}
wg.Add(SHARD_COUNT)
// Foreach shard.
for index, shard := range m.shards {
go func(index int, shard *ConcurrentMapShared[K, V]) {
// Foreach key, value pair.
shard.RLock()
chans[index] = make(chan Tuple[K, V], len(shard.items))
wg.Done()
for key, val := range shard.items {
chans[index] <- Tuple[K, V]{key, val}
}
shard.RUnlock()
close(chans[index])
}(index, shard)
}
wg.Wait()
return chans
}
// fanIn reads elements from channels `chans` into channel `out`
func fanIn[K comparable, V any](chans []chan Tuple[K, V], out chan Tuple[K, V]) {
wg := sync.WaitGroup{}
wg.Add(len(chans))
for _, ch := range chans {
go func(ch chan Tuple[K, V]) {
for t := range ch {
out <- t
}
wg.Done()
}(ch)
}
wg.Wait()
close(out)
}
// Items returns all items as map[string]V
func (m ConcurrentMap[K, V]) Items() map[K]V {
tmp := make(map[K]V)
// Insert items to temporary map.
for item := range m.IterBuffered() {
tmp[item.Key] = item.Val
}
return tmp
}
// Iterator callbacalled for every key,value found in
// maps. RLock is held for all calls for a given shard
// therefore callback sess consistent view of a shard,
// but not across the shards
type IterCb[K comparable, V any] func(key K, v V)
// Callback based iterator, cheapest way to read
// all elements in a map.
func (m ConcurrentMap[K, V]) IterCb(fn IterCb[K, V]) {
for idx := range m.shards {
shard := (m.shards)[idx]
shard.RLock()
for key, value := range shard.items {
fn(key, value)
}
shard.RUnlock()
}
}
// Keys returns all keys as []string
func (m ConcurrentMap[K, V]) Keys() []K {
count := m.Count()
ch := make(chan K, count)
go func() {
// Foreach shard.
wg := sync.WaitGroup{}
wg.Add(SHARD_COUNT)
for _, shard := range m.shards {
go func(shard *ConcurrentMapShared[K, V]) {
// Foreach key, value pair.
shard.RLock()
for key := range shard.items {
ch <- key
}
shard.RUnlock()
wg.Done()
}(shard)
}
wg.Wait()
close(ch)
}()
// Generate keys
keys := make([]K, 0, count)
for k := range ch {
keys = append(keys, k)
}
return keys
}
// Reviles ConcurrentMap "private" variables to json marshal.
func (m ConcurrentMap[K, V]) MarshalJSON() ([]byte, error) {
// Create a temporary map, which will hold all item spread across shards.
tmp := make(map[K]V)
// Insert items to temporary map.
for item := range m.IterBuffered() {
tmp[item.Key] = item.Val
}
return json.Marshal(tmp)
}
func strfnv32[K fmt.Stringer](key K) uint32 {
return fnv32(key.String())
}
func fnv32(key string) uint32 {
hash := uint32(2166136261)
const prime32 = uint32(16777619)
keyLength := len(key)
for i := 0; i < keyLength; i++ {
hash *= prime32
hash ^= uint32(key[i])
}
return hash
}
// Reverse process of Marshal.
func (m *ConcurrentMap[K, V]) UnmarshalJSON(b []byte) (err error) {
tmp := make(map[K]V)
// Unmarshal into a single map.
if err := json.Unmarshal(b, &tmp); err != nil {
return err
}
// foreach key,value pair in temporary map insert into our concurrent map.
for key, val := range tmp {
m.Set(key, val)
}
return nil
}
解决方案
Go语言中的 map 在并发情况下,只读是线程安全的,同时读写是线程不安全的。
下面来看下并发情况下读写 map 时会出现的问题,代码如下:
// 创建一个int到int的映射
m := make(map[int]int)
// 开启一段并发代码
go func() {
// 不停地对map进行写入
for {
m[1] = 1
}
}()
// 开启一段并发代码
go func() {
// 不停地对map进行读取
for {
_ = m[1]
}
}()
// 无限循环, 让并发程序在后台执行
for {
}
运行代码会报错,输出如下:
fatal error: concurrent map read and map write
错误信息显示,并发的 map 读和 map 写,也就是说使用了两个并发函数不断地对 map 进行读和写而发生了竞态问题,map 内部会对这种并发操作进行检查并提前发现。
需要并发读写时,一般的做法是加锁,但这样性能并不高。
Go语言在 1.9 版本中提供了一种效率较高的并发安全的 sync.Map,sync.Map 和 map 不同,不是以语言原生形态提供,而是在 sync 包。
- 无须初始化,直接声明即可。
- sync.Map 不能使用 map 的方式进行取值和设置等操作,而是使用 sync.Map 的方法进行调用,Store 表示存储,Load 表示获取,Delete 表示删除。
- 使用 Range 配合一个回调函数进行遍历操作,通过回调函数返回内部遍历出来的值,Range 参数中回调函数的返回值在需要继续迭代遍历时,返回 true,终止迭代遍历时,返回 false。
sync.Map 没有提供获取 map 数量的方法,替代方法是在获取 sync.Map 时遍历自行计算数量,sync.Map 为了保证并发安全有一些性能损失,因此在非并发情况下,使用 map 相比使用 sync.Map 会有更好的性能。