go语言并发实战——日志收集系统(九) 基于etcd的代码重构思考与初步实现

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: go语言并发实战——日志收集系统(九) 基于etcd的代码重构思考与初步实现

前言

在之前我们j基于sarama,tail还有go-ini实现了日志收集系统客户端的编写,但是我们梳理一下可以发现,该客户端还存在一些问题:

  • 客户端一次只能读取一个日志文件,无法同时读取多个分区
  • 无法管理日志存放的分区(topic)
    那我们一个如何去解决这个问题呢?在前两篇文章中我们介绍了etcd,它通过可以存储键值对并且通过watch操作来实现对键值对的实时监控,那我们能不能尝试用`etcd``来储存日志文件信息与对应分区信息?这就是我们今天这篇文章所探究的主题.

初步实现的流程

存储数据格式

这里为了存储数据方便,我们利用json格式来存储数据,示例如下:

[
    {
        "path": "G:/goproject/-goroutine-/log-agent/log/log1",
        "topic": "web.log"
    },
    {
        "path": "G:/goproject/-goroutine-/log-agent/log/log2",
        "topic": "s4.log"
    }
]

etcd初始化的编写

在之前有段etcd的博文中,我们已经介绍过etcd的基本使用,这里不做赘述,首先我们在log-agent文件夹下创建etcd文件夹,创建etcd.go文件,编写etcd的初始化:

func Init(address []string) (err error) {
  client, err = clientv3.New(clientv3.Config{
    Endpoints:   address,
    DialTimeout: 5 * time.Second,
  })
  if err != nil {
    logrus.Error("etcd client connect failed,err:%v", err)
    return
  }
  return
}

然后 在main.go文件中调用:

err = etcd.Init(ConfigObj.Etcdaddress.Addr)
  if err != nil {
    logrus.Error("InitEtcd failed, err:%v", err)
    return
  }
  logrus.Infof("InitEtcd success")

通过etcd拉取要收集文件的配置项

在初始化etcd后,我们就要通过etcd来拉取要收集文件的配置项了,首先定义一个结构体来接收信息:

type collectEntry struct {
  Path  string `json:"path"`
  Topic string `json:"topic"`
}

然后创建拉取配置项的函数:

func GetConf(key string) (err error, collectEntryList []collectEntry) {
  ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  defer cancel()
  response, err := client.Get(ctx, key)
  if err != nil {
    logrus.Error("get conf from etcd failed,err:%v", err)
    return
  }
  if len(response.Kvs) == 0 {
    logrus.Warningf("get len:0 conf from etcd failed,err:%v", err)
    return
  }
  fmt.Println(response.Kvs[0].Value)                             //此时还是json字符串
  err = json.Unmarshal(response.Kvs[0].Value, &collectEntryList) //把值反序列化到collectEntryList
  if err != nil {
    logrus.Error("json unmarshal failed,err:%v", err)
    return
  }
  return
}

然后在main.go里面调用一下就可以了:

//拉取要收集日志文件的配置项
  err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)
  if err != nil {
    logrus.Error("GetConf failed, err:%v", err)
    return
  }
  fmt.Println(collectEntryList)

尝试用之前的demo设置一下配置文件中的key对应的value:

package main
import (
  "context"
  "fmt"
  clientv3 "go.etcd.io/etcd/client/v3"
  "time"
)
func main() {
  cli, err := clientv3.New(clientv3.Config{
    Endpoints:   []string{"127.0.0.1:2379"}, //服务端通信端口
    DialTimeout: 5 * time.Second,            //连接超时时间
  })
  //put
  ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  str := "[{\"path\":\"G:/goproject/-goroutine-/log-agent/log/log1\",\"topic\":\"web.log\"},{\"path\":\"G:/goproject/-goroutine-/log-agent/log/log2\",\"topic\":\"s4.log\"}]"
  _, err = cli.Put(ctx, "collect_log_conf", str)
  cancel()
  if err != nil {
    fmt.Println("put failed,err:%v", err)
    return
  }
}

运行就可以看到我们接收到了配置项了:

涉及改动处的源代码

  • 配置文件
[kafka]
address=127.0.0.1:9092
topic=test1.log
chan_size=100000
[etcd]
address=127.0.0.1:2379
collect_key=collect_log_conf
[collect]
logfile_path:G:\goproject\-goroutine-\log-agent\log\log1
  • main.go
package main
import (
  "fmt"
  "github.com/Shopify/sarama"
  "github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
  "github.com/go-ini/ini"
  "log-agent/Kafka"
  "log-agent/etcd"
  "log-agent/tailFile"
  "strings"
  "time"
)
type Config struct {
  Kafakaddress Kafkaddress `ini:"kafka"`
  LogFilePath  LogFilePath `ini:"collect"`
  Etcdaddress  EtcdAddress `ini:"etcd"`
}
type Kafkaddress struct {
  Addr        []string `ini:"address"`
  Topic       string   `ini:"topic"`
  MessageSize int64    `ini:"chan_size"`
}
type LogFilePath struct {
  Path string `ini:"logfile_path"`
}
type EtcdAddress struct {
  Addr []string `ini:"address"`
  Key  string   `ini:"collect_key"`
}
func run(config *Config) (err error) {
  for {
    line, ok := <-tailFile.TailObj.Lines
    if !ok {
      logrus.Error("read from tail failed,err:", err)
      time.Sleep(2 * time.Second)
      continue
    }
    if len(strings.Trim(line.Text, "\r")) == 0 {
      continue
    }
    msg := &sarama.ProducerMessage{}
    msg.Topic = config.Kafakaddress.Topic
    msg.Value = sarama.StringEncoder(line.Text)
    Kafka.MesChan(msg)
  }
}
func main() {
  //读取配置文件,获取配置信息
  filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"
  ConfigObj := new(Config)
  err := ini.MapTo(ConfigObj, filename)
  if err != nil {
    logrus.Error("%s Load failed,err:", filename, err)
  }
  //初始化Kafka
  err = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)
  if err != nil {
    logrus.Error("InitKafka failed, err:%v", err)
    return
  }
  logrus.Infof("InitKafka success")
  //初始化etcd
  err = etcd.Init(ConfigObj.Etcdaddress.Addr)
  if err != nil {
    logrus.Error("InitEtcd failed, err:%v", err)
    return
  }
  logrus.Infof("InitEtcd success")
  //拉取要收集日志文件的配置项
  err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)
  if err != nil {
    logrus.Error("GetConf failed, err:%v", err)
    return
  }
  fmt.Println(collectEntryList)
  //初始化tail
  err = tailFile.InitTail(ConfigObj.LogFilePath.Path)
  if err != nil {
    logrus.Error("InitTail failed, err:%v", err)
    return
  }
  logrus.Infof("InitTail success")
  //利用sarama报发送消息到Kafka中
  err = run(ConfigObj)
  if err != nil {
    logrus.Error("run failed, err:%v", err)
    return
  }
}
  • etcd.go
package etcd
import (
  "encoding/json"
  "fmt"
  "github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
  clientv3 "go.etcd.io/etcd/client/v3"
  "golang.org/x/net/context"
  "time"
)
var client *clientv3.Client
type collectEntry struct {
  Path  string `json:"path"`
  Topic string `json:"topic"`
}
func Init(address []string) (err error) {
  client, err = clientv3.New(clientv3.Config{
    Endpoints:   address,
    DialTimeout: 5 * time.Second,
  })
  if err != nil {
    logrus.Error("etcd client connect failed,err:%v", err)
    return
  }
  return
}
func GetConf(key string) (err error, collectEntryList []collectEntry) {
  ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  response, err := client.Get(ctx, key)
  cancel()
  if err != nil {
    logrus.Error("get conf from etcd failed,err:%v", err)
    return
  }
  if len(response.Kvs) == 0 {
    logrus.Warningf("get len:0 conf from etcd failed,err:%v", err)
    return
  }
  fmt.Println(response.Kvs[0].Value)                             //此时还是json字符串
  err = json.Unmarshal(response.Kvs[0].Value, &collectEntryList) //把值反序列化到collectEntryList
  if err != nil {
    logrus.Error("json unmarshal failed,err:%v", err)
    return
  }
  return
}
相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
2月前
|
安全 Go
用 Zap 轻松搞定 Go 语言中的结构化日志
在现代应用程序开发中,日志记录至关重要。Go 语言中有许多日志库,而 Zap 因其高性能和灵活性脱颖而出。本文详细介绍如何在 Go 项目中使用 Zap 进行结构化日志记录,并展示如何定制日志输出,满足生产环境需求。通过基础示例、SugaredLogger 的便捷使用以及自定义日志配置,帮助你在实际开发中高效管理日志。
62 1
|
1月前
|
存储 监控 安全
什么是事件日志管理系统?事件日志管理系统有哪些用处?
事件日志管理系统是IT安全的重要工具,用于集中收集、分析和解释来自组织IT基础设施各组件的事件日志,如防火墙、路由器、交换机等,帮助提升网络安全、实现主动威胁检测和促进合规性。系统支持多种日志类型,包括Windows事件日志、Syslog日志和应用程序日志,通过实时监测、告警及可视化分析,为企业提供强大的安全保障。然而,实施过程中也面临数据量大、日志管理和分析复杂等挑战。EventLog Analyzer作为一款高效工具,不仅提供实时监测与告警、可视化分析和报告功能,还支持多种合规性报告,帮助企业克服挑战,提升网络安全水平。
|
2月前
|
缓存 监控 前端开发
在 Go 语言中实现 WebSocket 实时通信的应用,包括 WebSocket 的简介、Go 语言的优势、基本实现步骤、应用案例、注意事项及性能优化策略,旨在帮助开发者构建高效稳定的实时通信系统
本文深入探讨了在 Go 语言中实现 WebSocket 实时通信的应用,包括 WebSocket 的简介、Go 语言的优势、基本实现步骤、应用案例、注意事项及性能优化策略,旨在帮助开发者构建高效稳定的实时通信系统。
115 1
|
2月前
|
存储 负载均衡 监控
如何利用Go语言的高效性、并发支持、简洁性和跨平台性等优势,通过合理设计架构、实现负载均衡、构建容错机制、建立监控体系、优化数据存储及实施服务治理等步骤,打造稳定可靠的服务架构。
在数字化时代,构建高可靠性服务架构至关重要。本文探讨了如何利用Go语言的高效性、并发支持、简洁性和跨平台性等优势,通过合理设计架构、实现负载均衡、构建容错机制、建立监控体系、优化数据存储及实施服务治理等步骤,打造稳定可靠的服务架构。
45 1
|
2月前
|
数据库连接 Go 数据库
Go语言中的错误注入与防御编程。错误注入通过模拟网络故障、数据库错误等,测试系统稳定性
本文探讨了Go语言中的错误注入与防御编程。错误注入通过模拟网络故障、数据库错误等,测试系统稳定性;防御编程则强调在编码时考虑各种错误情况,确保程序健壮性。文章详细介绍了这两种技术在Go语言中的实现方法及其重要性,旨在提升软件质量和可靠性。
40 1
|
2月前
|
Go 调度 开发者
探索Go语言中的并发模式:goroutine与channel
在本文中,我们将深入探讨Go语言中的核心并发特性——goroutine和channel。不同于传统的并发模型,Go语言的并发机制以其简洁性和高效性著称。本文将通过实际代码示例,展示如何利用goroutine实现轻量级的并发执行,以及如何通过channel安全地在goroutine之间传递数据。摘要部分将概述这些概念,并提示读者本文将提供哪些具体的技术洞见。
|
2月前
|
存储 Linux Docker
centos系统清理docker日志文件
通过以上方法,可以有效清理和管理CentOS系统中的Docker日志文件,防止日志文件占用过多磁盘空间。选择合适的方法取决于具体的应用场景和需求,可以结合手动清理、logrotate和调整日志驱动等多种方式,确保系统的高效运行。
172 2
|
3月前
|
Java 大数据 Go
Go语言:高效并发的编程新星
【10月更文挑战第21】Go语言:高效并发的编程新星
58 7
|
2月前
|
并行计算 安全 Go
Go语言的并发特性
【10月更文挑战第26天】Go语言的并发特性
20 1
|
3月前
|
安全 Go 调度
探索Go语言的并发模式:协程与通道的协同作用
Go语言以其并发能力闻名于世,而协程(goroutine)和通道(channel)是实现并发的两大利器。本文将深入了解Go语言中协程的轻量级特性,探讨如何利用通道进行协程间的安全通信,并通过实际案例演示如何将这两者结合起来,构建高效且可靠的并发系统。