muduo高性能异步日志库的实现

简介: muduo高性能异步日志库的实现

1、日志写入逻辑

fwrite 函数原型

功能:向 buffer 中, 写入 count 个大小为 size 的对象到指定的流 stream。返回已写入对象的数量.

int fwrite(const void *buffer, size_t size, size_t count, FILE *stream )

fwritewrite 的区别

  • fwrite 有缓存,write 没有缓存
  • fwrite 是库函数,每次将数据写入用户缓冲区。等缓冲区满了,一次写入磁盘。或调用 fflush 刷新缓冲区;write 是系统调用,每次将数据写到磁盘,涉及用户态和内核态的切换。
  • 批量写入的时候:应当减少调用 writefsync,避免系统开销

可以用 setbuf设置用户缓冲区的大小

void setbuf(FILE *stream, char *buf);

fwrite 配合以下结合,将日志消息写入日志文件。

  • fflush: 内部触发 write 把用户缓冲区数据刷新到内核缓冲区中
    int fflush( FILE *stream )
  • fsync: 把内核缓冲区数据刷新到磁盘
    int fsync(int fd);

各函数接口的关系原理如图所示

文件库函数

例:

#include <sstream>
 #include <unistd.h>
 using namespace std;
 int main () {
     FILE *file = fopen("0-fwrite_test.log", "wt");
     // char buffer[BUFSIZ];
     // setbuf(file, NULL);
     for (int i = 0; i < 10; ++i) {
         ostringstream oss;
         oss << "No." << i << "Root Error Message!\n";
         fwrite(oss.str().c_str(), 1, oss.str().size(), file);
         fflush(file);
     }
     // // 把数据刷到磁盘里
     // int fd = fileno(file); //把文件流指针转换成文件描述符
     // fsync(fd);
     // fclose(file);
 }

2、muduo异步日志库

2.1、异步日志机制

异步日志,用一个线程负责收集日志消息,并写入日志文件。其他业务线程只管往这个日志线程发送日志消息。

整个框架如图所示,该框架有一个日志缓冲队列来将日志前端的数据发送到后端(日志线程),这是一个典型的生产者与消费者模型。生产者前端不是将日志消息逐条分别传送给后端,而是将多条日志信息缓存拼成1个大的 buffer 发送给后端。同时,对于消费者后端线程来说,并不是每条日志消息写入都将其唤醒,而是当前端写满1个 buffer 的时候,才唤醒后端日志线程批量写入磁盘文件。

因此,后端日志线程的唤醒有两个条件

  • buffer 写满唤醒:批量写入写满1个 buffer 后,唤醒后端日志线程,减少线程被唤醒的频率,降低系统开销。
  • 超时被唤醒:为了及时将日志消息写入文件,防止系统故障导致内存中的日志消息丢失,超过规定的时间阈值,即使 buffer 未满,也会立即将 buffer 中的数据写入。

2.2、双缓冲机制

muduo日志库采用双缓冲技术,所谓双缓冲技术就是前后端各有一个(其实是各有两个)大小为4MB的Buffer,之所以各有一个Buffer,是为了当前端Buffer写满了之后,后端空闲的Buffer能够立刻和前端的Buffer互换,互换完成后就可以释放锁,让前端继续向互换后的Buffer中写入日志,而不必等待后端把日志全部写入文件后再释放锁,大大减小了锁的粒度,这也是muduo异步日志高效的关键原因之一。

需要说明的是:

muduo前后端各有两个大小为4MB的Buffer,下图为了方面明白异步日志的思想,简化为前后端各自只有一个Buffer,

这里的Buffer不是Buffer.h中定义的Buffer,而是FixedBuffer.h中定义的,可以理解为一个有4MB大小的字符数组

现在的关键是,前后端的Buffer什么时候互换呢?

  • 第一种情况:前端Buffer写满了,就会唤醒后台线程互换Buffer,并把Buffer中的日志写入文件
  • 第二种情况:为了及时把前端Buffer中的日志写入文件,日志库也会每3秒执行一次交换Buffer的操作。(面试官可能会问你为什么要3秒交换一次Buffer)

