go语言并发实战——日志收集系统(九) 基于etcd的代码重构思考与初步实现

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: go语言并发实战——日志收集系统(九) 基于etcd的代码重构思考与初步实现

前言

在之前我们j基于sarama,tail还有go-ini实现了日志收集系统客户端的编写,但是我们梳理一下可以发现,该客户端还存在一些问题:

  • 客户端一次只能读取一个日志文件,无法同时读取多个分区
  • 无法管理日志存放的分区(topic)
    那我们一个如何去解决这个问题呢?在前两篇文章中我们介绍了etcd,它通过可以存储键值对并且通过watch操作来实现对键值对的实时监控,那我们能不能尝试用`etcd``来储存日志文件信息与对应分区信息?这就是我们今天这篇文章所探究的主题.

初步实现的流程

存储数据格式

这里为了存储数据方便,我们利用json格式来存储数据,示例如下:

[
    {
        "path": "G:/goproject/-goroutine-/log-agent/log/log1",
        "topic": "web.log"
    },
    {
        "path": "G:/goproject/-goroutine-/log-agent/log/log2",
        "topic": "s4.log"
    }
]

etcd初始化的编写

在之前有段etcd的博文中,我们已经介绍过etcd的基本使用,这里不做赘述,首先我们在log-agent文件夹下创建etcd文件夹,创建etcd.go文件,编写etcd的初始化:

func Init(address []string) (err error) {
  client, err = clientv3.New(clientv3.Config{
    Endpoints:   address,
    DialTimeout: 5 * time.Second,
  })
  if err != nil {
    logrus.Error("etcd client connect failed,err:%v", err)
    return
  }
  return
}

然后 在main.go文件中调用:

err = etcd.Init(ConfigObj.Etcdaddress.Addr)
  if err != nil {
    logrus.Error("InitEtcd failed, err:%v", err)
    return
  }
  logrus.Infof("InitEtcd success")

通过etcd拉取要收集文件的配置项

在初始化etcd后,我们就要通过etcd来拉取要收集文件的配置项了,首先定义一个结构体来接收信息:

type collectEntry struct {
  Path  string `json:"path"`
  Topic string `json:"topic"`
}

然后创建拉取配置项的函数:

func GetConf(key string) (err error, collectEntryList []collectEntry) {
  ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  defer cancel()
  response, err := client.Get(ctx, key)
  if err != nil {
    logrus.Error("get conf from etcd failed,err:%v", err)
    return
  }
  if len(response.Kvs) == 0 {
    logrus.Warningf("get len:0 conf from etcd failed,err:%v", err)
    return
  }
  fmt.Println(response.Kvs[0].Value)                             //此时还是json字符串
  err = json.Unmarshal(response.Kvs[0].Value, &collectEntryList) //把值反序列化到collectEntryList
  if err != nil {
    logrus.Error("json unmarshal failed,err:%v", err)
    return
  }
  return
}

然后在main.go里面调用一下就可以了:

//拉取要收集日志文件的配置项
  err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)
  if err != nil {
    logrus.Error("GetConf failed, err:%v", err)
    return
  }
  fmt.Println(collectEntryList)

尝试用之前的demo设置一下配置文件中的key对应的value:

package main
import (
  "context"
  "fmt"
  clientv3 "go.etcd.io/etcd/client/v3"
  "time"
)
func main() {
  cli, err := clientv3.New(clientv3.Config{
    Endpoints:   []string{"127.0.0.1:2379"}, //服务端通信端口
    DialTimeout: 5 * time.Second,            //连接超时时间
  })
  //put
  ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  str := "[{\"path\":\"G:/goproject/-goroutine-/log-agent/log/log1\",\"topic\":\"web.log\"},{\"path\":\"G:/goproject/-goroutine-/log-agent/log/log2\",\"topic\":\"s4.log\"}]"
  _, err = cli.Put(ctx, "collect_log_conf", str)
  cancel()
  if err != nil {
    fmt.Println("put failed,err:%v", err)
    return
  }
}

运行就可以看到我们接收到了配置项了:

涉及改动处的源代码

  • 配置文件
[kafka]
address=127.0.0.1:9092
topic=test1.log
chan_size=100000
[etcd]
address=127.0.0.1:2379
collect_key=collect_log_conf
[collect]
logfile_path:G:\goproject\-goroutine-\log-agent\log\log1
  • main.go
package main
import (
  "fmt"
  "github.com/Shopify/sarama"
  "github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
  "github.com/go-ini/ini"
  "log-agent/Kafka"
  "log-agent/etcd"
  "log-agent/tailFile"
  "strings"
  "time"
)
type Config struct {
  Kafakaddress Kafkaddress `ini:"kafka"`
  LogFilePath  LogFilePath `ini:"collect"`
  Etcdaddress  EtcdAddress `ini:"etcd"`
}
type Kafkaddress struct {
  Addr        []string `ini:"address"`
  Topic       string   `ini:"topic"`
  MessageSize int64    `ini:"chan_size"`
}
type LogFilePath struct {
  Path string `ini:"logfile_path"`
}
type EtcdAddress struct {
  Addr []string `ini:"address"`
  Key  string   `ini:"collect_key"`
}
func run(config *Config) (err error) {
  for {
    line, ok := <-tailFile.TailObj.Lines
    if !ok {
      logrus.Error("read from tail failed,err:", err)
      time.Sleep(2 * time.Second)
      continue
    }
    if len(strings.Trim(line.Text, "\r")) == 0 {
      continue
    }
    msg := &sarama.ProducerMessage{}
    msg.Topic = config.Kafakaddress.Topic
    msg.Value = sarama.StringEncoder(line.Text)
    Kafka.MesChan(msg)
  }
}
func main() {
  //读取配置文件,获取配置信息
  filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"
  ConfigObj := new(Config)
  err := ini.MapTo(ConfigObj, filename)
  if err != nil {
    logrus.Error("%s Load failed,err:", filename, err)
  }
  //初始化Kafka
  err = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)
  if err != nil {
    logrus.Error("InitKafka failed, err:%v", err)
    return
  }
  logrus.Infof("InitKafka success")
  //初始化etcd
  err = etcd.Init(ConfigObj.Etcdaddress.Addr)
  if err != nil {
    logrus.Error("InitEtcd failed, err:%v", err)
    return
  }
  logrus.Infof("InitEtcd success")
  //拉取要收集日志文件的配置项
  err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)
  if err != nil {
    logrus.Error("GetConf failed, err:%v", err)
    return
  }
  fmt.Println(collectEntryList)
  //初始化tail
  err = tailFile.InitTail(ConfigObj.LogFilePath.Path)
  if err != nil {
    logrus.Error("InitTail failed, err:%v", err)
    return
  }
  logrus.Infof("InitTail success")
  //利用sarama报发送消息到Kafka中
  err = run(ConfigObj)
  if err != nil {
    logrus.Error("run failed, err:%v", err)
    return
  }
}
  • etcd.go
package etcd
import (
  "encoding/json"
  "fmt"
  "github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
  clientv3 "go.etcd.io/etcd/client/v3"
  "golang.org/x/net/context"
  "time"
)
var client *clientv3.Client
type collectEntry struct {
  Path  string `json:"path"`
  Topic string `json:"topic"`
}
func Init(address []string) (err error) {
  client, err = clientv3.New(clientv3.Config{
    Endpoints:   address,
    DialTimeout: 5 * time.Second,
  })
  if err != nil {
    logrus.Error("etcd client connect failed,err:%v", err)
    return
  }
  return
}
func GetConf(key string) (err error, collectEntryList []collectEntry) {
  ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  response, err := client.Get(ctx, key)
  cancel()
  if err != nil {
    logrus.Error("get conf from etcd failed,err:%v", err)
    return
  }
  if len(response.Kvs) == 0 {
    logrus.Warningf("get len:0 conf from etcd failed,err:%v", err)
    return
  }
  fmt.Println(response.Kvs[0].Value)                             //此时还是json字符串
  err = json.Unmarshal(response.Kvs[0].Value, &collectEntryList) //把值反序列化到collectEntryList
  if err != nil {
    logrus.Error("json unmarshal failed,err:%v", err)
    return
  }
  return
}
相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
2天前
|
JSON 中间件 Go
Go语言Web框架Gin介绍
【7月更文挑战第19天】Gin是一个功能强大、高性能且易于使用的Go语言Web框架。它提供了路由、中间件、参数绑定等丰富的功能,帮助开发者快速构建高质量的Web应用。通过本文的介绍,你应该对Gin框架有了初步的了解,并能够使用它来开发简单的Web服务。随着你对Gin的深入学习和实践,你将能够利用它构建更复杂、更强大的Web应用。
|
7天前
|
Cloud Native Java Go
为什么要学习Go语言?
GO logo的核心理念,即简单胜于复杂。使用现代斜体无衬线字体与三条简单的运动线相结合,形成一个类似于快速运动的两个轮子的标记,传达速度和效率。字母的圆形暗示了GO地鼠的眼睛,创造了一个熟悉的形状,让标记和吉祥物很好地搭配在一起。
23 4
|
2天前
|
Oracle 关系型数据库 MySQL
|
9天前
|
安全 Go
Go语言map并发安全,互斥锁和读写锁谁更优?
Go并发编程中,`sync.Mutex`提供独占访问,适合读写操作均衡或写操作频繁的场景;`sync.RWMutex`允许多个读取者并行,适用于读多写少的情况。明智选择锁可提升程序性能和稳定性。示例展示了如何在操作map时使用这两种锁。
12 0
|
9天前
|
安全 Go 开发者
Go语言map并发安全使用的正确姿势
在Go并发编程中,由于普通map不是线程安全的,多goroutine访问可能导致数据竞态。为保证安全,可使用`sync.Mutex`封装map或使用从Go 1.9开始提供的`sync.Map`。前者通过加锁手动同步,后者内置并发控制,适用于多goroutine共享。选择哪种取决于具体场景和性能需求。
10 0
|
11天前
|
JSON 测试技术 Go
零值在go语言和初始化数据
【7月更文挑战第10天】本文介绍在Go语言中如何初始化数据,未初始化的变量会有对应的零值:bool为`false`,int为`0`,byte和string为空,pointer、function、interface及channel为`nil`,slice和map也为`nil`。。本文档作为指南,帮助理解Go的数据结构和正确使用它们。
58 22
零值在go语言和初始化数据
|
13天前
|
安全 算法 程序员
在go语言中使用泛型和反射
【7月更文挑战第8天】本文介绍go支持泛型后,提升了代码复用,如操作切片、映射、通道的函数,以及自定义数据结构。 泛型适用于通用数据结构和函数,减少接口使用和类型断言。
79 1
在go语言中使用泛型和反射
|
15天前
|
缓存 编译器 Shell
回顾go语言基础中一些特别的概念
【7月更文挑战第6天】本文介绍Go语言基础涵盖包声明、导入、函数、变量、语句和表达式以及注释。零值可用类型如切片、互斥锁和缓冲,支持预分配容量以优化性能。
43 2
回顾go语言基础中一些特别的概念
|
11天前
|
JSON Java Go
Go 语言性能优化技巧
在Go语言中优化性能涉及数字字符串转换(如用`strconv.Itoa()`代替`fmt.Sprintf()`)、避免不必要的字符串到字节切片转换、预分配切片容量、使用`strings.Builder`拼接、有效利用并发(`goroutine`和`sync.WaitGroup`)、减少内存分配、对象重用(`sync.Pool`)、无锁编程、I/O缓冲、正则预编译和选择高效的序列化方法。这些策略能显著提升代码执行效率和系统资源利用率。
46 13