go语言并发实战——日志收集系统(十) 重构tailfile模块实现同时监控多个日志文件

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: go语言并发实战——日志收集系统(十) 重构tailfile模块实现同时监控多个日志文件

前言

在上一篇文章中,我们实现了通过etcd来同时指定多个不同的有关分区与日志文件的路径,但是锁着一次读取配置的增多,不可避免的出现了一个问题:我们如何来监控多个日志文件,这样原来的tailFile模块相对于当下场景就显得有些捉襟见肘了,所以对tialFile模块进行重构就成了我们必须要做的事情了。

TailFiile模块的重构流程

储存数据结构体的重构

在上一篇博文中我们定义了collectEntry来储存我们从etcd中get到的信息,但是,这个获取的消息在tailFile模块也需要使用,所以这里我们再创建一个common模块来专门储存这个数据:

type CollectEntry struct {
  Path  string `json:"path"`
  Topic string `json:"topic"`
}

tailFile模块中也需要一个结构体来储存需要的信息:

type tailTask struct{
  path string
  topic string
  TailObj *tail.Tail
}

tail初始化模块的重构

由于现在我们的配置信息全部储存到了 CollectEntry结构体中,它会给tail的初始化函数传递一个CollectEntry结构体数组,所以我们需要对之前的tail模块代码进行重构与细化,如下:

type tailTask struct {
  path    string
  topic   string
  TailObj *tail.Tail
}
func NewTailTask(path, topic string) (tt *tailTask) {
  tt = &tailTask{
    path:  path,
    topic: topic,
  }
  return tt
}
func (task *tailTask) Init() (err error) {
  config := tail.Config{
    Follow:    true,
    ReOpen:    true,
    MustExist: true,
    Poll:      true,
    Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
  }
  task.TailObj, err = tail.TailFile(task.path, config)
  if err != nil {
    logrus.Error("tail create tailObj for path:%s,err:%v", task.path, err)
    return
  }
  return
}
func InitTail(collectEntryList []common.CollectEntry) (err error) {
  for _, entry := range collectEntryList {
    tt := NewTailTask(entry.Path, entry.Topic)
    err = tt.Init()
    if err != nil {
      logrus.Error("tail create tailObj for path:%s,err:%v", entry.Path, err)
      continue
    }
    go tt.run()
  }
  return
}

之前我们只有一个日志需要监控,所以主要的工作流程可以放在man.go中,但是现在会创建多个tailTask来监控,我们最好将他移动到tail模块中,最后tail模块的全部代码为:

package tailFile
import (
  "github.com/Shopify/sarama"
  "github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
  "github.com/hpcloud/tail"
  "log-agent/Kafka"
  "log-agent/common"
  "strings"
  "time"
)
type tailTask struct {
  path    string
  topic   string
  TailObj *tail.Tail
}
func NewTailTask(path, topic string) (tt *tailTask) {
  tt = &tailTask{
    path:  path,
    topic: topic,
  }
  return tt
}
func (task *tailTask) Init() (err error) {
  config := tail.Config{
    Follow:    true,
    ReOpen:    true,
    MustExist: true,
    Poll:      true,
    Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
  }
  task.TailObj, err = tail.TailFile(task.path, config)
  if err != nil {
    logrus.Error("tail create tailObj for path:%s,err:%v", task.path, err)
    return
  }
  return
}
func InitTail(collectEntryList []common.CollectEntry) (err error) {
  for _, entry := range collectEntryList {
    tt := NewTailTask(entry.Path, entry.Topic)
    err = tt.Init()
    if err != nil {
      logrus.Error("tail create tailObj for path:%s,err:%v", entry.Path, err)
      continue
    }
    go tt.run()
  }
  return
}
func (t *tailTask) run() {
  for {
    line, ok := <-t.TailObj.Lines
    if !ok {
      logrus.Warn("tailFile.TailObj.Lines channel closed,path:%s\n", t.path)
      time.Sleep(2 * time.Second)
      continue
    }
    if len(strings.Trim(line.Text, "\r")) == 0 {
      continue
    }
    msg := &sarama.ProducerMessage{}
    msg.Topic = t.topic
    msg.Value = sarama.StringEncoder(line.Text)
    Kafka.MesChan(msg)
  }
}

修改模块的全部代码

  • main.go
