Golang:将日志以Json格式输出到Kafka2

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Golang:将日志以Json格式输出到Kafka2

写日志到文件

这里定义一个名为fileWriter的类型,它需要实现LoggerWriter的接口。

先看类型的定义:

type fileWriter struct {
  file     *os.File
  lastHour int64
  Path     string
}

包含四个字段:

  • file 要输出的文件对象。
  • lastHour 按照小时创建文件的需要。
  • Path 日志文件的根路径。

再看其实现的接口:

func (w *fileWriter) Ensure(entry *logEntry) (err error) {
  if w.file == nil {
    f, err := w.createFile(w.Path, entry.Ts)
    if err != nil {
      return err
    }
    w.lastHour = w.getTimeHour(entry.Ts)
    w.file = f
    return nil
  }
  currentHour := w.getTimeHour(entry.Ts)
  if w.lastHour != currentHour {
    _ = w.file.Close()
    f, err := w.createFile(w.Path, entry.Ts)
    if err != nil {
      return err
    }
    w.lastHour = currentHour
    w.file = f
  }
  return
}
func (w *fileWriter) Write(buf []byte) (err error) {
  buf = append(buf, '\n')
  _, err = w.file.Write(buf)
  return
}
func (w *fileWriter) Sync() error {
  return w.file.Sync()
}
func (w *fileWriter) Close() error {
  return w.file.Close()
}

Ensure 中的主要逻辑是创建当前要写入的文件对象,如果小时数变了,先把之前的关闭,再创建一个新的文件。

Write 把数据写入到文件对象,这里加了一个换行符,也就是说对于文件日志,其每条日志最后都会有一个换行符,这样比较方便阅读。

Sync 调用文件对象的Sync方法,将日志从操作系统缓存刷到磁盘。

Close 关闭当前文件对象。

写日志到Kafka

这里定义一个名为kafkaWriter的类型,它也需要实现LoggerWriter的接口。

先看其结构体定义:

type kafkaWriter struct {
  Topic     string
  Address   string
  writer    *kafka.Writer
  batchSize int
}

这里包含四个字段:

Topic 写Kafka时需要一个主题,这里默认当前Logger中所有日志使用同一个主题。

Address Kafka的访问地址。

writer  向Kafka写数据时使用的Writer,这里集成的是:github.com/segmentio/kafka-go,支持自动重试和重连。

batchSize Kafka写日志的批次大小,批量写可以提高日志的写效率。

再看其实现的接口:

func (w *kafkaWriter) Ensure(curTime time.Time) (err error) {
  if w.writer == nil {
    w.writer = &kafka.Writer{
      Addr:      kafka.TCP(w.Address),
      Topic:     w.Topic,
      BatchSize: w.batchSize,
      Async:     true,
    }
  }
  return
}
func (w *kafkaWriter) Write(buf []byte) (err error) {
  // buf will be reused by ylog when this method return,
  // with aysnc write, we need copy data to a new slice
  kbuf := append([]byte(nil), buf...)
  err = w.writer.WriteMessages(context.Background(),
    kafka.Message{Value: kbuf},
  )
  return
}
func (w *kafkaWriter) Sync() error {
  return nil
}
func (w *kafkaWriter) Close() error {
  return w.writer.Close()
}

这里采用的是异步发送到Kafka的方式,WriteMessages方法不会阻塞,因为传入的buf要被ylog重用,所以这里copy了一下。异步还会存在的一个问题就是不会返回错误,可能丢失数据,不过对于日志这种数据,没有那么严格的要求,也可以接受。

如果采用同步发送,因为批量发送比较有效率,这里可以攒几条再发,但日志比较稀疏时,可能短时间很难攒够,就会出现长时间等不到日志的情况,所以还要有个超时机制,这有点麻烦,不过我也写了一个版本,有兴趣的可以去看看:github.com/bosima/ylog…

接口的组装

有了格式化接口和写日志接口,下一步就是将它们组装起来,以实现相应的处理能力。

首先是创建它们,因为我这里也没有动态配置的需求,所以就放到创建Logger实例的时候了,这样比较简单。


func NewYesLogger(opts ...Option) (logger *YesLogger) {
  logger = &YesLogger{}
  ...
  logger.writer = NewFileWriter("logs")
  logger.formatter = NewTextFormatter()
  for _, opt := range opts {
    opt(logger)
  }
  ...
  return
}

可以看到默认的formatter是textFormatter,默认的writer是fileWriter。这个函数传入的Option其实是个函数,在下边的opt(logger)中会执行它们,所以使用其它的Formatter或者Writer可以这样做:

logger := ylog.NewYesLogger(
    ...
    ylog.Writer(ylog.NewKafkaWriter(address, topic, writeBatchSize)),
    ylog.Formatter(ylog.NewJsonFormatter()),
)

这里 ylog.Writer 和 ylog.Formatter 就是符合Option类型的函数,调用它们可以设置不同的Formatter和Writer。

然后怎么使用它们呢?

...
l.formatter.Format(entry, &buf)
l.writer.Ensure(entry)
err := l.writer.Write(buf)
...

当 logEntry 进入消息处理环节后,首先调用formatter的Format方法格式化logEntry;然后调用了writer的Ensure方法确保writer已经准备好,最后调用writer的Write方法将格式化之后的数据输出到对应的目标。

为什么不将Ensure方法放到Write中呢?这是因为目前写文本日志的时候需要根据logEntry中的日志时间创建日志文件,这样就需要给Writer传递两个参数,有点别扭,所以这里将它们分开了。

