上节回顾
在上一篇文章中我们介绍了编写客户端的四个步骤,分别是:
- 读取配置文件,寻找日志路径
- 初始化服务
- 根据日志路径l来收集日志
- 将收集到的日志发送Kafka中
关于上述的内容博主画了一个思维导图(有点丑,大家勉强看看,以前没画过):
对了,为了画这个思维导图昨天博主找了好久思维导图的软件,最后发现了Vscode上面有一个非常不错的插件:drawio
,样子大概是这样的:
大家如果没有合适的思维导图绘制根据,可以试试这个。好了,话不多说,开始今天的内容。
读取配置信息,获取日志信息
前言
这里读取日志信息我们选择的是go-ini
这一第三方包,具体的使用方法在我前面的博文这种有所介绍,大家不了解的话可以参考:
go语言并发实战——日志收集系统(五) 基于go-ini包读取日志收集服务的配置文件
需求分析
这里配置文件中我们主要要知道两个消息,一个Kafka的配置信息,一个是日志文件的路径,配置文件应该是这样的:
[kafka] address=127.0.0.1:9092 topic=web.log chan_size=100000 [collect] logfile_path:G:\goproject\-goroutine-\log-agent\log\log1
而为了方便我们利用反射来读取配置文件,我们来创建几个结构体来存储我们读到的配置信息:
- Kafka结构体
type Kafkaddress struct { Addr []string `ini:"address"` Topic string `ini:"topic"` MessageSize int64 `ini:"chan_size"` }
- tail结构体
type LogFilePath struct { Path string `ini:"logfile_path"` }
- 总的结构体
type Config struct { Kafakaddress Kafkaddress `ini:"kafka"` LogFilePath LogFilePath `ini:"collect"` }
然后读取配置信息放入结构体中:
//读取配置文件,获取配置信息 filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini" ConfigObj := new(Config) err := ini.MapTo(ConfigObj, filename) if err != nil { logrus.Error("%s Load failed,err:", filename, err) }
这样我们就获得我们所需要的配置消息了
初始化服务
前言
这里我们初始服务主要是初始化Kafka以及tail包,利用它们读取日志信息并将其发送Kafka中,具体介绍可以参考前面的几篇文章:
go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费
go语言并发实战——日志收集系统(四) 利用tail包实现对日志文件的实时监控
Kafka的初始化
//初始化Kafka err = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize) if err != nil { logrus.Error("InitKafka failed, err:%v", err) return } logrus.Infof("InitKafka success")
tail的初始化
func InitTail(filename string) (err error) { config := tail.Config{ Follow: true, ReOpen: true, MustExist: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: 2}, } TailObj, err = tail.TailFile(filename, config) if err != nil { logrus.Error("tail create tailObj for path:%s,err:%v", filename, err) return } return }
根据路径来读取日志
需求分析
一般我们常见的想法会是我们先将日志消息读取出来然后发送给Kafka但是这样的串行操作无疑会大大增加程序的运行时间,所以这里我们选择将读到的日志信息打包发送到管道中,然后再看起一个协程来发送数据,这样实现了读取与发送的一步操作,可以有效降低程序的运行时间,而上面出现的MessageSiz
也就是我们设置的管道大小
func run(config *Config) (err error) { for { line, ok := <-tailFile.TailObj.Lines if !ok { logrus.Error("read from tail failed,err:", err) time.Sleep(2 * time.Second) continue } msg := &sarama.ProducerMessage{} msg.Topic = config.Kafakaddress.Topic msg.Value = sarama.StringEncoder(line.Text) Kafka.MsgChan <- msg }
发送消息到KafKa
func SendMsg() { for { select { case msg := <-MsgChan: pid, offset, err := client.SendMessage(msg) if err != nil { logrus.Error("send msg to kafka failed,err:%v", err) return } logrus.Info("send msg to kafka success,pid:%d,offset:%d", pid, offset) } } }
完整代码
- main.go
package main import ( "github.com/Shopify/sarama" "github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus" "github.com/go-ini/ini" "log-agent/Kafka" "log-agent/tailFile" "time" ) type Config struct { Kafakaddress Kafkaddress `ini:"kafka"` LogFilePath LogFilePath `ini:"collect"` } type Kafkaddress struct { Addr []string `ini:"address"` Topic string `ini:"topic"` MessageSize int64 `ini:"chan_size"` } type LogFilePath struct { Path string `ini:"logfile_path"` } func run(config *Config) (err error) { for { line, ok := <-tailFile.TailObj.Lines if !ok { logrus.Error("read from tail failed,err:", err) time.Sleep(2 * time.Second) continue } msg := &sarama.ProducerMessage{} msg.Topic = config.Kafakaddress.Topic msg.Value = sarama.StringEncoder(line.Text) Kafka.MsgChan <- msg } } func main() { //读取配置文件,获取配置信息 filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini" ConfigObj := new(Config) err := ini.MapTo(ConfigObj, filename) if err != nil { logrus.Error("%s Load failed,err:", filename, err) } //初始化Kafka err = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize) if err != nil { logrus.Error("InitKafka failed, err:%v", err) return } logrus.Infof("InitKafka success") //初始化tail err = tailFile.InitTail(ConfigObj.LogFilePath.Path) if err != nil { logrus.Error("InitTail failed, err:%v", err) return } logrus.Infof("InitTail success") //利用sarama报发送消息到Kafka中 err = run(ConfigObj) }
- Kafka.go
package Kafka import ( "github.com/Shopify/sarama" "github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus" ) var ( client sarama.SyncProducer MsgChan chan *sarama.ProducerMessage ) func InitKafka(address []string, Chan_size int64) (err error) { //初始化MsgChan MsgChan = make(chan *sarama.ProducerMessage, Chan_size) //初始化config config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.Return.Successes = true //连接Kafka client, err = sarama.NewSyncProducer(address, config) if err != nil { logrus.Error("kafka connect error,err:%v", err) return } go SendMsg() return } func SendMsg() { for { select { case msg := <-MsgChan: pid, offset, err := client.SendMessage(msg) if err != nil { logrus.Error("send msg to kafka failed,err:%v", err) return } logrus.Info("send msg to kafka success,pid:%d,offset:%d", pid, offset) } } }
- tailFile.go
package tailFile import ( "github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus" "github.com/hpcloud/tail" ) var TailObj *tail.Tail func InitTail(filename string) (err error) { config := tail.Config{ Follow: true, ReOpen: true, MustExist: true, Poll: true, Location: &tail.SeekInfo{Offset: 0, Whence: 2}, } TailObj, err = tail.TailFile(filename, config) if err != nil { logrus.Error("tail create tailObj for path:%s,err:%v", filename, err) return } return }
运行结果
在运行前打开ZooKeeper与Kafka,然后对日志文件进行操作,会出现:
出现
2024/04/22 20:26:34 Seeked G:\goproject\-goroutine-\log-agent\log\log1 - &{Offset:0 Whence:2} INFO[0013] send msg to kafka success,pid:%d,offset:%d0 3 INFO[0013] send msg to kafka success,pid:%d,offset:%d0 4
就代表运行成功了。
结语
今天的有关内容就到此为止啦,有问题的话欢迎在评论区评论,大家可以集思广益,如果你觉得博主的内容对你有帮助,欢迎三连一下和订阅专栏
如果博主文章里面有什么错误页欢迎斧正(毕竟博主页只是个小蒟蒻鸡),下篇文章我们要进入etcd的有关学习了,好了,大家下篇文章见!