muduo源码剖析之AsyncLogging异步日志类

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: AsyncLogging是muduo的日志,程序如果直接让文件写日志可能会发生阻塞,muduo前端设计了2个BufferPtr,分别是currentBuffer_和nextBuffer_,还有一个存放BufferPtr的vector(buffers_)。多个前端线程往currentBuffer_写数据,currentBuffer_写满了将其放入buffers_,通知后端线程读。前端线程将currentBuffer_和nextBuffer_替换继续写currentBuffer_。

简介

AsyncLogging是muduo的日志,程序如果直接让文件写日志可能会发生阻塞,muduo前端设计了2个BufferPtr,分别是currentBuffer和nextBuffer,还有一个存放BufferPtr的vector(buffers_)。

多个前端线程往currentBuffer写数据,currentBuffer写满了将其放入buffers,通知后端线程读。前端线程将currentBuffer和nextBuffer替换继续写currentBuffer
后端也有2个BufferPtr,分别为newBuffer1和newBuffer2,还有一个BufferVector(buffersToWrite)。后端线程在收到前端通知之后,利用buffersToWrite和buffers进行交换,并且用newBuffer1和newBuffer2归还给前端的currentBuffer和nextBuffer_,然后把日志写入文件。

muduo日志文件只提供写入本地文件

后端线程写入条件:

  1. 前端线程缓冲区写完后通过条件变量通知后端线程写入
  2. 超时,muduo设置的是默认时间是3秒(AsyncLogging构造函数第三个参数flushInterval_),

源码剖析

AsyncLogging.h

// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (chenshuo at chenshuo dot com)

#ifndef MUDUO_BASE_ASYNCLOGGING_H
#define MUDUO_BASE_ASYNCLOGGING_H

#include "muduo/base/BlockingQueue.h"
#include "muduo/base/BoundedBlockingQueue.h"
#include "muduo/base/CountDownLatch.h"
#include "muduo/base/Mutex.h"
#include "muduo/base/Thread.h"
#include "muduo/base/LogStream.h"

#include <atomic>
#include <vector>

namespace muduo
{
   

class AsyncLogging : noncopyable
{
   
 public:

  AsyncLogging(const string& basename,
               off_t rollSize,
               int flushInterval = 3);

  ~AsyncLogging()
  {
   
    if (running_)
    {
   
      stop();
    }
  }

  void append(const char* logline, int len);

  void start()
  {
   
    running_ = true;
    thread_.start();
    latch_.wait();//保证后端线程已经开始运行
  }

  void stop() NO_THREAD_SAFETY_ANALYSIS
  {
   
    running_ = false;
    cond_.notify();
    thread_.join();
  }

 private:

  void threadFunc();

  typedef muduo::detail::FixedBuffer<muduo::detail::kLargeBuffer> Buffer;
  typedef std::vector<std::unique_ptr<Buffer>> BufferVector;
  typedef BufferVector::value_type BufferPtr;

  //存储超时时间变量
  const int flushInterval_;        
  //后端线程启动标志
  std::atomic<bool> running_;
  //日志文件名
  const string basename_;
  //预留的日志大小
  const off_t rollSize_;
  //调用后端写入线程
  muduo::Thread thread_;
  //该变量保证后端日志写入线程已经运行
  muduo::CountDownLatch latch_;

  //条件变量与互斥锁,用来保证前端线程与后端线程的线程同步
  muduo::MutexLock mutex_;
  muduo::Condition cond_ GUARDED_BY(mutex_);

  //前端线程当前写入缓冲区
  BufferPtr currentBuffer_ GUARDED_BY(mutex_);
  //前端线程下一个备用缓冲区
  BufferPtr nextBuffer_ GUARDED_BY(mutex_);
  //带写入文件已填满的缓冲区队列
  BufferVector buffers_ GUARDED_BY(mutex_);
};

}  // namespace muduo

#endif  // MUDUO_BASE_ASYNCLOGGING_H

AsyncLogging.cc

前端线程

