对于阿里开源混沌工程工具chaosblade-box-agent心跳报错问题的分析与解决

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
可观测可视化 Grafana 版,10个用户账号 1个月
简介: 摘要:本文记录了一个由chaosblade-box平台后台发现的偶发的chaosblade-box-agent不发送心跳的问题,从报错日志入手,结合chaosblade-box-agent源码进行分析,最终解决问题并修复打包的过程。

背景

在chaos后台发现了agent不发心跳了的问题,大概启动agent后一个月左右这样,会发现agengt端上报心跳报错(dev和staging都发生过)。
目前解决方法是重启agent(devops环境)与chaos后台服务(dev/test/staging/prod环境)。单独重启还不行,必须都重启才能解决。

目标

解决chaosblade-box-agent不能持续稳定发射心跳的问题。

代码分析

●研读入口逻辑:cmd/chaos_agent.go,这个就是入口文件,通过go build后启动编译产物。项目构建通过Makefile进行,本质是go build cmd/chaos_agent.go。启动agent程序会先检查/opt/chaosblade/blade这个路径,如果这个路径不存在,则会启动失败,所以需要确保已安装了chaosblade工具,再启动chaosblade-box-agent
●研读接口逻辑:/AgentHeartBeat
●研读接口逻辑:/k8sPod

报错分析

从报错日志入手:"[heartbeat] send failed."出自conn/heartbeat/heartbeat.go中的sendHeartbeat方法,这个方法所在的对象heartbeatClient是在main函数中初始化并传入的:

heartbeatClient := heartbeat.NewClientHeartbeatHandler(options.Opts.HeartbeatConfig, transportClient)

而定时任务所依赖的心跳时间间隔就来自于HeartbeatConfig这个结构体,追根溯源,是在main函数一开始创建的:

options.NewOptions()
->
Opts.AddFlags()
->
o.Flags.DurationVar(&o.HeartbeatConfig.Period, "heartbeat.period", 5*time.Second, "the period of heartbeat")

在此设置了5秒发一次心跳。而心跳逻辑的启动逻辑入口也在main函数中:

newConn.Start()
->
func (c *Conn) Start()

该方法中遍历了c(Conn)对象中的clientHandlers,会分别调用每个ClientHandle对象的Start方法,这些handlers对象都是在main函数里注册到c对象中的:

// conn to server
connectClient := connect.NewClientConnectHandler(transportClient)
heartbeatClient := heartbeat.NewClientHeartbeatHandler(options.Opts.HeartbeatConfig, transportClient)
metricClient := metric.NewClientMetricHandler(transportClient, reportMetricConfigMap)
newConn := conn.NewConn()
newConn.Register(transport.API_REGISTRY, connectClient)
newConn.Register(transport.API_HEARTBEAT, heartbeatClient)
newConn.Register(transport.API_METRIC, metricClient)

最后再回到conn/heartbeat/heartbeat.go中的sendHeartbeat方法的报错日志处,根据报错日志中的关键字“Report kubernetes”继续向下搜索,找到collector/kubernetes/kubernetes.go中的报错所在函数:

func (collector *K8sBaseCollector) reportK8sMetric(namespace string, isExists bool, resource interface{
   }, size int)

其中根据报错日志,锁定报错所在逻辑:

// 开启压缩
uri.CompressVersion = fmt.Sprintf("%d", transport.AllCompress)
response, err := collector.transport.Invoke(uri, request, true) // todo 这里看下是否用这个struct
if err != nil {
   
    collector.resetIdentifierCache()
    logrus.Warningf("Report kubernetes %s infos err: %v", collector.ReportHandler, err)
    return
}

最后根据err后面的内容关键字“access token not found”搜索到日志出处所在文件transport/interceptor.go:

func (authInterceptor *authInterceptor) doInvoker(request *Request) (*Response, bool) {
   
        accessKey := tools.GetAccessKey()
        secureKey := tools.GetSecureKey()
        if accessKey == "" || secureKey == "" {
   
                return ReturnFail(TokenNotFound), false
        }
        request.AddHeader(AccessKey, accessKey)
        signData := request.Headers[SignData]
        if signData == "" {
   
                bytes, err := json.Marshal(request.Params)
                if err != nil {
   
                        return ReturnFail(EncodeError, err.Error()), false
                }
                signData = string(bytes)
        }
        sign := tools.Sign(signData)
        request.AddHeader(SignKey, sign)
        return nil, true
}

