对于阿里开源混沌工程工具chaosblade-box-agent心跳报错问题的分析与解决

本文涉及的产品
应用实时监控服务-可观测链路OpenTelemetry版,每月50GB免费额度
容器镜像服务 ACR,镜像仓库100个 不限时长
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 摘要:本文记录了一个由chaosblade-box平台后台发现的偶发的chaosblade-box-agent不发送心跳的问题,从报错日志入手,结合chaosblade-box-agent源码进行分析,最终解决问题并修复打包的过程。

背景

在chaos后台发现了agent不发心跳了的问题,大概启动agent后一个月左右这样,会发现agengt端上报心跳报错(dev和staging都发生过)。
目前解决方法是重启agent(devops环境)与chaos后台服务(dev/test/staging/prod环境)。单独重启还不行,必须都重启才能解决。

目标

解决chaosblade-box-agent不能持续稳定发射心跳的问题。

代码分析

●研读入口逻辑:cmd/chaos_agent.go,这个就是入口文件,通过go build后启动编译产物。项目构建通过Makefile进行,本质是go build cmd/chaos_agent.go。启动agent程序会先检查/opt/chaosblade/blade这个路径,如果这个路径不存在,则会启动失败,所以需要确保已安装了chaosblade工具,再启动chaosblade-box-agent
●研读接口逻辑:/AgentHeartBeat
●研读接口逻辑:/k8sPod

报错分析

从报错日志入手:"[heartbeat] send failed."出自conn/heartbeat/heartbeat.go中的sendHeartbeat方法,这个方法所在的对象heartbeatClient是在main函数中初始化并传入的:

heartbeatClient := heartbeat.NewClientHeartbeatHandler(options.Opts.HeartbeatConfig, transportClient)

而定时任务所依赖的心跳时间间隔就来自于HeartbeatConfig这个结构体,追根溯源,是在main函数一开始创建的:

options.NewOptions()
->
Opts.AddFlags()
->
o.Flags.DurationVar(&o.HeartbeatConfig.Period, "heartbeat.period", 5*time.Second, "the period of heartbeat")

在此设置了5秒发一次心跳。而心跳逻辑的启动逻辑入口也在main函数中:

newConn.Start()
->
func (c *Conn) Start()

该方法中遍历了c(Conn)对象中的clientHandlers,会分别调用每个ClientHandle对象的Start方法,这些handlers对象都是在main函数里注册到c对象中的:

// conn to server
connectClient := connect.NewClientConnectHandler(transportClient)
heartbeatClient := heartbeat.NewClientHeartbeatHandler(options.Opts.HeartbeatConfig, transportClient)
metricClient := metric.NewClientMetricHandler(transportClient, reportMetricConfigMap)
newConn := conn.NewConn()
newConn.Register(transport.API_REGISTRY, connectClient)
newConn.Register(transport.API_HEARTBEAT, heartbeatClient)
newConn.Register(transport.API_METRIC, metricClient)

最后再回到conn/heartbeat/heartbeat.go中的sendHeartbeat方法的报错日志处,根据报错日志中的关键字“Report kubernetes”继续向下搜索,找到collector/kubernetes/kubernetes.go中的报错所在函数:

func (collector *K8sBaseCollector) reportK8sMetric(namespace string, isExists bool, resource interface{
   }, size int)

其中根据报错日志,锁定报错所在逻辑:

// 开启压缩
uri.CompressVersion = fmt.Sprintf("%d", transport.AllCompress)
response, err := collector.transport.Invoke(uri, request, true) // todo 这里看下是否用这个struct
if err != nil {
   
    collector.resetIdentifierCache()
    logrus.Warningf("Report kubernetes %s infos err: %v", collector.ReportHandler, err)
    return
}

最后根据err后面的内容关键字“access token not found”搜索到日志出处所在文件transport/interceptor.go:

func (authInterceptor *authInterceptor) doInvoker(request *Request) (*Response, bool) {
   
        accessKey := tools.GetAccessKey()
        secureKey := tools.GetSecureKey()
        if accessKey == "" || secureKey == "" {
   
                return ReturnFail(TokenNotFound), false
        }
        request.AddHeader(AccessKey, accessKey)
        signData := request.Headers[SignData]
        if signData == "" {
   
                bytes, err := json.Marshal(request.Params)
                if err != nil {
   
                        return ReturnFail(EncodeError, err.Error()), false
                }
                signData = string(bytes)
        }
        sign := tools.Sign(signData)
        request.AddHeader(SignKey, sign)
        return nil, true
}