下图就以第二种情况为例,展示日志库双缓冲原理。

在0~3秒之间,当前端的线程调用了LOG_INFO << 日志信息;,会开辟一个4KB大小的空间,用于存储“日志信息”,当该语句执行结束时,就已经把日志信息都存储到前端的Buffer(黄色Buffer)中,同时析构自己刚开辟的4KB的空间。

在3秒之后,后端线程被唤醒,为了让前端阻塞时间更短,后端把自己的红色Buffer和前端的黄色Buffer互换,然后就可以释放锁,这样前端又能及时的向红色Buffer中写入新的日志信息。后端线程也可以把来自前端的黄色Buffer中的数据写入文件。

2.3、前端日志写入

AsyncLogging::append():前端生成一条日志消息。

前端准备一个前台缓冲区队列 buffers_和两个 buffer。前台缓冲队列 buffers_用来存放积攒的日志消息。两个 buffer,一个是当前缓冲区 currentBuffer,追加的日志消息存放于此;另一个作为当前缓冲区的备份,即预备缓冲区 nextBuffer,减少内存的开销。

函数执行逻辑如下:

判断当前缓冲区 currentBuffer_是否已经写满

  • 若当前缓冲区未满,追加日志消息到当前缓冲,这是最常见的情况
  • 若当前缓冲区写满,首先,把它移入前台缓冲队列 buffers_。其次,尝试把预备缓冲区 nextBuffer_移用为当前缓冲,若失败则创建新的缓冲区作为当前缓冲。最后,追加日志消息并唤醒后端日志线程开始写入日志数据。、
void AsyncLogging::append(const char* logline, int len) {
   // 多线程加锁,线程安全
   MutexLockGuard lock(mutex_);
   // 判断当前缓冲是否已经写满,批量数据的积攒
   // 1、当前缓冲未满,还能写入数据
   if (currentBuffer_->avail() > len) {
     // 追加日消息到当前缓冲
     currentBuffer_->append(logline, len); 
   }
   // 2、当前缓冲写满,两件事
   // 其一,将写满的当前缓冲的日志消息写入前台缓冲队列 buffers
   // 其二,追加日志消息到当前缓冲,唤醒后台日志落盘线程
   else {
     // 其一、当前缓冲移入(move)前台缓冲队列 buffers
     buffers_.push_back(std::move(currentBuffer_));
     // 判断预备缓冲是否写满
     // 2.1、预备缓冲未满,复用该缓冲区
     if (nextBuffer_) {
       // 预备缓冲移用为当前缓冲
       currentBuffer_ = std::move(nextBuffer_); 
     }
     // 2.2、预备缓冲写满(极少发生),重新分配缓冲
     // 原因:前端写入速度太快,一下子把两块缓冲都用完了
     else {
       // 重新分配 buffer,用作当前缓冲
       currentBuffer_.reset(new Buffer); 
     }
     // 其二、追加日志信息到当前缓冲,唤醒日志落盘线程
     currentBuffer_->append(logline, len);    
     cond_.notify(); 
   }
 }

2.4、后端日志落盘

AsyncLogging::threadFunc():后端日志落盘线程的执行函数。

后端同样也准备了一个后台缓冲区队列 buffersToWrite 和两个备用 buffer。后台缓冲区队列 buffersToWrite 存放待写入磁盘的数据。两个备用 buffer,newBuffer1newBuffer2,分别用来替换前台的当前缓冲和预备缓冲,而这两个备用 buffer 最后会被buffersToWrite内的两个 buffer 重新填充,减少了内存的开销。

函数执行逻辑如下,注意思考如何锁的粒度如何减小,起到了什么作用。

  • 唤醒日志落盘线程(超时或写满buffer),交换前台缓冲队列和后台缓冲队列。加锁
  • 日志落盘,将后台缓冲队列的所有 buffer 写入文件。不加锁,这样做的好处是日志落盘不影响前台缓冲队列的插入,不会出现阻塞问题,极大提升了系统性能。