可以看出这个错误是由于accessKey为空或secureKey为空导致的。进一步看下tools里面的方法:

var localAccessKey = ""
var localSecureKey = ""
var mutex = sync.RWMutex{
   }

// GetAccessKey
func GetAccessKey() string {
   
        mutex.RLock()
        defer mutex.RUnlock()
        return localAccessKey
}

// GetSecureKey
func GetSecureKey() string {
   
        mutex.RLock()
        defer mutex.RUnlock()
        return localSecureKey
}

这两个函数使用了读写锁 mutex 来保证并发访问的安全性。它们通过读取 localAccessKey 和 localSecureKey 的值来返回访问密钥和安全密钥。而这两个Key的赋值逻辑如下:

// Record AK/SK to file
func RecordSecretKeyToFile(accessKey, secretKey string) error {
   
        if accessKey == "" || secretKey == "" {
   
                log.Warningln("key: ", accessKey, secretKey)
                return errors.New("accessKey or secretKey is empty")
        }

        keys := map[string]string{
   
                AccessKeyName: accessKey,
                SecretKeyName: secretKey,
        }
        err := RecordMapToFile(keys, path.Join(GetUserHome(), ".chaos.cert"), true)
        if err != nil {
   
                return err
        }
        localAccessKey = accessKey
        localSecureKey = secretKey
        return nil
}

func RecordMapToFile(data map[string]string, filePath string, truncate bool) error {
   
        if len(data) == 0 {
   
                return nil
        }
        mutex.Lock()
        defer mutex.Unlock()
        flag := os.O_WRONLY | os.O_CREATE
        if truncate {
   
                flag = flag | os.O_TRUNC
        }
        file, err := os.OpenFile(filePath, flag, 0666)
        defer file.Close()
        if err != nil {
   
                log.WithField("file", filePath).WithError(err).Errorf("record data to file failed")
                return err
        }
        for key, value := range data {
   
                _, err := file.WriteString(strings.Join([]string{
   key, value}, Delimiter) + "\n")
                if err != nil {
   
                        log.WithFields(log.Fields{
   
                                "file":  filePath,
                                "key":   key,
                                "value": value,
                        }).WithError(err).Errorf("write data to file failed")
                        return err
                }
        }
        return nil
}

RecordMapToFile 函数的作用是将键值对数据写入指定的文件中。所以这里有两个需要关注的点,第一个点是这两个Key的值,第二个点是这个写入的文件路径,我们往回溯,找到文件写入路径:

path.Join(GetUserHome(), ".chaos.cert")

其中:

// GetUserHome return user home.
func GetUserHome() string {
   
        user, err := user.Current()
        if err == nil {
   
                return user.HomeDir
        }
        return "/root"
}

当前用户的主目录路径或/root路径,拼上".chaos.cert",所以可以认为这个aksk是写到了这个.chaos.cert文件中,接下来我们再往回溯,看下这对aksk的赋值逻辑,从conn/conn.go的Start开始:

func (c *Conn) Start() {
   
        if len(c.clientHandlers) <= 0 {
   
                return
        }

        var errCh chan error
        for clientHandlerName, clientHandler := range c.clientHandlers {
   
                go func(clientHandlerName string, clientHandler ClientHandle) {
   
                        logrus.WithField("clientHandlerName", clientHandlerName).Infof("conn start")
                        if err := clientHandler.Start(); err != nil {
   
                                logrus.WithField("clientHandlerName", clientHandlerName).Warnf("conn start failed, err: %s", err.Error())
                                errCh <- err
                        }
                }(clientHandlerName, clientHandler)
        }

        go func() {
   
                for {
   
                        select {
   
                        case err := <-errCh:
                                if err != nil {
   
                                        logrus.Errorf("register conn failed, err: %s", err.Error())
                                        os.Exit(1)
                                }
                        }
                }
        }()

}

到下面这个conn/connect/connect.go中的ClientConnectHandler的Start方法

