Go服务Docker Pod不断重启排查和解决

简介: 该文章分享了Go服务在Docker Pod中不断重启的问题排查过程和解决方案,识别出并发写map导致fatal error的问题,并提供了使用sync.Map或concurrent-map库作为并发安全的替代方案。

现象

pod实例的go服务每个几个小时就自动重启

初步分析

初步判断go服务遇到未捕捉的异常(造成panic且没有recover),才导致程序挂掉,docker服务给重启了。

具体分析

首先对代码进行初步的review,发现没有明显的问题导致的panic,因为预计的地方都已经加了recover。下一步就是定位哪些不能recover

  • 数据竞争(比如:对map进行并发读写),可以通过golang的编译标记race对代码进行检测是否存在数据竞争(比如:并发读写map)
  • 内存不足
  • 死锁

上线了几个pod,终于捕获到了完整的stack fatal error , 对照代码看了是并发写map 导致了fatal error , 这种是不能revovey的,后来在map的地方加了sync.RWMutex 解决了这个问题,上线后没有重启了

此外,也可以考虑concurrent-map

package cmap

import (
    "encoding/json"
    "fmt"
    "sync"
)

var SHARD_COUNT = 32

type Stringer interface {
    fmt.Stringer
    comparable
}

// A "thread" safe map of type string:Anything.
// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.
type ConcurrentMap[K comparable, V any] struct {
    shards   []*ConcurrentMapShared[K, V]
    sharding func(key K) uint32
}

// A "thread" safe string to anything map.
type ConcurrentMapShared[K comparable, V any] struct {
    items        map[K]V
    sync.RWMutex // Read Write mutex, guards access to internal map.
}

func create[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] {
    m := ConcurrentMap[K, V]{
        sharding: sharding,
        shards:   make([]*ConcurrentMapShared[K, V], SHARD_COUNT),
    }
    for i := 0; i < SHARD_COUNT; i++ {
        m.shards[i] = &ConcurrentMapShared[K, V]{items: make(map[K]V)}
    }
    return m
}

// Creates a new concurrent map.
func New[V any]() ConcurrentMap[string, V] {
    return create[string, V](fnv32)
}

// Creates a new concurrent map.
func NewStringer[K Stringer, V any]() ConcurrentMap[K, V] {
    return create[K, V](strfnv32[K])
}

// Creates a new concurrent map.
func NewWithCustomShardingFunction[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] {
    return create[K, V](sharding)
}

// GetShard returns shard under given key
func (m ConcurrentMap[K, V]) GetShard(key K) *ConcurrentMapShared[K, V] {
    return m.shards[uint(m.sharding(key))%uint(SHARD_COUNT)]
}

func (m ConcurrentMap[K, V]) MSet(data map[K]V) {
    for key, value := range data {
        shard := m.GetShard(key)
        shard.Lock()
        shard.items[key] = value
        shard.Unlock()
    }
}

// Sets the given value under the specified key.
func (m ConcurrentMap[K, V]) Set(key K, value V) {
    // Get map shard.
    shard := m.GetShard(key)
    shard.Lock()
    shard.items[key] = value
    shard.Unlock()
}

// Callback to return new element to be inserted into the map
// It is called while lock is held, therefore it MUST NOT
// try to access other keys in same map, as it can lead to deadlock since
// Go sync.RWLock is not reentrant
type UpsertCb[V any] func(exist bool, valueInMap V, newValue V) V

// Insert or Update - updates existing element or inserts a new one using UpsertCb
func (m ConcurrentMap[K, V]) Upsert(key K, value V, cb UpsertCb[V]) (res V) {
    shard := m.GetShard(key)
    shard.Lock()
    v, ok := shard.items[key]
    res = cb(ok, v, value)
    shard.items[key] = res
    shard.Unlock()
    return res
}

// Sets the given value under the specified key if no value was associated with it.
func (m ConcurrentMap[K, V]) SetIfAbsent(key K, value V) bool {
    // Get map shard.
    shard := m.GetShard(key)
    shard.Lock()
    _, ok := shard.items[key]
    if !ok {
        shard.items[key] = value
    }
    shard.Unlock()
    return !ok
}