可以看出这个错误是由于accessKey为空或secureKey为空导致的。进一步看下tools里面的方法:

var localAccessKey = ""
var localSecureKey = ""
var mutex = sync.RWMutex{
   }

// GetAccessKey
func GetAccessKey() string {
   
        mutex.RLock()
        defer mutex.RUnlock()
        return localAccessKey
}

// GetSecureKey
func GetSecureKey() string {
   
        mutex.RLock()
        defer mutex.RUnlock()
        return localSecureKey
}

这两个函数使用了读写锁 mutex 来保证并发访问的安全性。它们通过读取 localAccessKey 和 localSecureKey 的值来返回访问密钥和安全密钥。而这两个Key的赋值逻辑如下:

// Record AK/SK to file
func RecordSecretKeyToFile(accessKey, secretKey string) error {
   
        if accessKey == "" || secretKey == "" {
   
                log.Warningln("key: ", accessKey, secretKey)
                return errors.New("accessKey or secretKey is empty")
        }

        keys := map[string]string{
   
                AccessKeyName: accessKey,
                SecretKeyName: secretKey,
        }
        err := RecordMapToFile(keys, path.Join(GetUserHome(), ".chaos.cert"), true)
        if err != nil {
   
                return err
        }
        localAccessKey = accessKey
        localSecureKey = secretKey
        return nil
}

func RecordMapToFile(data map[string]string, filePath string, truncate bool) error {
   
        if len(data) == 0 {
   
                return nil
        }
        mutex.Lock()
        defer mutex.Unlock()
        flag := os.O_WRONLY | os.O_CREATE
        if truncate {
   
                flag = flag | os.O_TRUNC
        }
        file, err := os.OpenFile(filePath, flag, 0666)
        defer file.Close()
        if err != nil {
   
                log.WithField("file", filePath).WithError(err).Errorf("record data to file failed")
                return err
        }
        for key, value := range data {
   
                _, err := file.WriteString(strings.Join([]string{
   key, value}, Delimiter) + "\n")
                if err != nil {
   
                        log.WithFields(log.Fields{
   
                                "file":  filePath,
                                "key":   key,
                                "value": value,
                        }).WithError(err).Errorf("write data to file failed")
                        return err
                }
        }
        return nil
}

RecordMapToFile 函数的作用是将键值对数据写入指定的文件中。所以这里有两个需要关注的点,第一个点是这两个Key的值,第二个点是这个写入的文件路径,我们往回溯,找到文件写入路径:

path.Join(GetUserHome(), ".chaos.cert")

其中:

// GetUserHome return user home.
func GetUserHome() string {
   
        user, err := user.Current()
        if err == nil {
   
                return user.HomeDir
        }
        return "/root"
}

当前用户的主目录路径或/root路径,拼上".chaos.cert",所以可以认为这个aksk是写到了这个.chaos.cert文件中,接下来我们再往回溯,看下这对aksk的赋值逻辑,从conn/conn.go的Start开始:

func (c *Conn) Start() {
   
        if len(c.clientHandlers) <= 0 {
   
                return
        }

        var errCh chan error
        for clientHandlerName, clientHandler := range c.clientHandlers {
   
                go func(clientHandlerName string, clientHandler ClientHandle) {
   
                        logrus.WithField("clientHandlerName", clientHandlerName).Infof("conn start")
                        if err := clientHandler.Start(); err != nil {
   
                                logrus.WithField("clientHandlerName", clientHandlerName).Warnf("conn start failed, err: %s", err.Error())
                                errCh <- err
                        }
                }(clientHandlerName, clientHandler)
        }

        go func() {
   
                for {
   
                        select {
   
                        case err := <-errCh:
                                if err != nil {
   
                                        logrus.Errorf("register conn failed, err: %s", err.Error())
                                        os.Exit(1)
                                }
                        }
                }
        }()

}

到下面这个conn/connect/connect.go中的ClientConnectHandler的Start方法

