go语言并发实战——日志收集系统(十) 重构tailfile模块实现同时监控多个日志文件

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: go语言并发实战——日志收集系统(十) 重构tailfile模块实现同时监控多个日志文件

前言

在上一篇文章中,我们实现了通过etcd来同时指定多个不同的有关分区与日志文件的路径,但是锁着一次读取配置的增多,不可避免的出现了一个问题:我们如何来监控多个日志文件,这样原来的tailFile模块相对于当下场景就显得有些捉襟见肘了,所以对tialFile模块进行重构就成了我们必须要做的事情了。

TailFiile模块的重构流程

储存数据结构体的重构

在上一篇博文中我们定义了collectEntry来储存我们从etcd中get到的信息,但是,这个获取的消息在tailFile模块也需要使用,所以这里我们再创建一个common模块来专门储存这个数据:

type CollectEntry struct {
  Path  string `json:"path"`
  Topic string `json:"topic"`
}

tailFile模块中也需要一个结构体来储存需要的信息:

type tailTask struct{
  path string
  topic string
  TailObj *tail.Tail
}

tail初始化模块的重构

由于现在我们的配置信息全部储存到了 CollectEntry结构体中,它会给tail的初始化函数传递一个CollectEntry结构体数组,所以我们需要对之前的tail模块代码进行重构与细化,如下:

type tailTask struct {
  path    string
  topic   string
  TailObj *tail.Tail
}
func NewTailTask(path, topic string) (tt *tailTask) {
  tt = &tailTask{
    path:  path,
    topic: topic,
  }
  return tt
}
func (task *tailTask) Init() (err error) {
  config := tail.Config{
    Follow:    true,
    ReOpen:    true,
    MustExist: true,
    Poll:      true,
    Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
  }
  task.TailObj, err = tail.TailFile(task.path, config)
  if err != nil {
    logrus.Error("tail create tailObj for path:%s,err:%v", task.path, err)
    return
  }
  return
}
func InitTail(collectEntryList []common.CollectEntry) (err error) {
  for _, entry := range collectEntryList {
    tt := NewTailTask(entry.Path, entry.Topic)
    err = tt.Init()
    if err != nil {
      logrus.Error("tail create tailObj for path:%s,err:%v", entry.Path, err)
      continue
    }
    go tt.run()
  }
  return
}

之前我们只有一个日志需要监控,所以主要的工作流程可以放在man.go中,但是现在会创建多个tailTask来监控,我们最好将他移动到tail模块中,最后tail模块的全部代码为:

package tailFile
import (
  "github.com/Shopify/sarama"
  "github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
  "github.com/hpcloud/tail"
  "log-agent/Kafka"
  "log-agent/common"
  "strings"
  "time"
)
type tailTask struct {
  path    string
  topic   string
  TailObj *tail.Tail
}
func NewTailTask(path, topic string) (tt *tailTask) {
  tt = &tailTask{
    path:  path,
    topic: topic,
  }
  return tt
}
func (task *tailTask) Init() (err error) {
  config := tail.Config{
    Follow:    true,
    ReOpen:    true,
    MustExist: true,
    Poll:      true,
    Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
  }
  task.TailObj, err = tail.TailFile(task.path, config)
  if err != nil {
    logrus.Error("tail create tailObj for path:%s,err:%v", task.path, err)
    return
  }
  return
}
func InitTail(collectEntryList []common.CollectEntry) (err error) {
  for _, entry := range collectEntryList {
    tt := NewTailTask(entry.Path, entry.Topic)
    err = tt.Init()
    if err != nil {
      logrus.Error("tail create tailObj for path:%s,err:%v", entry.Path, err)
      continue
    }
    go tt.run()
  }
  return
}
func (t *tailTask) run() {
  for {
    line, ok := <-t.TailObj.Lines
    if !ok {
      logrus.Warn("tailFile.TailObj.Lines channel closed,path:%s\n", t.path)
      time.Sleep(2 * time.Second)
      continue
    }
    if len(strings.Trim(line.Text, "\r")) == 0 {
      continue
    }
    msg := &sarama.ProducerMessage{}
    msg.Topic = t.topic
    msg.Value = sarama.StringEncoder(line.Text)
    Kafka.MesChan(msg)
  }
}

修改模块的全部代码

  • main.go