package main
import (
  "fmt"
  "github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
  "github.com/go-ini/ini"
  "log-agent/Kafka"
  "log-agent/etcd"
  "log-agent/tailFile"
)
type Config struct {
  Kafakaddress Kafkaddress `ini:"kafka"`
  LogFilePath  LogFilePath `ini:"collect"`
  Etcdaddress  EtcdAddress `ini:"etcd"`
}
type Kafkaddress struct {
  Addr        []string `ini:"address"`
  Topic       string   `ini:"topic"`
  MessageSize int64    `ini:"chan_size"`
}
type LogFilePath struct {
  Path string `ini:"logfile_path"`
}
type EtcdAddress struct {
  Addr []string `ini:"address"`
  Key  string   `ini:"collect_key"`
}
func run() {
  select {}
}
func main() {
  //读取配置文件,获取配置信息
  filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"
  ConfigObj := new(Config)
  err := ini.MapTo(ConfigObj, filename)
  if err != nil {
    logrus.Error("%s Load failed,err:", filename, err)
  }
  //初始化Kafka
  err = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)
  if err != nil {
    logrus.Error("InitKafka failed, err:%v", err)
    return
  }
  logrus.Infof("InitKafka success")
  //初始化etcd
  err = etcd.Init(ConfigObj.Etcdaddress.Addr)
  if err != nil {
    logrus.Error("InitEtcd failed, err:%v", err)
    return
  }
  logrus.Infof("InitEtcd success")
  //拉取要收集日志文件的配置项
  err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)
  if err != nil {
    logrus.Error("GetConf failed, err:%v", err)
    return
  }
  fmt.Println(collectEntryList)
  //初始化tail
  err = tailFile.InitTail(collectEntryList)
  if err != nil {
    logrus.Error("InitTail failed, err:%v", err)
    return
  }
  logrus.Infof("InitTail success")
  run()
}
  • common.go
package common
type CollectEntry struct {
  Path  string `json:"path"`
  Topic string `json:"topic"`
}
  • tailFile.go
package tailFile
import (
  "github.com/Shopify/sarama"
  "github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
  "github.com/hpcloud/tail"
  "log-agent/Kafka"
  "log-agent/common"
  "strings"
  "time"
)
type tailTask struct {
  path    string
  topic   string
  TailObj *tail.Tail
}
func NewTailTask(path, topic string) (tt *tailTask) {
  tt = &tailTask{
    path:  path,
    topic: topic,
  }
  return tt
}
func (task *tailTask) Init() (err error) {
  config := tail.Config{
    Follow:    true,
    ReOpen:    true,
    MustExist: true,
    Poll:      true,
    Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
  }
  task.TailObj, err = tail.TailFile(task.path, config)
  if err != nil {
    logrus.Error("tail create tailObj for path:%s,err:%v", task.path, err)
    return
  }
  return
}
func InitTail(collectEntryList []common.CollectEntry) (err error) {
  for _, entry := range collectEntryList {
    tt := NewTailTask(entry.Path, entry.Topic)
    err = tt.Init()
    if err != nil {
      logrus.Error("tail create tailObj for path:%s,err:%v", entry.Path, err)
      continue
    }
    go tt.run()
  }
  return
}
func (t *tailTask) run() {
  for {
    line, ok := <-t.TailObj.Lines
    if !ok {
      logrus.Warn("tailFile.TailObj.Lines channel closed,path:%s\n", t.path)
      time.Sleep(2 * time.Second)
      continue
    }
    if len(strings.Trim(line.Text, "\r")) == 0 {
      continue
    }
    msg := &sarama.ProducerMessage{}
    msg.Topic = t.topic
    msg.Value = sarama.StringEncoder(line.Text)
    Kafka.MesChan(msg)
  }
}

运行结果

