Go服务Docker Pod不断重启排查和解决

简介: 该文章分享了Go服务在Docker Pod中不断重启的问题排查过程和解决方案,识别出并发写map导致fatal error的问题,并提供了使用sync.Map或concurrent-map库作为并发安全的替代方案。

现象

pod实例的go服务每个几个小时就自动重启

初步分析

初步判断go服务遇到未捕捉的异常(造成panic且没有recover),才导致程序挂掉,docker服务给重启了。

具体分析

首先对代码进行初步的review,发现没有明显的问题导致的panic,因为预计的地方都已经加了recover。下一步就是定位哪些不能recover

  • 数据竞争(比如:对map进行并发读写),可以通过golang的编译标记race对代码进行检测是否存在数据竞争(比如:并发读写map)
  • 内存不足
  • 死锁

上线了几个pod,终于捕获到了完整的stack fatal error , 对照代码看了是并发写map 导致了fatal error , 这种是不能revovey的,后来在map的地方加了sync.RWMutex 解决了这个问题,上线后没有重启了

此外,也可以考虑concurrent-map

package cmap

import (
    "encoding/json"
    "fmt"
    "sync"
)

var SHARD_COUNT = 32

type Stringer interface {
    fmt.Stringer
    comparable
}

// A "thread" safe map of type string:Anything.
// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.
type ConcurrentMap[K comparable, V any] struct {
    shards   []*ConcurrentMapShared[K, V]
    sharding func(key K) uint32
}

// A "thread" safe string to anything map.
type ConcurrentMapShared[K comparable, V any] struct {
    items        map[K]V
    sync.RWMutex // Read Write mutex, guards access to internal map.
}

func create[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] {
    m := ConcurrentMap[K, V]{
        sharding: sharding,
        shards:   make([]*ConcurrentMapShared[K, V], SHARD_COUNT),
    }
    for i := 0; i < SHARD_COUNT; i++ {
        m.shards[i] = &ConcurrentMapShared[K, V]{items: make(map[K]V)}
    }
    return m
}

// Creates a new concurrent map.
func New[V any]() ConcurrentMap[string, V] {
    return create[string, V](fnv32)
}

// Creates a new concurrent map.
func NewStringer[K Stringer, V any]() ConcurrentMap[K, V] {
    return create[K, V](strfnv32[K])
}

// Creates a new concurrent map.
func NewWithCustomShardingFunction[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] {
    return create[K, V](sharding)
}

// GetShard returns shard under given key
func (m ConcurrentMap[K, V]) GetShard(key K) *ConcurrentMapShared[K, V] {
    return m.shards[uint(m.sharding(key))%uint(SHARD_COUNT)]
}

func (m ConcurrentMap[K, V]) MSet(data map[K]V) {
    for key, value := range data {
        shard := m.GetShard(key)
        shard.Lock()
        shard.items[key] = value
        shard.Unlock()
    }
}

// Sets the given value under the specified key.
func (m ConcurrentMap[K, V]) Set(key K, value V) {
    // Get map shard.
    shard := m.GetShard(key)
    shard.Lock()
    shard.items[key] = value
    shard.Unlock()
}

// Callback to return new element to be inserted into the map
// It is called while lock is held, therefore it MUST NOT
// try to access other keys in same map, as it can lead to deadlock since
// Go sync.RWLock is not reentrant
type UpsertCb[V any] func(exist bool, valueInMap V, newValue V) V

// Insert or Update - updates existing element or inserts a new one using UpsertCb
func (m ConcurrentMap[K, V]) Upsert(key K, value V, cb UpsertCb[V]) (res V) {
    shard := m.GetShard(key)
    shard.Lock()
    v, ok := shard.items[key]
    res = cb(ok, v, value)
    shard.items[key] = res
    shard.Unlock()
    return res
}

