go语言并发实战——日志收集系统(十一)基于etcd来监视配置文件的变化

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: go语言并发实战——日志收集系统(十一)基于etcd来监视配置文件的变化

前言

在我们实际生产中,我们常常因为新的项目或者新的功能进而要对配置文件进行修改,但是在生产环境下我们不是每次配置文件发生变化都重启一次系统,这无疑是不切实际的,所以我们需要对配置文件进行实时监控,而今天我们所要展示的也就是如何基于etcd来监控配置文件的变化。

etcd对配置项监控的流程

需求分析

首先我们来看我们日志收集服务的主要工作流程:

func main() {
  //读取配置文件,获取配置信息
  filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"
  ConfigObj := new(Config)
  err := ini.MapTo(ConfigObj, filename)
  if err != nil {
    logrus.Error("%s Load failed,err:", filename, err)
  }
  //初始化Kafka
  err = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)
  if err != nil {
    logrus.Error("InitKafka failed, err:%v", err)
    return
  }
  logrus.Infof("InitKafka success")
  //初始化etcd
  err = etcd.Init(ConfigObj.Etcdaddress.Addr)
  if err != nil {
    logrus.Error("InitEtcd failed, err:%v", err)
    return
  }
  logrus.Infof("InitEtcd success")
  //拉取要收集日志文件的配置项
  err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)
  if err != nil {
    logrus.Error("GetConf failed, err:%v", err)
    return
  }
  fmt.Println(collectEntryList)
  //初始化tail
  err = tailFile.InitTail(collectEntryList)
  if err != nil {
    logrus.Error("InitTail failed, err:%v", err)
    return
  }
  logrus.Infof("InitTail success")
  run()
}

在上述主要工作逻辑的基础上,现在我们需要etcd来实现对配置文件的实时监控,而这就需要我们在后态去运行一个监控程序来实时监控查看需要见监控的配置文件是否变化。并且将变化发送到tailFile模块中

实现Watch监控

所以这里我们对main.go进行一点简单的修改,添加一个后台程序 go etcd.WatchConf(ConfigObj.Etcdaddress.Key):

package main
import (
  "fmt"
  "github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
  "github.com/go-ini/ini"
  "log-agent/Kafka"
  "log-agent/etcd"
  "log-agent/tailFile"
)
type Config struct {
  Kafakaddress Kafkaddress `ini:"kafka"`
  LogFilePath  LogFilePath `ini:"collect"`
  Etcdaddress  EtcdAddress `ini:"etcd"`
}
type Kafkaddress struct {
  Addr        []string `ini:"address"`
  Topic       string   `ini:"topic"`
  MessageSize int64    `ini:"chan_size"`
}
type LogFilePath struct {
  Path string `ini:"logfile_path"`
}
type EtcdAddress struct {
  Addr []string `ini:"address"`
  Key  string   `ini:"collect_key"`
}
func run() {
  select {}
}
func main() {
  //读取配置文件,获取配置信息
  filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"
  ConfigObj := new(Config)
  err := ini.MapTo(ConfigObj, filename)
  if err != nil {
    logrus.Error("%s Load failed,err:", filename, err)
  }
  //初始化Kafka
  err = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)
  if err != nil {
    logrus.Error("InitKafka failed, err:%v", err)
    return
  }
  logrus.Infof("InitKafka success")
  //初始化etcd
  err = etcd.Init(ConfigObj.Etcdaddress.Addr)
  if err != nil {
    logrus.Error("InitEtcd failed, err:%v", err)
    return
  }
  logrus.Infof("InitEtcd success")
  //拉取要收集日志文件的配置项
  err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)
  if err != nil {
    logrus.Error("GetConf failed, err:%v", err)
    return
  }
  fmt.Println(collectEntryList)
  go etcd.WatchConf(ConfigObj.Etcdaddress.Key)
  //初始化tail
  err = tailFile.InitTail(collectEntryList)
  if err != nil {
    logrus.Error("InitTail failed, err:%v", err)
    return
  }
  logrus.Infof("InitTail success")
  run()
}

我们在来看这个函数的具体逻辑:

