k8s与日志--journalbeat源码解读-阿里云开发者社区

开发者社区> 店家小二> 正文

k8s与日志--journalbeat源码解读

简介: 前言 对于日志系统的重要性不言而喻,参照沪江的一篇关于日志系统的介绍,基本上日志数据在以下几方面具有非常重要的作用: 数据查找:通过检索日志信息,定位相应的 bug ,找出解决方案 服务诊断:通过对日志信息进行统计、分析,了解服务器的负荷和服务运行状态 数据分析:可以做进一步的数据分析,比如根据请求中的课程 id ,找出 TOP10 用户感兴趣课程 日志+大数据+AI的确有很多想象空间。
+关注继续查看

前言

对于日志系统的重要性不言而喻,参照沪江的一 篇关于日志系统的介绍,基本上日志数据在以下几方面具有非常重要的作用:

  • 数据查找:通过检索日志信息,定位相应的 bug ,找出解决方案
  • 服务诊断:通过对日志信息进行统计、分析,了解服务器的负荷和服务运行状态
  • 数据分析:可以做进一步的数据分析,比如根据请求中的课程 id ,找出 TOP10 用户感兴趣课程

日志+大数据+AI的确有很多想象空间。
而对于收集系统,流行的技术stack有之前的elk,到现在的efk。logstash换成了filebeat。当然日志收集agent,也有flume和fluentd,尤其fluentd属于cncf组织的产品,在k8s中有着广泛的应用。但是fluentd是ruby写的,不利于深入源码了解。当然今天我们重点讲的是另外一个agent--journalbeat。望文生义,隶属于efk stack 中beats系列中的一员,专门用于收集journald日志。

journalbeat源码解读

journald日志简介

长久以来 syslog 是每一个 Unix 系统中的重要部件。在漫长的历史中在各种 Linux 发行版中都有不同的实现去完成类似的工作,它们采取的是逻辑相近,并使用基本相同的文件格式。但是 syslog 也存在诸多的问题,随着新设备的出现以及对安全的重视,这些缺点越发显得突出,例如日志消息内容无法验证、数据格式松散、日志检索低效、有限的元数据保存、无法记录二进制数据等。
Journald是针对以上需求的解决方案。受udev事件启发,Journal 条目与环境组块相似。一个键值域,按照换行符分开,使用大写的变量名。除了支持ASCII 格式的字符串外,还能够支持二进制数据,如 ATA SMART 健康信息、SCSI 数据。应用程序和服务可以通过将项目域传递给systemd journald服务来生成项目。该服务可以为项目增加一定数量的元数据。这些受信任域的值由 Journal 服务来决定且无法由客户端来伪造。在Journald中,可以把日志数据导出,在异地读取,并不受处理器架构的影响。这对嵌入式设备是很有用的功能,方便维护人员分析设备运行状况。
大致总结就是

  • journald日志是新的linux系统的具备的
  • journald区别于传统的文件存储方式,是二进制存储。需要用journalctl查看。

docker对于journald的支持

The journald logging driver sends container logs to the systemd journal. Log entries can be retrieved using the journalctl command, through use of the journal API, or using the docker logs command.
即docker除了json等日志格式,已经增加了journald驱动。

目前本司使用场景

我们的k8s集群,所有的docker输出的日志格式都采用journald,这样主机centos系统日志和docker的日志都用journalbeat来收集。

journalbeat实现关键

journalbeat整个实现过程,基本上两点:

  • 与其他社区贡献的beats系列,比如packetbeat,mysqlbeat类似,遵循了beats的框架和约定,journalbeat实现了run和stop等方法即可,然后作为一个客户端,将收集到的数据,publish到beats中。
  • 读取journald日志,采用了coreos开源的go-systemd库中sdjournal部分。其实sdjournal是一个利用cgo 对于journald日志c接口的封装。

源码解读

程序入口:

package main

import (
 "log" "github.com/elastic/beats/libbeat/beat" "github.com/mheese/journalbeat/beater"
)

func main() {
 err := beat.Run("journalbeat", "", beater.New)
 if err != nil {
 log.Fatal(err)
 }
}

整个journalbeat共实现了3个方法即可。run,stop,和new。
run和stop顾名思义,就是beats控制journalbeat的运行和停止。
而new:
需要按照

// Creator initializes and configures a new Beater instance used to execute // the beat its run-loop. type Creator func(*Beat, *common.Config) (Beater, error)

实现Creator方法,返回的Beater实例,交由beats控制。
具体实现:

// New creates beater func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
 config := config.DefaultConfig
 var err error
 if err = cfg.Unpack(&config); err != nil {
 return nil, fmt.Errorf("Error reading config file: %v", err)
 }

 jb := &Journalbeat{
 config: config,
 done: make(chan struct{}),
 cursorChan: make(chan string),
 pending: make(chan *eventReference),
 completed: make(chan *eventReference, config.PendingQueue.CompletedQueueSize),
 }

 if err = jb.initJournal(); err != nil {
 logp.Err("Failed to connect to the Systemd Journal: %v", err)
 return nil, err
 }

 jb.client = b.Publisher.Connect()
 return jb, nil
}

