前言
在上一篇文章中,我们实现了通过etcd来同时指定多个不同的有关分区与日志文件的路径,但是锁着一次读取配置的增多,不可避免的出现了一个问题:我们如何来监控多个日志文件,这样原来的tailFile
模块相对于当下场景就显得有些捉襟见肘了,所以对tialFile
模块进行重构就成了我们必须要做的事情了。
TailFiile模块的重构流程
储存数据结构体的重构
在上一篇博文中我们定义了collectEntry
来储存我们从etcd中get到的信息,但是,这个获取的消息在tailFile
模块也需要使用,所以这里我们再创建一个common
模块来专门储存这个数据:
type CollectEntry struct { Path string `json:"path"` Topic string `json:"topic"` }
在tailFile
模块中也需要一个结构体来储存需要的信息:
type tailTask struct{ path string topic string TailObj *tail.Tail }
tail初始化模块的重构
由于现在我们的配置信息全部储存到了 CollectEntry结构体中,它会给tail的初始化函数传递一个CollectEntry
结构体数组,所以我们需要对之前的tail模块代码进行重构与细化,如下:
type tailTask struct { path string topic string TailObj *tail.Tail } func NewTailTask(path, topic string) (tt *tailTask) { tt = &tailTask{ path: path, topic: topic, } return tt } func (task *tailTask) Init() (err error) { config := tail.Config{ Follow: true, ReOpen: true, MustExist: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: 2}, } task.TailObj, err = tail.TailFile(task.path, config) if err != nil { logrus.Error("tail create tailObj for path:%s,err:%v", task.path, err) return } return } func InitTail(collectEntryList []common.CollectEntry) (err error) { for _, entry := range collectEntryList { tt := NewTailTask(entry.Path, entry.Topic) err = tt.Init() if err != nil { logrus.Error("tail create tailObj for path:%s,err:%v", entry.Path, err) continue } go tt.run() } return }
之前我们只有一个日志需要监控,所以主要的工作流程可以放在man.go
中,但是现在会创建多个tailTask
来监控,我们最好将他移动到tail模块中,最后tail模块的全部代码为:
package tailFile import ( "github.com/Shopify/sarama" "github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus" "github.com/hpcloud/tail" "log-agent/Kafka" "log-agent/common" "strings" "time" ) type tailTask struct { path string topic string TailObj *tail.Tail } func NewTailTask(path, topic string) (tt *tailTask) { tt = &tailTask{ path: path, topic: topic, } return tt } func (task *tailTask) Init() (err error) { config := tail.Config{ Follow: true, ReOpen: true, MustExist: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: 2}, } task.TailObj, err = tail.TailFile(task.path, config) if err != nil { logrus.Error("tail create tailObj for path:%s,err:%v", task.path, err) return } return } func InitTail(collectEntryList []common.CollectEntry) (err error) { for _, entry := range collectEntryList { tt := NewTailTask(entry.Path, entry.Topic) err = tt.Init() if err != nil { logrus.Error("tail create tailObj for path:%s,err:%v", entry.Path, err) continue } go tt.run() } return } func (t *tailTask) run() { for { line, ok := <-t.TailObj.Lines if !ok { logrus.Warn("tailFile.TailObj.Lines channel closed,path:%s\n", t.path) time.Sleep(2 * time.Second) continue } if len(strings.Trim(line.Text, "\r")) == 0 { continue } msg := &sarama.ProducerMessage{} msg.Topic = t.topic msg.Value = sarama.StringEncoder(line.Text) Kafka.MesChan(msg) } }
修改模块的全部代码
- main.go
package main import ( "fmt" "github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus" "github.com/go-ini/ini" "log-agent/Kafka" "log-agent/etcd" "log-agent/tailFile" ) type Config struct { Kafakaddress Kafkaddress `ini:"kafka"` LogFilePath LogFilePath `ini:"collect"` Etcdaddress EtcdAddress `ini:"etcd"` } type Kafkaddress struct { Addr []string `ini:"address"` Topic string `ini:"topic"` MessageSize int64 `ini:"chan_size"` } type LogFilePath struct { Path string `ini:"logfile_path"` } type EtcdAddress struct { Addr []string `ini:"address"` Key string `ini:"collect_key"` } func run() { select {} } func main() { //读取配置文件,获取配置信息 filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini" ConfigObj := new(Config) err := ini.MapTo(ConfigObj, filename) if err != nil { logrus.Error("%s Load failed,err:", filename, err) } //初始化Kafka err = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize) if err != nil { logrus.Error("InitKafka failed, err:%v", err) return } logrus.Infof("InitKafka success") //初始化etcd err = etcd.Init(ConfigObj.Etcdaddress.Addr) if err != nil { logrus.Error("InitEtcd failed, err:%v", err) return } logrus.Infof("InitEtcd success") //拉取要收集日志文件的配置项 err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key) if err != nil { logrus.Error("GetConf failed, err:%v", err) return } fmt.Println(collectEntryList) //初始化tail err = tailFile.InitTail(collectEntryList) if err != nil { logrus.Error("InitTail failed, err:%v", err) return } logrus.Infof("InitTail success") run() }
- common.go
package common type CollectEntry struct { Path string `json:"path"` Topic string `json:"topic"` }
- tailFile.go
package tailFile import ( "github.com/Shopify/sarama" "github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus" "github.com/hpcloud/tail" "log-agent/Kafka" "log-agent/common" "strings" "time" ) type tailTask struct { path string topic string TailObj *tail.Tail } func NewTailTask(path, topic string) (tt *tailTask) { tt = &tailTask{ path: path, topic: topic, } return tt } func (task *tailTask) Init() (err error) { config := tail.Config{ Follow: true, ReOpen: true, MustExist: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: 2}, } task.TailObj, err = tail.TailFile(task.path, config) if err != nil { logrus.Error("tail create tailObj for path:%s,err:%v", task.path, err) return } return } func InitTail(collectEntryList []common.CollectEntry) (err error) { for _, entry := range collectEntryList { tt := NewTailTask(entry.Path, entry.Topic) err = tt.Init() if err != nil { logrus.Error("tail create tailObj for path:%s,err:%v", entry.Path, err) continue } go tt.run() } return } func (t *tailTask) run() { for { line, ok := <-t.TailObj.Lines if !ok { logrus.Warn("tailFile.TailObj.Lines channel closed,path:%s\n", t.path) time.Sleep(2 * time.Second) continue } if len(strings.Trim(line.Text, "\r")) == 0 { continue } msg := &sarama.ProducerMessage{} msg.Topic = t.topic msg.Value = sarama.StringEncoder(line.Text) Kafka.MesChan(msg) } }
运行结果
当你对不同日志文件修改都有反馈时就代表运行成功啦!