func WatchConf(key string) {
  rch := client.Watch(context.Background(), key)
  var newConf []common.CollectEntry
  for wresp := range rch {
    logrus.Infof("get new conf fromn etcd")
    for _, ev := range wresp.Events {
      fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
      err := json.Unmarshal(ev.Kv.Value, &newConf)
      if err != nil {
        logrus.Error("json unmarshal failed,err:%v", err)
        continue
      }
      tailFile.SendNewConf(newConf)
    }
  }
}

与之前有关etcd的文章中的操作例子不同,这里我们并没有定义上下文,主要是因为这里我们不确定什么时候终止这个程序,所以不使用上下文了。

发送新配置到tailFile中

在上面我们已经完成etcd的监控,现在我们需要把新的配置消息发送到tailFile,这里我们第一反应是写一个死循环一直独缺,但是这样其实不大方便,毕竟储蓄一直运行会占掉大量不必要消耗的资源,这里我们可以让双方使用管道来进行通信,平时管道处于阻塞状态,只有监测到新配置才会进行通信,这样会使资源得到最大化的利用,我们来看一看具体的代码实现:

  • 首先我们来定义一下用于通信的管道
var (
  confchan chan []common.CollectEntry
)
  • 然后我们要对管道进行初始化,并且读取管道中新的配置信息:
confchan = make(chan []common.CollectEntry)
  newConf := <-confchan
  logrus.Infof("get newconf from etcd", newConf)

最后,由于我们这里管道只用于etcd模块与tailFile模块之间的通信,所以这里我们就不暴露管道,而是选择暴露函数:

func SendNewConf(newConf []common.CollectEntry) {
  confchan <- newConf
}

结语

最后附上上述变化模块的代码:

  • main.go
package main
import (
  "fmt"
  "github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
  "github.com/go-ini/ini"
  "log-agent/Kafka"
  "log-agent/etcd"
  "log-agent/tailFile"
)
type Config struct {
  Kafakaddress Kafkaddress `ini:"kafka"`
  LogFilePath  LogFilePath `ini:"collect"`
  Etcdaddress  EtcdAddress `ini:"etcd"`
}
type Kafkaddress struct {
  Addr        []string `ini:"address"`
  Topic       string   `ini:"topic"`
  MessageSize int64    `ini:"chan_size"`
}
type LogFilePath struct {
  Path string `ini:"logfile_path"`
}
type EtcdAddress struct {
  Addr []string `ini:"address"`
  Key  string   `ini:"collect_key"`
}
func run() {
  select {}
}
func main() {
  //读取配置文件,获取配置信息
  filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"
  ConfigObj := new(Config)
  err := ini.MapTo(ConfigObj, filename)
  if err != nil {
    logrus.Error("%s Load failed,err:", filename, err)
  }
  //初始化Kafka
  err = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)
  if err != nil {
    logrus.Error("InitKafka failed, err:%v", err)
    return
  }
  logrus.Infof("InitKafka success")
  //初始化etcd
  err = etcd.Init(ConfigObj.Etcdaddress.Addr)
  if err != nil {
    logrus.Error("InitEtcd failed, err:%v", err)
    return
  }
  logrus.Infof("InitEtcd success")
  //拉取要收集日志文件的配置项
  err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)
  if err != nil {
    logrus.Error("GetConf failed, err:%v", err)
    return
  }
  fmt.Println(collectEntryList)
  go etcd.WatchConf(ConfigObj.Etcdaddress.Key)
  //初始化tail
  err = tailFile.InitTail(collectEntryList)
  if err != nil {
    logrus.Error("InitTail failed, err:%v", err)
    return
  }
  logrus.Infof("InitTail success")
  run()
}
  • etcd.go