// Connect to remote
func (cc *ClientConnectHandler) Start() error {
   
        request := transport.NewRequest()
        request.AddParam("ip", options.Opts.Ip)
        request.AddParam("pid", options.Opts.Pid)
        request.AddParam("type", options.ProgramName)
        request.AddParam("uid", options.Opts.Uid)
        request.AddParam("instanceId", options.Opts.InstanceId)
        request.AddParam("namespace", options.Opts.Namespace)
        request.AddParam("deviceId", options.Opts.InstanceId)
        request.AddParam("deviceType", strconv.Itoa(options.Host))
        request.AddParam("ak", options.Opts.License)
        request.AddParam("uptime", tools.GetUptime())
        request.AddParam("startupMode", options.Opts.StartupMode)
        request.AddParam("v", options.Opts.Version)
        request.AddParam("agentMode", options.Opts.AgentMode)
        request.AddParam("osType", options.Opts.InstallOperator)
        request.AddParam("cpuNum", strconv.Itoa(runtime.NumCPU()))

        request.AddParam("clusterId", options.Opts.ClusterId).
                AddParam("clusterName", options.Opts.ClusterName)

        chaosBladeVersion := options.Opts.ChaosbladeVersion
        if chaosBladeVersion != "" {
   
                request.AddParam("cbv", chaosBladeVersion)
        }

        // todo windows cant be work
        if memInfo, err := linux.ReadMemInfo("/proc/meminfo"); err != nil {
   
                logrus.Warnln("read proc/meminfo err:", err.Error())
        } else {
   
                memTotalKB := float64(memInfo.MemTotal)
                request.AddParam("memSize", fmt.Sprintf("%f", memTotalKB))
        }

        // application only for host mode
        request.AddParam(options.AppInstanceKeyName, options.Opts.ApplicationInstance)
        request.AddParam(options.AppGroupKeyName, options.Opts.ApplicationGroup)

        if options.Opts.RestrictedVpc {
   
                request.AddParam("restrictedVpc", "true")
                request.AddParam("vpcId", options.Opts.License)
        } else {
   
                request.AddParam("vpcId", options.Opts.VpcId)
        }

        uri := transport.TransportUriMap[transport.API_REGISTRY]

        response, err := cc.transportClient.Invoke(uri, request, false)

        if err != nil {
   
                return err
        }

        // todo 这里要换成http
        return handleDirectHttpConnectResponse(*response)
}

再到下面方法

// handler direct http response
func handleDirectHttpConnectResponse(response transport.Response) error {
   
        if !response.Success {
   
                return errors.New(fmt.Sprintf("connect server failed, %s", response.Error))
        }
        result := response.Result

        v, ok := result.(map[string]interface{
   })
        if !ok {
   
                return errors.New("response is error")
        }
        options.Opts.SetCid(v["cid"].(string))

        if v["uid"] != nil {
   
                options.Opts.SetUid(v["uid"].(string))
        }

        ak, ok := v["ak"]
        if !ok || ak == nil {
   
                logrus.Error("response data is wrong, lack ak!")
                return errors.New("accessKey or secretKey is empty")
        }

        sk, ok := v["sk"]
        if !ok || sk == nil {
   
                logrus.Error("response data is wrong, lack sk!")
                return errors.New("accessKey or secretKey is empty")
        }
        err := tools.RecordSecretKeyToFile(ak.(string), sk.(string))
        return err
}

可以分析得出,ak和sk是通过请求registry接口(API_REGISTRY)得到的。在启动agent的时候会通过该接口与chaosblade后台服务通信完成注册,从接口返回得到ak和sk并保存到本地用户路径下的.chaos.cert文件中与内存变量中,后续将直接从变量中读取。
解决办法
所以我们如果遇到了这个报错,应该是变量中的ak和sk为空导致的。为了验证这点,我们可以在报错日志上方将ak和sk变量打印,另外将.chaos.cert文件中的内容进行打印,如果发现是ak和sk变量为空而.chaos.cert文件中仍然存在ak和sk,则可以试图通过.cert文件重新装载ak和sk变量。