接下来,用图表示前端和后端的具体交互情况,注意配合源码认真分析。

一开始,分配好四个缓冲区,前端和后端各持有其中的两个。同时,前端和后端各有一个缓冲队列,初始时都是空的。结合源码分析以下过程:

case 1: 超时唤醒后端线程将当前缓冲区写入文件,此时前端写日志的频率不高。

超时唤醒后端线程,先把 currentBuffer_ 送入 buffers_,再把 newbuffer1 移用为 currentBuffer_。随后,交换 buffers_buffersToWrite。离开临界区,后端开始将 buffersToWrite中的 bufferA 写入文件。写完后(write done)再重新填充 newbuffer1,等待下一次 cond_.waitForSeconds() 返回。

case 2:超时前写满当前缓冲唤醒后端线程写入文件

写满currentBuffer_唤醒后端线程,把currentBuffer_送入buffers_,再把newbuffer1移用为currentBuffer_。随后,交换 buffers_buffersToWrite,最后用newbuffer2替换nextBuffer_,始终保证前端有两个空缓冲可用。离开临界区,后端开始将buffersToWrite中的 bufferA 和 bufferB 写入文件。写完后再重新填充 newbuffer1newbuffer2,等待下一次 cond_.waitForSeconds() 返回。

上述这两种情况是最常见的。

case3: 用完了两个缓冲,需要重新分配新 buffer。可能原因:前端短时间内密集写入日志,或者后端文件写入速度较慢,导致前端耗尽了两个缓冲,并分配了新缓冲。

写满currentBuffer_唤醒后端线程,但出于种种原因,后端线程并没有立刻开始工作,接下来预备缓冲nextBuffer_也写满了,前端线程新分配了缓冲区 E。当后端线程终于获取控制权后,将缓冲 C、D交给前端,并开始将缓冲 A, B, E依次写入文件。写完后再用缓冲A、B重新填充两块空闲缓冲,从而释放了缓冲E。