// Get retrieves an element from map under given key.
func (m ConcurrentMap[K, V]) Get(key K) (V, bool) {
    // Get shard
    shard := m.GetShard(key)
    shard.RLock()
    // Get item from shard.
    val, ok := shard.items[key]
    shard.RUnlock()
    return val, ok
}

// Count returns the number of elements within the map.
func (m ConcurrentMap[K, V]) Count() int {
    count := 0
    for i := 0; i < SHARD_COUNT; i++ {
        shard := m.shards[i]
        shard.RLock()
        count += len(shard.items)
        shard.RUnlock()
    }
    return count
}

// Looks up an item under specified key
func (m ConcurrentMap[K, V]) Has(key K) bool {
    // Get shard
    shard := m.GetShard(key)
    shard.RLock()
    // See if element is within shard.
    _, ok := shard.items[key]
    shard.RUnlock()
    return ok
}

// Remove removes an element from the map.
func (m ConcurrentMap[K, V]) Remove(key K) {
    // Try to get shard.
    shard := m.GetShard(key)
    shard.Lock()
    delete(shard.items, key)
    shard.Unlock()
}

// RemoveCb is a callback executed in a map.RemoveCb() call, while Lock is held
// If returns true, the element will be removed from the map
type RemoveCb[K any, V any] func(key K, v V, exists bool) bool

// RemoveCb locks the shard containing the key, retrieves its current value and calls the callback with those params
// If callback returns true and element exists, it will remove it from the map
// Returns the value returned by the callback (even if element was not present in the map)
func (m ConcurrentMap[K, V]) RemoveCb(key K, cb RemoveCb[K, V]) bool {
    // Try to get shard.
    shard := m.GetShard(key)
    shard.Lock()
    v, ok := shard.items[key]
    remove := cb(key, v, ok)
    if remove && ok {
        delete(shard.items, key)
    }
    shard.Unlock()
    return remove
}

// Pop removes an element from the map and returns it
func (m ConcurrentMap[K, V]) Pop(key K) (v V, exists bool) {
    // Try to get shard.
    shard := m.GetShard(key)
    shard.Lock()
    v, exists = shard.items[key]
    delete(shard.items, key)
    shard.Unlock()
    return v, exists
}

// IsEmpty checks if map is empty.
func (m ConcurrentMap[K, V]) IsEmpty() bool {
    return m.Count() == 0
}

// Used by the Iter & IterBuffered functions to wrap two variables together over a channel,
type Tuple[K comparable, V any] struct {
    Key K
    Val V
}

// Iter returns an iterator which could be used in a for range loop.
//
// Deprecated: using IterBuffered() will get a better performence
func (m ConcurrentMap[K, V]) Iter() <-chan Tuple[K, V] {
    chans := snapshot(m)
    ch := make(chan Tuple[K, V])
    go fanIn(chans, ch)
    return ch
}

// IterBuffered returns a buffered iterator which could be used in a for range loop.
func (m ConcurrentMap[K, V]) IterBuffered() <-chan Tuple[K, V] {
    chans := snapshot(m)
    total := 0
    for _, c := range chans {
        total += cap(c)
    }
    ch := make(chan Tuple[K, V], total)
    go fanIn(chans, ch)
    return ch
}

// Clear removes all items from map.
func (m ConcurrentMap[K, V]) Clear() {
    for item := range m.IterBuffered() {
        m.Remove(item.Key)
    }
}

// Returns a array of channels that contains elements in each shard,
// which likely takes a snapshot of `m`.
// It returns once the size of each buffered channel is determined,
// before all the channels are populated using goroutines.
func snapshot[K comparable, V any](m ConcurrentMap[K, V]) (chans []chan Tuple[K, V]) {
    //When you access map items before initializing.
    if len(m.shards) == 0 {
        panic(`cmap.ConcurrentMap is not initialized. Should run New() before usage.`)
    }
    chans = make([]chan Tuple[K, V], SHARD_COUNT)
    wg := sync.WaitGroup{}
    wg.Add(SHARD_COUNT)
    // Foreach shard.
    for index, shard := range m.shards {
        go func(index int, shard *ConcurrentMapShared[K, V]) {
            // Foreach key, value pair.
            shard.RLock()
            chans[index] = make(chan Tuple[K, V], len(shard.items))
            wg.Done()
            for key, val := range shard.items {
                chans[index] <- Tuple[K, V]{key, val}
            }
            shard.RUnlock()
            close(chans[index])
        }(index, shard)
    }
    wg.Wait()
    return chans
}