// Connect to remote
func (cc *ClientConnectHandler) Start() error {
   
        request := transport.NewRequest()
        request.AddParam("ip", options.Opts.Ip)
        request.AddParam("pid", options.Opts.Pid)
        request.AddParam("type", options.ProgramName)
        request.AddParam("uid", options.Opts.Uid)
        request.AddParam("instanceId", options.Opts.InstanceId)
        request.AddParam("namespace", options.Opts.Namespace)
        request.AddParam("deviceId", options.Opts.InstanceId)
        request.AddParam("deviceType", strconv.Itoa(options.Host))
        request.AddParam("ak", options.Opts.License)
        request.AddParam("uptime", tools.GetUptime())
        request.AddParam("startupMode", options.Opts.StartupMode)
        request.AddParam("v", options.Opts.Version)
        request.AddParam("agentMode", options.Opts.AgentMode)
        request.AddParam("osType", options.Opts.InstallOperator)
        request.AddParam("cpuNum", strconv.Itoa(runtime.NumCPU()))

        request.AddParam("clusterId", options.Opts.ClusterId).
                AddParam("clusterName", options.Opts.ClusterName)

        chaosBladeVersion := options.Opts.ChaosbladeVersion
        if chaosBladeVersion != "" {
   
                request.AddParam("cbv", chaosBladeVersion)
        }

        // todo windows cant be work
        if memInfo, err := linux.ReadMemInfo("/proc/meminfo"); err != nil {
   
                logrus.Warnln("read proc/meminfo err:", err.Error())
        } else {
   
                memTotalKB := float64(memInfo.MemTotal)
                request.AddParam("memSize", fmt.Sprintf("%f", memTotalKB))
        }

        // application only for host mode
        request.AddParam(options.AppInstanceKeyName, options.Opts.ApplicationInstance)
        request.AddParam(options.AppGroupKeyName, options.Opts.ApplicationGroup)

        if options.Opts.RestrictedVpc {
   
                request.AddParam("restrictedVpc", "true")
                request.AddParam("vpcId", options.Opts.License)
        } else {
   
                request.AddParam("vpcId", options.Opts.VpcId)
        }

        uri := transport.TransportUriMap[transport.API_REGISTRY]

        response, err := cc.transportClient.Invoke(uri, request, false)

        if err != nil {
   
                return err
        }

        // todo 这里要换成http
        return handleDirectHttpConnectResponse(*response)
}

再到下面方法

// handler direct http response
func handleDirectHttpConnectResponse(response transport.Response) error {
   
        if !response.Success {
   
                return errors.New(fmt.Sprintf("connect server failed, %s", response.Error))
        }
        result := response.Result

        v, ok := result.(map[string]interface{
   })
        if !ok {
   
                return errors.New("response is error")
        }
        options.Opts.SetCid(v["cid"].(string))

        if v["uid"] != nil {
   
                options.Opts.SetUid(v["uid"].(string))
        }

        ak, ok := v["ak"]
        if !ok || ak == nil {
   
                logrus.Error("response data is wrong, lack ak!")
                return errors.New("accessKey or secretKey is empty")
        }

        sk, ok := v["sk"]
        if !ok || sk == nil {
   
                logrus.Error("response data is wrong, lack sk!")
                return errors.New("accessKey or secretKey is empty")
        }
        err := tools.RecordSecretKeyToFile(ak.(string), sk.(string))
        return err
}

可以分析得出,ak和sk是通过请求registry接口(API_REGISTRY)得到的。在启动agent的时候会通过该接口与chaosblade后台服务通信完成注册,从接口返回得到ak和sk并保存到本地用户路径下的.chaos.cert文件中与内存变量中,后续将直接从变量中读取。
解决办法
所以我们如果遇到了这个报错,应该是变量中的ak和sk为空导致的。为了验证这点,我们可以在报错日志上方将ak和sk变量打印,另外将.chaos.cert文件中的内容进行打印,如果发现是ak和sk变量为空而.chaos.cert文件中仍然存在ak和sk,则可以试图通过.cert文件重新装载ak和sk变量。

