1、日志写入逻辑
fwrite
函数原型
功能:向 buffer 中, 写入 count 个大小为 size 的对象到指定的流 stream。返回已写入对象的数量.
int fwrite(const void *buffer, size_t size, size_t count, FILE *stream )
fwrite
与 write
的区别
- fwrite 有缓存,write 没有缓存
- fwrite 是库函数,每次将数据写入用户缓冲区。等缓冲区满了,一次写入磁盘。或调用 fflush 刷新缓冲区;write 是系统调用,每次将数据写到磁盘,涉及用户态和内核态的切换。
- 批量写入的时候:应当减少调用
write
和fsync
,避免系统开销
可以用 setbuf
设置用户缓冲区的大小
void setbuf(FILE *stream, char *buf);
fwrite 配合以下结合,将日志消息写入日志文件。
fflush
: 内部触发write
把用户缓冲区数据刷新到内核缓冲区中
int fflush( FILE *stream )fsync
: 把内核缓冲区数据刷新到磁盘
int fsync(int fd);
各函数接口的关系原理如图所示
文件库函数
例:
#include <sstream> #include <unistd.h> using namespace std; int main () { FILE *file = fopen("0-fwrite_test.log", "wt"); // char buffer[BUFSIZ]; // setbuf(file, NULL); for (int i = 0; i < 10; ++i) { ostringstream oss; oss << "No." << i << "Root Error Message!\n"; fwrite(oss.str().c_str(), 1, oss.str().size(), file); fflush(file); } // // 把数据刷到磁盘里 // int fd = fileno(file); //把文件流指针转换成文件描述符 // fsync(fd); // fclose(file); }
2、muduo异步日志库
2.1、异步日志机制
异步日志,用一个线程负责收集日志消息,并写入日志文件。其他业务线程只管往这个日志线程发送日志消息。
整个框架如图所示,该框架有一个日志缓冲队列来将日志前端的数据发送到后端(日志线程),这是一个典型的生产者与消费者模型。生产者前端不是将日志消息逐条分别传送给后端,而是将多条日志信息缓存拼成1个大的 buffer 发送给后端。同时,对于消费者后端线程来说,并不是每条日志消息写入都将其唤醒,而是当前端写满1个 buffer 的时候,才唤醒后端日志线程批量写入磁盘文件。
因此,后端日志线程的唤醒有两个条件
- buffer 写满唤醒:批量写入写满1个 buffer 后,唤醒后端日志线程,减少线程被唤醒的频率,降低系统开销。
- 超时被唤醒:为了及时将日志消息写入文件,防止系统故障导致内存中的日志消息丢失,超过规定的时间阈值,即使 buffer 未满,也会立即将 buffer 中的数据写入。
2.2、双缓冲机制
muduo日志库采用双缓冲技术,所谓双缓冲技术就是前后端各有一个(其实是各有两个)大小为4MB的Buffer,之所以各有一个Buffer,是为了当前端Buffer写满了之后,后端空闲的Buffer能够立刻和前端的Buffer互换,互换完成后就可以释放锁,让前端继续向互换后的Buffer中写入日志,而不必等待后端把日志全部写入文件后再释放锁,大大减小了锁的粒度,这也是muduo异步日志高效的关键原因之一。
需要说明的是:
muduo前后端各有两个大小为4MB的Buffer,下图为了方面明白异步日志的思想,简化为前后端各自只有一个Buffer,
这里的Buffer不是Buffer.h中定义的Buffer,而是FixedBuffer.h中定义的,可以理解为一个有4MB大小的字符数组
现在的关键是,前后端的Buffer什么时候互换呢?
- 第一种情况:前端Buffer写满了,就会唤醒后台线程互换Buffer,并把Buffer中的日志写入文件
- 第二种情况:为了及时把前端Buffer中的日志写入文件,日志库也会每3秒执行一次交换Buffer的操作。(面试官可能会问你为什么要3秒交换一次Buffer)
下图就以第二种情况为例,展示日志库双缓冲原理。
在0~3秒之间,当前端的线程调用了LOG_INFO << 日志信息;
,会开辟一个4KB大小的空间,用于存储“日志信息”,当该语句执行结束时,就已经把日志信息都存储到前端的Buffer(黄色Buffer)中,同时析构自己刚开辟的4KB的空间。
在3秒之后,后端线程被唤醒,为了让前端阻塞时间更短,后端把自己的红色Buffer和前端的黄色Buffer互换,然后就可以释放锁,这样前端又能及时的向红色Buffer中写入新的日志信息。后端线程也可以把来自前端的黄色Buffer中的数据写入文件。
2.3、前端日志写入
AsyncLogging::append()
:前端生成一条日志消息。
前端准备一个前台缓冲区队列 buffers_
和两个 buffer。前台缓冲队列 buffers_
用来存放积攒的日志消息。两个 buffer,一个是当前缓冲区 currentBuffer
,追加的日志消息存放于此;另一个作为当前缓冲区的备份,即预备缓冲区 nextBuffer
,减少内存的开销。
函数执行逻辑如下:
判断当前缓冲区 currentBuffer_
是否已经写满
- 若当前缓冲区未满,追加日志消息到当前缓冲,这是最常见的情况
- 若当前缓冲区写满,首先,把它移入前台缓冲队列
buffers_
。其次,尝试把预备缓冲区nextBuffer_
移用为当前缓冲,若失败则创建新的缓冲区作为当前缓冲。最后,追加日志消息并唤醒后端日志线程开始写入日志数据。、
void AsyncLogging::append(const char* logline, int len) { // 多线程加锁,线程安全 MutexLockGuard lock(mutex_); // 判断当前缓冲是否已经写满,批量数据的积攒 // 1、当前缓冲未满,还能写入数据 if (currentBuffer_->avail() > len) { // 追加日消息到当前缓冲 currentBuffer_->append(logline, len); } // 2、当前缓冲写满,两件事 // 其一,将写满的当前缓冲的日志消息写入前台缓冲队列 buffers // 其二,追加日志消息到当前缓冲,唤醒后台日志落盘线程 else { // 其一、当前缓冲移入(move)前台缓冲队列 buffers buffers_.push_back(std::move(currentBuffer_)); // 判断预备缓冲是否写满 // 2.1、预备缓冲未满,复用该缓冲区 if (nextBuffer_) { // 预备缓冲移用为当前缓冲 currentBuffer_ = std::move(nextBuffer_); } // 2.2、预备缓冲写满(极少发生),重新分配缓冲 // 原因:前端写入速度太快,一下子把两块缓冲都用完了 else { // 重新分配 buffer,用作当前缓冲 currentBuffer_.reset(new Buffer); } // 其二、追加日志信息到当前缓冲,唤醒日志落盘线程 currentBuffer_->append(logline, len); cond_.notify(); } }
2.4、后端日志落盘
AsyncLogging::threadFunc()
:后端日志落盘线程的执行函数。
后端同样也准备了一个后台缓冲区队列 buffersToWrite
和两个备用 buffer。后台缓冲区队列 buffersToWrite
存放待写入磁盘的数据。两个备用 buffer,newBuffer1
和newBuffer2
,分别用来替换前台的当前缓冲和预备缓冲,而这两个备用 buffer 最后会被buffersToWrite
内的两个 buffer 重新填充,减少了内存的开销。
函数执行逻辑如下,注意思考如何锁的粒度如何减小,起到了什么作用。
- 唤醒日志落盘线程(超时或写满buffer),交换前台缓冲队列和后台缓冲队列。加锁
- 日志落盘,将后台缓冲队列的所有 buffer 写入文件。不加锁,这样做的好处是日志落盘不影响前台缓冲队列的插入,不会出现阻塞问题,极大提升了系统性能。
接下来,用图表示前端和后端的具体交互情况,注意配合源码认真分析。
一开始,分配好四个缓冲区,前端和后端各持有其中的两个。同时,前端和后端各有一个缓冲队列,初始时都是空的。结合源码分析以下过程:
case 1: 超时唤醒后端线程将当前缓冲区写入文件,此时前端写日志的频率不高。
超时唤醒后端线程,先把
currentBuffer_
送入buffers_
,再把newbuffer1
移用为currentBuffer_
。随后,交换buffers_
和buffersToWrite
。离开临界区,后端开始将buffersToWrite
中的 bufferA 写入文件。写完后(write done)再重新填充newbuffer1
,等待下一次cond_.waitForSeconds()
返回。
case 2:超时前写满当前缓冲唤醒后端线程写入文件
写满
currentBuffer_
唤醒后端线程,把currentBuffer_
送入buffers_
,再把newbuffer1
移用为currentBuffer_
。随后,交换buffers_
和buffersToWrite
,最后用newbuffer2
替换nextBuffer_
,始终保证前端有两个空缓冲可用。离开临界区,后端开始将buffersToWrite
中的 bufferA 和 bufferB 写入文件。写完后再重新填充newbuffer1
和newbuffer2
,等待下一次cond_.waitForSeconds()
返回。
上述这两种情况是最常见的。
case3: 用完了两个缓冲,需要重新分配新 buffer。可能原因:前端短时间内密集写入日志,或者后端文件写入速度较慢,导致前端耗尽了两个缓冲,并分配了新缓冲。
写满
currentBuffer_
唤醒后端线程,但出于种种原因,后端线程并没有立刻开始工作,接下来预备缓冲nextBuffer_
也写满了,前端线程新分配了缓冲区 E。当后端线程终于获取控制权后,将缓冲 C、D交给前端,并开始将缓冲 A, B, E依次写入文件。写完后再用缓冲A、B重新填充两块空闲缓冲,从而释放了缓冲E。
void AsyncLogging::threadFunc() { assert(running_ == true); latch_.countDown(); // logFile 类负责将数据写入磁盘 LogFile output(basename_, rollSize_, false); BufferPtr newBuffer1(new Buffer); // 用于替换前台的当前缓冲 currentbuffer BufferPtr newBuffer2(new Buffer); // 用于替换前台的预备缓冲 nextbuffer newBuffer1->bzero(); newBuffer2->bzero(); BufferVector buffersToWrite; // 后台缓冲队列 buffersToWrite.reserve(16); // 两个不同的缓冲队列,涉及到锁的粒度问题 // 异步日志开启,则循环执行 while (running_) { assert(newBuffer1 && newBuffer1->length() == 0); assert(newBuffer2 && newBuffer2->length() == 0); assert(buffersToWrite.empty()); // <---------- 交换前台缓冲队列和后台缓冲队列 ----------> { // 锁的作用域,放在外面,锁的粒度就大了,日志落盘的时候都会阻塞 append // 1、多线程加锁,线程安全,注意锁的作用域 MutexLockGuard lock(mutex_); // 2、判断前台缓冲队列 buffers 是否有数据可读 // buffers 没有数据可读,休眠 if (buffers_.empty()) { // 触发日志的落盘 (唤醒) 的两个条件:1.超时 or 2.被唤醒,即前台写满 buffer cond_.waitForSeconds(flushInterval_); // 内部封装 pthread_cond_timedwait } // 只要触发日志落盘,不管当前的 buffer 是否写满都必须取出来,写入磁盘 // 3、将当前缓冲区 currentbuffer 移入前台缓冲队列 buffers。 // currentbuffer 被锁住 -> currentBuffer 被置空 buffers_.push_back(std::move(currentBuffer_)); // 4、将空闲的 newbuffer1 移为当前缓冲,复用已经分配的空间 currentBuffer_ = std::move(newBuffer1); // currentbuffer 需要内存空间 // 5、核心:把前台缓冲队列的所有buffer交换(互相转移)到后台缓冲队列 // 这样在后续的日志落盘过程中不影响前台缓冲队列的插入 buffersToWrite.swap(buffers_); // 若预备缓冲为空,则将空闲的 newbuffer2 移为预备缓冲,复用已经分配的空间 // 这样前台始终有一个预备缓冲可供调配 if (!nextBuffer_) { nextBuffer_ = std::move(newBuffer2); } } // 注意这里加锁的粒度,日志落盘的时候不需要加锁了,主要是双队列的功劳 // <-------- 日志落盘,将buffersToWrite中的所有buffer写入文件 --------> assert(!buffersToWrite.empty()); // 6、异步日志消息堆积的处理。 // 同步日志,阻塞io,不存在堆积问题;异步日志,直接删除多余的日志,并插入提示信息 if (buffersToWrite.size() > 25) { printf("Dropped\n"); // 插入提示信息 char buf[256]; snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n", Timestamp::now().toFormattedString().c_str(), buffersToWrite.size()-2); fputs(buf, stderr); output.append(buf, static_cast<int>(strlen(buf))); // 只保留2个buffer(默认4M) buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end()); } // 7、循环写入 buffersToWrite 的所有 buffer for (const auto& buffer : buffersToWrite) { // 内部封装 fwrite,将 buffer中的一行日志数据,写入用户缓冲区,等待写入文件 output.append(buffer->data(), buffer->length()); } // 8、刷新数据到磁盘文件?这里应该保证数据落到磁盘,但事实上并没有,需要改进 fsync // 内部调用flush,只能将数据刷新到内核缓冲区,不能保证数据落到磁盘(断电问题) output.flush(); // 9、重新填充 newBuffer1 和 newBuffer2 // 改变后台缓冲队列的大小,始终只保存两个 buffer,多余的 buffer 被释放 // 为什么不直接保存到当前和预备缓冲?这是因为加锁的粒度,二者需要加锁操作 if (buffersToWrite.size() > 2) { // 只保留2个buffer,分别用于填充备用缓冲 newBuffer1 和 newBuffer2 buffersToWrite.resize(2); } // 用 buffersToWrite 内的 buffer 重新填充 newBuffer1 if (!newBuffer1) { assert(!buffersToWrite.empty()); newBuffer1 = std::move(buffersToWrite.back()); // 复用 buffer buffersToWrite.pop_back(); newBuffer1->reset(); // 重置指针,置空 } // 用 buffersToWrite 内的 buffer 重新填充 newBuffer2 if (!newBuffer2) { assert(!buffersToWrite.empty()); newBuffer2 = std::move(buffersToWrite.back()); // 复用 buffer buffersToWrite.pop_back(); newBuffer2->reset(); // 重置指针,置空 } // 清空 buffersToWrite buffersToWrite.clear(); } // 存在问题 output.flush(); }
2.5、coredump查找未落盘的日志
具体找哪里要看实际的情况写日志线程停在哪里。不能光按照本文说描述的位置,里面的log 序号不一定对的上。
通过异步日志的实现可以知道,日志消息并不是生成后立刻就会写出,而是先存放在前端缓冲区currentBuffer或者前端缓冲区队列buffers中,每过一段时间才会将缓冲区中的日志消息写到日志文件中。那么这就会有问题了:如果程序在中途core dump了,那么在缓冲区中还未来得及写出的日志消息该如何找回呢?
core dump的原因有多种,现在来构造这样的场景:主线程开启日志线程,写入100万条日志,然后休眠一小段时间,在日志还没有完全写入前人为往空指针写数据制造异常退出,如下所示:
void testCoredump() { AsyncLogging log("coredump", 200*1000*1000); log.start(); //开启日志线程 g_asyncLog = &log; int msgcnt = 0; Logger::setOutput(asyncOutput);//设置日志输出函数 for(int i=0;i<1000000;i++) {//写入100万条日志消息 LOG_INFO << "0voice 0123456789" << " abcdefghijklmnopqrstuvwxyz " << ++msgcnt; if(i == 500000) { int *ptr = NULL; *ptr = 0x1234; // 人为制造异常 } } }
这样一来,当程序执行到*ptr = 0x1234;时,就会自动异常,也就引发了core dump。
下面就来看看,如何在这种情况下,去找回还未来得及写出到日志文件中的日志消息。
完整的代码参考课程提供的muduo_log,将main_log_test.cc的main函数进行修改如下:
2.5.1、生成core文件
要寻找还未写出的日志消息,需要用到core dump时生成的core文件,但是在默认情况下是不生成的,因此需要先开启。
通过ulimit -c可查看core文件是否开启:
如果显示为0,说明未开启,此时可以通过ulimit -c unlimited开启,这里unlimited指的是core文件的最大大小,可以设置为其它数字:
ulimit -c unlimited
可以看到,运行程序生成的可执行文件core_dump,程序发生了异常退出,此时生成了名为"core"的文件,这个文件就记录了程序崩溃的信息。此外还可以看到,已经生成了一个.log日志文件,打开这个日志文件,如下所示:
如果没有发生core dump,那么日志文件也应当有1000000条日志消息,而现在可以看到,只有465957条日志消息,也就是说,剩下的部分日志消息都还未来得及写出,这里我们是故意LOG_INFO 在500000的时候制造的崩溃, 所以下面就来寻找这500000-465957=34043条消息(这里不是固定的,要看崩溃的时间点)。
2.5.2、gdb调试core文件
从core文件中去获得需要的信息。
通过gdb [execfile] [corefile]进行调试,如下所示:(注意:调试所用的execfile必须是debug生成的!!!否则很多关键信息都看不到)
特别需要注意gdb core文件的格式,是gdb 执行程序 coredump文件
gdb main_log_test core
通过gdb信息可以看到,Program terminated with signal SIGSEGV, Segmentation fault.。LWP是线程的标识,这里当前线程的LWP为87016,一共有两个线程:LWP 87016和87017,这个数字相当于线程的tid,每个线程都是不同的,无论是否在同一进程。从这里我们可以看出来崩溃是在主线程,但我们关注的是日志写入线程:
通过infothread 查看线程信息:
看不到什么有效的信息,那我们直接用thread id切换线程栈,然后用bt命令查看是否是日志写入栈。
我们直接用thread 2切换后可以看到,这里有两个线程,可以看到,Id2线程位于pthread_cond_timewait,因此这显然就是后端日志线程,异常的线程显然就是Id 1线程了。
用bt(即backtrace)查看线程的调用栈信息:
说明此时日志线程并没有再写,而且此时buffers_为空,我们则只需要关心currentBuffer_
。
然后通过frame 2切换栈空间,然后属于l查看源码
此时所在的环境,就相当于是append函数的栈帧未写出的日志消息,只可能存在于currentBuffer_或buffers_中,而currentBuffer实际上是一个unique_ptr,因此可以通过currentBuffer.get()来获取currentBuffer所指向的LogStream,用print打印变量,用p打印指针如下所示:
*print currentBuffer_.get()
这里报错是因为FixedBuffer中的值太大,超过了max-value-size,因此重新设置max-value-size为无限大:
set max-value-size unlimited
然后再*print currentBuffer_.get()
可以看到此时已经在data_后面显示了一部分日志消息,不过还有很多被省略了,这是因为gdb的终端输出长度有限制,默认为200个字符,可以修改这个限制,如下所示:
show print elements
setprint elementsunlimited
show print elements
然后重新打印变量值print *currentBuffer_.get(),如下所示(此时会打印会比较慢,需要等待一小会):
可以看到满屏幕都是一条一条的日志消息,如果我们想把这些消息全部输出到外部的文件中,可以通过logging实现,即在print之前通过set logging file [filename]来指定输出文件,然后set logging on来开启拷贝,此后终端上的所有打印的信息都会拷贝到指定的输出文件中,通过set logging off来停止拷贝。
set logging file gdbinfo.txt
setlogging on
然后再*print currentBuffer_.get()
随后一直按回车,以打印buffer所有的数据。
此时就可以进行print了,此后直到调用set logging off,所有终端上的打印信息都会拷贝到gdbInfo.txt中。
然后就可以用vim打开gdbinfo.txt:
此时所有数据都被当作了一行,这是因为在拷贝时,将‘\n’作为了两个普通的字符而不是换行符。因此在vim中将其进行查找替换即可:在命令模式下输入:%s/\\n/\r/g
回车即可。
这是文件尾部内容,可以看到里面有日志,然后再在命令模式下输入0跳转到文件头部:
(该文件可以/1000000 查找第100万条日志)
但并不是我们说的应该是973469的下一条,973470
2.6、高性能原因总结
如何实现高性能的日志
- 批量写入:攒够数据,一次写入,glog / muduo
- 唤醒机制:通知唤醒
notify
+ 超时唤醒wait_timeout
- 锁的粒度:刷新磁盘时,日志接口不会阻塞。这是通过双队列实现的,前台队列实现日志接口,后台队列实现刷新磁盘。
- 内存分配:移动语义,避免深拷贝;双缓冲,前台后台都设有。