// fanIn reads elements from channels `chans` into channel `out`
func fanIn[K comparable, V any](chans []chan Tuple[K, V], out chan Tuple[K, V]) {
    wg := sync.WaitGroup{}
    wg.Add(len(chans))
    for _, ch := range chans {
        go func(ch chan Tuple[K, V]) {
            for t := range ch {
                out <- t
            }
            wg.Done()
        }(ch)
    }
    wg.Wait()
    close(out)
}

// Items returns all items as map[string]V
func (m ConcurrentMap[K, V]) Items() map[K]V {
    tmp := make(map[K]V)

    // Insert items to temporary map.
    for item := range m.IterBuffered() {
        tmp[item.Key] = item.Val
    }

    return tmp
}

// Iterator callbacalled for every key,value found in
// maps. RLock is held for all calls for a given shard
// therefore callback sess consistent view of a shard,
// but not across the shards
type IterCb[K comparable, V any] func(key K, v V)

// Callback based iterator, cheapest way to read
// all elements in a map.
func (m ConcurrentMap[K, V]) IterCb(fn IterCb[K, V]) {
    for idx := range m.shards {
        shard := (m.shards)[idx]
        shard.RLock()
        for key, value := range shard.items {
            fn(key, value)
        }
        shard.RUnlock()
    }
}

// Keys returns all keys as []string
func (m ConcurrentMap[K, V]) Keys() []K {
    count := m.Count()
    ch := make(chan K, count)
    go func() {
        // Foreach shard.
        wg := sync.WaitGroup{}
        wg.Add(SHARD_COUNT)
        for _, shard := range m.shards {
            go func(shard *ConcurrentMapShared[K, V]) {
                // Foreach key, value pair.
                shard.RLock()
                for key := range shard.items {
                    ch <- key
                }
                shard.RUnlock()
                wg.Done()
            }(shard)
        }
        wg.Wait()
        close(ch)
    }()

    // Generate keys
    keys := make([]K, 0, count)
    for k := range ch {
        keys = append(keys, k)
    }
    return keys
}

// Reviles ConcurrentMap "private" variables to json marshal.
func (m ConcurrentMap[K, V]) MarshalJSON() ([]byte, error) {
    // Create a temporary map, which will hold all item spread across shards.
    tmp := make(map[K]V)

    // Insert items to temporary map.
    for item := range m.IterBuffered() {
        tmp[item.Key] = item.Val
    }
    return json.Marshal(tmp)
}
func strfnv32[K fmt.Stringer](key K) uint32 {
    return fnv32(key.String())
}

func fnv32(key string) uint32 {
    hash := uint32(2166136261)
    const prime32 = uint32(16777619)
    keyLength := len(key)
    for i := 0; i < keyLength; i++ {
        hash *= prime32
        hash ^= uint32(key[i])
    }
    return hash
}

// Reverse process of Marshal.
func (m *ConcurrentMap[K, V]) UnmarshalJSON(b []byte) (err error) {
    tmp := make(map[K]V)

    // Unmarshal into a single map.
    if err := json.Unmarshal(b, &tmp); err != nil {
        return err
    }

    // foreach key,value pair in temporary map insert into our concurrent map.
    for key, val := range tmp {
        m.Set(key, val)
    }
    return nil
}

解决方案

Go语言中的 map 在并发情况下,只读是线程安全的,同时读写是线程不安全的。

下面来看下并发情况下读写 map 时会出现的问题,代码如下:

// 创建一个int到int的映射
m := make(map[int]int)

// 开启一段并发代码
go func() {

    // 不停地对map进行写入
    for {
        m[1] = 1
    }

}()

