对于阿里开源混沌工程工具chaosblade-box-agent心跳报错问题的分析与解决

本文涉及的产品
可观测可视化 Grafana 版,10个用户账号 1个月
函数计算FC,每月15万CU 3个月
可观测监控 Prometheus 版,每月50GB免费额度
简介: 摘要:本文记录了一个由chaosblade-box平台后台发现的偶发的chaosblade-box-agent不发送心跳的问题,从报错日志入手,结合chaosblade-box-agent源码进行分析,最终解决问题并修复打包的过程。

背景

在chaos后台发现了agent不发心跳了的问题,大概启动agent后一个月左右这样,会发现agengt端上报心跳报错(dev和staging都发生过)。
目前解决方法是重启agent(devops环境)与chaos后台服务(dev/test/staging/prod环境)。单独重启还不行,必须都重启才能解决。

目标

解决chaosblade-box-agent不能持续稳定发射心跳的问题。

代码分析

●研读入口逻辑:cmd/chaos_agent.go,这个就是入口文件,通过go build后启动编译产物。项目构建通过Makefile进行,本质是go build cmd/chaos_agent.go。启动agent程序会先检查/opt/chaosblade/blade这个路径,如果这个路径不存在,则会启动失败,所以需要确保已安装了chaosblade工具,再启动chaosblade-box-agent
●研读接口逻辑:/AgentHeartBeat
●研读接口逻辑:/k8sPod

报错分析

从报错日志入手:"[heartbeat] send failed."出自conn/heartbeat/heartbeat.go中的sendHeartbeat方法,这个方法所在的对象heartbeatClient是在main函数中初始化并传入的:

heartbeatClient := heartbeat.NewClientHeartbeatHandler(options.Opts.HeartbeatConfig, transportClient)

而定时任务所依赖的心跳时间间隔就来自于HeartbeatConfig这个结构体,追根溯源,是在main函数一开始创建的:

options.NewOptions()
->
Opts.AddFlags()
->
o.Flags.DurationVar(&o.HeartbeatConfig.Period, "heartbeat.period", 5*time.Second, "the period of heartbeat")

在此设置了5秒发一次心跳。而心跳逻辑的启动逻辑入口也在main函数中:

newConn.Start()
->
func (c *Conn) Start()

该方法中遍历了c(Conn)对象中的clientHandlers,会分别调用每个ClientHandle对象的Start方法,这些handlers对象都是在main函数里注册到c对象中的:

// conn to server
connectClient := connect.NewClientConnectHandler(transportClient)
heartbeatClient := heartbeat.NewClientHeartbeatHandler(options.Opts.HeartbeatConfig, transportClient)
metricClient := metric.NewClientMetricHandler(transportClient, reportMetricConfigMap)
newConn := conn.NewConn()
newConn.Register(transport.API_REGISTRY, connectClient)
newConn.Register(transport.API_HEARTBEAT, heartbeatClient)
newConn.Register(transport.API_METRIC, metricClient)

最后再回到conn/heartbeat/heartbeat.go中的sendHeartbeat方法的报错日志处,根据报错日志中的关键字“Report kubernetes”继续向下搜索,找到collector/kubernetes/kubernetes.go中的报错所在函数:

func (collector *K8sBaseCollector) reportK8sMetric(namespace string, isExists bool, resource interface{
   }, size int)

其中根据报错日志,锁定报错所在逻辑:

// 开启压缩
uri.CompressVersion = fmt.Sprintf("%d", transport.AllCompress)
response, err := collector.transport.Invoke(uri, request, true) // todo 这里看下是否用这个struct
if err != nil {
   
    collector.resetIdentifierCache()
    logrus.Warningf("Report kubernetes %s infos err: %v", collector.ReportHandler, err)
    return
}

最后根据err后面的内容关键字“access token not found”搜索到日志出处所在文件transport/interceptor.go:

func (authInterceptor *authInterceptor) doInvoker(request *Request) (*Response, bool) {
   
        accessKey := tools.GetAccessKey()
        secureKey := tools.GetSecureKey()
        if accessKey == "" || secureKey == "" {
   
                return ReturnFail(TokenNotFound), false
        }
        request.AddHeader(AccessKey, accessKey)
        signData := request.Headers[SignData]
        if signData == "" {
   
                bytes, err := json.Marshal(request.Params)
                if err != nil {
   
                        return ReturnFail(EncodeError, err.Error()), false
                }
                signData = string(bytes)
        }
        sign := tools.Sign(signData)
        request.AddHeader(SignKey, sign)
        return nil, true
}