当你对不同日志文件修改都有反馈时就代表运行成功啦!

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
2月前
|
人工智能 安全 算法
Go入门实战:并发模式的使用
本文详细探讨了Go语言的并发模式,包括Goroutine、Channel、Mutex和WaitGroup等核心概念。通过具体代码实例与详细解释,介绍了这些模式的原理及应用。同时分析了未来发展趋势与挑战,如更高效的并发控制、更好的并发安全及性能优化。Go语言凭借其优秀的并发性能,在现代编程中备受青睐。
110 33
|
1月前
|
Go
【LeetCode 热题100】DP 实战进阶:最长递增子序列、乘积最大子数组、分割等和子集(力扣300 / 152/ 416 )(Go语言版)
本文深入解析三道经典的动态规划问题:**最长递增子序列(LIS)**、**乘积最大子数组** 和 **分割等和子集**。 - **300. LIS** 通过 `dp[i]` 表示以第 `i` 个元素结尾的最长递增子序列长度,支持 O(n²) 动态规划与 O(n log n) 的二分优化。 - **152. 乘积最大子数组** 利用正负数特性,同时维护最大值与最小值的状态转移方程。 - **416. 分割等和子集** 转化为 0-1 背包问题,通过布尔型 DP 实现子集和判断。 总结对比了三题的状态定义与解法技巧,并延伸至相关变种问题,助你掌握动态规划的核心思想与灵活应用!
66 1
|
1月前
|
分布式计算 算法 Go
【LeetCode 热题100】BFS/DFS 实战:岛屿数量 & 腐烂的橘子(力扣200 / 994 )(Go语言版)
本文讲解了两道经典的图论问题:**岛屿数量(LeetCode 200)** 和 **腐烂的橘子(LeetCode 994)**,分别通过 DFS/BFS 实现。在“岛屿数量”中,利用深度或广度优先搜索遍历二维网格,标记连通陆地并计数;“腐烂的橘子”则采用多源 BFS,模拟腐烂传播过程,计算最短时间。两者均需掌握访问标记技巧,是学习网格搜索算法的绝佳实践。
68 1
|
1月前
|
Go
【LeetCode 热题100】BFS/DFS 实战:岛屿数量 & 腐烂的橘子(力扣200 / 994 )(Go语言版)
本篇博客详细解析了三道经典的动态规划问题:198. 打家劫舍(线性状态转移)、279. 完全平方数与322. 零钱兑换(完全背包问题)。通过 Go 语言实现,帮助读者掌握动态规划的核心思想及其实战技巧。从状态定义到转移方程,逐步剖析每道题的解法,并总结其异同点,助力解决更复杂的 DP 问题。适合初学者深入理解动态规划的应用场景和优化方法。
45 0
|
3月前
|
存储 NoSQL Redis
阿里面试:Redis 为啥那么快?怎么实现的100W并发?说出了6大架构,面试官跪地: 纯内存 + 尖端结构 + 无锁架构 + EDA架构 + 异步日志 + 集群架构
阿里面试:Redis 为啥那么快?怎么实现的100W并发?说出了6大架构,面试官跪地: 纯内存 + 尖端结构 + 无锁架构 + EDA架构 + 异步日志 + 集群架构
阿里面试:Redis 为啥那么快?怎么实现的100W并发?说出了6大架构,面试官跪地: 纯内存 + 尖端结构 +  无锁架构 +  EDA架构  + 异步日志 + 集群架构
|
3月前
|
Go API 定位技术
MCP 实战:用 Go 语言开发一个查询 IP 信息的 MCP 服务器
随着 MCP 的快速普及和广泛应用,MCP 服务器也层出不穷。大多数开发者使用的 MCP 服务器开发库是官方提供的 typescript-sdk,而作为 Go 开发者,我们也可以借助优秀的第三方库去开发 MCP 服务器,例如 ThinkInAIXYZ/go-mcp。 本文将详细介绍如何在 Go 语言中使用 go-mcp 库来开发一个查询 IP 信息的 MCP 服务器。
195 0
|
4月前
|
存储 JSON Go
PHP 日志系统的最佳搭档:一个 Go 写的远程日志收集服务
为了不再 SSH 上去翻日志,我写了个 Go 小脚本,用来接收远程日志。PHP 负责记录日志,Go 负责存储和展示,按天存储、支持 API 访问、可远程管理,终于能第一时间知道项目炸了。
78 10
|
8月前
|
XML 安全 Java
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
本文介绍了Java日志框架的基本概念和使用方法,重点讨论了SLF4J、Log4j、Logback和Log4j2之间的关系及其性能对比。SLF4J作为一个日志抽象层,允许开发者使用统一的日志接口,而Log4j、Logback和Log4j2则是具体的日志实现框架。Log4j2在性能上优于Logback,推荐在新项目中使用。文章还详细说明了如何在Spring Boot项目中配置Log4j2和Logback,以及如何使用Lombok简化日志记录。最后,提供了一些日志配置的最佳实践,包括滚动日志、统一日志格式和提高日志性能的方法。
2427 31
【日志框架整合】Slf4j、Log4j、Log4j2、Logback配置模板
|
7月前
|
监控 安全 Apache
什么是Apache日志?为什么Apache日志分析很重要?
Apache是全球广泛使用的Web服务器软件,支持超过30%的活跃网站。它通过接收和处理HTTP请求,与后端服务器通信,返回响应并记录日志,确保网页请求的快速准确处理。Apache日志分为访问日志和错误日志,对提升用户体验、保障安全及优化性能至关重要。EventLog Analyzer等工具可有效管理和分析这些日志,增强Web服务的安全性和可靠性。
192 9
|
5月前
|
存储 SQL 关系型数据库
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log、原理、写入过程;binlog与redolog区别、update语句的执行流程、两阶段提交、主从复制、三种日志的使用场景;查询日志、慢查询日志、错误日志等其他几类日志
420 35
MySQL日志详解——日志分类、二进制日志bin log、回滚日志undo log、重做日志redo log