func (authInterceptor *authInterceptor) doInvoker(request *Request) (*Response, bool) {
   
    accessKey := tools.GetAccessKey()
    secureKey := tools.GetSecureKey()
    if accessKey == "" || secureKey == "" {
   
        logrus.Warnf("[zenki] accessKey or secureKey is empty, access key: %s, secure key: %s", accessKey, secureKey)
        go func() {
   
            chaosCertPath := path.Join(tools.GetUserHome(), ".chaos.cert")
            if _, err := os.Stat(chaosCertPath); os.IsNotExist(err) {
   
                logrus.Warnf("[zenki] chaos cert file not found, path: %s", chaosCertPath)
                return
            }
            ak, sk, err := tools.ReadAkSkFromFile(chaosCertPath)
            if err != nil {
   
                logrus.Warnf("[zenki] read access key or secure key from chaos cert file failed, path: %s", chaosCertPath)
                return
            }
            tools.SetAccessKey(ak)
            tools.SetSecureKey(sk)
        }()
        return ReturnFail(TokenNotFound), false
    }
    request.AddHeader(AccessKey, accessKey)
    signData := request.Headers[SignData]
    if signData == "" {
   
        bytes, err := json.Marshal(request.Params)
        if err != nil {
   
            return ReturnFail(EncodeError, err.Error()), false
        }
        signData = string(bytes)
    }
    sign := tools.Sign(signData)
    request.AddHeader(SignKey, sign)
    return nil, true
}

另外在pkg/tools/auth.go里增加从文件中读取ak和sk的方法:

// ReadAkSkFromFile [zenki] read ak and sk from file
func ReadAkSkFromFile(filePath string) (ak string, sk string, err error) {
   
    bytes, err := os.ReadFile(AppFile)
    if err != nil {
   
        return "", "", err
    }
    content := strings.TrimSpace(string(bytes))
    slice := strings.Split(content, "\n")
    if len(slice) == 0 {
   
        return "", "", fmt.Errorf("empty content")
    }
    for _, value := range slice {
   
        kv := strings.SplitN(value, Delimiter, 2)
        if len(kv) != 2 {
   
            continue
        }
        switch kv[0] {
   
        case AccessKeyName:
            ak = kv[1]
        case SecretKeyName:
            sk = kv[1]
        }
    }
    return
}

以及增加设置ak和sk的方法:

// SetAccessKey [zenki] set local access key
func SetAccessKey(accessKey string) {
   
    mutex.Lock()
    defer mutex.Unlock()
    log.Infoln("[zenki] Set access key: ", accessKey)
    localAccessKey = accessKey
}

// SetSecureKey [zenki] set local secure key
func SetSecureKey(secureKey string) {
   
    mutex.Lock()
    defer mutex.Unlock()
    log.Infoln("[zenki] Set secure key: ", secureKey)
    localSecureKey = secureKey
}

修复与打包

通过访问chaosblade平台查看探针安装的方法。

wget https://chaosblade.oss-cn-hangzhou.aliyuncs.com/platform/release/1.0.2/chaosagent.tar.gz -O chaos.tar.gz && tar -zxvf chaos.tar.gz -C /opt/ && sudo sh /opt/chaos/chaosctl.sh install -k 71ae74cbcaba356d59462c398ad940e6 -p  [应用名]  -g  [应用分组]  -P  [agent端口号]  -t  [chaosblade-box ip:port]