// Sets the given value under the specified key if no value was associated with it.
func (m ConcurrentMap[K, V]) SetIfAbsent(key K, value V) bool {
    // Get map shard.
    shard := m.GetShard(key)
    shard.Lock()
    _, ok := shard.items[key]
    if !ok {
        shard.items[key] = value
    }
    shard.Unlock()
    return !ok
}

// Get retrieves an element from map under given key.
func (m ConcurrentMap[K, V]) Get(key K) (V, bool) {
    // Get shard
    shard := m.GetShard(key)
    shard.RLock()
    // Get item from shard.
    val, ok := shard.items[key]
    shard.RUnlock()
    return val, ok
}

// Count returns the number of elements within the map.
func (m ConcurrentMap[K, V]) Count() int {
    count := 0
    for i := 0; i < SHARD_COUNT; i++ {
        shard := m.shards[i]
        shard.RLock()
        count += len(shard.items)
        shard.RUnlock()
    }
    return count
}

// Looks up an item under specified key
func (m ConcurrentMap[K, V]) Has(key K) bool {
    // Get shard
    shard := m.GetShard(key)
    shard.RLock()
    // See if element is within shard.
    _, ok := shard.items[key]
    shard.RUnlock()
    return ok
}

// Remove removes an element from the map.
func (m ConcurrentMap[K, V]) Remove(key K) {
    // Try to get shard.
    shard := m.GetShard(key)
    shard.Lock()
    delete(shard.items, key)
    shard.Unlock()
}

// RemoveCb is a callback executed in a map.RemoveCb() call, while Lock is held
// If returns true, the element will be removed from the map
type RemoveCb[K any, V any] func(key K, v V, exists bool) bool

// RemoveCb locks the shard containing the key, retrieves its current value and calls the callback with those params
// If callback returns true and element exists, it will remove it from the map
// Returns the value returned by the callback (even if element was not present in the map)
func (m ConcurrentMap[K, V]) RemoveCb(key K, cb RemoveCb[K, V]) bool {
    // Try to get shard.
    shard := m.GetShard(key)
    shard.Lock()
    v, ok := shard.items[key]
    remove := cb(key, v, ok)
    if remove && ok {
        delete(shard.items, key)
    }
    shard.Unlock()
    return remove
}

// Pop removes an element from the map and returns it
func (m ConcurrentMap[K, V]) Pop(key K) (v V, exists bool) {
    // Try to get shard.
    shard := m.GetShard(key)
    shard.Lock()
    v, exists = shard.items[key]
    delete(shard.items, key)
    shard.Unlock()
    return v, exists
}

// IsEmpty checks if map is empty.
func (m ConcurrentMap[K, V]) IsEmpty() bool {
    return m.Count() == 0
}

// Used by the Iter & IterBuffered functions to wrap two variables together over a channel,
type Tuple[K comparable, V any] struct {
    Key K
    Val V
}

// Iter returns an iterator which could be used in a for range loop.
//
// Deprecated: using IterBuffered() will get a better performence
func (m ConcurrentMap[K, V]) Iter() <-chan Tuple[K, V] {
    chans := snapshot(m)
    ch := make(chan Tuple[K, V])
    go fanIn(chans, ch)
    return ch
}

// IterBuffered returns a buffered iterator which could be used in a for range loop.
func (m ConcurrentMap[K, V]) IterBuffered() <-chan Tuple[K, V] {
    chans := snapshot(m)
    total := 0
    for _, c := range chans {
        total += cap(c)
    }
    ch := make(chan Tuple[K, V], total)
    go fanIn(chans, ch)
    return ch
}

// Clear removes all items from map.
func (m ConcurrentMap[K, V]) Clear() {
    for item := range m.IterBuffered() {
        m.Remove(item.Key)
    }
}