void AsyncLogging::threadFunc() {
   assert(running_ == true);
   latch_.countDown();
   // logFile 类负责将数据写入磁盘
   LogFile output(basename_, rollSize_, false);
   BufferPtr newBuffer1(new Buffer); // 用于替换前台的当前缓冲 currentbuffer
   BufferPtr newBuffer2(new Buffer); // 用于替换前台的预备缓冲 nextbuffer 
   newBuffer1->bzero();  
   newBuffer2->bzero();
   BufferVector buffersToWrite;  // 后台缓冲队列
   buffersToWrite.reserve(16);   // 两个不同的缓冲队列,涉及到锁的粒度问题
   // 异步日志开启,则循环执行
   while (running_) {
     assert(newBuffer1 && newBuffer1->length() == 0);
     assert(newBuffer2 && newBuffer2->length() == 0);
     assert(buffersToWrite.empty());
     // <---------- 交换前台缓冲队列和后台缓冲队列 ---------->
     { // 锁的作用域,放在外面,锁的粒度就大了,日志落盘的时候都会阻塞 append
       // 1、多线程加锁,线程安全,注意锁的作用域
       MutexLockGuard lock(mutex_);
       // 2、判断前台缓冲队列 buffers 是否有数据可读
       // buffers 没有数据可读,休眠
       if (buffers_.empty()) {
         // 触发日志的落盘 (唤醒) 的两个条件:1.超时 or 2.被唤醒,即前台写满 buffer
         cond_.waitForSeconds(flushInterval_); // 内部封装 pthread_cond_timedwait
       }
       // 只要触发日志落盘,不管当前的 buffer 是否写满都必须取出来,写入磁盘
       // 3、将当前缓冲区 currentbuffer 移入前台缓冲队列 buffers。
       // currentbuffer 被锁住 -> currentBuffer 被置空  
       buffers_.push_back(std::move(currentBuffer_)); 
       // 4、将空闲的 newbuffer1 移为当前缓冲,复用已经分配的空间
       currentBuffer_ = std::move(newBuffer1); // currentbuffer 需要内存空间
       // 5、核心:把前台缓冲队列的所有buffer交换(互相转移)到后台缓冲队列 
       // 这样在后续的日志落盘过程中不影响前台缓冲队列的插入
       buffersToWrite.swap(buffers_);      
       // 若预备缓冲为空,则将空闲的 newbuffer2 移为预备缓冲,复用已经分配的空间
       // 这样前台始终有一个预备缓冲可供调配
       if (!nextBuffer_) { 
         nextBuffer_ = std::move(newBuffer2);  
       }
     } // 注意这里加锁的粒度,日志落盘的时候不需要加锁了,主要是双队列的功劳
     // <-------- 日志落盘,将buffersToWrite中的所有buffer写入文件 -------->
     assert(!buffersToWrite.empty());
     // 6、异步日志消息堆积的处理。
     // 同步日志,阻塞io,不存在堆积问题;异步日志,直接删除多余的日志,并插入提示信息
     if (buffersToWrite.size() > 25) {
       printf("Dropped\n");
       // 插入提示信息
       char buf[256];
       snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",
                Timestamp::now().toFormattedString().c_str(),
                buffersToWrite.size()-2);    
       fputs(buf, stderr);
       output.append(buf, static_cast<int>(strlen(buf)));
       // 只保留2个buffer(默认4M)
       buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());   
     }
     // 7、循环写入 buffersToWrite 的所有 buffer
     for (const auto& buffer : buffersToWrite) {
       // 内部封装 fwrite,将 buffer中的一行日志数据,写入用户缓冲区,等待写入文件
       output.append(buffer->data(), buffer->length());  
     }
     // 8、刷新数据到磁盘文件?这里应该保证数据落到磁盘,但事实上并没有,需要改进 fsync
     // 内部调用flush,只能将数据刷新到内核缓冲区,不能保证数据落到磁盘(断电问题)
     output.flush();   
     // 9、重新填充 newBuffer1 和 newBuffer2
     // 改变后台缓冲队列的大小,始终只保存两个 buffer,多余的 buffer 被释放
     // 为什么不直接保存到当前和预备缓冲?这是因为加锁的粒度,二者需要加锁操作
     if (buffersToWrite.size() > 2) {
        // 只保留2个buffer,分别用于填充备用缓冲 newBuffer1 和 newBuffer2
       buffersToWrite.resize(2);  
     }
     // 用 buffersToWrite 内的 buffer 重新填充 newBuffer1
     if (!newBuffer1) {
       assert(!buffersToWrite.empty());
       newBuffer1 = std::move(buffersToWrite.back()); // 复用 buffer
       buffersToWrite.pop_back();
       newBuffer1->reset();    // 重置指针,置空
     }
     // 用 buffersToWrite 内的 buffer 重新填充 newBuffer2
     if (!newBuffer2) {
       assert(!buffersToWrite.empty());
       newBuffer2 = std::move(buffersToWrite.back()); // 复用 buffer
       buffersToWrite.pop_back();
       newBuffer2->reset();   // 重置指针,置空
     }
     // 清空 buffersToWrite
     buffersToWrite.clear();  
   }
   // 存在问题
   output.flush();
 }

2.5、coredump查找未落盘的日志

具体找哪里要看实际的情况写日志线程停在哪里。不能光按照本文说描述的位置,里面的log 序号不一定对的上

通过异步日志的实现可以知道,日志消息并不是生成后立刻就会写出,而是先存放在前端缓冲区currentBuffer或者前端缓冲区队列buffers中,每过一段时间才会将缓冲区中的日志消息写到日志文件中。那么这就会有问题了:如果程序在中途core dump了,那么在缓冲区中还未来得及写出的日志消息该如何找回呢?