可以看出实际上是用到了:
https://chaosblade.oss-cn-hangzhou.aliyuncs.com/platform/release/1.0.2/chaosagent.tar.gz
下载下来并解压以后,可以看到里面的组成如下:
这里面agent是我们需要替换的chaosblade-box-agent的编译产物,其他的不用动。
所以我们只需要修改了代码以后重新执行make命令进行编译,将编译生成的agent文件替换到这里,然后重新压缩成chaos.tar.gz文件,最后将这个chaos.tar.gz上传到cdn上,把wget后面的链接替换成我们上传后cdn的链接即可。

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
30天前
|
数据采集 自然语言处理 安全
控制电脑手机的智能体人人都能造,微软开源OmniParser
微软研究团队推出OmniParser,旨在提升GPT-4V等多模态模型在用户界面操作方面的性能。通过解析用户界面截图为结构化元素,OmniParser显著增强了模型的交互能力,使其在多种基准测试中表现出色。该技术开源,促进了社区合作与技术创新,但同时也面临数据质量、计算资源及安全隐私等挑战。
62 14
|
30天前
|
API 数据库 决策智能
基于百炼平台qwen-max的api 打造一套 检索增强 图谱增强 智能工具调用决策的智能体
本文介绍了一种基于阿里云百炼平台的`qwen-max` API构建的智能体方案,该方案集成了检索增强、图谱增强及智能工具调用决策三大模块,旨在通过结合外部数据源、知识图谱和自动化决策提高智能回答的准确性和丰富度。通过具体代码示例展示了如何实现这些功能,最终形成一个能灵活应对多种查询需求的智能系统。
115 11
|
30天前
|
自然语言处理 NoSQL API
基于百炼平台qwen-max的api 打造一套 检索增强 图谱增强 基于指令的智能工具调用决策 智能体
基于百炼平台的 `qwen-max` API,设计了一套融合检索增强、图谱增强及指令驱动的智能工具调用决策系统。该系统通过解析用户指令,智能选择调用检索、图谱推理或模型生成等工具,以提高问题回答的准确性和丰富性。系统设计包括指令解析、工具调用决策、检索增强、图谱增强等模块,旨在通过多种技术手段综合提升智能体的能力。
142 5
|
3月前
|
前端开发 API 决策智能
多智能体微调实践:α-UMi 开源
近年来,为了加强大型语言模型(Large-Language Models, LLM)实时信息处理、解决专业问题的能力,催生了工具调用智能体(Tool Integrated Agent)概念
|
23天前
|
人工智能 API 语音技术
TEN Agent:开源的实时多模态 AI 代理框架,支持语音、文本和图像的实时通信交互
TEN Agent 是一个开源的实时多模态 AI 代理框架,集成了 OpenAI Realtime API 和 RTC 技术,支持语音、文本和图像的多模态交互,具备实时通信、模块化设计和多语言支持等功能,适用于智能客服、实时语音助手等多种场景。
132 15
TEN Agent:开源的实时多模态 AI 代理框架,支持语音、文本和图像的实时通信交互
|
20天前
|
监控 Linux Windows
如何在主机安装主机管理工具WGCLOUD的agent端
WGCloud 是一款用于主机监控的工具,通过在目标主机上部署 agent 实现监控。无论是 server 主机还是被监控主机,均需运行 agent。安装包中包含 server 和 agent,只需启动相应 agent 即可。支持多种部署方式,包括压缩包复制、一键部署脚本等。确保 server 和 agent 版本一致,并正确配置 `application.properties` 中的 `serverUrl` 和 `wgToken`。
|
2月前
|
JSON 数据可视化 知识图谱
基于百炼 qwen plus 、开源qwen2.5 7B Instruct 建非schema限定的图谱 用于agent tool的图谱形式结构化 文本资料方案
基于百炼 qwen plus 的上市企业ESG图谱构建工作,通过调用阿里云的 OpenAI 服务,从 Excel 文件读取上市公司 ESG 报告数据,逐条处理并生成知识图谱,最终以 YAML 格式输出。该过程包括数据读取、API 调用、结果处理和文件保存等步骤,确保生成的知识图谱全面、动态且结构清晰。此外,还提供了基于 Pyvis 的可视化工具,将生成的图谱以交互式图形展示,便于进一步分析和应用。
384 3
|
3月前
|
人工智能 运维 自然语言处理
对话蚂蚁开源蒋炜:让 Agent 把运维人员从 24 小时的待命中解放出来
当整个行业的智慧都集中在一件事情上时,比起闭门造车,开源一定能带来更好的技术迭代和发展。CodeFuse 「编码挑战季」活动火热进行中,诚邀广大开发者们参与编码挑战
138 3
对话蚂蚁开源蒋炜:让 Agent 把运维人员从 24 小时的待命中解放出来
|
2月前
|
传感器 机器学习/深度学习 自然语言处理
智能代理(Agent)在工具调用与协作中的应用实践
随着人工智能技术的飞速发展,智能代理(Agent)技术已成为解决复杂任务的关键手段。本文深入探讨了如何设计灵活的工具调用机制和构建高效的单/多Agent系统以提升任务执行效率。文章不仅涵盖了相关的理论知识,还提供了丰富的实践案例和代码实现,旨在帮助读者深入理解和应用智能代理技术。
188 2
|
3月前
LangChain-26 Custom Agent 自定义一个Agent并通过@tool绑定对应的工具 同时让大模型自己调用编写的@tools函数
LangChain-26 Custom Agent 自定义一个Agent并通过@tool绑定对应的工具 同时让大模型自己调用编写的@tools函数
109 3
LangChain-26 Custom Agent 自定义一个Agent并通过@tool绑定对应的工具 同时让大模型自己调用编写的@tools函数