前端在生成一条日志消息的时候会调用AsyncLogging::append()。在这个函数中,如果当前缓冲(currentBuff)剩余的空间足够大,则会直接把日志消息拷贝(追加)到当前缓冲中,这是最常见的情况。
这里拷贝一条日志消息并不会带来多大开销。前后端代码的其余部分都没有拷贝,而是简单的指针交换。否则,说明当前缓冲已经写满,就把它送入(移入)buffers,并试图把预备好的另一块缓冲(nextBuffer
)移用(move)为当前缓冲,然后追加日志消息并通知(唤醒)后端开始写入日志数据。
以上两种情况在临界区之内都没有耗时的操作,运行时间为常数。
如果前端写入速度太快,一下子把两块缓冲都用完了,那么只好分配一块新的buffer,作为当前缓冲,这是极少发生的情况

后端线程

首先准备好两块空闲的buffer,以备在临界区内交换。
在临界区内,等待条件触发,这里的条件有两个:其一是超时,其二是前端写满了一个或多个buffer。
注意这里是非常规的conditionvariable用法,它没有使用while循环,而且等待时间有上限。当“条件”满足时,先将当前缓冲(currentBuffe)移入buffers,并立刻将空闲的newBuffer1移为当前缓冲。
注意这整段代码位于临界区之内,因此不会有任何race condition。接下来将buffer与buffersToWrite交换,后面的代码可以在临界区之外安全地访问buffersToWrite,将其中的日志数据写入文件。
临界区里最后干的一件事情是用newBuffer2替换nextBuffer,这样前端始终有一个预备buffer可供调配。nextBuffer
可以减少前端临界区分配内存的概率,缩短前端临界区长度。注意到后端临界区内也没有耗时的操作,运行时间为常数。会buffersToWrite内的buffer重新填充newBuffer1和newBuffer2,这样下一次执行的时候还有两个空闲buffer可用于替换前端的当前缓冲和预备缓冲。
最后,这四个缓冲在程序启动的时候会全部填充为0,这样可以避免程序热身时page fault引发性能不稳定。

// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (chenshuo at chenshuo dot com)

#include "muduo/base/AsyncLogging.h"
#include "muduo/base/LogFile.h"
#include "muduo/base/Timestamp.h"

#include <stdio.h>

using namespace muduo;

//异步日志
AsyncLogging::AsyncLogging(const string& basename,
                           off_t rollSize,
                           int flushInterval)
  : flushInterval_(flushInterval),//后端线程超时时间变量,默认为3s
    running_(false),//后端线程启动标志
    basename_(basename),//设置输出日志文件名
    rollSize_(rollSize),// 预留的日志大小
    thread_(std::bind(&AsyncLogging::threadFunc, this), "Logging"),// 执行该异步日志记录器的线程
    latch_(1),//该变量作用是保证后端线程已进入
    mutex_(),
    cond_(mutex_),
    currentBuffer_(new Buffer), //当前缓冲区
    nextBuffer_(new Buffer),//预备缓冲区
    buffers_()//缓冲区队列
{
   
  currentBuffer_->bzero(); //清空
  nextBuffer_->bzero();//清空
  buffers_.reserve(16);//设置缓冲区队列大小为16  
}

//所有LOG_*最终都会调用append函数
void AsyncLogging::append(const char* logline, int len)
{
   
  muduo::MutexLockGuard lock(mutex_);
  if (currentBuffer_->avail() > len)     // 如果当前buffer还有空间,就添加到当前日志
  {
   
    currentBuffer_->append(logline, len);//调用vector的append
  }
  else
  {
   
    //将使用完后的buffer添加到 buffer vector后
    buffers_.push_back(std::move(currentBuffer_));

    if (nextBuffer_)    // 重新设置当前buffer
    {
   
      currentBuffer_ = std::move(nextBuffer_);
    }
    else
    {
   
      currentBuffer_.reset(new Buffer); // Rarely happens
      //如果前端写入速度太快了,一下子把两块缓冲都用完了,那么只好分配一块新的buffer,作当前缓冲,这是极少发生的情况
    }
    currentBuffer_->append(logline, len);

    // 通知日志线程,有数据可写
    cond_.notify();
  }
}

