go语言并发实战——日志收集系统(十) 重构tailfile模块实现同时监控多个日志文件

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: go语言并发实战——日志收集系统(十) 重构tailfile模块实现同时监控多个日志文件

前言

在上一篇文章中,我们实现了通过etcd来同时指定多个不同的有关分区与日志文件的路径,但是锁着一次读取配置的增多,不可避免的出现了一个问题:我们如何来监控多个日志文件,这样原来的tailFile模块相对于当下场景就显得有些捉襟见肘了,所以对tialFile模块进行重构就成了我们必须要做的事情了。

TailFiile模块的重构流程

储存数据结构体的重构

在上一篇博文中我们定义了collectEntry来储存我们从etcd中get到的信息,但是,这个获取的消息在tailFile模块也需要使用,所以这里我们再创建一个common模块来专门储存这个数据:

type CollectEntry struct {
  Path  string `json:"path"`
  Topic string `json:"topic"`
}

tailFile模块中也需要一个结构体来储存需要的信息:

type tailTask struct{
  path string
  topic string
  TailObj *tail.Tail
}

tail初始化模块的重构

由于现在我们的配置信息全部储存到了 CollectEntry结构体中,它会给tail的初始化函数传递一个CollectEntry结构体数组,所以我们需要对之前的tail模块代码进行重构与细化,如下:

type tailTask struct {
  path    string
  topic   string
  TailObj *tail.Tail
}
func NewTailTask(path, topic string) (tt *tailTask) {
  tt = &tailTask{
    path:  path,
    topic: topic,
  }
  return tt
}
func (task *tailTask) Init() (err error) {
  config := tail.Config{
    Follow:    true,
    ReOpen:    true,
    MustExist: true,
    Poll:      true,
    Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
  }
  task.TailObj, err = tail.TailFile(task.path, config)
  if err != nil {
    logrus.Error("tail create tailObj for path:%s,err:%v", task.path, err)
    return
  }
  return
}
func InitTail(collectEntryList []common.CollectEntry) (err error) {
  for _, entry := range collectEntryList {
    tt := NewTailTask(entry.Path, entry.Topic)
    err = tt.Init()
    if err != nil {
      logrus.Error("tail create tailObj for path:%s,err:%v", entry.Path, err)
      continue
    }
    go tt.run()
  }
  return
}

之前我们只有一个日志需要监控,所以主要的工作流程可以放在man.go中,但是现在会创建多个tailTask来监控,我们最好将他移动到tail模块中,最后tail模块的全部代码为:

package tailFile
import (
  "github.com/Shopify/sarama"
  "github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
  "github.com/hpcloud/tail"
  "log-agent/Kafka"
  "log-agent/common"
  "strings"
  "time"
)
type tailTask struct {
  path    string
  topic   string
  TailObj *tail.Tail
}
func NewTailTask(path, topic string) (tt *tailTask) {
  tt = &tailTask{
    path:  path,
    topic: topic,
  }
  return tt
}
func (task *tailTask) Init() (err error) {
  config := tail.Config{
    Follow:    true,
    ReOpen:    true,
    MustExist: true,
    Poll:      true,
    Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
  }
  task.TailObj, err = tail.TailFile(task.path, config)
  if err != nil {
    logrus.Error("tail create tailObj for path:%s,err:%v", task.path, err)
    return
  }
  return
}
func InitTail(collectEntryList []common.CollectEntry) (err error) {
  for _, entry := range collectEntryList {
    tt := NewTailTask(entry.Path, entry.Topic)
    err = tt.Init()
    if err != nil {
      logrus.Error("tail create tailObj for path:%s,err:%v", entry.Path, err)
      continue
    }
    go tt.run()
  }
  return
}
func (t *tailTask) run() {
  for {
    line, ok := <-t.TailObj.Lines
    if !ok {
      logrus.Warn("tailFile.TailObj.Lines channel closed,path:%s\n", t.path)
      time.Sleep(2 * time.Second)
      continue
    }
    if len(strings.Trim(line.Text, "\r")) == 0 {
      continue
    }
    msg := &sarama.ProducerMessage{}
    msg.Topic = t.topic
    msg.Value = sarama.StringEncoder(line.Text)
    Kafka.MesChan(msg)
  }
}

修改模块的全部代码

  • main.go
package main
import (
  "fmt"
  "github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
  "github.com/go-ini/ini"
  "log-agent/Kafka"
  "log-agent/etcd"
  "log-agent/tailFile"
)
type Config struct {
  Kafakaddress Kafkaddress `ini:"kafka"`
  LogFilePath  LogFilePath `ini:"collect"`
  Etcdaddress  EtcdAddress `ini:"etcd"`
}
type Kafkaddress struct {
  Addr        []string `ini:"address"`
  Topic       string   `ini:"topic"`
  MessageSize int64    `ini:"chan_size"`
}
type LogFilePath struct {
  Path string `ini:"logfile_path"`
}
type EtcdAddress struct {
  Addr []string `ini:"address"`
  Key  string   `ini:"collect_key"`
}
func run() {
  select {}
}
func main() {
  //读取配置文件,获取配置信息
  filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"
  ConfigObj := new(Config)
  err := ini.MapTo(ConfigObj, filename)
  if err != nil {
    logrus.Error("%s Load failed,err:", filename, err)
  }
  //初始化Kafka
  err = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)
  if err != nil {
    logrus.Error("InitKafka failed, err:%v", err)
    return
  }
  logrus.Infof("InitKafka success")
  //初始化etcd
  err = etcd.Init(ConfigObj.Etcdaddress.Addr)
  if err != nil {
    logrus.Error("InitEtcd failed, err:%v", err)
    return
  }
  logrus.Infof("InitEtcd success")
  //拉取要收集日志文件的配置项
  err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)
  if err != nil {
    logrus.Error("GetConf failed, err:%v", err)
    return
  }
  fmt.Println(collectEntryList)
  //初始化tail
  err = tailFile.InitTail(collectEntryList)
  if err != nil {
    logrus.Error("InitTail failed, err:%v", err)
    return
  }
  logrus.Infof("InitTail success")
  run()
}
  • common.go
