k8s与监控--从telegraf改造谈golang多协程精确控制

本文涉及的产品
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: 从telegraf改造谈golang多协程精确控制 前言 telegraf是infuxdb公司开源出来的一个基于插件机制的收集metrics的项目。整个架构和elastic公司的日志收集系统极其类似,具备良好的扩展性。

从telegraf改造谈golang多协程精确控制


前言

telegraf是infuxdb公司开源出来的一个基于插件机制的收集metrics的 项目。整个架构和elastic公司的日志收集系统极其类似,具备良好的扩展性。与现在流行的各种exporter+promethues监控方案相比:

  1. 大致具备良好的可扩展性。很容易增加自己的处理逻辑,在input,output,process,filter等环境定制自己专属的插件。
  2. 统一了各种exporter,减少了部署各种exporter的工作量和维护成本。

目前telegraf改造工作基本上是两大部分:

  1. 增加了一些telegraf不支持的插件,比如虚拟化(kvm,vmware等),数据库(oracle),k8s和openstack等input插件。
  2. telegraf是基于配置文件的,所以会有两个问题,很难做分布式和无停机动态调度input任务。所以我们的工作就是将获取配置接口化,所有的配置文件来源于统一配置中心。然后就是改造无停机动态调度input。

在改造改造无停机动态调度input就涉及到golang多协程精确控制的问题。

一些golang常用并发手段

sync包下WaitGroup

具体事例:

 var wg sync.WaitGroup

 wg.Add(len(a.Config.Outputs))
 for _, o := range a.Config.Outputs {
 go func(output *models.RunningOutput) {
 defer wg.Done()
 err := output.Write()
 if err != nil {
 log.Printf("E! Error writing to output [%s]: %s\n",
 output.Name, err.Error())
 }
 }(o)
 }

 wg.Wait()

WaitGroup内部维护了一个counter,当counter数值为0时,表明添加的任务都已经完成。
总共有三个方法:

func (wg *WaitGroup) Add(delta int)

添加任务,delta参数表示添加任务的数量。

func (wg *WaitGroup) Done()

任务执行完成,调用Done方法,一般使用姿势都是defer wg.Done(),此时counter中会减一。

func (wg *WaitGroup) Wait()

通过使用sync.WaitGroup,可以阻塞主线程,直到相应数量的子线程结束。

chan struct{},控制协程退出

启动协程的时候,传递一个shutdown chan struct{},需要关闭该协程的时候,直接close(shutdown)。struct{}在golang中是一个消耗接近0的对象。
具体事例:

// gatherer runs the inputs that have been configured with their own
// reporting interval.
func (a *Agent) gatherer(
 shutdown chan struct{},
 kill chan struct{},
 input *models.RunningInput,
 interval time.Duration,
 metricC chan telegraf.Metric,
) {
 defer panicRecover(input)

 GatherTime := selfstat.RegisterTiming("gather",
 "gather_time_ns",
 map[string]string{"input": input.Config.Name},
 )

 acc := NewAccumulator(input, metricC)
 acc.SetPrecision(a.Config.Agent.Precision.Duration,
 a.Config.Agent.Interval.Duration)

 ticker := time.NewTicker(interval)
 defer ticker.Stop()

 for {
 internal.RandomSleep(a.Config.Agent.CollectionJitter.Duration, shutdown)

 start := time.Now()
 gatherWithTimeout(shutdown, kill, input, acc, interval)
 elapsed := time.Since(start)

 GatherTime.Incr(elapsed.Nanoseconds())

 select {
 case <-shutdown:
 return case <-kill:
 return case <-ticker.C:
 continue
 }
 }
}

借助chan 实现指定数量的协程或动态调整协程数量

当然这里必须是每个协程是幂等,也就是所有协程做的是同样的工作。
首先创建 一个 pool:= make(chan chan struct{}, maxWorkers),maxWorkers为目标协程数量。
然后启动协程:

 for i := 0; i < s.workers; i++ {
 go func() {
 wQuit := make(chan struct{})
 s.pool <- wQuit
 s.sFlowWorker(wQuit)
 }()
 }

关闭协程:

 func (s *SFlow) sFlowWorker(wQuit chan struct{}) {
LOOP:
 for {

 select {
 case <-wQuit:
 break LOOP
 case msg, ok = <-sFlowUDPCh:
 if !ok {
 break LOOP
 }
 }

 // 此处执行任务操作
 
}

动态调整:

 for n = 0; n < 10; n++ {
 if len(s.pool) > s.workers {
 wQuit := <-s.pool
 close(wQuit)
 }
 }

多协程精确控制

在改造telegraf过程中,要想动态调整input,每个input都是唯一的,分属不同类型插件。就必须实现精准控制指定的协程的启停。
这个时候实现思路就是:实现一个kills map[string]chan struct{},k为每个任务的唯一ID。添加任务时候,传递一个chan struct{},这个时候关闭指定ID的chan struct{},就能控制指定的协程。

// DelInput add input func (a *Agent) DelInput(inputs []*models.RunningInput) error {
 a.storeMutex.Lock()
 defer a.storeMutex.Unlock()

 for _, v := range inputs {
 if _, ok := a.kills[v.Config.ID]; !ok {
 return fmt.Errorf("input: %s,未找到,无法删除", v.Config.ID)
 }
 }

 for _, input := range inputs {
 if kill, ok := a.kills[input.Config.ID]; ok {
 delete(a.kills, input.Config.ID)
 close(kill)
 }
 }
 return nil
}