void AsyncLogging::threadFunc()             // 线程调用的函数,主要用于周期性的flush数据到日志文件中
{
   
  assert(running_ == true);
  latch_.countDown();
  LogFile output(basename_, rollSize_, false);//打开日志文件
  BufferPtr newBuffer1(new Buffer);           //这两个是后台线程的buffer 
  BufferPtr newBuffer2(new Buffer);
  newBuffer1->bzero();//clear
  newBuffer2->bzero();//clear
  BufferVector buffersToWrite;            //用来和前台线程的buffers_进行swap
  buffersToWrite.reserve(16);            //预留空间
  while (running_)
  {
   
    assert(newBuffer1 && newBuffer1->length() == 0);
    assert(newBuffer2 && newBuffer2->length() == 0);
    assert(buffersToWrite.empty());

    {
   
      muduo::MutexLockGuard lock(mutex_);
      //如果buffer为空,那么表示没有数据需要写入文件,那么就等待指定的时间(默认三秒)
      if (buffers_.empty())  // unusual usage!
      {
   
        cond_.waitForSeconds(flushInterval_);
      }

      //无论cond是因何而醒来,都要将currentBuffer_放到buffers_中。  
      //如果是因为时间到而醒,那么currentBuffer_还没满,此时也要将之写入LogFile中。  
      //如果已经有一个前台buffer满了,那么在前台线程中就已经把一个前台buffer放到buffers_中  
      //了。此时,还是需要把currentBuffer_放到buffers_中(注意,前后放置是不同的buffer,  
      //因为在前台线程中,currentBuffer_已经被换成nextBuffer_指向的buffer了)
      buffers_.push_back(std::move(currentBuffer_));   //currentBuffer_是当前缓冲区

      /*---归还一个buffer---*/ // 将新的buffer转成当前缓冲区
      currentBuffer_ = std::move(newBuffer1);
      //使用新的未使用的 buffersToWrite 交换 buffers_,将buffers_中的数据在异步线程中写入LogFile中
      buffersToWrite.swap(buffers_);//内部指针交换,而非复制
      if (!nextBuffer_)
      {
   
        nextBuffer_ = std::move(newBuffer2);/*-----假如需要,归还第二个----*/
      }
    }

    assert(!buffersToWrite.empty());

    // 如果将要写入文件的buffer列表中buffer的个数大于25,那么将多余数据删除  
    // 消息堆积
    //前端陷入死循环,拼命发送日志消息,超过后端的处理能力
    //这是典型的生产速度超过消费速度,会造成数据在内存中的堆积
    //严重时引发性能问题(可用内存不足),
    //或程序崩溃(分配内存失败)
    if (buffersToWrite.size() > 25)
    {
   
      char buf[256];
      snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",
               Timestamp::now().toFormattedString().c_str(),
               buffersToWrite.size()-2);
      fputs(buf, stderr);
      output.append(buf, static_cast<int>(strlen(buf)));

      // 丢掉多余日志,以腾出内存,仅保留两块缓冲区
      buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());
    }

     // 将buffersToWrite的数据写入到日志中
    for (const auto& buffer : buffersToWrite)
    {
   
      // FIXME: use unbuffered stdio FILE ? or use ::writev ?
      output.append(buffer->data(), buffer->length());
    }

    // 重新调整buffersToWrite的大小 
    if (buffersToWrite.size() > 2)
    {
   
      // drop non-bzero-ed buffers, avoid trashing
      buffersToWrite.resize(2);
    }

    if (!newBuffer1)
    {
   
      assert(!buffersToWrite.empty());
      // 从buffersToWrite中弹出一个作为newBuffer1
      newBuffer1 = std::move(buffersToWrite.back());
      buffersToWrite.pop_back();
      newBuffer1->reset();// 清理newBuffer1
    }

    //前台buffer是由newBuffer1 2 归还的。现在把buffersToWrite的buffer归还给后台buffer
    if (!newBuffer2)
    {
   
      assert(!buffersToWrite.empty());
      newBuffer2 = std::move(buffersToWrite.back());
      buffersToWrite.pop_back();
      newBuffer2->reset();
    }

    buffersToWrite.clear();
    output.flush();//刷新日志文件(写入)
  }
  output.flush();
}

