Golang:将日志以Json格式输出到Kafka2

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Golang:将日志以Json格式输出到Kafka2

写日志到文件

这里定义一个名为fileWriter的类型,它需要实现LoggerWriter的接口。

先看类型的定义:

type fileWriter struct {
  file     *os.File
  lastHour int64
  Path     string
}

包含四个字段:

  • file 要输出的文件对象。
  • lastHour 按照小时创建文件的需要。
  • Path 日志文件的根路径。

再看其实现的接口:

func (w *fileWriter) Ensure(entry *logEntry) (err error) {
  if w.file == nil {
    f, err := w.createFile(w.Path, entry.Ts)
    if err != nil {
      return err
    }
    w.lastHour = w.getTimeHour(entry.Ts)
    w.file = f
    return nil
  }
  currentHour := w.getTimeHour(entry.Ts)
  if w.lastHour != currentHour {
    _ = w.file.Close()
    f, err := w.createFile(w.Path, entry.Ts)
    if err != nil {
      return err
    }
    w.lastHour = currentHour
    w.file = f
  }
  return
}
func (w *fileWriter) Write(buf []byte) (err error) {
  buf = append(buf, '\n')
  _, err = w.file.Write(buf)
  return
}
func (w *fileWriter) Sync() error {
  return w.file.Sync()
}
func (w *fileWriter) Close() error {
  return w.file.Close()
}

Ensure 中的主要逻辑是创建当前要写入的文件对象,如果小时数变了,先把之前的关闭,再创建一个新的文件。

Write 把数据写入到文件对象,这里加了一个换行符,也就是说对于文件日志,其每条日志最后都会有一个换行符,这样比较方便阅读。

Sync 调用文件对象的Sync方法,将日志从操作系统缓存刷到磁盘。

Close 关闭当前文件对象。

写日志到Kafka

这里定义一个名为kafkaWriter的类型,它也需要实现LoggerWriter的接口。

先看其结构体定义:

type kafkaWriter struct {
  Topic     string
  Address   string
  writer    *kafka.Writer
  batchSize int
}

这里包含四个字段:

Topic 写Kafka时需要一个主题,这里默认当前Logger中所有日志使用同一个主题。

Address Kafka的访问地址。

writer  向Kafka写数据时使用的Writer,这里集成的是:github.com/segmentio/kafka-go,支持自动重试和重连。

batchSize Kafka写日志的批次大小,批量写可以提高日志的写效率。

再看其实现的接口:

func (w *kafkaWriter) Ensure(curTime time.Time) (err error) {
  if w.writer == nil {
    w.writer = &kafka.Writer{
      Addr:      kafka.TCP(w.Address),
      Topic:     w.Topic,
      BatchSize: w.batchSize,
      Async:     true,
    }
  }
  return
}
func (w *kafkaWriter) Write(buf []byte) (err error) {
  // buf will be reused by ylog when this method return,
  // with aysnc write, we need copy data to a new slice
  kbuf := append([]byte(nil), buf...)
  err = w.writer.WriteMessages(context.Background(),
    kafka.Message{Value: kbuf},
  )
  return
}
func (w *kafkaWriter) Sync() error {
  return nil
}
func (w *kafkaWriter) Close() error {
  return w.writer.Close()
}

这里采用的是异步发送到Kafka的方式,WriteMessages方法不会阻塞,因为传入的buf要被ylog重用,所以这里copy了一下。异步还会存在的一个问题就是不会返回错误,可能丢失数据,不过对于日志这种数据,没有那么严格的要求,也可以接受。

如果采用同步发送,因为批量发送比较有效率,这里可以攒几条再发,但日志比较稀疏时,可能短时间很难攒够,就会出现长时间等不到日志的情况,所以还要有个超时机制,这有点麻烦,不过我也写了一个版本,有兴趣的可以去看看:github.com/bosima/ylog…

接口的组装

有了格式化接口和写日志接口,下一步就是将它们组装起来,以实现相应的处理能力。

首先是创建它们,因为我这里也没有动态配置的需求,所以就放到创建Logger实例的时候了,这样比较简单。


func NewYesLogger(opts ...Option) (logger *YesLogger) {
  logger = &YesLogger{}
  ...
  logger.writer = NewFileWriter("logs")
  logger.formatter = NewTextFormatter()
  for _, opt := range opts {
    opt(logger)
  }
  ...
  return
}

可以看到默认的formatter是textFormatter,默认的writer是fileWriter。这个函数传入的Option其实是个函数,在下边的opt(logger)中会执行它们,所以使用其它的Formatter或者Writer可以这样做:

logger := ylog.NewYesLogger(
    ...
    ylog.Writer(ylog.NewKafkaWriter(address, topic, writeBatchSize)),
    ylog.Formatter(ylog.NewJsonFormatter()),
)

这里 ylog.Writer 和 ylog.Formatter 就是符合Option类型的函数,调用它们可以设置不同的Formatter和Writer。

然后怎么使用它们呢?

...
l.formatter.Format(entry, &buf)
l.writer.Ensure(entry)
err := l.writer.Write(buf)
...