可以看出这个错误是由于accessKey为空或secureKey为空导致的。进一步看下tools里面的方法:

var localAccessKey = ""
var localSecureKey = ""
var mutex = sync.RWMutex{
   }

// GetAccessKey
func GetAccessKey() string {
   
        mutex.RLock()
        defer mutex.RUnlock()
        return localAccessKey
}

// GetSecureKey
func GetSecureKey() string {
   
        mutex.RLock()
        defer mutex.RUnlock()
        return localSecureKey
}

这两个函数使用了读写锁 mutex 来保证并发访问的安全性。它们通过读取 localAccessKey 和 localSecureKey 的值来返回访问密钥和安全密钥。而这两个Key的赋值逻辑如下:

// Record AK/SK to file
func RecordSecretKeyToFile(accessKey, secretKey string) error {
   
        if accessKey == "" || secretKey == "" {
   
                log.Warningln("key: ", accessKey, secretKey)
                return errors.New("accessKey or secretKey is empty")
        }

        keys := map[string]string{
   
                AccessKeyName: accessKey,
                SecretKeyName: secretKey,
        }
        err := RecordMapToFile(keys, path.Join(GetUserHome(), ".chaos.cert"), true)
        if err != nil {
   
                return err
        }
        localAccessKey = accessKey
        localSecureKey = secretKey
        return nil
}

func RecordMapToFile(data map[string]string, filePath string, truncate bool) error {
   
        if len(data) == 0 {
   
                return nil
        }
        mutex.Lock()
        defer mutex.Unlock()
        flag := os.O_WRONLY | os.O_CREATE
        if truncate {
   
                flag = flag | os.O_TRUNC
        }
        file, err := os.OpenFile(filePath, flag, 0666)
        defer file.Close()
        if err != nil {
   
                log.WithField("file", filePath).WithError(err).Errorf("record data to file failed")
                return err
        }
        for key, value := range data {
   
                _, err := file.WriteString(strings.Join([]string{
   key, value}, Delimiter) + "\n")
                if err != nil {
   
                        log.WithFields(log.Fields{
   
                                "file":  filePath,
                                "key":   key,
                                "value": value,
                        }).WithError(err).Errorf("write data to file failed")
                        return err
                }
        }
        return nil
}

RecordMapToFile 函数的作用是将键值对数据写入指定的文件中。所以这里有两个需要关注的点,第一个点是这两个Key的值,第二个点是这个写入的文件路径,我们往回溯,找到文件写入路径:

path.Join(GetUserHome(), ".chaos.cert")

其中:

// GetUserHome return user home.
func GetUserHome() string {
   
        user, err := user.Current()
        if err == nil {
   
                return user.HomeDir
        }
        return "/root"
}

当前用户的主目录路径或/root路径,拼上".chaos.cert",所以可以认为这个aksk是写到了这个.chaos.cert文件中,接下来我们再往回溯,看下这对aksk的赋值逻辑,从conn/conn.go的Start开始:

func (c *Conn) Start() {
   
        if len(c.clientHandlers) <= 0 {
   
                return
        }

        var errCh chan error
        for clientHandlerName, clientHandler := range c.clientHandlers {
   
                go func(clientHandlerName string, clientHandler ClientHandle) {
   
                        logrus.WithField("clientHandlerName", clientHandlerName).Infof("conn start")
                        if err := clientHandler.Start(); err != nil {
   
                                logrus.WithField("clientHandlerName", clientHandlerName).Warnf("conn start failed, err: %s", err.Error())
                                errCh <- err
                        }
                }(clientHandlerName, clientHandler)
        }

        go func() {
   
                for {
   
                        select {
   
                        case err := <-errCh:
                                if err != nil {
   
                                        logrus.Errorf("register conn failed, err: %s", err.Error())
                                        os.Exit(1)
                                }
                        }
                }
        }()

}

到下面这个conn/connect/connect.go中的ClientConnectHandler的Start方法

