简介
AsyncLogging是muduo的日志,程序如果直接让文件写日志可能会发生阻塞,muduo前端设计了2个BufferPtr,分别是currentBuffer和nextBuffer,还有一个存放BufferPtr的vector(buffers_)。
多个前端线程往currentBuffer写数据,currentBuffer写满了将其放入buffers,通知后端线程读。前端线程将currentBuffer和nextBuffer替换继续写currentBuffer。
后端也有2个BufferPtr,分别为newBuffer1和newBuffer2,还有一个BufferVector(buffersToWrite)。后端线程在收到前端通知之后,利用buffersToWrite和buffers进行交换,并且用newBuffer1和newBuffer2归还给前端的currentBuffer和nextBuffer_,然后把日志写入文件。
muduo日志文件只提供写入本地文件
后端线程写入条件:
- 前端线程缓冲区写完后通过条件变量通知后端线程写入
- 超时,muduo设置的是默认时间是3秒(AsyncLogging构造函数第三个参数flushInterval_),
源码剖析
AsyncLogging.h
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (chenshuo at chenshuo dot com)
#ifndef MUDUO_BASE_ASYNCLOGGING_H
#define MUDUO_BASE_ASYNCLOGGING_H
#include "muduo/base/BlockingQueue.h"
#include "muduo/base/BoundedBlockingQueue.h"
#include "muduo/base/CountDownLatch.h"
#include "muduo/base/Mutex.h"
#include "muduo/base/Thread.h"
#include "muduo/base/LogStream.h"
#include <atomic>
#include <vector>
namespace muduo
{
class AsyncLogging : noncopyable
{
public:
AsyncLogging(const string& basename,
off_t rollSize,
int flushInterval = 3);
~AsyncLogging()
{
if (running_)
{
stop();
}
}
void append(const char* logline, int len);
void start()
{
running_ = true;
thread_.start();
latch_.wait();//保证后端线程已经开始运行
}
void stop() NO_THREAD_SAFETY_ANALYSIS
{
running_ = false;
cond_.notify();
thread_.join();
}
private:
void threadFunc();
typedef muduo::detail::FixedBuffer<muduo::detail::kLargeBuffer> Buffer;
typedef std::vector<std::unique_ptr<Buffer>> BufferVector;
typedef BufferVector::value_type BufferPtr;
//存储超时时间变量
const int flushInterval_;
//后端线程启动标志
std::atomic<bool> running_;
//日志文件名
const string basename_;
//预留的日志大小
const off_t rollSize_;
//调用后端写入线程
muduo::Thread thread_;
//该变量保证后端日志写入线程已经运行
muduo::CountDownLatch latch_;
//条件变量与互斥锁,用来保证前端线程与后端线程的线程同步
muduo::MutexLock mutex_;
muduo::Condition cond_ GUARDED_BY(mutex_);
//前端线程当前写入缓冲区
BufferPtr currentBuffer_ GUARDED_BY(mutex_);
//前端线程下一个备用缓冲区
BufferPtr nextBuffer_ GUARDED_BY(mutex_);
//带写入文件已填满的缓冲区队列
BufferVector buffers_ GUARDED_BY(mutex_);
};
} // namespace muduo
#endif // MUDUO_BASE_ASYNCLOGGING_H
AsyncLogging.cc
前端线程
前端在生成一条日志消息的时候会调用AsyncLogging::append()。在这个函数中,如果当前缓冲(currentBuff)剩余的空间足够大,则会直接把日志消息拷贝(追加)到当前缓冲中,这是最常见的情况。
这里拷贝一条日志消息并不会带来多大开销。前后端代码的其余部分都没有拷贝,而是简单的指针交换。否则,说明当前缓冲已经写满,就把它送入(移入)buffers,并试图把预备好的另一块缓冲(nextBuffer)移用(move)为当前缓冲,然后追加日志消息并通知(唤醒)后端开始写入日志数据。
以上两种情况在临界区之内都没有耗时的操作,运行时间为常数。
如果前端写入速度太快,一下子把两块缓冲都用完了,那么只好分配一块新的buffer,作为当前缓冲,这是极少发生的情况
后端线程
首先准备好两块空闲的buffer,以备在临界区内交换。
在临界区内,等待条件触发,这里的条件有两个:其一是超时,其二是前端写满了一个或多个buffer。
注意这里是非常规的conditionvariable用法,它没有使用while循环,而且等待时间有上限。当“条件”满足时,先将当前缓冲(currentBuffe)移入buffers,并立刻将空闲的newBuffer1移为当前缓冲。
注意这整段代码位于临界区之内,因此不会有任何race condition。接下来将buffer与buffersToWrite交换,后面的代码可以在临界区之外安全地访问buffersToWrite,将其中的日志数据写入文件。
临界区里最后干的一件事情是用newBuffer2替换nextBuffer,这样前端始终有一个预备buffer可供调配。nextBuffer可以减少前端临界区分配内存的概率,缩短前端临界区长度。注意到后端临界区内也没有耗时的操作,运行时间为常数。会buffersToWrite内的buffer重新填充newBuffer1和newBuffer2,这样下一次执行的时候还有两个空闲buffer可用于替换前端的当前缓冲和预备缓冲。
最后,这四个缓冲在程序启动的时候会全部填充为0,这样可以避免程序热身时page fault引发性能不稳定。
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (chenshuo at chenshuo dot com)
#include "muduo/base/AsyncLogging.h"
#include "muduo/base/LogFile.h"
#include "muduo/base/Timestamp.h"
#include <stdio.h>
using namespace muduo;
//异步日志
AsyncLogging::AsyncLogging(const string& basename,
off_t rollSize,
int flushInterval)
: flushInterval_(flushInterval),//后端线程超时时间变量,默认为3s
running_(false),//后端线程启动标志
basename_(basename),//设置输出日志文件名
rollSize_(rollSize),// 预留的日志大小
thread_(std::bind(&AsyncLogging::threadFunc, this), "Logging"),// 执行该异步日志记录器的线程
latch_(1),//该变量作用是保证后端线程已进入
mutex_(),
cond_(mutex_),
currentBuffer_(new Buffer), //当前缓冲区
nextBuffer_(new Buffer),//预备缓冲区
buffers_()//缓冲区队列
{
currentBuffer_->bzero(); //清空
nextBuffer_->bzero();//清空
buffers_.reserve(16);//设置缓冲区队列大小为16
}
//所有LOG_*最终都会调用append函数
void AsyncLogging::append(const char* logline, int len)
{
muduo::MutexLockGuard lock(mutex_);
if (currentBuffer_->avail() > len) // 如果当前buffer还有空间,就添加到当前日志
{
currentBuffer_->append(logline, len);//调用vector的append
}
else
{
//将使用完后的buffer添加到 buffer vector后
buffers_.push_back(std::move(currentBuffer_));
if (nextBuffer_) // 重新设置当前buffer
{
currentBuffer_ = std::move(nextBuffer_);
}
else
{
currentBuffer_.reset(new Buffer); // Rarely happens
//如果前端写入速度太快了,一下子把两块缓冲都用完了,那么只好分配一块新的buffer,作当前缓冲,这是极少发生的情况
}
currentBuffer_->append(logline, len);
// 通知日志线程,有数据可写
cond_.notify();
}
}
void AsyncLogging::threadFunc() // 线程调用的函数,主要用于周期性的flush数据到日志文件中
{
assert(running_ == true);
latch_.countDown();
LogFile output(basename_, rollSize_, false);//打开日志文件
BufferPtr newBuffer1(new Buffer); //这两个是后台线程的buffer
BufferPtr newBuffer2(new Buffer);
newBuffer1->bzero();//clear
newBuffer2->bzero();//clear
BufferVector buffersToWrite; //用来和前台线程的buffers_进行swap
buffersToWrite.reserve(16); //预留空间
while (running_)
{
assert(newBuffer1 && newBuffer1->length() == 0);
assert(newBuffer2 && newBuffer2->length() == 0);
assert(buffersToWrite.empty());
{
muduo::MutexLockGuard lock(mutex_);
//如果buffer为空,那么表示没有数据需要写入文件,那么就等待指定的时间(默认三秒)
if (buffers_.empty()) // unusual usage!
{
cond_.waitForSeconds(flushInterval_);
}
//无论cond是因何而醒来,都要将currentBuffer_放到buffers_中。
//如果是因为时间到而醒,那么currentBuffer_还没满,此时也要将之写入LogFile中。
//如果已经有一个前台buffer满了,那么在前台线程中就已经把一个前台buffer放到buffers_中
//了。此时,还是需要把currentBuffer_放到buffers_中(注意,前后放置是不同的buffer,
//因为在前台线程中,currentBuffer_已经被换成nextBuffer_指向的buffer了)
buffers_.push_back(std::move(currentBuffer_)); //currentBuffer_是当前缓冲区
/*---归还一个buffer---*/ // 将新的buffer转成当前缓冲区
currentBuffer_ = std::move(newBuffer1);
//使用新的未使用的 buffersToWrite 交换 buffers_,将buffers_中的数据在异步线程中写入LogFile中
buffersToWrite.swap(buffers_);//内部指针交换,而非复制
if (!nextBuffer_)
{
nextBuffer_ = std::move(newBuffer2);/*-----假如需要,归还第二个----*/
}
}
assert(!buffersToWrite.empty());
// 如果将要写入文件的buffer列表中buffer的个数大于25,那么将多余数据删除
// 消息堆积
//前端陷入死循环,拼命发送日志消息,超过后端的处理能力
//这是典型的生产速度超过消费速度,会造成数据在内存中的堆积
//严重时引发性能问题(可用内存不足),
//或程序崩溃(分配内存失败)
if (buffersToWrite.size() > 25)
{
char buf[256];
snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",
Timestamp::now().toFormattedString().c_str(),
buffersToWrite.size()-2);
fputs(buf, stderr);
output.append(buf, static_cast<int>(strlen(buf)));
// 丢掉多余日志,以腾出内存,仅保留两块缓冲区
buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());
}
// 将buffersToWrite的数据写入到日志中
for (const auto& buffer : buffersToWrite)
{
// FIXME: use unbuffered stdio FILE ? or use ::writev ?
output.append(buffer->data(), buffer->length());
}
// 重新调整buffersToWrite的大小
if (buffersToWrite.size() > 2)
{
// drop non-bzero-ed buffers, avoid trashing
buffersToWrite.resize(2);
}
if (!newBuffer1)
{
assert(!buffersToWrite.empty());
// 从buffersToWrite中弹出一个作为newBuffer1
newBuffer1 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer1->reset();// 清理newBuffer1
}
//前台buffer是由newBuffer1 2 归还的。现在把buffersToWrite的buffer归还给后台buffer
if (!newBuffer2)
{
assert(!buffersToWrite.empty());
newBuffer2 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer2->reset();
}
buffersToWrite.clear();
output.flush();//刷新日志文件(写入)
}
output.flush();
}
如果日志消息堆积怎么办
万一前端陷入死循环,拼命发送日志消息,超过后端的处理(输出)能力,会导致什么后果?对于同步日志来说,这不是问题,因为阻塞IO自然就限制了前端的写入速度,起到了节流阀的作用。但是对于异步日志来说,这就是典型的生产速度高于消费速度问题,会造成数据在内存中堆积,严重时引发性能问题(可用内存不足)或程序崩溃(分配内存失败)。
muduo日志库处理日志堆积的方法很简单:直接丢掉多余的日志buffer,以腾出内存,见这样可以防止日志库本身引起程序故障,是一种自我保护措施。