core dump的原因有多种,现在来构造这样的场景:主线程开启日志线程,写入100万条日志,然后休眠一小段时间,在日志还没有完全写入前人为往空指针写数据制造异常退出,如下所示:

void testCoredump()
{
    AsyncLogging log("coredump", 200*1000*1000);
    log.start();   //开启日志线程
    g_asyncLog = &log;
    int msgcnt = 0;
    Logger::setOutput(asyncOutput);//设置日志输出函数
    for(int i=0;i<1000000;i++)   {//写入100万条日志消息
      LOG_INFO << "0voice 0123456789" << " abcdefghijklmnopqrstuvwxyz "
               << ++msgcnt;
      if(i == 500000) {
         int *ptr = NULL;
        *ptr = 0x1234;  // 人为制造异常
      }
    } 
}

这样一来,当程序执行到*ptr = 0x1234;时,就会自动异常,也就引发了core dump。

下面就来看看,如何在这种情况下,去找回还未来得及写出到日志文件中的日志消息。

完整的代码参考课程提供的muduo_log,将main_log_test.cc的main函数进行修改如下:

2.5.1、生成core文件

要寻找还未写出的日志消息,需要用到core dump时生成的core文件,但是在默认情况下是不生成的,因此需要先开启。

通过ulimit -c可查看core文件是否开启:

如果显示为0,说明未开启,此时可以通过ulimit -c unlimited开启,这里unlimited指的是core文件的最大大小,可以设置为其它数字:

ulimit -c unlimited

可以看到,运行程序生成的可执行文件core_dump,程序发生了异常退出,此时生成了名为"core"的文件,这个文件就记录了程序崩溃的信息。此外还可以看到,已经生成了一个.log日志文件,打开这个日志文件,如下所示:

如果没有发生core dump,那么日志文件也应当有1000000条日志消息,而现在可以看到,只有465957条日志消息,也就是说,剩下的部分日志消息都还未来得及写出,这里我们是故意LOG_INFO 在500000的时候制造的崩溃, 所以下面就来寻找这500000-465957=34043条消息(这里不是固定的,要看崩溃的时间点)。

2.5.2、gdb调试core文件

从core文件中去获得需要的信息。

通过gdb [execfile] [corefile]进行调试,如下所示:(注意:调试所用的execfile必须是debug生成的!!!否则很多关键信息都看不到)

特别需要注意gdb core文件的格式,是gdb 执行程序 coredump文件

gdb main_log_test core

通过gdb信息可以看到,Program terminated with signal SIGSEGV, Segmentation fault.。LWP是线程的标识,这里当前线程的LWP为87016,一共有两个线程:LWP 87016和87017,这个数字相当于线程的tid,每个线程都是不同的,无论是否在同一进程。从这里我们可以看出来崩溃是在主线程,但我们关注的是日志写入线程

通过infothread 查看线程信息:

看不到什么有效的信息,那我们直接用thread id切换线程栈,然后用bt命令查看是否是日志写入栈。

我们直接用thread 2切换后可以看到,这里有两个线程,可以看到,Id2线程位于pthread_cond_timewait,因此这显然就是后端日志线程,异常的线程显然就是Id 1线程了。

bt(即backtrace)查看线程的调用栈信息:

说明此时日志线程并没有再写,而且此时buffers_为空,我们则只需要关心currentBuffer_

然后通过frame 2切换栈空间,然后属于l查看源码

此时所在的环境,就相当于是append函数的栈帧未写出的日志消息,只可能存在于currentBuffer_buffers_中,而currentBuffer实际上是一个unique_ptr,因此可以通过currentBuffer.get()来获取currentBuffer所指向的LogStream,用print打印变量,用p打印指针如下所示:

*print currentBuffer_.get()

这里报错是因为FixedBuffer中的值太大,超过了max-value-size,因此重新设置max-value-size为无限大:

set max-value-size unlimited

然后再*print currentBuffer_.get()