package common
type CollectEntry struct {
  Path  string `json:"path"`
  Topic string `json:"topic"`
}
  • tailFile.go
package tailFile
import (
  "github.com/Shopify/sarama"
  "github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
  "github.com/hpcloud/tail"
  "log-agent/Kafka"
  "log-agent/common"
  "strings"
  "time"
)
type tailTask struct {
  path    string
  topic   string
  TailObj *tail.Tail
}
func NewTailTask(path, topic string) (tt *tailTask) {
  tt = &tailTask{
    path:  path,
    topic: topic,
  }
  return tt
}
func (task *tailTask) Init() (err error) {
  config := tail.Config{
    Follow:    true,
    ReOpen:    true,
    MustExist: true,
    Poll:      true,
    Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
  }
  task.TailObj, err = tail.TailFile(task.path, config)
  if err != nil {
    logrus.Error("tail create tailObj for path:%s,err:%v", task.path, err)
    return
  }
  return
}
func InitTail(collectEntryList []common.CollectEntry) (err error) {
  for _, entry := range collectEntryList {
    tt := NewTailTask(entry.Path, entry.Topic)
    err = tt.Init()
    if err != nil {
      logrus.Error("tail create tailObj for path:%s,err:%v", entry.Path, err)
      continue
    }
    go tt.run()
  }
  return
}
func (t *tailTask) run() {
  for {
    line, ok := <-t.TailObj.Lines
    if !ok {
      logrus.Warn("tailFile.TailObj.Lines channel closed,path:%s\n", t.path)
      time.Sleep(2 * time.Second)
      continue
    }
    if len(strings.Trim(line.Text, "\r")) == 0 {
      continue
    }
    msg := &sarama.ProducerMessage{}
    msg.Topic = t.topic
    msg.Value = sarama.StringEncoder(line.Text)
    Kafka.MesChan(msg)
  }
}

运行结果

当你对不同日志文件修改都有反馈时就代表运行成功啦!

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
9天前
|
存储 JSON 监控
Viper,一个Go语言配置管理神器!
Viper 是一个功能强大的 Go 语言配置管理库,支持从多种来源读取配置,包括文件、环境变量、远程配置中心等。本文详细介绍了 Viper 的核心特性和使用方法,包括从本地 YAML 文件和 Consul 远程配置中心读取配置的示例。Viper 的多来源配置、动态配置和轻松集成特性使其成为管理复杂应用配置的理想选择。
29 2
|
7天前
|
Go 索引
go语言中的循环语句
【11月更文挑战第4天】
18 2
|
7天前
|
Go C++
go语言中的条件语句
【11月更文挑战第4天】
20 2
|
12天前
|
安全 Go
用 Zap 轻松搞定 Go 语言中的结构化日志
在现代应用程序开发中,日志记录至关重要。Go 语言中有许多日志库,而 Zap 因其高性能和灵活性脱颖而出。本文详细介绍如何在 Go 项目中使用 Zap 进行结构化日志记录,并展示如何定制日志输出,满足生产环境需求。通过基础示例、SugaredLogger 的便捷使用以及自定义日志配置,帮助你在实际开发中高效管理日志。
31 1
|
11天前
|
程序员 Go
go语言中的控制结构
【11月更文挑战第3天】
87 58
|
10天前
|
监控 Go API
Go语言在微服务架构中的应用实践
在微服务架构的浪潮中,Go语言以其简洁、高效和并发处理能力脱颖而出,成为构建微服务的理想选择。本文将探讨Go语言在微服务架构中的应用实践,包括Go语言的特性如何适应微服务架构的需求,以及在实际开发中如何利用Go语言的特性来提高服务的性能和可维护性。我们将通过一个具体的案例分析,展示Go语言在微服务开发中的优势,并讨论在实际应用中可能遇到的挑战和解决方案。
|
11天前
|
存储 编译器 Go
go语言中的变量、常量、数据类型
【11月更文挑战第3天】
28 9
|
7天前
|
Go
go语言中的 跳转语句
【11月更文挑战第4天】
15 4
|
11天前
|
数据采集 监控 Java
go语言编程学习
【11月更文挑战第3天】
30 7
|
7天前
|
JSON 安全 Go
Go语言中使用JWT鉴权、Token刷新完整示例,拿去直接用!
本文介绍了如何在 Go 语言中使用 Gin 框架实现 JWT 用户认证和安全保护。JWT(JSON Web Token)是一种轻量、高效的认证与授权解决方案,特别适合微服务架构。文章详细讲解了 JWT 的基本概念、结构以及如何在 Gin 中生成、解析和刷新 JWT。通过示例代码,展示了如何在实际项目中应用 JWT,确保用户身份验证和数据安全。完整代码可在 GitHub 仓库中查看。
38 1