package etcd
import (
  "encoding/json"
  "fmt"
  "github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
  clientv3 "go.etcd.io/etcd/client/v3"
  "golang.org/x/net/context"
  "log-agent/common"
  "log-agent/tailFile"
  "time"
)
var client *clientv3.Client
func Init(address []string) (err error) {
  client, err = clientv3.New(clientv3.Config{
    Endpoints:   address,
    DialTimeout: 5 * time.Second,
  })
  if err != nil {
    logrus.Error("etcd client connect failed,err:%v", err)
    return
  }
  return
}
func GetConf(key string) (err error, collectEntryList []common.CollectEntry) {
  ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  response, err := client.Get(ctx, key)
  cancel()
  if err != nil {
    logrus.Error("get conf from etcd failed,err:%v", err)
    return
  }
  if len(response.Kvs) == 0 {
    logrus.Warningf("get len:0 conf from etcd failed,err:%v", err)
    return
  }
  fmt.Println(response.Kvs[0].Value)                             //此时还是json字符串
  err = json.Unmarshal(response.Kvs[0].Value, &collectEntryList) //把值反序列化到collectEntryList
  if err != nil {
    logrus.Error("json unmarshal failed,err:%v", err)
    return
  }
  return
}
func WatchConf(key string) {
  rch := client.Watch(context.Background(), key)
  var newConf []common.CollectEntry
  for wresp := range rch {
    logrus.Infof("get new conf fromn etcd")
    for _, ev := range wresp.Events {
      fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
      err := json.Unmarshal(ev.Kv.Value, &newConf)
      if err != nil {
        logrus.Error("json unmarshal failed,err:%v", err)
        continue
      }
      tailFile.SendNewConf(newConf)
    }
  }
  • tailFile.go
package tailFile
import (
  "github.com/Shopify/sarama"
  "github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
  "github.com/hpcloud/tail"
  "log-agent/Kafka"
  "log-agent/common"
  "strings"
  "time"
)
type tailTask struct {
  path    string
  topic   string
  TailObj *tail.Tail
}
var (
  confchan chan []common.CollectEntry
)
func NewTailTask(path, topic string) (tt *tailTask) {
  tt = &tailTask{
    path:  path,
    topic: topic,
  }
  return tt
}
func (task *tailTask) Init() (err error) {
  config := tail.Config{
    Follow:    true,
    ReOpen:    true,
    MustExist: true,
    Poll:      true,
    Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
  }
  task.TailObj, err = tail.TailFile(task.path, config)
  if err != nil {
    logrus.Error("tail create tailObj for path:%s,err:%v", task.path, err)
    return
  }
  return
}
func InitTail(collectEntryList []common.CollectEntry) (err error) {
  for _, entry := range collectEntryList {
    tt := NewTailTask(entry.Path, entry.Topic)
    err = tt.Init()
    if err != nil {
      logrus.Error("tail create tailObj for path:%s,err:%v", entry.Path, err)
      continue
    }
    go tt.run()
  }
  //初始化新配置的管道
  confchan = make(chan []common.CollectEntry)
  newConf := <-confchan
  logrus.Infof("get newconf from etcd", newConf)
  return
}
func (t *tailTask) run() {
  for {
    line, ok := <-t.TailObj.Lines
    if !ok {
      logrus.Warn("tailFile.TailObj.Lines channel closed,path:%s\n", t.path)
      time.Sleep(2 * time.Second)
      continue
    }
    if len(strings.Trim(line.Text, "\r")) == 0 {
      continue
    }
    msg := &sarama.ProducerMessage{}
    msg.Topic = t.topic
    msg.Value = sarama.StringEncoder(line.Text)
    Kafka.MesChan(msg)
  }
}
func SendNewConf(newConf []common.CollectEntry) {
  confchan <- newConf
}
相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
打赏
0
0
0
0
13
分享
相关文章
|
1天前
|
公司局域网管理系统里的 Go 语言 Bloom Filter 算法,太值得深挖了
本文探讨了如何利用 Go 语言中的 Bloom Filter 算法提升公司局域网管理系统的性能。Bloom Filter 是一种高效的空间节省型数据结构,适用于快速判断元素是否存在于集合中。文中通过具体代码示例展示了如何在 Go 中实现 Bloom Filter,并应用于局域网的 IP 访问控制,显著提高系统响应速度和安全性。随着网络规模扩大和技术进步,持续优化算法和结合其他安全技术将是企业维持网络竞争力的关键。
14 1
公司局域网管理系统里的 Go 语言 Bloom Filter 算法,太值得深挖了
探秘员工泄密行为防线:基于Go语言的布隆过滤器算法解析
在信息爆炸时代,员工泄密行为对企业构成重大威胁。本文聚焦布隆过滤器(Bloom Filter)这一高效数据结构,结合Go语言实现算法,帮助企业识别和预防泄密风险。通过构建正常操作“指纹库”,实时监测员工操作,快速筛查可疑行为。示例代码展示了如何利用布隆过滤器检测异常操作,并提出优化建议,如调整参数、结合日志分析系统等,全方位筑牢企业信息安全防线,守护核心竞争力。
|
8天前
|
【02】客户端服务端C语言-go语言-web端PHP语言整合内容发布-优雅草网络设备监控系统-2月12日优雅草简化Centos stream8安装zabbix7教程-本搭建教程非docker搭建教程-优雅草solution
【02】客户端服务端C语言-go语言-web端PHP语言整合内容发布-优雅草网络设备监控系统-2月12日优雅草简化Centos stream8安装zabbix7教程-本搭建教程非docker搭建教程-优雅草solution
57 20
【01】客户端服务端C语言-go语言-web端PHP语言整合内容发布-优雅草网络设备监控系统-硬件设备实时监控系统运营版发布-本产品基于企业级开源项目Zabbix深度二开-分步骤实现预计10篇合集-自营版
【01】客户端服务端C语言-go语言-web端PHP语言整合内容发布-优雅草网络设备监控系统-硬件设备实时监控系统运营版发布-本产品基于企业级开源项目Zabbix深度二开-分步骤实现预计10篇合集-自营版
22 0
图解MySQL【日志】——Undo Log
Undo Log(回滚日志)是 MySQL 中用于实现事务原子性和一致性的关键机制。在默认的自动提交模式下,MySQL 隐式开启事务,每条增删改语句都会记录到 Undo Log 中。其主要作用包括:
8 0
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log、原理、写入过程;binlog与redolog区别、update语句的执行流程、两阶段提交、主从复制、三种日志的使用场景;查询日志、慢查询日志、错误日志等其他几类日志
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log
MySQL事务日志-Undo Log工作原理分析
事务的持久性是交由Redo Log来保证,原子性则是交由Undo Log来保证。如果事务中的SQL执行到一半出现错误,需要把前面已经执行过的SQL撤销以达到原子性的目的,这个过程也叫做"回滚",所以Undo Log也叫回滚日志。
MySQL事务日志-Undo Log工作原理分析
什么是Apache日志?为什么Apache日志分析很重要?
Apache是全球广泛使用的Web服务器软件,支持超过30%的活跃网站。它通过接收和处理HTTP请求,与后端服务器通信,返回响应并记录日志,确保网页请求的快速准确处理。Apache日志分为访问日志和错误日志,对提升用户体验、保障安全及优化性能至关重要。EventLog Analyzer等工具可有效管理和分析这些日志,增强Web服务的安全性和可靠性。
什么是事件日志管理系统?事件日志管理系统有哪些用处?
事件日志管理系统是IT安全的重要工具,用于集中收集、分析和解释来自组织IT基础设施各组件的事件日志,如防火墙、路由器、交换机等,帮助提升网络安全、实现主动威胁检测和促进合规性。系统支持多种日志类型,包括Windows事件日志、Syslog日志和应用程序日志,通过实时监测、告警及可视化分析,为企业提供强大的安全保障。然而,实施过程中也面临数据量大、日志管理和分析复杂等挑战。EventLog Analyzer作为一款高效工具,不仅提供实时监测与告警、可视化分析和报告功能,还支持多种合规性报告,帮助企业克服挑战,提升网络安全水平。
115 2
什么是日志管理,如何进行日志管理?
日志管理是对IT系统生成的日志数据进行收集、存储、分析和处理的实践,对维护系统健康、确保安全及获取运营智能至关重要。本文介绍了日志管理的基本概念、常见挑战、工具的主要功能及选择解决方案的方法,强调了定义管理目标、日志收集与分析、警报和报告、持续改进等关键步骤,以及如何应对数据量大、安全问题、警报疲劳等挑战,最终实现日志数据的有效管理和利用。
369 0

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等