可以看到此时已经在data_后面显示了一部分日志消息,不过还有很多被省略了,这是因为gdb的终端输出长度有限制,默认为200个字符,可以修改这个限制,如下所示:

show print elements

setprint elementsunlimited

show print elements

然后重新打印变量值print *currentBuffer_.get(),如下所示(此时会打印会比较慢,需要等待一小会):

可以看到满屏幕都是一条一条的日志消息,如果我们想把这些消息全部输出到外部的文件中,可以通过logging实现,即在print之前通过set logging file [filename]来指定输出文件,然后set logging on来开启拷贝,此后终端上的所有打印的信息都会拷贝到指定的输出文件中,通过set logging off来停止拷贝。

set logging file gdbinfo.txt

setlogging on

然后再*print currentBuffer_.get()

随后一直按回车,以打印buffer所有的数据。

此时就可以进行print了,此后直到调用set logging off,所有终端上的打印信息都会拷贝到gdbInfo.txt中。

然后就可以用vim打开gdbinfo.txt:

此时所有数据都被当作了一行,这是因为在拷贝时,将‘\n’作为了两个普通的字符而不是换行符。因此在vim中将其进行查找替换即可:在命令模式下输入:%s/\\n/\r/g回车即可。

这是文件尾部内容,可以看到里面有日志,然后再在命令模式下输入0跳转到文件头部:

(该文件可以/1000000 查找第100万条日志)

但并不是我们说的应该是973469的下一条,973470

2.6、高性能原因总结

如何实现高性能的日志

  • 批量写入:攒够数据,一次写入,glog / muduo
  • 唤醒机制:通知唤醒 notify + 超时唤醒 wait_timeout
  • 锁的粒度:刷新磁盘时,日志接口不会阻塞。这是通过双队列实现的,前台队列实现日志接口,后台队列实现刷新磁盘。
  • 内存分配:移动语义,避免深拷贝;双缓冲,前台后台都设有。
相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
2月前
|
存储 Go
Go 浅析主流日志库:从设计层学习如何集成日志轮转与切割功能
本文将探讨几个热门的 go 日志库如 logrus、zap 和官网的 slog,我将分析这些库的的关键设计元素,探讨它们是如何支持日志轮转与切割功能的配置。
53 0
Go 浅析主流日志库:从设计层学习如何集成日志轮转与切割功能
|
5月前
|
JSON Go 数据格式
Go 1.21.0 中新增的结构化日志记录标准库 log/slog 详解
Go 1.21.0 中新增的结构化日志记录标准库 log/slog 详解
60 0
|
14天前
|
消息中间件 设计模式 Java
spdlog中的异步日志方案
spdlog中的异步日志方案
37 2
|
2月前
|
前端开发
muduo源码剖析之AsyncLogging异步日志类
AsyncLogging是muduo的日志,程序如果直接让文件写日志可能会发生阻塞,muduo前端设计了2个BufferPtr,分别是currentBuffer_和nextBuffer_,还有一个存放BufferPtr的vector(buffers_)。多个前端线程往currentBuffer_写数据,currentBuffer_写满了将其放入buffers_,通知后端线程读。前端线程将currentBuffer_和nextBuffer_替换继续写currentBuffer_。
17 0
|
2月前
|
JSON Go 开发工具
Golang日志库Zap基本使用
Golang日志库Zap基本使用
23 0
|
2月前
|
Java Linux
异步日志方案log4cpp
异步日志方案log4cpp
20 0
|
2月前
|
缓存 前端开发 NoSQL
muduo异步日志库模块的实现
muduo异步日志库模块的实现
|
3月前
|
消息中间件 JSON Go
Go日志库——logrus
Go日志库——logrus
|
3月前
|
JSON 缓存 Go
Go日志库-zap
Go日志库-zap
|
4月前
|
存储 缓存 前端开发
两种异步日志方案的介绍
两种异步日志方案的介绍
51 0

相关产品

  • 云迁移中心