一般的beats中,都会有一些共同属性。例如下面的done和client属性。

// Journalbeat is the main Journalbeat struct type Journalbeat struct {
 done chan struct{}
 config config.Config
 client publisher.Client

 journal *sdjournal.Journal

 cursorChan chan string
 pending, completed chan *eventReference
 wg sync.WaitGroup
}

done是一个控制整个beater启停的信号量。
而client 是与beats平台通信的client。注意在初始化的时候,

jb.client = b.Publisher.Connect()

建立链接。
然后在收集到数据,发送的时候,也是通过该client

select {
 case <-jb.done:
 return nil
 default:
 // we need to clone to avoid races since map is a pointer...
 jb.client.PublishEvent(ref.body.Clone(), publisher.Signal(&eventSignal{ref, jb.completed}), publisher.Guaranteed)
 }

注意上边的发送姿势和对于刚才提到的done信号量使用。
其他方法都是业务相关不再详细解读了。

journalbeat如何保证发送失败的日志重新发送

关于这点,个人感觉是最优雅的部分

所有发送失败的日志是会在程序结束之前以json格式保存到文件,完成持久化。

 // on exit fully consume both queues and flush to disk the pending queue defer func() {
 var wg sync.WaitGroup
 wg.Add(2)

 go func() {
 defer wg.Done()
 for evRef := range jb.pending {
 pending[evRef.cursor] = evRef.body
 }
 }()

 go func() {
 defer wg.Done()
 for evRef := range jb.completed {
 completed[evRef.cursor] = evRef.body
 }
 }()
 wg.Wait()

 logp.Info("Saving the pending queue, consists of %d messages", len(diff(pending, completed)))
 if err := flush(diff(pending, completed), jb.config.PendingQueue.File); err != nil {
 logp.Err("error writing pending queue %s: %s", jb.config.PendingQueue.File, err)
 }
 }()

程序启动以后首先会读取之前持久化的发送失败的日志,重新发送

// load the previously saved queue of unsent events and try to publish them if any
 if err := jb.publishPending(); err != nil {
 logp.Warn("could not read the pending queue: %s", err)
 }

client publish收集到的日志到beats,设置了publisher.Guaranteed模式,成功和失败都有反馈

jb.client.PublishEvent(ref.body.Clone(), publisher.Signal(&eventSignal{ref, jb.completed}), publisher.Guaranteed)

其中publisher.Signal(&eventSignal{ref, jb.completed})类似于一个回调,凡是成功的都会写成功的ref到jb.completed中。方便客户端控制。

维护了两个chan,一个存放客户端发送的日志,一个存放服务端接受成功的日志,精确对比,可获取发送失败的日志,进入重发动作

journalbeat struct中有下面两个属性

pending, completed chan *eventReference

每次客户端发送一条日志,都会写到pending。

case publishedChan <- jb.client.PublishEvent(event, publisher.Signal(&eventSignal{ref, jb.completed}), publisher.Guaranteed):
 if published := <-publishedChan; published {
 jb.pending <- ref // save cursor if jb.config.WriteCursorState {
 jb.cursorChan <- rawEvent.Cursor
 }
 }
 }

publisher.Signal(&eventSignal{ref, jb.completed}),回调会将成功的写到completed。
整个程序同时会启动一个
go jb.managePendingQueueLoop()
协程,专门用来定时重发失败日志。

 // managePendingQueueLoop runs the loop which manages the set of events waiting to be acked func (jb *Journalbeat) managePendingQueueLoop() {
 jb.wg.Add(1)
 defer jb.wg.Done()
 pending := map[string]common.MapStr{}
 completed := map[string]common.MapStr{}

 // diff returns the difference between this map and the other.
 diff := func(this, other map[string]common.MapStr) map[string]common.MapStr {
 result := map[string]common.MapStr{}
 for k, v := range this {
 if _, ok := other[k]; !ok {
 result[k] = v
 }
 }
 return result
 }

 // flush saves the map[string]common.MapStr to the JSON file on disk
 flush := func(source map[string]common.MapStr, dest string) error {
 tempFile, err := ioutil.TempFile(filepath.Dir(dest), fmt.Sprintf(".%s", filepath.Base(dest)))
 if err != nil {
 return err
 }

 if err = json.NewEncoder(tempFile).Encode(source); err != nil {
 _ = tempFile.Close()
 return err
 }

 _ = tempFile.Close()
 return os.Rename(tempFile.Name(), dest)
 }

 // on exit fully consume both queues and flush to disk the pending queue defer func() {
 var wg sync.WaitGroup
 wg.Add(2)

 go func() {
 defer wg.Done()
 for evRef := range jb.pending {
 pending[evRef.cursor] = evRef.body
 }
 }()

 go func() {
 defer wg.Done()
 for evRef := range jb.completed {
 completed[evRef.cursor] = evRef.body
 }
 }()
 wg.Wait()

 logp.Info("Saving the pending queue, consists of %d messages", len(diff(pending, completed)))
 if err := flush(diff(pending, completed), jb.config.PendingQueue.File); err != nil {
 logp.Err("error writing pending queue %s: %s", jb.config.PendingQueue.File, err)
 }
 }()

 // flush the pending queue to disk periodically
 tick := time.Tick(jb.config.PendingQueue.FlushPeriod)
 for {
 select {
 case <-jb.done:
 return case p, ok := <-jb.pending:
 if ok {
 pending[p.cursor] = p.body
 }
 case c, ok := <-jb.completed:
 if ok {
 completed[c.cursor] = c.body
 }
 case <-tick:
 result := diff(pending, completed)
 if err := flush(result, jb.config.PendingQueue.File); err != nil {
 logp.Err("error writing %s: %s", jb.config.PendingQueue.File, err)
 }
 pending = result
 completed = map[string]common.MapStr{}
 }
 }
}