func (authInterceptor *authInterceptor) doInvoker(request *Request) (*Response, bool) {
   
    accessKey := tools.GetAccessKey()
    secureKey := tools.GetSecureKey()
    if accessKey == "" || secureKey == "" {
   
        logrus.Warnf("[zenki] accessKey or secureKey is empty, access key: %s, secure key: %s", accessKey, secureKey)
        go func() {
   
            chaosCertPath := path.Join(tools.GetUserHome(), ".chaos.cert")
            if _, err := os.Stat(chaosCertPath); os.IsNotExist(err) {
   
                logrus.Warnf("[zenki] chaos cert file not found, path: %s", chaosCertPath)
                return
            }
            ak, sk, err := tools.ReadAkSkFromFile(chaosCertPath)
            if err != nil {
   
                logrus.Warnf("[zenki] read access key or secure key from chaos cert file failed, path: %s", chaosCertPath)
                return
            }
            tools.SetAccessKey(ak)
            tools.SetSecureKey(sk)
        }()
        return ReturnFail(TokenNotFound), false
    }
    request.AddHeader(AccessKey, accessKey)
    signData := request.Headers[SignData]
    if signData == "" {
   
        bytes, err := json.Marshal(request.Params)
        if err != nil {
   
            return ReturnFail(EncodeError, err.Error()), false
        }
        signData = string(bytes)
    }
    sign := tools.Sign(signData)
    request.AddHeader(SignKey, sign)
    return nil, true
}

另外在pkg/tools/auth.go里增加从文件中读取ak和sk的方法:

// ReadAkSkFromFile [zenki] read ak and sk from file
func ReadAkSkFromFile(filePath string) (ak string, sk string, err error) {
   
    bytes, err := os.ReadFile(AppFile)
    if err != nil {
   
        return "", "", err
    }
    content := strings.TrimSpace(string(bytes))
    slice := strings.Split(content, "\n")
    if len(slice) == 0 {
   
        return "", "", fmt.Errorf("empty content")
    }
    for _, value := range slice {
   
        kv := strings.SplitN(value, Delimiter, 2)
        if len(kv) != 2 {
   
            continue
        }
        switch kv[0] {
   
        case AccessKeyName:
            ak = kv[1]
        case SecretKeyName:
            sk = kv[1]
        }
    }
    return
}

以及增加设置ak和sk的方法:

// SetAccessKey [zenki] set local access key
func SetAccessKey(accessKey string) {
   
    mutex.Lock()
    defer mutex.Unlock()
    log.Infoln("[zenki] Set access key: ", accessKey)
    localAccessKey = accessKey
}

// SetSecureKey [zenki] set local secure key
func SetSecureKey(secureKey string) {
   
    mutex.Lock()
    defer mutex.Unlock()
    log.Infoln("[zenki] Set secure key: ", secureKey)
    localSecureKey = secureKey
}

修复与打包

通过访问chaosblade平台查看探针安装的方法。

wget https://chaosblade.oss-cn-hangzhou.aliyuncs.com/platform/release/1.0.2/chaosagent.tar.gz -O chaos.tar.gz && tar -zxvf chaos.tar.gz -C /opt/ && sudo sh /opt/chaos/chaosctl.sh install -k 71ae74cbcaba356d59462c398ad940e6 -p  [应用名]  -g  [应用分组]  -P  [agent端口号]  -t  [chaosblade-box ip:port]

可以看出实际上是用到了:
https://chaosblade.oss-cn-hangzhou.aliyuncs.com/platform/release/1.0.2/chaosagent.tar.gz
下载下来并解压以后,可以看到里面的组成如下:
这里面agent是我们需要替换的chaosblade-box-agent的编译产物,其他的不用动。
所以我们只需要修改了代码以后重新执行make命令进行编译,将编译生成的agent文件替换到这里,然后重新压缩成chaos.tar.gz文件,最后将这个chaos.tar.gz上传到cdn上,把wget后面的链接替换成我们上传后cdn的链接即可。