// Returns a array of channels that contains elements in each shard,
// which likely takes a snapshot of `m`.
// It returns once the size of each buffered channel is determined,
// before all the channels are populated using goroutines.
func snapshot[K comparable, V any](m ConcurrentMap[K, V]) (chans []chan Tuple[K, V]) {
    //When you access map items before initializing.
    if len(m.shards) == 0 {
        panic(`cmap.ConcurrentMap is not initialized. Should run New() before usage.`)
    }
    chans = make([]chan Tuple[K, V], SHARD_COUNT)
    wg := sync.WaitGroup{}
    wg.Add(SHARD_COUNT)
    // Foreach shard.
    for index, shard := range m.shards {
        go func(index int, shard *ConcurrentMapShared[K, V]) {
            // Foreach key, value pair.
            shard.RLock()
            chans[index] = make(chan Tuple[K, V], len(shard.items))
            wg.Done()
            for key, val := range shard.items {
                chans[index] <- Tuple[K, V]{key, val}
            }
            shard.RUnlock()
            close(chans[index])
        }(index, shard)
    }
    wg.Wait()
    return chans
}

// fanIn reads elements from channels `chans` into channel `out`
func fanIn[K comparable, V any](chans []chan Tuple[K, V], out chan Tuple[K, V]) {
    wg := sync.WaitGroup{}
    wg.Add(len(chans))
    for _, ch := range chans {
        go func(ch chan Tuple[K, V]) {
            for t := range ch {
                out <- t
            }
            wg.Done()
        }(ch)
    }
    wg.Wait()
    close(out)
}

// Items returns all items as map[string]V
func (m ConcurrentMap[K, V]) Items() map[K]V {
    tmp := make(map[K]V)

    // Insert items to temporary map.
    for item := range m.IterBuffered() {
        tmp[item.Key] = item.Val
    }

    return tmp
}

// Iterator callbacalled for every key,value found in
// maps. RLock is held for all calls for a given shard
// therefore callback sess consistent view of a shard,
// but not across the shards
type IterCb[K comparable, V any] func(key K, v V)

// Callback based iterator, cheapest way to read
// all elements in a map.
func (m ConcurrentMap[K, V]) IterCb(fn IterCb[K, V]) {
    for idx := range m.shards {
        shard := (m.shards)[idx]
        shard.RLock()
        for key, value := range shard.items {
            fn(key, value)
        }
        shard.RUnlock()
    }
}

// Keys returns all keys as []string
func (m ConcurrentMap[K, V]) Keys() []K {
    count := m.Count()
    ch := make(chan K, count)
    go func() {
        // Foreach shard.
        wg := sync.WaitGroup{}
        wg.Add(SHARD_COUNT)
        for _, shard := range m.shards {
            go func(shard *ConcurrentMapShared[K, V]) {
                // Foreach key, value pair.
                shard.RLock()
                for key := range shard.items {
                    ch <- key
                }
                shard.RUnlock()
                wg.Done()
            }(shard)
        }
        wg.Wait()
        close(ch)
    }()

    // Generate keys
    keys := make([]K, 0, count)
    for k := range ch {
        keys = append(keys, k)
    }
    return keys
}

// Reviles ConcurrentMap "private" variables to json marshal.
func (m ConcurrentMap[K, V]) MarshalJSON() ([]byte, error) {
    // Create a temporary map, which will hold all item spread across shards.
    tmp := make(map[K]V)

    // Insert items to temporary map.
    for item := range m.IterBuffered() {
        tmp[item.Key] = item.Val
    }
    return json.Marshal(tmp)
}
func strfnv32[K fmt.Stringer](key K) uint32 {
    return fnv32(key.String())
}

func fnv32(key string) uint32 {
    hash := uint32(2166136261)
    const prime32 = uint32(16777619)
    keyLength := len(key)
    for i := 0; i < keyLength; i++ {
        hash *= prime32
        hash ^= uint32(key[i])
    }
    return hash
}

// Reverse process of Marshal.
func (m *ConcurrentMap[K, V]) UnmarshalJSON(b []byte) (err error) {
    tmp := make(map[K]V)

    // Unmarshal into a single map.
    if err := json.Unmarshal(b, &tmp); err != nil {
        return err
    }

    // foreach key,value pair in temporary map insert into our concurrent map.
    for key, val := range tmp {
        m.Set(key, val)
    }
    return nil
}