// Connect to remote
func (cc *ClientConnectHandler) Start() error {
   
        request := transport.NewRequest()
        request.AddParam("ip", options.Opts.Ip)
        request.AddParam("pid", options.Opts.Pid)
        request.AddParam("type", options.ProgramName)
        request.AddParam("uid", options.Opts.Uid)
        request.AddParam("instanceId", options.Opts.InstanceId)
        request.AddParam("namespace", options.Opts.Namespace)
        request.AddParam("deviceId", options.Opts.InstanceId)
        request.AddParam("deviceType", strconv.Itoa(options.Host))
        request.AddParam("ak", options.Opts.License)
        request.AddParam("uptime", tools.GetUptime())
        request.AddParam("startupMode", options.Opts.StartupMode)
        request.AddParam("v", options.Opts.Version)
        request.AddParam("agentMode", options.Opts.AgentMode)
        request.AddParam("osType", options.Opts.InstallOperator)
        request.AddParam("cpuNum", strconv.Itoa(runtime.NumCPU()))

        request.AddParam("clusterId", options.Opts.ClusterId).
                AddParam("clusterName", options.Opts.ClusterName)

        chaosBladeVersion := options.Opts.ChaosbladeVersion
        if chaosBladeVersion != "" {
   
                request.AddParam("cbv", chaosBladeVersion)
        }

        // todo windows cant be work
        if memInfo, err := linux.ReadMemInfo("/proc/meminfo"); err != nil {
   
                logrus.Warnln("read proc/meminfo err:", err.Error())
        } else {
   
                memTotalKB := float64(memInfo.MemTotal)
                request.AddParam("memSize", fmt.Sprintf("%f", memTotalKB))
        }

        // application only for host mode
        request.AddParam(options.AppInstanceKeyName, options.Opts.ApplicationInstance)
        request.AddParam(options.AppGroupKeyName, options.Opts.ApplicationGroup)

        if options.Opts.RestrictedVpc {
   
                request.AddParam("restrictedVpc", "true")
                request.AddParam("vpcId", options.Opts.License)
        } else {
   
                request.AddParam("vpcId", options.Opts.VpcId)
        }

        uri := transport.TransportUriMap[transport.API_REGISTRY]

        response, err := cc.transportClient.Invoke(uri, request, false)

        if err != nil {
   
                return err
        }

        // todo 这里要换成http
        return handleDirectHttpConnectResponse(*response)
}

再到下面方法

// handler direct http response
func handleDirectHttpConnectResponse(response transport.Response) error {
   
        if !response.Success {
   
                return errors.New(fmt.Sprintf("connect server failed, %s", response.Error))
        }
        result := response.Result

        v, ok := result.(map[string]interface{
   })
        if !ok {
   
                return errors.New("response is error")
        }
        options.Opts.SetCid(v["cid"].(string))

        if v["uid"] != nil {
   
                options.Opts.SetUid(v["uid"].(string))
        }

        ak, ok := v["ak"]
        if !ok || ak == nil {
   
                logrus.Error("response data is wrong, lack ak!")
                return errors.New("accessKey or secretKey is empty")
        }

        sk, ok := v["sk"]
        if !ok || sk == nil {
   
                logrus.Error("response data is wrong, lack sk!")
                return errors.New("accessKey or secretKey is empty")
        }
        err := tools.RecordSecretKeyToFile(ak.(string), sk.(string))
        return err
}

可以分析得出,ak和sk是通过请求registry接口(API_REGISTRY)得到的。在启动agent的时候会通过该接口与chaosblade后台服务通信完成注册,从接口返回得到ak和sk并保存到本地用户路径下的.chaos.cert文件中与内存变量中,后续将直接从变量中读取。
解决办法
所以我们如果遇到了这个报错,应该是变量中的ak和sk为空导致的。为了验证这点,我们可以在报错日志上方将ak和sk变量打印,另外将.chaos.cert文件中的内容进行打印,如果发现是ak和sk变量为空而.chaos.cert文件中仍然存在ak和sk,则可以试图通过.cert文件重新装载ak和sk变量。

