前言
在之前我们j基于sarama
,tail
还有go-ini
实现了日志收集系统客户端的编写,但是我们梳理一下可以发现,该客户端还存在一些问题:
- 客户端一次只能读取一个日志文件,无法同时读取多个分区
- 无法管理日志存放的分区(topic)
那我们一个如何去解决这个问题呢?在前两篇文章中我们介绍了etcd,它通过可以存储键值对并且通过watch
操作来实现对键值对的实时监控,那我们能不能尝试用`etcd``来储存日志文件信息与对应分区信息?这就是我们今天这篇文章所探究的主题.
初步实现的流程
存储数据格式
这里为了存储数据方便,我们利用json格式来存储数据,示例如下:
[ { "path": "G:/goproject/-goroutine-/log-agent/log/log1", "topic": "web.log" }, { "path": "G:/goproject/-goroutine-/log-agent/log/log2", "topic": "s4.log" } ]
etcd初始化的编写
在之前有段etcd的博文中,我们已经介绍过etcd的基本使用,这里不做赘述,首先我们在log-agent文件夹下创建etcd文件夹,创建etcd.go
文件,编写etcd的初始化:
func Init(address []string) (err error) { client, err = clientv3.New(clientv3.Config{ Endpoints: address, DialTimeout: 5 * time.Second, }) if err != nil { logrus.Error("etcd client connect failed,err:%v", err) return } return }
然后 在main.go
文件中调用:
err = etcd.Init(ConfigObj.Etcdaddress.Addr) if err != nil { logrus.Error("InitEtcd failed, err:%v", err) return } logrus.Infof("InitEtcd success")
通过etcd拉取要收集文件的配置项
在初始化etcd后,我们就要通过etcd来拉取要收集文件的配置项了,首先定义一个结构体来接收信息:
type collectEntry struct { Path string `json:"path"` Topic string `json:"topic"` }
然后创建拉取配置项的函数:
func GetConf(key string) (err error, collectEntryList []collectEntry) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() response, err := client.Get(ctx, key) if err != nil { logrus.Error("get conf from etcd failed,err:%v", err) return } if len(response.Kvs) == 0 { logrus.Warningf("get len:0 conf from etcd failed,err:%v", err) return } fmt.Println(response.Kvs[0].Value) //此时还是json字符串 err = json.Unmarshal(response.Kvs[0].Value, &collectEntryList) //把值反序列化到collectEntryList if err != nil { logrus.Error("json unmarshal failed,err:%v", err) return } return }
然后在main.go
里面调用一下就可以了:
//拉取要收集日志文件的配置项 err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key) if err != nil { logrus.Error("GetConf failed, err:%v", err) return } fmt.Println(collectEntryList)
尝试用之前的demo设置一下配置文件中的key对应的value:
package main import ( "context" "fmt" clientv3 "go.etcd.io/etcd/client/v3" "time" ) func main() { cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{"127.0.0.1:2379"}, //服务端通信端口 DialTimeout: 5 * time.Second, //连接超时时间 }) //put ctx, cancel := context.WithTimeout(context.Background(), time.Second) str := "[{\"path\":\"G:/goproject/-goroutine-/log-agent/log/log1\",\"topic\":\"web.log\"},{\"path\":\"G:/goproject/-goroutine-/log-agent/log/log2\",\"topic\":\"s4.log\"}]" _, err = cli.Put(ctx, "collect_log_conf", str) cancel() if err != nil { fmt.Println("put failed,err:%v", err) return } }
运行就可以看到我们接收到了配置项了:
涉及改动处的源代码
- 配置文件
[kafka] address=127.0.0.1:9092 topic=test1.log chan_size=100000 [etcd] address=127.0.0.1:2379 collect_key=collect_log_conf [collect] logfile_path:G:\goproject\-goroutine-\log-agent\log\log1
main.go
package main import ( "fmt" "github.com/Shopify/sarama" "github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus" "github.com/go-ini/ini" "log-agent/Kafka" "log-agent/etcd" "log-agent/tailFile" "strings" "time" ) type Config struct { Kafakaddress Kafkaddress `ini:"kafka"` LogFilePath LogFilePath `ini:"collect"` Etcdaddress EtcdAddress `ini:"etcd"` } type Kafkaddress struct { Addr []string `ini:"address"` Topic string `ini:"topic"` MessageSize int64 `ini:"chan_size"` } type LogFilePath struct { Path string `ini:"logfile_path"` } type EtcdAddress struct { Addr []string `ini:"address"` Key string `ini:"collect_key"` } func run(config *Config) (err error) { for { line, ok := <-tailFile.TailObj.Lines if !ok { logrus.Error("read from tail failed,err:", err) time.Sleep(2 * time.Second) continue } if len(strings.Trim(line.Text, "\r")) == 0 { continue } msg := &sarama.ProducerMessage{} msg.Topic = config.Kafakaddress.Topic msg.Value = sarama.StringEncoder(line.Text) Kafka.MesChan(msg) } } func main() { //读取配置文件,获取配置信息 filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini" ConfigObj := new(Config) err := ini.MapTo(ConfigObj, filename) if err != nil { logrus.Error("%s Load failed,err:", filename, err) } //初始化Kafka err = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize) if err != nil { logrus.Error("InitKafka failed, err:%v", err) return } logrus.Infof("InitKafka success") //初始化etcd err = etcd.Init(ConfigObj.Etcdaddress.Addr) if err != nil { logrus.Error("InitEtcd failed, err:%v", err) return } logrus.Infof("InitEtcd success") //拉取要收集日志文件的配置项 err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key) if err != nil { logrus.Error("GetConf failed, err:%v", err) return } fmt.Println(collectEntryList) //初始化tail err = tailFile.InitTail(ConfigObj.LogFilePath.Path) if err != nil { logrus.Error("InitTail failed, err:%v", err) return } logrus.Infof("InitTail success") //利用sarama报发送消息到Kafka中 err = run(ConfigObj) if err != nil { logrus.Error("run failed, err:%v", err) return } }
etcd.go
package etcd import ( "encoding/json" "fmt" "github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus" clientv3 "go.etcd.io/etcd/client/v3" "golang.org/x/net/context" "time" ) var client *clientv3.Client type collectEntry struct { Path string `json:"path"` Topic string `json:"topic"` } func Init(address []string) (err error) { client, err = clientv3.New(clientv3.Config{ Endpoints: address, DialTimeout: 5 * time.Second, }) if err != nil { logrus.Error("etcd client connect failed,err:%v", err) return } return } func GetConf(key string) (err error, collectEntryList []collectEntry) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) response, err := client.Get(ctx, key) cancel() if err != nil { logrus.Error("get conf from etcd failed,err:%v", err) return } if len(response.Kvs) == 0 { logrus.Warningf("get len:0 conf from etcd failed,err:%v", err) return } fmt.Println(response.Kvs[0].Value) //此时还是json字符串 err = json.Unmarshal(response.Kvs[0].Value, &collectEntryList) //把值反序列化到collectEntryList if err != nil { logrus.Error("json unmarshal failed,err:%v", err) return } return }