总结

当然还有一些其他的细节,不再一一讲述了。比如定时写Cursor的功能和日志格式转换等。具体的大家可以看源码。主要是讲了我认为其优雅的部分和为beats编写beater的要点。

本文转自SegmentFault-k8s与日志--journalbeat源码解读

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
阿里云服务器怎么设置密码?怎么停机?怎么重启服务器?
如果在创建实例时没有设置密码,或者密码丢失,您可以在控制台上重新设置实例的登录密码。本文仅描述如何在 ECS 管理控制台上修改实例登录密码。
9955 0
PHP源码加密- php-beast
php-beast 详细介绍 使用案例: http://www.beastcoder.com/ PHP Beast是一个源码加密模块,使用这个模块可以把PHP源码加密并在此模块下运行。 为什么要用PHP-Beast?     有时候我们的代码会放到代理商上, 所以很有可能代码被盗取, 或者我们写了一个商业系统而且不希望代码开源, 所以这时候就需要加密我们的代码.
911 0
java B2B2C Springboot电子商务平台源码-统一日志管理ELK
什么是ELK?ELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana , 它们都是开源软件。新增了一个FileBeat,它是一个轻量级的日志收集处理工具(Agent),Filebeat占用资源少,适合于在各个服务器上搜集日志后传输给Logstash,官方也推荐此工具。
1022 0
一文看懂 K8s 日志系统设计和实践
上一篇中我们介绍了为什么需要一个日志系统、为什么云原生下的日志系统如此重要以及云原生下日志系统的建设难点,相信DevOps、SRE、运维等同学看了是深有体会的。本篇文章单刀直入,会直接跟大家分享一下如何在云原生的场景下搭建一个灵活、功能强大、可靠、可扩容的日志系统。
4299 0
源码分析 RocketMQ DLedger 多副本之 Leader 选主
本文将按照《RocketMQ 多副本前置篇:初探raft协议》的思路来学习RocketMQ选主逻辑。首先先回顾一下关于Leader的一些思考: 节点状态需要引入3种节点状态:Follower(跟随者)、Candidate(候选者),该状态下的节点会发起投票请求,Leader(主节点)。
2206 0
源码分析 RocketMQ DLedger(多副本) 之日志复制(传播)
本文紧接着 源码分析 RocketMQ DLedger(多副本) 之日志追加流程 ,继续 Leader 处理客户端 append 的请求流程中最至关重要的一环:日志复制。 DLedger 多副本的日志转发由 DLedgerEntryPusher 实现,接下来将对其进行详细介绍。
3143 0
仿酷狗音乐播放器开发日志二十二 动态调色板控件第二版(性能大幅提升附源码)
转载请说明原出处,谢谢~~         在上次写的博客《仿酷狗音乐播放器开发日志二十一 开发动态调色板控件(附源码)》发布后,我在群里和网友讨论这个控件的性能和优 缺点,发现了他很多不足,还有很多提升空间,之后我简单的修改了代码提升了控件的响应速度。
869 0
阿里云服务器如何登录?阿里云服务器的三种登录方法
购买阿里云ECS云服务器后如何登录?场景不同,阿里云优惠总结大概有三种登录方式: 登录到ECS云服务器控制台 在ECS云服务器控制台用户可以更改密码、更换系.
13705 0
一起谈.NET技术,Xml日志记录文件最优方案(附源代码)
  Xml作为数据存储的一种方式,当数据非常大的时候,我们将碰到很多Xml处理的问题。通常,我们对Xml文件进行编辑的最直接的方式是将xml文件加载到XmlDocument,在内存中来对XmlDocument进行修改,然后再保存到磁盘中。
581 0
Spring IOC 容器源码分析 - 填充属性到 bean 原始对象
1. 简介 本篇文章,我们来一起了解一下 Spring 是如何将配置文件中的属性值填充到 bean 对象中的。我在前面几篇文章中介绍过 Spring 创建 bean 的流程,即 Spring 先通过反射创建一个原始的 bean 对象,然后再向这个原始的 bean 对象中填充属性。
1606 0
+关注
651
文章
0
问答
文章排行榜
最热
最新