package main
import (
  "fmt"
  "github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
  "github.com/go-ini/ini"
  "log-agent/Kafka"
  "log-agent/etcd"
  "log-agent/tailFile"
)
type Config struct {
  Kafakaddress Kafkaddress `ini:"kafka"`
  LogFilePath  LogFilePath `ini:"collect"`
  Etcdaddress  EtcdAddress `ini:"etcd"`
}
type Kafkaddress struct {
  Addr        []string `ini:"address"`
  Topic       string   `ini:"topic"`
  MessageSize int64    `ini:"chan_size"`
}
type LogFilePath struct {
  Path string `ini:"logfile_path"`
}
type EtcdAddress struct {
  Addr []string `ini:"address"`
  Key  string   `ini:"collect_key"`
}
func run() {
  select {}
}
func main() {
  //读取配置文件,获取配置信息
  filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"
  ConfigObj := new(Config)
  err := ini.MapTo(ConfigObj, filename)
  if err != nil {
    logrus.Error("%s Load failed,err:", filename, err)
  }
  //初始化Kafka
  err = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)
  if err != nil {
    logrus.Error("InitKafka failed, err:%v", err)
    return
  }
  logrus.Infof("InitKafka success")
  //初始化etcd
  err = etcd.Init(ConfigObj.Etcdaddress.Addr)
  if err != nil {
    logrus.Error("InitEtcd failed, err:%v", err)
    return
  }
  logrus.Infof("InitEtcd success")
  //拉取要收集日志文件的配置项
  err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)
  if err != nil {
    logrus.Error("GetConf failed, err:%v", err)
    return
  }
  fmt.Println(collectEntryList)
  //初始化tail
  err = tailFile.InitTail(collectEntryList)
  if err != nil {
    logrus.Error("InitTail failed, err:%v", err)
    return
  }
  logrus.Infof("InitTail success")
  run()
}
  • common.go
package common
type CollectEntry struct {
  Path  string `json:"path"`
  Topic string `json:"topic"`
}
  • tailFile.go
package tailFile
import (
  "github.com/Shopify/sarama"
  "github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
  "github.com/hpcloud/tail"
  "log-agent/Kafka"
  "log-agent/common"
  "strings"
  "time"
)
type tailTask struct {
  path    string
  topic   string
  TailObj *tail.Tail
}
func NewTailTask(path, topic string) (tt *tailTask) {
  tt = &tailTask{
    path:  path,
    topic: topic,
  }
  return tt
}
func (task *tailTask) Init() (err error) {
  config := tail.Config{
    Follow:    true,
    ReOpen:    true,
    MustExist: true,
    Poll:      true,
    Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
  }
  task.TailObj, err = tail.TailFile(task.path, config)
  if err != nil {
    logrus.Error("tail create tailObj for path:%s,err:%v", task.path, err)
    return
  }
  return
}
func InitTail(collectEntryList []common.CollectEntry) (err error) {
  for _, entry := range collectEntryList {
    tt := NewTailTask(entry.Path, entry.Topic)
    err = tt.Init()
    if err != nil {
      logrus.Error("tail create tailObj for path:%s,err:%v", entry.Path, err)
      continue
    }
    go tt.run()
  }
  return
}
func (t *tailTask) run() {
  for {
    line, ok := <-t.TailObj.Lines
    if !ok {
      logrus.Warn("tailFile.TailObj.Lines channel closed,path:%s\n", t.path)
      time.Sleep(2 * time.Second)
      continue
    }
    if len(strings.Trim(line.Text, "\r")) == 0 {
      continue
    }
    msg := &sarama.ProducerMessage{}
    msg.Topic = t.topic
    msg.Value = sarama.StringEncoder(line.Text)
    Kafka.MesChan(msg)
  }
}

运行结果

当你对不同日志文件修改都有反馈时就代表运行成功啦!

