k8s与监控--改造telegraf的buffer实现

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
数据安全中心,免费版
简介: 改造telegraf的buffer实现 前言 最近在使用telegraf的场景中,要求数据在程序意外终止的时候不丢失。按照telegraf最初的原始实现,在running_output内部维护了两个buffer,分别是metrics和failMetrics。

改造telegraf的buffer实现

前言

最近在使用telegraf的场景中,要求数据在程序意外终止的时候不丢失。按照telegraf最初的原始实现,在running_output内部维护了两个buffer,分别是metrics和failMetrics。这两个buffer是基于go中channel实现的。由于没 有持久化机制,在意外退出的时候,存在丢失数据的风险。所以这篇文章主要讲述之前telegraf保证数据安全的一些措施和我们对代码的一些优化。

telegraf关于数据安全的处理办法

关于两个buffer,定义在running_output.go的struct中。

// RunningOutput contains the output configuration
type RunningOutput struct {
 Name string Output telegraf.Output Config *OutputConfig MetricBufferLimit int MetricBatchSize int MetricsFiltered selfstat.Stat MetricsWritten selfstat.Stat BufferSize selfstat.Stat BufferLimit selfstat.Stat WriteTime selfstat.Stat metrics *buffer.Buffer failMetrics *buffer.Buffer

 // Guards against concurrent calls to the Output as described in #3009
 sync.Mutex
}

这个两个buffer的大小提供了配置参数可以设置。

metrics: buffer.NewBuffer(batchSize),
failMetrics: buffer.NewBuffer(bufferLimit),

顾名思义。metrics存放要发送到指定output的metric,而failMetrics存放发送失败的metric。当然失败的metrics会在telegraf重发机制下再次发送。

 if ro.metrics.Len() == ro.MetricBatchSize {
 batch := ro.metrics.Batch(ro.MetricBatchSize)
 err := ro.write(batch)
 if err != nil {
 ro.failMetrics.Add(batch...)
 }
 }

在向metrics增加metrics的时候,做是否达到批量发送的数量,如果达到就调用发送方法。当然还有定时的解决方案,如果一直没有达到MetricBatchSize,也会在一定时间后发送数据。具体实现代码在agent.go中

 ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration)
 semaphore := make(chan struct{}, 1)
 for {
 select {
 case <-shutdown:
 log.Println("I! Hang on, flushing any cached metrics before shutdown")
 // wait for outMetricC to get flushed before flushing outputs
 wg.Wait()
 a.flush()
 return nil case <-ticker.C:
 go func() {
 select {
 case semaphore <- struct{}{}:
 internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown)
 a.flush()
 <-semaphore
 default:
 // skipping this flush because one is already happening
 log.Println("W! Skipping a scheduled flush because there is" +
 " already a flush ongoing.")
 }
 }()

在程序接受到停止信号后,程序会首先flush剩下的数据到output中,然后退出进程。这样可以保证一定的数据安全。

基于redis实现buffer的持久化

在持久化机制的选型中,优先实现redis。本身redis性能高,而且具备完善的持久化。
具体的实现架构如下:

964664a1e79b884bde2e01cba61d27f6aba10303

将原buffer中功能抽象出buffer.go接口。
具体代码:

package buffer

import (
 "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/buffer/memory" "github.com/influxdata/telegraf/internal/buffer/redis"
)

const (
 BufferTypeForMemory = "memory"
 BufferTypeForRedis = "redis"
)

type Buffer interface {
 IsEmpty() bool
 Len() int
 Add(metrics ...telegraf.Metric)
 Batch(batchSize int) []telegraf.Metric
}

func NewBuffer(mod string, size int, key, addr string) Buffer {
 switch mod {
 case BufferTypeForRedis:
 return redis.NewBuffer(size, key, addr)
 default:
 return memory.NewBuffer(size)
 }
}

然后分别内存和redis实现了Buffer接口。
其中NewBuffer相当于一个工厂方法。
当然在后期可以实现基于file和db等buffer实现,来满足不同的场景和要求。

redis实现buffer的要点

由于要满足先进先出的要求,选择了redis的list数据结构。redis中的list是一个字符串list,所以telegraf中metric数据接口要符合序列化的要求。比如属性需要可导出,即public。所以这点需要改动telegraf对于metric struct的定义。另外可以选择json或是msgpack等序列化方式。我们这边是采用的json序列化的方式。

结语

改造以后,可以根据自己的需求通过配置文件来决定使用channel或是redis来实现buffer。各有优劣,内存实现的话,性能高,受到的依赖少。而redis这种分布式存储,决定了数据安全,但是性能会有一定的损耗,毕竟有大量的序列化和反序列化以及网络传输,当然依赖也增加了,取决于redis的可靠性,建议redis集群部署。

本文转自中文社区-k8s与监控--改造telegraf的buffer实现

相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。 &nbsp; &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
相关文章
|
9月前
|
Prometheus Kubernetes 监控
Kubernetes监控:Prometheus与AlertManager结合,配置邮件告警。
完成这些步骤之后,您就拥有了一个可以用邮件通知你的Kubernetes监控解决方案了。当然,所有的这些配置都需要相互照应,还要对你的Kubernetes集群状况有深入的了解。希望这份指南能帮助你创建出适合自己场景的监控系统,让你在首次发现问题时就能做出响应。
475 22
|
Kubernetes 监控 Cloud Native
"解锁K8s新姿势!Cobra+Client-go强强联手,打造你的专属K8s监控神器,让资源优化与性能监控尽在掌握!"
【8月更文挑战第14天】在云原生领域,Kubernetes以出色的扩展性和定制化能力引领潮流。面对独特需求,自定义插件成为必要。本文通过Cobra与Client-go两大利器,打造一款监测特定标签Pods资源使用的K8s插件。Cobra简化CLI开发,Client-go则负责与K8s API交互。从初始化项目到实现查询逻辑,一步步引导你构建个性化工具,开启K8s集群智能化管理之旅。
247 2
|
Prometheus Kubernetes 监控
Kubernetes(K8S) 监控 Prometheus + Grafana
Kubernetes(K8S) 监控 Prometheus + Grafana
662 2
|
人工智能 运维 Kubernetes
智能化运维:KoPylot为k8S带来AI监控诊断
智能化运维:KoPylot为k8S带来AI监控诊断
|
运维 Kubernetes 监控
Loki+Promtail+Grafana监控K8s日志
综上,Loki+Promtail+Grafana 监控组合对于在 K8s 环境中优化日志管理至关重要,它不仅提供了强大且易于扩展的日志收集与汇总工具,还有可视化这些日志的能力。通过有效地使用这套工具,可以显著地提高对应用的运维监控能力和故障诊断效率。
1633 0
|
Prometheus 监控 Kubernetes
在k8S中,状态码监控是怎么做的?
在k8S中,状态码监控是怎么做的?
|
Prometheus 监控 Kubernetes
在k8S中,blackbox主要是监控什么的?
在k8S中,blackbox主要是监控什么的?
|
Prometheus Kubernetes 监控
在k8S中,etcd是怎么监控的?
在k8S中,etcd是怎么监控的?
|
数据采集 监控 Kubernetes
在k8S中,kubelet监控Worker节点资源是使用什么组件来实现的?
在k8S中,kubelet监控Worker节点资源是使用什么组件来实现的?

热门文章

最新文章

推荐镜像

更多