func (authInterceptor *authInterceptor) doInvoker(request *Request) (*Response, bool) {
   
    accessKey := tools.GetAccessKey()
    secureKey := tools.GetSecureKey()
    if accessKey == "" || secureKey == "" {
   
        logrus.Warnf("[zenki] accessKey or secureKey is empty, access key: %s, secure key: %s", accessKey, secureKey)
        go func() {
   
            chaosCertPath := path.Join(tools.GetUserHome(), ".chaos.cert")
            if _, err := os.Stat(chaosCertPath); os.IsNotExist(err) {
   
                logrus.Warnf("[zenki] chaos cert file not found, path: %s", chaosCertPath)
                return
            }
            ak, sk, err := tools.ReadAkSkFromFile(chaosCertPath)
            if err != nil {
   
                logrus.Warnf("[zenki] read access key or secure key from chaos cert file failed, path: %s", chaosCertPath)
                return
            }
            tools.SetAccessKey(ak)
            tools.SetSecureKey(sk)
        }()
        return ReturnFail(TokenNotFound), false
    }
    request.AddHeader(AccessKey, accessKey)
    signData := request.Headers[SignData]
    if signData == "" {
   
        bytes, err := json.Marshal(request.Params)
        if err != nil {
   
            return ReturnFail(EncodeError, err.Error()), false
        }
        signData = string(bytes)
    }
    sign := tools.Sign(signData)
    request.AddHeader(SignKey, sign)
    return nil, true
}

另外在pkg/tools/auth.go里增加从文件中读取ak和sk的方法:

// ReadAkSkFromFile [zenki] read ak and sk from file
func ReadAkSkFromFile(filePath string) (ak string, sk string, err error) {
   
    bytes, err := os.ReadFile(AppFile)
    if err != nil {
   
        return "", "", err
    }
    content := strings.TrimSpace(string(bytes))
    slice := strings.Split(content, "\n")
    if len(slice) == 0 {
   
        return "", "", fmt.Errorf("empty content")
    }
    for _, value := range slice {
   
        kv := strings.SplitN(value, Delimiter, 2)
        if len(kv) != 2 {
   
            continue
        }
        switch kv[0] {
   
        case AccessKeyName:
            ak = kv[1]
        case SecretKeyName:
            sk = kv[1]
        }
    }
    return
}

以及增加设置ak和sk的方法:

// SetAccessKey [zenki] set local access key
func SetAccessKey(accessKey string) {
   
    mutex.Lock()
    defer mutex.Unlock()
    log.Infoln("[zenki] Set access key: ", accessKey)
    localAccessKey = accessKey
}

// SetSecureKey [zenki] set local secure key
func SetSecureKey(secureKey string) {
   
    mutex.Lock()
    defer mutex.Unlock()
    log.Infoln("[zenki] Set secure key: ", secureKey)
    localSecureKey = secureKey
}

修复与打包

通过访问chaosblade平台查看探针安装的方法。

wget https://chaosblade.oss-cn-hangzhou.aliyuncs.com/platform/release/1.0.2/chaosagent.tar.gz -O chaos.tar.gz && tar -zxvf chaos.tar.gz -C /opt/ && sudo sh /opt/chaos/chaosctl.sh install -k 71ae74cbcaba356d59462c398ad940e6 -p  [应用名]  -g  [应用分组]  -P  [agent端口号]  -t  [chaosblade-box ip:port]