如果日志消息堆积怎么办

万一前端陷入死循环,拼命发送日志消息,超过后端的处理(输出)能力,会导致什么后果?对于同步日志来说,这不是问题,因为阻塞IO自然就限制了前端的写入速度,起到了节流阀的作用。但是对于异步日志来说,这就是典型的生产速度高于消费速度问题,会造成数据在内存中堆积,严重时引发性能问题(可用内存不足)或程序崩溃(分配内存失败)。
muduo日志库处理日志堆积的方法很简单:直接丢掉多余的日志buffer,以腾出内存,见这样可以防止日志库本身引起程序故障,是一种自我保护措施。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
3月前
|
设计模式 SQL 安全
PHP中的设计模式:单例模式的深入探索与实践在PHP的编程实践中,设计模式是解决常见软件设计问题的最佳实践。单例模式作为设计模式中的一种,确保一个类只有一个实例,并提供全局访问点,广泛应用于配置管理、日志记录和测试框架等场景。本文将深入探讨单例模式的原理、实现方式及其在PHP中的应用,帮助开发者更好地理解和运用这一设计模式。
在PHP开发中,单例模式通过确保类仅有一个实例并提供一个全局访问点,有效管理和访问共享资源。本文详细介绍了单例模式的概念、PHP实现方式及应用场景,并通过具体代码示例展示如何在PHP中实现单例模式以及如何在实际项目中正确使用它来优化代码结构和性能。
52 2
|
4月前
|
XML 监控 Java
异步日志:性能优化的金钥匙
本文主要介绍了Log4j2框架的核心原理、实践应用以及一些实用的小Tips,力图揭示Log4j2这一强大日志记录工具在现代分布式服务架构运维中的关键作用。
|
3月前
|
存储 运维 监控
超级好用的C++实用库之日志类
超级好用的C++实用库之日志类
49 0
|
4月前
|
SQL JavaScript 前端开发
【Azure 应用服务】Azure JS Function 异步方法中执行SQL查询后,Callback函数中日志无法输出问题
【Azure 应用服务】Azure JS Function 异步方法中执行SQL查询后,Callback函数中日志无法输出问题
|
4月前
|
数据采集 监控 Kubernetes
Job类日志采集问题之iLogtail以减小容器发现和开始采集的延时如何优化
Job类日志采集问题之iLogtail以减小容器发现和开始采集的延时如何优化
|
4月前
|
数据采集 Kubernetes Java
Job类日志采集问题之在日志中添加容器的元信息标签,如何操作
Job类日志采集问题之在日志中添加容器的元信息标签,如何操作
|
4月前
|
存储 容器
Job类日志采集问题之DaemonSet采集方式的参数以减小采集延时如何调整
Job类日志采集问题之DaemonSet采集方式的参数以减小采集延时如何调整
|
4月前
|
容器
Job类日志采集问题之ECI产品采集方式对于弹性扩缩容是如何支持的
Job类日志采集问题之ECI产品采集方式对于弹性扩缩容是如何支持的
|
4月前
|
存储 数据采集 容器
Job类日志采集问题之DaemonSet采集方式在Job日志采集上如何表现
Job类日志采集问题之DaemonSet采集方式在Job日志采集上如何表现
|
4月前
|
存储 Kubernetes 数据处理
Job类日志采集问题之为什么Job容器的日志采集要考虑容器发现速度和开始采集延时,如何理解
Job类日志采集问题之为什么Job容器的日志采集要考虑容器发现速度和开始采集延时,如何理解
下一篇
DataWorks