添加任务:

// AddInput add input func (a *Agent) AddInput(shutdown chan struct{}, inputs []*models.RunningInput) error {
 a.storeMutex.Lock()
 defer a.storeMutex.Unlock()
 for _, v := range inputs {
 if _, ok := a.kills[v.Config.ID]; ok {
 return fmt.Errorf("input: %s,已经存在无法新增", v.Config.ID)
 }
 }

 for _, input := range inputs {
 interval := a.Config.Agent.Interval.Duration
 // overwrite global interval if this plugin has it's own. if input.Config.Interval != 0 {
 interval = input.Config.Interval
 }
 if input.Config.ID == "" {
 continue
 }
 
 a.wg.Add(1)

 kill := make(chan struct{})
 a.kills[input.Config.ID] = kill

 go func(in *models.RunningInput, interv time.Duration) {
 defer a.wg.Done()
 a.gatherer(shutdown, kill, in, interv, a.metricC)
 }(input, interval)
 }

 return nil
}

总结

简单介绍了一下telegraf项目。后续的优化和改造工作还在继续。主要是分布式telegraf的调度算法。毕竟集中化所有exporter以后,telegraf的负载能力受单机能力限制,而且也不符合高可用的使用目标。

本文转自中文社区-k8s与监控--从telegraf改造谈golang多协程精确控制

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
2月前
|
安全 Go
Golang语言goroutine协程并发安全及锁机制
这篇文章是关于Go语言中多协程操作同一数据问题、互斥锁Mutex和读写互斥锁RWMutex的详细介绍及使用案例,涵盖了如何使用这些同步原语来解决并发访问共享资源时的数据安全问题。
86 4
|
13天前
|
存储 安全 测试技术
GoLang协程Goroutiney原理与GMP模型详解
本文详细介绍了Go语言中的Goroutine及其背后的GMP模型。Goroutine是Go语言中的一种轻量级线程,由Go运行时管理,支持高效的并发编程。文章讲解了Goroutine的创建、调度、上下文切换和栈管理等核心机制,并通过示例代码展示了如何使用Goroutine。GMP模型(Goroutine、Processor、Machine)是Go运行时调度Goroutine的基础,通过合理的调度策略,实现了高并发和高性能的程序执行。
76 29
|
11天前
|
Go 计算机视觉
在Golang高并发环境中如何进行协程同步?
在此示例中,使用互斥锁来保护对共享计数器变量 c 的访问,确保并发的 HTTP 请求不会产生数据竞争。
32 3
|
11天前
|
负载均衡 算法 Go
GoLang协程Goroutiney原理与GMP模型详解
【11月更文挑战第4天】Goroutine 是 Go 语言中的轻量级线程,由 Go 运行时管理,创建和销毁开销小,适合高并发场景。其调度采用非抢占式和协作式多任务处理结合的方式。GMP 模型包括 G(Goroutine)、M(系统线程)和 P(逻辑处理器),通过工作窃取算法实现负载均衡,确保高效利用系统资源。
|
2月前
|
Go 调度
Golang语言goroutine协程篇
这篇文章是关于Go语言goroutine协程的详细教程,涵盖了并发编程的常见术语、goroutine的创建和调度、使用sync.WaitGroup控制协程退出以及如何通过GOMAXPROCS设置程序并发时占用的CPU逻辑核心数。
50 4
Golang语言goroutine协程篇
|
3月前
|
Kubernetes 监控 Cloud Native
"解锁K8s新姿势!Cobra+Client-go强强联手,打造你的专属K8s监控神器,让资源优化与性能监控尽在掌握!"
【8月更文挑战第14天】在云原生领域,Kubernetes以出色的扩展性和定制化能力引领潮流。面对独特需求,自定义插件成为必要。本文通过Cobra与Client-go两大利器,打造一款监测特定标签Pods资源使用的K8s插件。Cobra简化CLI开发,Client-go则负责与K8s API交互。从初始化项目到实现查询逻辑,一步步引导你构建个性化工具,开启K8s集群智能化管理之旅。
53 2
|
3月前
|
Prometheus Kubernetes 监控
Kubernetes(K8S) 监控 Prometheus + Grafana
Kubernetes(K8S) 监控 Prometheus + Grafana
252 2
|
2月前
|
运维 Kubernetes 监控
Loki+Promtail+Grafana监控K8s日志
综上,Loki+Promtail+Grafana 监控组合对于在 K8s 环境中优化日志管理至关重要,它不仅提供了强大且易于扩展的日志收集与汇总工具,还有可视化这些日志的能力。通过有效地使用这套工具,可以显著地提高对应用的运维监控能力和故障诊断效率。
290 0
|
3月前
|
NoSQL Unix 编译器
Golang协程goroutine的调度与状态变迁分析
文章深入分析了Golang中goroutine的调度和状态变迁,包括Grunnable、Gwaiting、Grunning和Gsyscall等状态,以及它们之间的转换条件和原理,帮助理解Go调度器的内部机制。
44 0
|
3月前
|
Prometheus 监控 Kubernetes
在k8S中,状态码监控是怎么做的?
在k8S中,状态码监控是怎么做的?