可以看出实际上是用到了:
https://chaosblade.oss-cn-hangzhou.aliyuncs.com/platform/release/1.0.2/chaosagent.tar.gz
下载下来并解压以后,可以看到里面的组成如下:
这里面agent是我们需要替换的chaosblade-box-agent的编译产物,其他的不用动。
所以我们只需要修改了代码以后重新执行make命令进行编译,将编译生成的agent文件替换到这里,然后重新压缩成chaos.tar.gz文件,最后将这个chaos.tar.gz上传到cdn上,把wget后面的链接替换成我们上传后cdn的链接即可。

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
2月前
|
前端开发 API 决策智能
多智能体微调实践:α-UMi 开源
近年来,为了加强大型语言模型(Large-Language Models, LLM)实时信息处理、解决专业问题的能力,催生了工具调用智能体(Tool Integrated Agent)概念
|
5月前
|
人工智能 自然语言处理 搜索推荐
微软开源基于ChatGPT的,超级文本代码智能体
【7月更文挑战第17天】微软的TaskWeaver是开源的LLM框架,聚焦领域特定数据分析与个性化需求。它以代码优先,将用户请求转为可执行代码,增强处理复杂任务的效率和准确性。通过用户定义插件实现定制,适应多种场景。然而,转化请求可能引入复杂性和错误,非技术用户使用插件有难度,且开源带来的安全与隐私问题需关注。[论文链接](https://arxiv.org/abs/2311.17541)**
67 4
|
2月前
|
人工智能 运维 自然语言处理
对话蚂蚁开源蒋炜:让 Agent 把运维人员从 24 小时的待命中解放出来
当整个行业的智慧都集中在一件事情上时,比起闭门造车,开源一定能带来更好的技术迭代和发展。CodeFuse 「编码挑战季」活动火热进行中,诚邀广大开发者们参与编码挑战
107 3
对话蚂蚁开源蒋炜:让 Agent 把运维人员从 24 小时的待命中解放出来
|
2月前
LangChain-26 Custom Agent 自定义一个Agent并通过@tool绑定对应的工具 同时让大模型自己调用编写的@tools函数
LangChain-26 Custom Agent 自定义一个Agent并通过@tool绑定对应的工具 同时让大模型自己调用编写的@tools函数
76 3
LangChain-26 Custom Agent 自定义一个Agent并通过@tool绑定对应的工具 同时让大模型自己调用编写的@tools函数
|
2月前
|
人工智能 JSON 数据可视化
OpenAI调用接口多轮对话Multi Turn Multi Agent 多轮多角色对话调试和可视化工具Dialogue Visualization
伴随着生成式人工智能技术发展,进2年涌现出大语言模型LLM/Agent系统/AI推理等众多方向的技术项目和论文。其中对话系统,智能体交互是用户通过UX界面和AI系统进行交互,这种交互有时候也是多模态(用户输入文字/语音/图像)等等。在调用OpenAI 的对话接口时候,有时候需要把对话结果打印出来检查是否有bug,JSON数据格式就比较难看出来了,尤其是有多角色的多轮对话。这个时候可以借助一些在线的"对话"可视化的工具 (Online Dialogue Visualization Tool) DeepNLP Dialogue Visualization Tool,方便产品经理,算法研发,学术研究
OpenAI调用接口多轮对话Multi Turn Multi Agent 多轮多角色对话调试和可视化工具Dialogue Visualization
|
2月前
|
人工智能 JSON 自然语言处理
开源模型+Orchestrating Agents多智能体框架,易用、强大且可控
本文采用开源Qwen2.5-14B-instruct-GGUF来体验多智能体编排和交接,希望在体验多智能体编排和交接框架的同时,一起评估中小参数规模的模型(14B)能否较好的完成多智能体任务。
|
2月前
|
SQL 分布式计算 Hadoop
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
Hadoop-19 Flume Agent批量采集数据到HDFS集群 监听Hive的日志 操作则把记录写入到HDFS 方便后续分析
48 2
|
4月前
|
人工智能 自然语言处理 算法
可自主进化的Agent?首个端到端智能体符号化训练框架开源了
【8月更文挑战第13天】近年来,AI领域在构建能自主完成复杂任务的智能体方面取得重大突破。这些智能体通常基于大型语言模型,可通过学习适应环境。为简化设计流程,AIWaves Inc.提出智能体符号化学习框架,使智能体能在数据中心模式下自我优化,以推进通向通用人工智能的道路。该框架将智能体视作符号网络,利用提示、工具及其组合方式定义可学习的权重,并采用自然语言模拟反向传播和梯度下降等学习过程,指导智能体的自我改进。实验显示,此框架能有效促进智能体的自主进化。尽管如此,该框架仍面临高质量提示设计及计算资源需求高等挑战。论文详情参见:https://arxiv.org/pdf/2406.18532。
180 58
|
2月前
LangChain-24 Agengts 通过TavilySearch Agent实现检索内容并回答 AgentExecutor转换Search 借助Prompt Tools工具
LangChain-24 Agengts 通过TavilySearch Agent实现检索内容并回答 AgentExecutor转换Search 借助Prompt Tools工具
36 0
|
4月前
|
数据采集 存储 Java
Flume Agent 的内部原理分析:深入探讨 Flume 的架构与实现机制
【8月更文挑战第24天】Apache Flume是一款专为大规模日志数据的收集、聚合及传输而设计的分布式、可靠且高可用系统。本文深入解析Flume Agent的核心机制并提供实际配置与使用示例。Flume Agent由三大组件构成:Source(数据源)、Channel(数据缓存)与Sink(数据目的地)。工作流程包括数据采集、暂存及传输。通过示例配置文件和Java代码片段展示了如何设置这些组件以实现日志数据的有效管理。Flume的强大功能与灵活性使其成为大数据处理及实时数据分析领域的优选工具。
126 1