相关实践学习
通过日志服务实现云资源OSS的安全审计
本实验介绍如何通过日志服务实现云资源OSS的安全审计。
相关文章
|
1月前
|
Prometheus 监控 Cloud Native
基于docker搭建监控系统&日志收集
Prometheus 是一款由 SoundCloud 开发的开源监控报警系统及时序数据库(TSDB),支持多维数据模型和灵活查询语言,适用于大规模集群监控。它通过 HTTP 拉取数据,支持服务发现、多种图表展示(如 Grafana),并可结合 Loki 实现日志聚合。本文介绍其架构、部署及与 Docker 集成的监控方案。
248 122
基于docker搭建监控系统&日志收集
|
27天前
|
Prometheus 监控 Java
日志收集和Spring 微服务监控的最佳实践
在微服务架构中,日志记录与监控对系统稳定性、问题排查和性能优化至关重要。本文介绍了在 Spring 微服务中实现高效日志记录与监控的最佳实践,涵盖日志级别选择、结构化日志、集中记录、服务ID跟踪、上下文信息添加、日志轮转,以及使用 Spring Boot Actuator、Micrometer、Prometheus、Grafana、ELK 堆栈等工具进行监控与可视化。通过这些方法,可提升系统的可观测性与运维效率。
105 1
日志收集和Spring 微服务监控的最佳实践
|
14天前
|
存储 缓存 监控
用 C++ 红黑树给公司电脑监控软件的日志快速排序的方法
本文介绍基于C++红黑树算法实现公司监控电脑软件的日志高效管理,利用其自平衡特性提升日志排序、检索与动态更新效率,并结合实际场景提出优化方向,增强系统性能与稳定性。
44 4
|
2月前
|
数据采集 Go API
Go语言实战案例:多协程并发下载网页内容
本文是《Go语言100个实战案例 · 网络与并发篇》第6篇,讲解如何使用 Goroutine 和 Channel 实现多协程并发抓取网页内容,提升网络请求效率。通过实战掌握高并发编程技巧,构建爬虫、内容聚合器等工具,涵盖 WaitGroup、超时控制、错误处理等核心知识点。
|
2月前
|
数据采集 JSON Go
Go语言实战案例:实现HTTP客户端请求并解析响应
本文是 Go 网络与并发实战系列的第 2 篇,详细介绍如何使用 Go 构建 HTTP 客户端,涵盖请求发送、响应解析、错误处理、Header 与 Body 提取等流程,并通过实战代码演示如何并发请求多个 URL,适合希望掌握 Go 网络编程基础的开发者。
|
5月前
|
监控 容灾 算法
阿里云 SLS 多云日志接入最佳实践:链路、成本与高可用性优化
本文探讨了如何高效、经济且可靠地将海外应用与基础设施日志统一采集至阿里云日志服务(SLS),解决全球化业务扩展中的关键挑战。重点介绍了高性能日志采集Agent(iLogtail/LoongCollector)在海外场景的应用,推荐使用LoongCollector以获得更优的稳定性和网络容错能力。同时分析了多种网络接入方案,包括公网直连、全球加速优化、阿里云内网及专线/CEN/VPN接入等,并提供了成本优化策略和多目标发送配置指导,帮助企业构建稳定、低成本、高可用的全球日志系统。
629 54
|
11月前
|
XML 安全 Java
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
本文介绍了Java日志框架的基本概念和使用方法,重点讨论了SLF4J、Log4j、Logback和Log4j2之间的关系及其性能对比。SLF4J作为一个日志抽象层,允许开发者使用统一的日志接口,而Log4j、Logback和Log4j2则是具体的日志实现框架。Log4j2在性能上优于Logback,推荐在新项目中使用。文章还详细说明了如何在Spring Boot项目中配置Log4j2和Logback,以及如何使用Lombok简化日志记录。最后,提供了一些日志配置的最佳实践,包括滚动日志、统一日志格式和提高日志性能的方法。
2986 31
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
|
10月前
|
监控 安全 Apache
什么是Apache日志?为什么Apache日志分析很重要?
Apache是全球广泛使用的Web服务器软件,支持超过30%的活跃网站。它通过接收和处理HTTP请求,与后端服务器通信,返回响应并记录日志,确保网页请求的快速准确处理。Apache日志分为访问日志和错误日志,对提升用户体验、保障安全及优化性能至关重要。EventLog Analyzer等工具可有效管理和分析这些日志,增强Web服务的安全性和可靠性。
262 9
|
8月前
|
存储 SQL 关系型数据库
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log、原理、写入过程;binlog与redolog区别、update语句的执行流程、两阶段提交、主从复制、三种日志的使用场景;查询日志、慢查询日志、错误日志等其他几类日志
642 35
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log