当 logEntry 进入消息处理环节后,首先调用formatter的Format方法格式化logEntry;然后调用了writer的Ensure方法确保writer已经准备好,最后调用writer的Write方法将格式化之后的数据输出到对应的目标。

为什么不将Ensure方法放到Write中呢?这是因为目前写文本日志的时候需要根据logEntry中的日志时间创建日志文件,这样就需要给Writer传递两个参数,有点别扭,所以这里将它们分开了。

如何提高日志处理的吞吐量

Kafka的吞吐量是很高的,那么如果放到ylog自身来说,如何提高它的吞吐量呢?

首先想到的就是Channel,可以使用有缓冲的Channel模拟一个队列,生产者不停的向Channel发送数据,如果Writer可以一直在缓冲被填满之前将数据取走,那么理论上说生产者就是非阻塞的,相比同步输出到某个Writer,没有直接磁盘IO、网络IO,日志处理的吞吐量必将大幅提升。

定义一个Channel,其容量默认为当前机器逻辑处理器的数量:

logger.pipe = make(chan *logEntry, runtime.NumCPU())

接收数据的代码:

  for {
    select {
    case entry := <-l.pipe:
      // reuse the slice memory
      buf = buf[:0]
      l.formatter.Format(entry, &buf)
      l.writer.Ensure(entry.Ts)
      err := l.writer.Write(buf)
    ...
    }
  }

实际效果怎么样呢?看下Benchmark:

goos: darwin
goarch: amd64
pkg: github.com/bosima/ylog
cpu: Intel(R) Core(TM) i5-8259U CPU @ 2.30GHz
BenchmarkInfo-8      1332333         871.6 ns/op       328 B/op        4 allocs/op

这个结果可以和zerolog、zap等高性能日志库一较高下了,当然目前可以做的事情要比它们简单很多。

如果对Java有所了解的同学应该听说过log4j,在log4j2中引入了一个名为Disruptor的组件,它让日志处理飞快了起来,受到很多Java开发者的追捧。Disruptor之所以这么厉害,是因为它使用了无锁并发、环形队列、缓存行填充等多种高级技术。

相比之下,Golang的Channel虽然也使用了环形缓冲,但是还是使用了锁,作为队列来说性能并不是最优的。

Golang中有没有类似的东西呢?最近出来的ZenQ可能是一个不错的选择,不过看似还不太稳定,过段时间再尝试下。有兴趣的可以去看看:github.com/alphadose/Z…


好了,以上就是本文的主要内容。关于ylog的介绍也告一段落了,后续会在Github上持续更新,增加更多有用的功能,并不断优化处理性能,欢迎关注:github.com/bosima/ylog


相关文章
|
24天前
|
运维 监控 Cloud Native
一行代码都不改,Golang 应用链路指标日志全知道
本文将通过阿里云开源的 Golang Agent,帮助用户实现“一行代码都不改”就能获取到应用产生的各种观测数据,同时提升运维团队和研发团队的幸福感。
|
1月前
|
消息中间件 JSON NoSQL
从 ES Kafka Mongodb Restful ... 取到 json 之后
JSON 是一种广泛使用的数据交换格式,但其计算和处理能力有限。esProc SPL 是一款强大的开源计算引擎,能够高效解析 JSON 数据,并支持复杂的过滤、分组、连接等操作。它不仅兼容多种数据源,如 RESTful、ElasticSearch、MongoDB 和 Kafka,还提供了游标对象处理大数据流,支持与 Java 应用无缝集成,实现灵活的业务逻辑处理。
|
3月前
|
Prometheus Cloud Native Go
Golang语言之Prometheus的日志模块使用案例
这篇文章是关于如何在Golang语言项目中使用Prometheus的日志模块的案例,包括源代码编写、编译和测试步骤。
80 3
Golang语言之Prometheus的日志模块使用案例
|
3月前
|
JSON Go 数据格式
Golang语言结构体链式编程与JSON序列化
这篇文章是关于Go语言中结构体链式编程与JSON序列化的教程,详细介绍了JSON格式的基本概念、结构体的序列化与反序列化、结构体标签的使用以及如何实现链式编程。
47 4
|
4月前
|
JSON Go API
一文搞懂 Golang 高性能日志库 - Zap
一文搞懂 Golang 高性能日志库 - Zap
354 2
|
4月前
分享一种接口的日志格式
分享一种接口的日志格式
57 13
|
4月前
|
人工智能 数据库连接 Go
Golang 搭建 WebSocket 应用(五) - 消息推送日志
Golang 搭建 WebSocket 应用(五) - 消息推送日志
47 1
|
4月前
|
存储 JSON Go
一文搞懂 Golang 高性能日志库 Zerolog
一文搞懂 Golang 高性能日志库 Zerolog
480 0
|
4月前
|
JSON 安全 Go
[golang]使用logrus自定义日志模块
[golang]使用logrus自定义日志模块
|
4月前
|
JSON Go 数据格式
[golang]标准库-json
[golang]标准库-json

热门文章

最新文章