相关实践学习
容器服务Serverless版ACK Serverless 快速入门:在线魔方应用部署和监控
通过本实验,您将了解到容器服务Serverless版ACK Serverless 的基本产品能力,即可以实现快速部署一个在线魔方应用,并借助阿里云容器服务成熟的产品生态,实现在线应用的企业级监控,提升应用稳定性。
云原生实践公开课
课程大纲 开篇:如何学习并实践云原生技术 基础篇: 5 步上手 Kubernetes 进阶篇:生产环境下的 K8s 实践 相关的阿里云产品:容器服务&nbsp;ACK 容器服务&nbsp;Kubernetes&nbsp;版(简称&nbsp;ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情:&nbsp;https://www.aliyun.com/product/kubernetes
相关文章
|
5天前
|
人工智能 决策智能 iOS开发
新Siri解锁万能Agent?魔搭开源移动端框架Mobile-Agent-v2抢先体验!
在刚结束的苹果全球开发者大会WWAC上,Apple家族AI全家桶 「Apple Intelligence」亮相,库克宣布个人智能系统将应用于新版本的 iPhone、iPad 和 Mac 操作系统。
|
25天前
|
开发框架 API 决策智能
ModelScope-Agent框架再升级!新增一键配置多人聊天,配套开源多智能体数据集和训练
ModelScope-Agent是魔搭社区推出的适配开源大语言模型(LLM)的AI Agent(智能体)开发框架,借助ModelScope-Agent,所有开发者都可基于开源 LLM 搭建属于自己的智能体应用。在最新升级完Assistant API和Tool APIs之后,我们又迎来了多智能体聊天室的升级,通过几分钟快速配置即可搭建一个全新的聊天室。
|
1月前
|
人工智能 测试技术 API
【AIGC】LangChain Agent(代理)技术分析与实践
【5月更文挑战第12天】 LangChain代理是利用大语言模型和推理引擎执行一系列操作以完成任务的工具,适用于从简单响应到复杂交互的各种场景。它能整合多种服务,如Google搜索、Wikipedia和LLM。代理通过选择合适的工具按顺序执行任务,不同于链的固定路径。代理的优势在于可以根据上下文动态选择工具和执行策略。适用场景包括网络搜索、嵌入式搜索和API集成。代理由工具组成,每个工具负责单一任务,如Web搜索或数据库查询。工具包则包含预定义的工具集合。创建代理需要定义工具、初始化执行器和设置提示词。LangChain提供了一个从简单到复杂的AI解决方案框架。
410 3
|
1月前
|
监控 Java Maven
揭秘Java Agent技术:解锁Java工具开发的新境界
作为JDK提供的关键机制,Java Agent技术不仅为Java工具的开发者提供了一个强大的框架,还为性能监控、故障诊断和动态代码修改等领域带来了革命性的变革。本文旨在全面解析Java Agent技术的应用场景以及实现方式,特别是静态加载模式和动态加载模式这两种关键模式。
123 0
|
1月前
|
人工智能 自然语言处理 搜索推荐
【AGI】智能体简介及场景分析
【4月更文挑战第14天】AI时代,智能体的意义,使用场景及对未来的意义
65 1
|
1月前
|
存储 人工智能 测试技术
【AI智能体】SuperAGI-开源AI Agent 管理平台
【4月更文挑战第9天】智能体管理平台SuperAGI简介及实践
251 0
|
1月前
|
人工智能 API 决策智能
【AI Agent系列】【阿里AgentScope框架】实战1:利用AgentScope实现动态创建Agent和自由组织讨论
【AI Agent系列】【阿里AgentScope框架】实战1:利用AgentScope实现动态创建Agent和自由组织讨论
286 2
|
1月前
|
人工智能 决策智能 C++
【AI Agent系列】【阿里AgentScope框架】5. Pipeline模块的组合使用及Pipeline模块总结
【AI Agent系列】【阿里AgentScope框架】5. Pipeline模块的组合使用及Pipeline模块总结
102 1
|
1月前
|
人工智能 决策智能
【AI Agent系列】【阿里AgentScope框架】4. 深入源码:Pipeline模块如何组织多智能体间的数据流?- 循环结构
【AI Agent系列】【阿里AgentScope框架】4. 深入源码:Pipeline模块如何组织多智能体间的数据流?- 循环结构
79 0
|
1月前
|
人工智能 决策智能
【AI Agent系列】【阿里AgentScope框架】3. 深入源码:Pipeline模块如何组织多智能体间的数据流?- 顺序结构与条件分支
【AI Agent系列】【阿里AgentScope框架】3. 深入源码:Pipeline模块如何组织多智能体间的数据流?- 顺序结构与条件分支
73 2