如何提高日志处理的吞吐量

Kafka的吞吐量是很高的,那么如果放到ylog自身来说,如何提高它的吞吐量呢?

首先想到的就是Channel,可以使用有缓冲的Channel模拟一个队列,生产者不停的向Channel发送数据,如果Writer可以一直在缓冲被填满之前将数据取走,那么理论上说生产者就是非阻塞的,相比同步输出到某个Writer,没有直接磁盘IO、网络IO,日志处理的吞吐量必将大幅提升。

定义一个Channel,其容量默认为当前机器逻辑处理器的数量:

logger.pipe = make(chan *logEntry, runtime.NumCPU())

接收数据的代码:

  for {
    select {
    case entry := <-l.pipe:
      // reuse the slice memory
      buf = buf[:0]
      l.formatter.Format(entry, &buf)
      l.writer.Ensure(entry.Ts)
      err := l.writer.Write(buf)
    ...
    }
  }

实际效果怎么样呢?看下Benchmark:

goos: darwin
goarch: amd64
pkg: github.com/bosima/ylog
cpu: Intel(R) Core(TM) i5-8259U CPU @ 2.30GHz
BenchmarkInfo-8      1332333         871.6 ns/op       328 B/op        4 allocs/op

这个结果可以和zerolog、zap等高性能日志库一较高下了,当然目前可以做的事情要比它们简单很多。

如果对Java有所了解的同学应该听说过log4j,在log4j2中引入了一个名为Disruptor的组件,它让日志处理飞快了起来,受到很多Java开发者的追捧。Disruptor之所以这么厉害,是因为它使用了无锁并发、环形队列、缓存行填充等多种高级技术。

相比之下,Golang的Channel虽然也使用了环形缓冲,但是还是使用了锁,作为队列来说性能并不是最优的。

Golang中有没有类似的东西呢?最近出来的ZenQ可能是一个不错的选择,不过看似还不太稳定,过段时间再尝试下。有兴趣的可以去看看:github.com/alphadose/Z…


好了,以上就是本文的主要内容。关于ylog的介绍也告一段落了,后续会在Github上持续更新,增加更多有用的功能,并不断优化处理性能,欢迎关注:github.com/bosima/ylog


相关文章
|
11天前
|
JSON 人工智能 算法
探索大型语言模型LLM推理全阶段的JSON格式输出限制方法
本篇文章详细讨论了如何确保大型语言模型(LLMs)输出结构化的JSON格式,这对于提高数据处理的自动化程度和系统的互操作性至关重要。
|
2月前
|
消息中间件 JSON NoSQL
从 ES Kafka Mongodb Restful ... 取到 json 之后
JSON 是一种广泛使用的数据交换格式,但其计算和处理能力有限。esProc SPL 是一款强大的开源计算引擎,能够高效解析 JSON 数据,并支持复杂的过滤、分组、连接等操作。它不仅兼容多种数据源,如 RESTful、ElasticSearch、MongoDB 和 Kafka,还提供了游标对象处理大数据流,支持与 Java 应用无缝集成,实现灵活的业务逻辑处理。
|
2月前
|
JSON JavaScript Java
对比JSON和Hessian2的序列化格式
通过以上对比分析,希望能够帮助开发者在不同场景下选择最适合的序列化格式,提高系统的整体性能和可维护性。
62 3
|
2月前
|
JSON API 数据安全/隐私保护
拍立淘按图搜索API接口返回数据的JSON格式示例
拍立淘按图搜索API接口允许用户通过上传图片来搜索相似的商品,该接口返回的通常是一个JSON格式的响应,其中包含了与上传图片相似的商品信息。以下是一个基于淘宝平台的拍立淘按图搜索API接口返回数据的JSON格式示例,同时提供对其关键字段的解释
|
2月前
|
JSON 数据格式 索引
Python中序列化/反序列化JSON格式的数据
【11月更文挑战第4天】本文介绍了 Python 中使用 `json` 模块进行序列化和反序列化的操作。序列化是指将 Python 对象(如字典、列表)转换为 JSON 字符串,主要使用 `json.dumps` 方法。示例包括基本的字典和列表序列化,以及自定义类的序列化。反序列化则是将 JSON 字符串转换回 Python 对象,使用 `json.loads` 方法。文中还提供了具体的代码示例,展示了如何处理不同类型的 Python 对象。
|
2月前
|
JSON 人工智能 算法
探索LLM推理全阶段的JSON格式输出限制方法
文章详细讨论了如何确保大型语言模型(LLMs)输出结构化的JSON格式,这对于提高数据处理的自动化程度和系统的互操作性至关重要。
218 12
|
2月前
|
JSON Java 数据格式
springboot中表字段映射中设置JSON格式字段映射
springboot中表字段映射中设置JSON格式字段映射
166 1
|
3月前
|
存储 消息中间件 大数据
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
56 4
|
3月前
|
存储 消息中间件 大数据
大数据-70 Kafka 高级特性 物理存储 日志存储 日志清理: 日志删除与日志压缩
大数据-70 Kafka 高级特性 物理存储 日志存储 日志清理: 日志删除与日志压缩
54 1
|
3月前
|
存储 消息中间件 大数据
大数据-68 Kafka 高级特性 物理存储 日志存储概述
大数据-68 Kafka 高级特性 物理存储 日志存储概述
35 1