// 开启一段并发代码
go func() {

    // 不停地对map进行读取
    for {
        _ = m[1]
    }

}()

// 无限循环, 让并发程序在后台执行
for {

}

运行代码会报错,输出如下:

fatal error: concurrent map read and map write

错误信息显示,并发的 map 读和 map 写,也就是说使用了两个并发函数不断地对 map 进行读和写而发生了竞态问题,map 内部会对这种并发操作进行检查并提前发现。

需要并发读写时,一般的做法是加锁,但这样性能并不高。

Go语言在 1.9 版本中提供了一种效率较高的并发安全的 sync.Map,sync.Map 和 map 不同,不是以语言原生形态提供,而是在 sync 包。

  • 无须初始化,直接声明即可。
  • sync.Map 不能使用 map 的方式进行取值和设置等操作,而是使用 sync.Map 的方法进行调用,Store 表示存储,Load 表示获取,Delete 表示删除。
  • 使用 Range 配合一个回调函数进行遍历操作,通过回调函数返回内部遍历出来的值,Range 参数中回调函数的返回值在需要继续迭代遍历时,返回 true,终止迭代遍历时,返回 false。

sync.Map 没有提供获取 map 数量的方法,替代方法是在获取 sync.Map 时遍历自行计算数量,sync.Map 为了保证并发安全有一些性能损失,因此在非并发情况下,使用 map 相比使用 sync.Map 会有更好的性能。

参考

https://juejin.cn/post/6844903895227957262

相关文章
|
18天前
|
缓存 弹性计算 API
用 Go 快速开发一个 RESTful API 服务
用 Go 快速开发一个 RESTful API 服务
|
12天前
|
存储 Linux Docker
CentOS 7.6安装Docker实战案例及存储引擎和服务进程简介
关于如何在CentOS 7.6上安装Docker、介绍Docker存储引擎以及服务进程关系的实战案例。
49 3
CentOS 7.6安装Docker实战案例及存储引擎和服务进程简介
|
2天前
|
Go API 开发者
深入探讨:使用Go语言构建高性能RESTful API服务
在本文中,我们将探索Go语言在构建高效、可靠的RESTful API服务中的独特优势。通过实际案例分析,我们将展示Go如何通过其并发模型、简洁的语法和内置的http包,成为现代后端服务开发的有力工具。
|
18天前
|
Java Docker Python
启动docker服务需要的三个重要文件
这篇文章介绍了启动Docker服务所需的三个重要文件:Dockerfile、build_image.sh和run.sh。文章提供了Java和Python两个版本的Dockerfile示例,并解释了每个阶段的作用,如基础镜像的选择、构建环境的设置、以及如何通过参数传递环境变量。build_image.sh脚本用于执行Docker镜像的构建、标记和推送过程,而run.sh脚本则用于执行具体的运行命令,包括设置Java参数和执行jar文件。 文章还强调了这些文件应由项目负责人维护,并根据项目需求自行修改启动命令参数。
15 2
|
25天前
|
Docker 容器
Docker启动的容器如何做到自动重启?
【8月更文挑战第19天】Docker启动的容器如何做到自动重启?
28 1
|
6天前
|
关系型数据库 数据库 网络虚拟化
Docker环境下重启PostgreSQL数据库服务的全面指南与代码示例
由于时间和空间限制,我将在后续的回答中分别涉及到“Python中采用lasso、SCAD、LARS技术分析棒球运动员薪资的案例集锦”以及“Docker环境下重启PostgreSQL数据库服务的全面指南与代码示例”。如果你有任何一个问题的优先顺序或需要立即回答的,请告知。
16 0
|
18天前
|
运维 监控 程序员
Go 服务自动收集线上问题现场
Go 服务自动收集线上问题现场
|
18天前
|
SQL JavaScript Go
Go Web 服务框架实现详解
Go Web 服务框架实现详解
|
18天前
|
运维 Shell Go
构建 Go 应用 docker 镜像的十八种姿势
构建 Go 应用 docker 镜像的十八种姿势
|
19天前
|
前端开发 Go 开发者
用 Go + WebSocket 快速实现一个 chat 服务
用 Go + WebSocket 快速实现一个 chat 服务