解决方案

Go语言中的 map 在并发情况下,只读是线程安全的,同时读写是线程不安全的。

下面来看下并发情况下读写 map 时会出现的问题,代码如下:

// 创建一个int到int的映射
m := make(map[int]int)

// 开启一段并发代码
go func() {

    // 不停地对map进行写入
    for {
        m[1] = 1
    }

}()

// 开启一段并发代码
go func() {

    // 不停地对map进行读取
    for {
        _ = m[1]
    }

}()

// 无限循环, 让并发程序在后台执行
for {

}

运行代码会报错,输出如下:

fatal error: concurrent map read and map write

错误信息显示,并发的 map 读和 map 写,也就是说使用了两个并发函数不断地对 map 进行读和写而发生了竞态问题,map 内部会对这种并发操作进行检查并提前发现。

需要并发读写时,一般的做法是加锁,但这样性能并不高。

Go语言在 1.9 版本中提供了一种效率较高的并发安全的 sync.Map,sync.Map 和 map 不同,不是以语言原生形态提供,而是在 sync 包。

  • 无须初始化,直接声明即可。
  • sync.Map 不能使用 map 的方式进行取值和设置等操作,而是使用 sync.Map 的方法进行调用,Store 表示存储,Load 表示获取,Delete 表示删除。
  • 使用 Range 配合一个回调函数进行遍历操作,通过回调函数返回内部遍历出来的值,Range 参数中回调函数的返回值在需要继续迭代遍历时,返回 true,终止迭代遍历时,返回 false。

sync.Map 没有提供获取 map 数量的方法,替代方法是在获取 sync.Map 时遍历自行计算数量,sync.Map 为了保证并发安全有一些性能损失,因此在非并发情况下,使用 map 相比使用 sync.Map 会有更好的性能。

参考

https://juejin.cn/post/6844903895227957262

相关文章
|
11天前
|
关系型数据库 MySQL API
|
6天前
|
Go UED
Go Web服务中如何优雅平滑重启?
在生产环境中,服务升级时如何确保不中断当前请求并应用新代码是一个挑战。本文介绍了如何使用 Go 语言的 `endless` 包实现服务的优雅重启,确保在不停止服务的情况下完成无缝升级。通过示例代码和测试步骤,详细展示了 `endless` 包的工作原理和实际应用。
22 3
|
7天前
|
JSON Go UED
Go Web服务中如何优雅关机?
在构建 Web 服务时,优雅关机是一个关键的技术点,它确保服务关闭时所有正在处理的请求都能顺利完成。本文通过一个简单的 Go 语言示例,展示了如何使用 Gin 框架实现优雅关机。通过捕获系统信号和使用 `http.Server` 的 `Shutdown` 方法,我们可以在服务关闭前等待所有请求处理完毕,从而提升用户体验,避免数据丢失或不一致。
14 1
|
21天前
|
负载均衡 应用服务中间件 网络安全
docker swarm添加更多的服务
【10月更文挑战第16天】
18 6
|
21天前
|
Docker 容器
docker swarm启动服务并连接到网络
【10月更文挑战第16天】
20 5
|
22天前
|
负载均衡 网络协议 关系型数据库
docker swarm 使用网络启动服务
【10月更文挑战第15天】
19 4
|
23天前
|
Docker 容器
docker swarm 在服务中使用网络
【10月更文挑战第14天】
17 2
|
12天前
|
Docker 容器
docker 容器重启
【10月更文挑战第30天】docker 容器重启
15 0
|
1月前
|
安全 网络安全 数据安全/隐私保护
docker服务未启动
【10月更文挑战第2天】
55 3
|
1月前
|
Linux iOS开发 Docker
docker服务未启动
【10月更文挑战第3天】
63 1