背景
在chaos后台发现了agent不发心跳了的问题,大概启动agent后一个月左右这样,会发现agengt端上报心跳报错(dev和staging都发生过)。
目前解决方法是重启agent(devops环境)与chaos后台服务(dev/test/staging/prod环境)。单独重启还不行,必须都重启才能解决。
目标
解决chaosblade-box-agent不能持续稳定发射心跳的问题。
代码分析
●研读入口逻辑:cmd/chaos_agent.go,这个就是入口文件,通过go build后启动编译产物。项目构建通过Makefile进行,本质是go build cmd/chaos_agent.go。启动agent程序会先检查/opt/chaosblade/blade这个路径,如果这个路径不存在,则会启动失败,所以需要确保已安装了chaosblade工具,再启动chaosblade-box-agent
●研读接口逻辑:/AgentHeartBeat
●研读接口逻辑:/k8sPod
报错分析
从报错日志入手:"[heartbeat] send failed."出自conn/heartbeat/heartbeat.go中的sendHeartbeat方法,这个方法所在的对象heartbeatClient是在main函数中初始化并传入的:
heartbeatClient := heartbeat.NewClientHeartbeatHandler(options.Opts.HeartbeatConfig, transportClient)
而定时任务所依赖的心跳时间间隔就来自于HeartbeatConfig这个结构体,追根溯源,是在main函数一开始创建的:
options.NewOptions()
->
Opts.AddFlags()
->
o.Flags.DurationVar(&o.HeartbeatConfig.Period, "heartbeat.period", 5*time.Second, "the period of heartbeat")
在此设置了5秒发一次心跳。而心跳逻辑的启动逻辑入口也在main函数中:
newConn.Start()
->
func (c *Conn) Start()
该方法中遍历了c(Conn)对象中的clientHandlers,会分别调用每个ClientHandle对象的Start方法,这些handlers对象都是在main函数里注册到c对象中的:
// conn to server
connectClient := connect.NewClientConnectHandler(transportClient)
heartbeatClient := heartbeat.NewClientHeartbeatHandler(options.Opts.HeartbeatConfig, transportClient)
metricClient := metric.NewClientMetricHandler(transportClient, reportMetricConfigMap)
newConn := conn.NewConn()
newConn.Register(transport.API_REGISTRY, connectClient)
newConn.Register(transport.API_HEARTBEAT, heartbeatClient)
newConn.Register(transport.API_METRIC, metricClient)
最后再回到conn/heartbeat/heartbeat.go中的sendHeartbeat方法的报错日志处,根据报错日志中的关键字“Report kubernetes”继续向下搜索,找到collector/kubernetes/kubernetes.go中的报错所在函数:
func (collector *K8sBaseCollector) reportK8sMetric(namespace string, isExists bool, resource interface{
}, size int)
其中根据报错日志,锁定报错所在逻辑:
// 开启压缩
uri.CompressVersion = fmt.Sprintf("%d", transport.AllCompress)
response, err := collector.transport.Invoke(uri, request, true) // todo 这里看下是否用这个struct
if err != nil {
collector.resetIdentifierCache()
logrus.Warningf("Report kubernetes %s infos err: %v", collector.ReportHandler, err)
return
}
最后根据err后面的内容关键字“access token not found”搜索到日志出处所在文件transport/interceptor.go:
func (authInterceptor *authInterceptor) doInvoker(request *Request) (*Response, bool) {
accessKey := tools.GetAccessKey()
secureKey := tools.GetSecureKey()
if accessKey == "" || secureKey == "" {
return ReturnFail(TokenNotFound), false
}
request.AddHeader(AccessKey, accessKey)
signData := request.Headers[SignData]
if signData == "" {
bytes, err := json.Marshal(request.Params)
if err != nil {
return ReturnFail(EncodeError, err.Error()), false
}
signData = string(bytes)
}
sign := tools.Sign(signData)
request.AddHeader(SignKey, sign)
return nil, true
}
可以看出这个错误是由于accessKey为空或secureKey为空导致的。进一步看下tools里面的方法:
var localAccessKey = ""
var localSecureKey = ""
var mutex = sync.RWMutex{
}
// GetAccessKey
func GetAccessKey() string {
mutex.RLock()
defer mutex.RUnlock()
return localAccessKey
}
// GetSecureKey
func GetSecureKey() string {
mutex.RLock()
defer mutex.RUnlock()
return localSecureKey
}
这两个函数使用了读写锁 mutex 来保证并发访问的安全性。它们通过读取 localAccessKey 和 localSecureKey 的值来返回访问密钥和安全密钥。而这两个Key的赋值逻辑如下:
// Record AK/SK to file
func RecordSecretKeyToFile(accessKey, secretKey string) error {
if accessKey == "" || secretKey == "" {
log.Warningln("key: ", accessKey, secretKey)
return errors.New("accessKey or secretKey is empty")
}
keys := map[string]string{
AccessKeyName: accessKey,
SecretKeyName: secretKey,
}
err := RecordMapToFile(keys, path.Join(GetUserHome(), ".chaos.cert"), true)
if err != nil {
return err
}
localAccessKey = accessKey
localSecureKey = secretKey
return nil
}
func RecordMapToFile(data map[string]string, filePath string, truncate bool) error {
if len(data) == 0 {
return nil
}
mutex.Lock()
defer mutex.Unlock()
flag := os.O_WRONLY | os.O_CREATE
if truncate {
flag = flag | os.O_TRUNC
}
file, err := os.OpenFile(filePath, flag, 0666)
defer file.Close()
if err != nil {
log.WithField("file", filePath).WithError(err).Errorf("record data to file failed")
return err
}
for key, value := range data {
_, err := file.WriteString(strings.Join([]string{
key, value}, Delimiter) + "\n")
if err != nil {
log.WithFields(log.Fields{
"file": filePath,
"key": key,
"value": value,
}).WithError(err).Errorf("write data to file failed")
return err
}
}
return nil
}
RecordMapToFile 函数的作用是将键值对数据写入指定的文件中。所以这里有两个需要关注的点,第一个点是这两个Key的值,第二个点是这个写入的文件路径,我们往回溯,找到文件写入路径:
path.Join(GetUserHome(), ".chaos.cert")
其中:
// GetUserHome return user home.
func GetUserHome() string {
user, err := user.Current()
if err == nil {
return user.HomeDir
}
return "/root"
}
当前用户的主目录路径或/root路径,拼上".chaos.cert",所以可以认为这个aksk是写到了这个.chaos.cert文件中,接下来我们再往回溯,看下这对aksk的赋值逻辑,从conn/conn.go的Start开始:
func (c *Conn) Start() {
if len(c.clientHandlers) <= 0 {
return
}
var errCh chan error
for clientHandlerName, clientHandler := range c.clientHandlers {
go func(clientHandlerName string, clientHandler ClientHandle) {
logrus.WithField("clientHandlerName", clientHandlerName).Infof("conn start")
if err := clientHandler.Start(); err != nil {
logrus.WithField("clientHandlerName", clientHandlerName).Warnf("conn start failed, err: %s", err.Error())
errCh <- err
}
}(clientHandlerName, clientHandler)
}
go func() {
for {
select {
case err := <-errCh:
if err != nil {
logrus.Errorf("register conn failed, err: %s", err.Error())
os.Exit(1)
}
}
}
}()
}
到下面这个conn/connect/connect.go中的ClientConnectHandler的Start方法
// Connect to remote
func (cc *ClientConnectHandler) Start() error {
request := transport.NewRequest()
request.AddParam("ip", options.Opts.Ip)
request.AddParam("pid", options.Opts.Pid)
request.AddParam("type", options.ProgramName)
request.AddParam("uid", options.Opts.Uid)
request.AddParam("instanceId", options.Opts.InstanceId)
request.AddParam("namespace", options.Opts.Namespace)
request.AddParam("deviceId", options.Opts.InstanceId)
request.AddParam("deviceType", strconv.Itoa(options.Host))
request.AddParam("ak", options.Opts.License)
request.AddParam("uptime", tools.GetUptime())
request.AddParam("startupMode", options.Opts.StartupMode)
request.AddParam("v", options.Opts.Version)
request.AddParam("agentMode", options.Opts.AgentMode)
request.AddParam("osType", options.Opts.InstallOperator)
request.AddParam("cpuNum", strconv.Itoa(runtime.NumCPU()))
request.AddParam("clusterId", options.Opts.ClusterId).
AddParam("clusterName", options.Opts.ClusterName)
chaosBladeVersion := options.Opts.ChaosbladeVersion
if chaosBladeVersion != "" {
request.AddParam("cbv", chaosBladeVersion)
}
// todo windows cant be work
if memInfo, err := linux.ReadMemInfo("/proc/meminfo"); err != nil {
logrus.Warnln("read proc/meminfo err:", err.Error())
} else {
memTotalKB := float64(memInfo.MemTotal)
request.AddParam("memSize", fmt.Sprintf("%f", memTotalKB))
}
// application only for host mode
request.AddParam(options.AppInstanceKeyName, options.Opts.ApplicationInstance)
request.AddParam(options.AppGroupKeyName, options.Opts.ApplicationGroup)
if options.Opts.RestrictedVpc {
request.AddParam("restrictedVpc", "true")
request.AddParam("vpcId", options.Opts.License)
} else {
request.AddParam("vpcId", options.Opts.VpcId)
}
uri := transport.TransportUriMap[transport.API_REGISTRY]
response, err := cc.transportClient.Invoke(uri, request, false)
if err != nil {
return err
}
// todo 这里要换成http
return handleDirectHttpConnectResponse(*response)
}
再到下面方法
// handler direct http response
func handleDirectHttpConnectResponse(response transport.Response) error {
if !response.Success {
return errors.New(fmt.Sprintf("connect server failed, %s", response.Error))
}
result := response.Result
v, ok := result.(map[string]interface{
})
if !ok {
return errors.New("response is error")
}
options.Opts.SetCid(v["cid"].(string))
if v["uid"] != nil {
options.Opts.SetUid(v["uid"].(string))
}
ak, ok := v["ak"]
if !ok || ak == nil {
logrus.Error("response data is wrong, lack ak!")
return errors.New("accessKey or secretKey is empty")
}
sk, ok := v["sk"]
if !ok || sk == nil {
logrus.Error("response data is wrong, lack sk!")
return errors.New("accessKey or secretKey is empty")
}
err := tools.RecordSecretKeyToFile(ak.(string), sk.(string))
return err
}
可以分析得出,ak和sk是通过请求registry接口(API_REGISTRY)得到的。在启动agent的时候会通过该接口与chaosblade后台服务通信完成注册,从接口返回得到ak和sk并保存到本地用户路径下的.chaos.cert文件中与内存变量中,后续将直接从变量中读取。
解决办法
所以我们如果遇到了这个报错,应该是变量中的ak和sk为空导致的。为了验证这点,我们可以在报错日志上方将ak和sk变量打印,另外将.chaos.cert文件中的内容进行打印,如果发现是ak和sk变量为空而.chaos.cert文件中仍然存在ak和sk,则可以试图通过.cert文件重新装载ak和sk变量。
func (authInterceptor *authInterceptor) doInvoker(request *Request) (*Response, bool) {
accessKey := tools.GetAccessKey()
secureKey := tools.GetSecureKey()
if accessKey == "" || secureKey == "" {
logrus.Warnf("[zenki] accessKey or secureKey is empty, access key: %s, secure key: %s", accessKey, secureKey)
go func() {
chaosCertPath := path.Join(tools.GetUserHome(), ".chaos.cert")
if _, err := os.Stat(chaosCertPath); os.IsNotExist(err) {
logrus.Warnf("[zenki] chaos cert file not found, path: %s", chaosCertPath)
return
}
ak, sk, err := tools.ReadAkSkFromFile(chaosCertPath)
if err != nil {
logrus.Warnf("[zenki] read access key or secure key from chaos cert file failed, path: %s", chaosCertPath)
return
}
tools.SetAccessKey(ak)
tools.SetSecureKey(sk)
}()
return ReturnFail(TokenNotFound), false
}
request.AddHeader(AccessKey, accessKey)
signData := request.Headers[SignData]
if signData == "" {
bytes, err := json.Marshal(request.Params)
if err != nil {
return ReturnFail(EncodeError, err.Error()), false
}
signData = string(bytes)
}
sign := tools.Sign(signData)
request.AddHeader(SignKey, sign)
return nil, true
}
另外在pkg/tools/auth.go里增加从文件中读取ak和sk的方法:
// ReadAkSkFromFile [zenki] read ak and sk from file
func ReadAkSkFromFile(filePath string) (ak string, sk string, err error) {
bytes, err := os.ReadFile(AppFile)
if err != nil {
return "", "", err
}
content := strings.TrimSpace(string(bytes))
slice := strings.Split(content, "\n")
if len(slice) == 0 {
return "", "", fmt.Errorf("empty content")
}
for _, value := range slice {
kv := strings.SplitN(value, Delimiter, 2)
if len(kv) != 2 {
continue
}
switch kv[0] {
case AccessKeyName:
ak = kv[1]
case SecretKeyName:
sk = kv[1]
}
}
return
}
以及增加设置ak和sk的方法:
// SetAccessKey [zenki] set local access key
func SetAccessKey(accessKey string) {
mutex.Lock()
defer mutex.Unlock()
log.Infoln("[zenki] Set access key: ", accessKey)
localAccessKey = accessKey
}
// SetSecureKey [zenki] set local secure key
func SetSecureKey(secureKey string) {
mutex.Lock()
defer mutex.Unlock()
log.Infoln("[zenki] Set secure key: ", secureKey)
localSecureKey = secureKey
}
修复与打包
通过访问chaosblade平台查看探针安装的方法。
wget https://chaosblade.oss-cn-hangzhou.aliyuncs.com/platform/release/1.0.2/chaosagent.tar.gz -O chaos.tar.gz && tar -zxvf chaos.tar.gz -C /opt/ && sudo sh /opt/chaos/chaosctl.sh install -k 71ae74cbcaba356d59462c398ad940e6 -p [应用名] -g [应用分组] -P [agent端口号] -t [chaosblade-box ip:port]
可以看出实际上是用到了:
https://chaosblade.oss-cn-hangzhou.aliyuncs.com/platform/release/1.0.2/chaosagent.tar.gz
下载下来并解压以后,可以看到里面的组成如下:
这里面agent是我们需要替换的chaosblade-box-agent的编译产物,其他的不用动。
所以我们只需要修改了代码以后重新执行make命令进行编译,将编译生成的agent文件替换到这里,然后重新压缩成chaos.tar.gz文件,最后将这个chaos.tar.gz上传到cdn上,把wget后面的链接替换成我们上传后cdn的链接即可。