1. 背景
当我们在编写程序时,尤其是在多核心处理器普及的今天,多线程编程是一个重要的概念,可以让我们更充分地利用计算机的处理能力,实现并行处理任务,从而提高程序的效率和性能。
多线程是指在一个单一的程序中可以同时运行多个不同的执行线程。每个线程可以看作是程序执行的一个独立的路径。在多线程环境中,操作系统负责多个线程的调度,使它们可以在一个或多个核心上并发运行。相比于单线程程序,多线程程序能更有效地利用多核处理器的计算资源,执行多任务或处理并发请求。
多线程能够提升程序的效能,但也引入了复杂的同步问题。锁是解决这些问题的传统方法,而无锁化编程是一种更高级但复杂的技术,它能够在某些情况下提供更优的性能和可扩展性。正确选择和实现适合应用场景的并发策略,是高效多线程编程的关键。
iLogtail 作为一款阿里云日志服务(SLS)团队自研的可观测数据采集器,目前已经在 Github 开源。作为端上的应用,iLogtail在技术演进的过程中,对于资源占用和性能都在追求极致的优化。
2. 跟着 iLogtail 回顾多线程基础概念
2.1 线程模型
我们平时所说的并发编程、多线程、共享资源等概念都是与线程相关的,这里所说的线程实际上应该叫作“用户线程”,而对应到操作系统,还有另外一种线程叫作“内核线程”。
在操作系统和编程语言层面上,线程模型可以分为几种类型:
1.1:1 模型:每个用户级线程对应一个内核级线程。这种模型下,线程管理由操作系统内核完成,提供了良好的并发性能,但可能在创建线程或上下文切换时引入更多的开销。大部分编程语言的线程库(如linux的pthread,Java的java.lang.Thread,C++11的std::thread等等)都是 1:1 模型。
2.N:1 模型:多个用户级线程映射到一个内核级线程。这种模型下 ,线程管理由用户级库执行,线程操作开销较小,但无法利用多核处理器的优势,且一个线程的阻塞会阻塞整个进程。
3.M:N 模型:多个用户级线程映射到多个内核级线程。这种模型旨在结合1:1和N:1模型的优点,可以由用户级线程库和操作系统内核共同管理。Go语言中的goroutine调度器就是采用的这种实现方案,在Go语言中一个进程可以启动成千上万个goroutine,这也是其出道以来就自带“高并发”光环的重要原因。
选择合适的线程模型对于开发者来说至关重要,因为它直接影响到程序的性能、可扩展性、响应性和简易性。比如 Go 语言的并发哲学鼓励使用通道(channels)来在 goroutines 之间进行通信,而不是使用传统的锁机制和共享内存。通过通道传递消息,设计者意图减少锁的使用,从而避免这类问题。此外,Go 的设计哲学倾向于简单和清晰的并发模型,而比如可重入锁这样的复杂性通常与这种哲学背道而驰。
2.2 如何确保代码按照预期执行
同步的目的是保证不同执行流对共享数据并发操作的一致性。在单核时代,使用原子变量就很容易达成这一目的。甚至因为CPU的一些访存特性,对某些内存对齐数据的读或写也具有原子的特性。但在多核架构下即使操作是原子的,仍然会因为其他原因导致同步失效。
首先是现代编译器的代码优化和编译器指令重排可能会影响到代码的执行顺序。其次还有指令执行级别的乱序优化,流水线、乱序执行、分支预测都可能导致实际执行的次序不一致。
2.2.1 基础概念
volatile 关键字
在 C++ 中,volatile 关键字是一个类型修饰符,用于告诉编译器某个变量的值可能在程序的控制之外被改变。这意味着编译器应该防止对这些变量的访问被优化掉,即每次对 volatile 变量的读写都应该直接从内存中进行,而不能使用缓存的值。
- volatile 不保证原子性,对 volatile 变量的操作可能不是线程安全的。
- volatile 不防止由于 CPU 缓存导致的可见性问题,因此不足以处理多线程中的内存顺序问题。
内存屏障(Memory Barriers)
内存屏障,又称为内存栅栏,是一种同步机制,确保指定的内存操作在屏障前后有一个明确的执行顺序。它是在硬件层面实现的,用来防止编译器和CPU对指令进行不当的重排。内存屏障通常分为以下几种:
- 全屏障(Full Barrier):确保所有先于屏障的读写操作完成后,才执行屏障后的读写操作。
- 读屏障(Read Barrier):确保所有先于屏障的读操作完成后,才执行屏障后的读写操作。
- 写屏障(Write Barrier):确保所有先于屏障的写操作完成后,才执行屏障后的读写操作。
Memory Order
在 C++ 中,memory_order 是一个枚举类型,用于指示原子操作的内存顺序语义。自 C++11 起,标准库提供了一系列原子类型和操作,它们位于 头文件中。memory_order 指定了编译器和处理器如何处理原子操作周围的内存访问,这对于正确编写无锁数据结构和算法至关重要。
以下是一些不同的 memory_order 选项,以及它们的含义:
1.memory_order_relaxed:最弱的内存顺序保证。它只保证原子操作的原子性,不保证任何操作的顺序。这意味着,除了对特定的原子变量的操作是原子的外,编译器和处理器可以随意地重排操作。适用于计数器或统计数据,其中顺序不是特别重要。
2.memory_order_consume(在 C++17 中弃用)保证在依赖于原子变量的特定操作之间保持顺序。但是,由于其复杂性和实现的困难,它在 C++17 中已被弃用。
3.memory_order_acquire:防止操作在原子操作之前进行重排。换句话说,所有在原子操作之前的读或写操作都不会被重排到原子操作之后。
4.memory_order_release:防止操作在原子操作之后进行重排。所有在原子操作之后的读或写操作都不会被重排到原子操作之前。
5.memory_order_acq_rel:结合了 memory_order_acquire 和 memory_order_release 的效果。适用于同时充当获取和释放操作的原子操作。6.memory_order_seq_cst:提供最严格的内存顺序保证。所有的 memory_order_seq_cst 操作在单个程序中看起来就像是按照某种顺序执行一样,即这些操作是顺序一致的。几乎所有的原子操作默认都是 memory_order_seq_cst,这也是最安全、最直观,但可能是效率最低的选项。
理解和准确使用更弱的内存序可以带来性能上的提升。iLogtail 中自旋锁的实现,就是使用了memory_order_acquire和memory_order_release
class SpinLock { std::atomic_flag v_ = ATOMIC_FLAG_INIT; public: SpinLock() {} bool try_lock() { return !v_.test_and_set(std::memory_order_acquire); } void lock() { for (unsigned k = 0; !try_lock(); ++k) { boost::detail::yield(k); } } void unlock() { v_.clear(std::memory_order_release); } }; using ScopedSpinLock = std::lock_guard<SpinLock>;
2.2.2 抑制编译器重排
所谓编译器重排,这里是指编译器在生成目标代码的过程中交换没有依赖关系的内存访问顺序的行为。抑制编译器重排具体来说可以通过以下三种方式来实现:
- 把对应的变量声明为 volatile 的,C++ 标准保证对 volatile 变量间的访问编译器不会进行重排,不过仅仅是 volatile 变量之间, volatile 变量和其他变量间还是有可能会重排的;
- 在需要的地方手动添加合适的 Memory Barrier 指令,Memory Barrier 指令的语义保证了编译器不会进行错误的重排操作;
- 把对应变量声明为 atomic 的, 与 volatile 类似,C++ 标准也保证 atomic 变量间的访问编译器不会进行重排。
抑制编译器优化
编译器优化有时候会优化掉对变量的读写。比如下面例子中,如果data_ready 不用volatile 修饰,那么主程序循环中编译器优化的时候,可能会优化掉对data_ready 的读取检查,从而导致线程无法停止。
// 由中断服务例程修改的标志 volatile bool data_ready = false; // 中断服务例程 void ISR() { // 数据准备好了,设置标志 data_ready = true; } int main() { // 主程序循环 while (true) { if (data_ready) { // 处理数据 // ... // 重置标志 data_ready = false; } // 执行其他任务... } }
把对应变量声明为 volatile 或 atomic 都可以抑制编译器对变量读取的优化,C++ 保证对 volatile 或 atomic 内存的访问肯定会发生,不会被优化掉。
2.2.3 抑制 CPU 乱序
在现代多核 CPU 的架构中,为了提高处理速度,通常会采用一种称为乱序执行(Out-of-Order Execution)的技术。乱序执行指的是 CPU 在执行机器指令时不一定按照程序中指令的原始顺序执行,而是会对指令流进行重新排序,以更有效地利用 CPU 的资源。
#include <thread> #include <atomic> #include <cassert> #include <iostream> std::atomic<bool> x(false), y(false); int a = 0, b = 0; void thread1() { x.store(true, std::memory_order_relaxed); // A1 a = y.load(std::memory_order_relaxed); // A2 } void thread2() { y.store(true, std::memory_order_relaxed); // B1 b = x.load(std::memory_order_relaxed); // B2 } int main() { std::thread t1(thread1); std::thread t2(thread2); t1.join(); t2.join(); std::cout << "a: " << a << ", b: " << b << std::endl; return 0; }
在这个例子中,我们使用了 std::memory_order_relaxed 来告诉编译器和 CPU,我们不需要任何内存顺序保证。这允许编译器和处理器以最优化的方式生成机器指令,包括可能的乱序执行。
现在,让我们考虑一些由乱序执行引起的可能的执行序列:
1.CPU 可能在 thread1 中先执行 A2,再执行 A1,同样在 thread2 中先执行 B2,再执行 B1。这种情况下,结果可能是 a == 0 并且 b == 0,即使我们可能期望至少有一个变量为 1。
2.CPU 也可能在两个线程中按照代码的顺序执行指令,这样就会得到 a == 1 或者 b == 1,或者两者都为 1。
3.由于编译器和处理器的重排,存在多种执行组合,可能导致任意一个线程先执行存储操作,然后另一个线程执行加载操作,结果可能是 a == 1 && b == 0,a == 0 && b == 1,或者 a == 1 && b == 1。
为了 CPU 乱序的问题,从根本上来说只有通过插入所谓的 Memory Barrier 内存屏障指令来解决,这些指令会使得 CPU 保证特定的内存访问序及内存写入操作在多核间的可见性。然而由于不同处理器架构间的内存模型和具体 Memory Barrier 指令均不相同,需要在什么位置添加哪条指令并不具有通用性,因此 C++ 11 在此基础上做了一层抽象,引入了 atomic 类型及 Memory Order 的概念,有助于写出更通用的代码。从本质上看就是靠编译器来根据代码中指定的高层次 Memory Order 来自动选择是否需要插入特定处理器架构上低层次的内存屏障指令。
锁的概念
在多线程程序中,锁是一种同步机制,用来控制对共享资源的访问。它可以避免多个线程在同一时刻读取或修改同一个数据,从而防止数据不一致和竞争条件等问题。最常见的锁有互斥锁(Mutex)和读写锁(Read-Write Lock),它们能够确保在任意时刻,资源要么被单个线程独占(写操作),要么可被多个线程共享(读操作)。
然而,使用锁也会带来一些问题,如死锁(Deadlock)、饥饿(Starvation)和锁竞争(Lock Contention),这些都可能导致程序性能下降,甚至完全失去响应。
2.2.4 互斥锁
互斥锁用于控制多个线程对它们之间共享资源互斥访问,也就是说为了避免多个线程在某一时刻同时操作一个共享资源。在某一时刻只有一个线程可以获得互斥锁,在释放互斥锁之前其它线程都不能获得互斥锁,以阻塞的状态在一个等待队列中等待。
std::mutex 在锁定和解锁操作时,内部隐含地提供了所需的内存序保证,确保在锁定和解锁操作之间的内存读写操作不会被重排(提供了序列化效果)。具体来说,std::mutex 的 lock() 操作在成功获取锁之后会执行一个内存屏障(memory barrier),保证锁定操作之前的所有内存写入对于获得锁的线程来说都是可见的。解锁操作 unlock() 在释放锁之前也会执行一个内存屏障,确保所有对共享数据的修改在锁释放之后对其他线程都是可见的。
在 iLogtail 中std::mutex使用的比较多,比如最简单的,对全局 Map 的复杂操作需要搭配一个互斥锁来实现。
class LogFileProfiler { private: typedef std::unordered_map<std::string, LogStoreStatistic*> LogstoreSenderStatisticsMap; // key : region, value :unordered_map<std::string, LogStoreStatistic*> std::map<std::string, LogstoreSenderStatisticsMap*> mAllStatisticsMap; std::mutex mStatisticLock; };
在使用 std::mutex 时,通常会搭配 std::lock_guard 或 std::unique_lock 这样的 RAII (Resource Acquisition Is Initialization) 包装器,以确保在作用域结束时自动释放锁,防止死锁或忘记释放锁的情况发生。
2.2.5 信号量Semaphore
std::condition_variable 是 C++ 中的一种同步原语,它用于在多线程程序中实现线程间的条件等待。条件变量通常与互斥锁(std::mutex)结合使用,以等待某个条件成为真。它的主要作用是阻塞一个或多个线程,直到收到另一个线程发送的通知或者直到某个条件被满足。
以下是iLogtail一个典型的使用 std::condition_variable 的实例,互斥锁加信号量一起,来控制线程的生命周期,避免了while (true)死循环空耗 CPU 的情况。
std::condition_variable mStopCV; void LogtailAlarm::Stop() { { lock_guard<mutex> lock(mThreadRunningMux); mIsThreadRunning = false; } mStopCV.notify_one(); } bool LogtailAlarm::SendAlarmLoop() { { unique_lock<mutex> lock(mThreadRunningMux); while (mIsThreadRunning) { SendAllRegionAlarm(); if (mStopCV.wait_for(lock, std::chrono::seconds(3), [this]() { return !mIsThreadRunning; })) { break; } } } SendAllRegionAlarm(); return true; }
2.2.6 递归互斥锁(recursive mutex)
std::recursive_mutex 是 C++ 标准库中的一种互斥锁,它允许同一个线程多次获取同一互斥锁。与 std::mutex 相比,它提供了可重入性,这意味着如果一个线程已经拥有了锁,它仍然可以再次锁定而不会产生死锁。std::recursive_mutex 是 C++ 标准库中的一种互斥锁,它允许同一个线程多次获取同一互斥锁。与 std::mutex 相比,它提供了可重入性,这意味着如果一个线程已经拥有了锁,它仍然可以再次锁定而不会产生死锁。
这种递归锁对于处理那些在同一线程中多次需要获取同一锁的场景非常有用,例如,当递归函数需要在每次递归调用中访问共享资源时。使用 std::recursive_mutex 可以避免因线程尝试重新获取已持有的锁而导致的死锁问题。
2.2.7 共享锁/独占锁
共享锁指的是我们同一把锁可以被多个线程同时获得,而独占锁则只允许一个线程获取锁。读写锁是共享锁和独占锁概念的典型应用,其中读锁是共享锁,写锁是独占锁。读锁可以被多个线程同时持有,而写锁最多只能同时被一个线程持有。
读写锁是一种适用于特定场景的锁,它可以提高资源利用率并减少等待时间,但也可能引入更复杂的同步问题,如写者饥饿(写操作等待过长)。
在 iLogtail 中,指标模块是比较典型的一写多读的场景。ReadMetrics 提供对外的数据,因此外部访问的时候,存在多读的情况,如果使用简单的互斥锁,那么读和读之间也会有竞争关系。因此这里采用了读写锁,保证了多读的性能。
void ReadMetrics::ReadAsLogGroup(std::map<std::string, sls_logs::LogGroup*>& logGroupMap) const { ReadLock lock(mReadWriteLock); // 读链表,加读锁,允许多读 MetricsRecord* tmp = mHead; while (tmp) { ... tmp = tmp->GetNext(); } } void ReadMetrics::Clear() { WriteLock lock(mReadWriteLock); // 改链表,加写锁 while (mHead) { MetricsRecord* toDelete = mHead; mHead = mHead->GetNext(); delete toDelete; } }
2.2.8 自旋锁
自旋锁的特点是,当线程无法立即获取锁时,并不直接陷入阻塞或者释放 CPU 资源,它会通过循环不断尝试获取锁这个过程被称为“自旋”。非自旋锁则没有自旋过程,如果无法获取锁,线程会直接放弃或执行其他处理逻辑,例如去排队、陷入阻塞等。
- 自旋锁是一种死等的锁机制。当发生访问资源冲突的时候,可以有两个选择:一个是自旋等待,一个是挂起当前进程,调度其他进程执行。自旋锁当前的执行线程会不断的重新尝试直到获取锁进入临界区。
- 只允许一个线程进入。一次只能有一个线程获取锁并进入临界区,其他的线程都是在门口不断的尝试。
- 执行时间短。由于自旋锁等这种特性,因此它适用在那些代码不是非常复杂的临界区,如果临界区执行时间太长,那么不断在临界区门口“死等”的那些线程会浪费大量的CPU资源。
- 可以在中断上下文执行。由于不睡眠,因此自旋锁可以在中断上下文中适用。
iLogtail 在一些比较快速的数据 Set Get 中使用的自旋锁,主要是因为这些操作比较迅速,自旋花费的时间代价小于上下文切换的代价。比如如下例子中,针对一个 map 的查找与插入操作就使用的自旋锁
int32_t ConfigManager::FindAllMatch(vector<FileDiscoveryConfig>& allConfig, const std::string& path, const std::string& name /*= ""*/) { ... { // 自旋锁加锁,数据查找 ScopedSpinLock cachedLock(mCacheFileAllConfigMapLock); auto iter = mCacheFileAllConfigMap.find(cachedFileKey); if (iter != mCacheFileAllConfigMap.end()) { if (iter->second.second == 0 || time(NULL) - iter->second.second < INT32_FLAG(multi_config_alarm_interval)) { allConfig = iter->second.first; return (int32_t)allConfig.size(); } } } ... { // 自旋锁加锁,数据插入 ScopedSpinLock cachedLock(mCacheFileAllConfigMapLock); mCacheFileAllConfigMap[cachedFileKey] = std::make_pair(allConfig, alarmFlag ? (int32_t)time(NULL) : (int32_t)0); } return (int32_t)allConfig.size(); }
3. 谈谈 iLogtail 中的无锁化(Lock-Free)编程实践
为了解决锁带来的问题,无锁化编程提供了另一种方法。无锁编程不依赖于传统的锁同步,而是通过原子操作来确保多个线程能够安全地并发访问共享资源。原子操作是不可分割的,保证在执行过程中不会被其他线程中断,因此可以用来实现无锁的数据结构。
无锁化编程的优势包括减少线程上下文切换的开销、避免死锁和提高程序的并发性能。但是,无锁编程通常更复杂,并且需要精确的设计和良好的理解内存模型。并不是所有的情况都适合无锁编程,有时使用适当的锁机制会更加简单和高效。
3.1 原子类型
class Application { private: Application(); ~Application() = default; // 是否接受到SigTerm信号的标识 std::atomic_bool mSigTermSignalFlag = false; };
iLogtail 中有一个std::atomic_bool 的变量,进程如果接收到SigTerm信号,会将mSigTermSignalFlag 变量设置成 true,iLogtail 的主线程中,会不断的调用mSigTermSignalFlag.load(),如果判断为 true,则执行退出
void Application::Start() { 。。。 while (true) { 。。。 if (mSigTermSignalFlag.load()) { LOG_INFO(sLogger, ("received SIGTERM signal", "exit process")); Exit(); } } }
3.2 降低锁的粒度
分割锁策略将数据结构划分成多个独立的分段(stripes),例如,一个大的哈希表可以分割成多个部分,每个部分有自己的锁。这样,不同的线程可以同时访问表的不同部分,从而在某种程度上实现无锁访问。
3.2.1 旧的指标计算模块
之前 iLogtail 指标统计的线程模型是这样的
iLogtail 的指标存储在LogFileProfiler中的一个全局 Map 中,多个处理线程,循环处理的过程中,每次处理都会去访问这个全局 Map,进行指标的计算。
void LogFileProfiler::AddProfilingData(const std::string& configName, const std::string& region, const std::string& projectName, const std::string& category, const std::string& convertedPath, const std::string& hostLogPath, const std::vector<sls_logs::LogTag>& tags, uint64_t readBytes, uint64_t skipBytes, uint64_t splitLines, uint64_t parseFailures, uint64_t regexMatchFailures, uint64_t parseTimeFailures, uint64_t historyFailures, uint64_t sendFailures, const std::string& errorLine) { string key = projectName + "_" + category + "_" + configName + "_" + hostLogPath; // 锁 std::lock_guard<std::mutex> lock(mStatisticLock); // 从全局map中找到每个地域的指标Map LogstoreSenderStatisticsMap& statisticsMap = *MakesureRegionStatisticsMapUnlocked(region); // 从地域的指标Map中找到当前采集配置的指标 std::unordered_map<string, LogStoreStatistic*>::iterator iter = statisticsMap.find(key); // 指标计算 ... }
可以看到在AddProfilingData 中,有一把范围比较大的锁,从 Map 查找对应的指标对象开始,一直到指标计算结束。在多个 LogProcessThread 的情况下,锁的竞争是比较频繁和激烈的,因此这样的实现性能肯定是比较差的。
3.2.2 新的指标计算模块
新的指标计算模块,将指标数据结构下放到了每一个 Plugin 的实例中,如图举例是 ProcessorInstance 中。
class Plugin { protected: // 指标存储数据结构 mutable MetricsRecordRef mMetricsRecordRef; };
每个 LogProcess 线程处理数据的时候,实际上调用的是ProcessorInstance::Process 函数,这个函数内部会去进行自己实例的指标计算
void ProcessorInstance::Process(std::vector<PipelineEventGroup>& logGroupList) { if (logGroupList.empty()) { return; } // 计算插件输入数据指标 for (const auto& logGroup : logGroupList) { mProcInRecordsTotal->Add(logGroup.GetEvents().size()); } uint64_t startTime = GetCurrentTimeInMicroSeconds(); mPlugin->Process(logGroupList); uint64_t durationTime = GetCurrentTimeInMicroSeconds() - startTime; // 计算插件处理时间指标 mProcTimeMS->Add(durationTime); // 计算插件输出数据指标 for (const auto& logGroup : logGroupList) { mProcOutRecordsTotal->Add(logGroup.GetEvents().size()); } }
每个指标内部使用的是原子类型,比如Counter 中的指标就是std::atomic_long,这样即使在多线程计算的时候,也可以通过原子类型的特点,保证计算结果的正确性,从而避免了使用开销比较大的锁,整个指标计算的过程实现了真正的无锁化。
class Counter { private: std::string mName; std::atomic_long mVal; public: Counter(const std::string& name, uint64_t val); uint64_t GetValue() const; const std::string& GetName() const; void Add(uint64_t val); Counter* CopyAndReset(); };
3.3 双Buffer切换实现读写分离
读写分离,写操作在一个复制的数组上进行,读操作还是在原始数组上进行,读写分离,互不影响。写操作需要加锁,防止并发写入时导致数据丢失。写操作结束之后需要让数组指针指向新的复制数组。
试想一个这样的场景,有两个线程,线程A负责处理用户的请求,相应用户请求,其间要从线程B的缓冲区获取一些数据,线程B的缓冲区是一块内存,这块内存定时或者不间断会更新一次,这块内存就成为了两个线程的临界区(线程B刷新此内存时,线程A可能正在读取)。
按照常规的做法,我们可能通过加锁来避免同时访问,但这样的话,因为这块内存可能被每秒问几百次,而每分钟更新一次,这样的话,每次访问都加锁和解锁,会带来很多无用功,性能也会下降。所以我们这里采用一种优化策略: 双Buffer切换。 顾名思义,就是有两个同样结构的内存块,同一时刻只有一个内存块提供服务,如果发生了更新或写入,则把新数据放在另一个内存块做,等到做完了,通过"指针"(此处的指针泛指指向资源的句柄)改变指向,指向新的内存块,即完成了切换,原内存块在新内存块完之前,仍然提供服务。
iLogtail 中的指标模块就很好的体现了这种的思想,具体过程如下:
iLogtail 指标模块有两个类,WriteMetrics 和 ReadMetircs,每个类中都各有一个单向链表
class WriteMetrics { private: WriteMetrics() = default; std::mutex mMutex; MetricsRecord* mHead = nullptr; void Clear(); MetricsRecord* GetHead(); public: ~WriteMetrics(); static WriteMetrics* GetInstance() { static WriteMetrics* ptr = new WriteMetrics(); return ptr; } void PrepareMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels); MetricsRecord* DoSnapshot(); }; class ReadMetrics { private: ReadMetrics() = default; mutable ReadWriteLock mReadWriteLock; MetricsRecord* mHead = nullptr; void Clear(); MetricsRecord* GetHead(); public: ~ReadMetrics(); static ReadMetrics* GetInstance() { static ReadMetrics* ptr = new ReadMetrics(); return ptr; } void ReadAsLogGroup(std::map<std::string, sls_logs::LogGroup*>& logGroupMap) const; void UpdateMetrics(); };
每个 Plugin 初始化的时候,会构造指标对象,并且放到WriteMetrics 中的链表中进行存储。整个链表中,只有链表的头节点存在竞争,但是 Plugin 初始化的时候,通常是单线程顺序执行的,因此存储的时候,采用头插法。
void WriteMetrics::PrepareMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels) { MetricsRecord* cur = new MetricsRecord(std::make_shared<MetricLabels>(labels)); ref.SetMetricsRecord(cur); std::lock_guard<std::mutex> lock(mMutex); cur->SetNext(mHead); mHead = cur; }
ReadMetircs 中的链表是WriteMetrics 中链表的副本,负责对外提供数据。在对WriteMetrics 中的链表进行 Snapshot 的时候,大概分了这下几步:
1.加锁,从当前头部遍历,找到第一个非 delete 节点,对节点进行复制,成为新的ReadMetircs 链表头
2.跳出锁的范围,继续遍历至最后,对非 delete 节点进行复制,放到新的ReadMetircs 链表里
3.前面两步如果是标记删除的节点,放到一个待删除的链表中
4.待删除的链表进行删除操作
ReadMetrics 中调用WriteMetrics::DoSnapshot()获取新的链表之后,加锁,将链表头设置成新链表,原链表无锁删除。
void ReadMetrics::UpdateMetrics() { MetricsRecord* snapshot = WriteMetrics::GetInstance()->DoSnapshot(); MetricsRecord* toDelete; { // Only lock when change head WriteLock lock(mReadWriteLock); toDelete = mHead; mHead = snapshot; } // delete old linklist while (toDelete) { MetricsRecord* obj = toDelete; toDelete = toDelete->GetNext(); delete obj; } }
3.4 延迟释放(Deferred Reclamation)
链表中的节点,如果要进行线程安全的删除,那需要对前后节点的指针操作都加锁。如果在 Plugin 析构的时候,进行同步的删除,那么就需要频繁的加锁,性能会很差。因此, Plugin 在进行析构的时候,指标对象,也就是上面链表中的节点会被标记删除,这样再定期的进行统一删除,可以减少锁的竞争。
3.4.1 标记删除
MetricsRecordRef 对象的析构函数中,会把内部的mMetrics 标记为待删除的节点。
MetricsRecordRef::~MetricsRecordRef() { if (mMetrics) { mMetrics->MarkDeleted(); } }
3.4.2 统一释放
WriteMetrics 在DoSnapshot 函数内部,遍历链表的过程中,会把待删除的节点临时存储下来,最后统一执行删除操作,删除操作是无锁的,因为该对象不会被其他线程访问。
MetricsRecord* WriteMetrics::DoSnapshot() { // new read head MetricsRecord* toDeleteHead = nullptr; // 遍历链表,将待删除的节点加到toDeleteHead中 // 执行删除 while (toDeleteHead) { MetricsRecord* toDelete = toDeleteHead; toDeleteHead = toDeleteHead->GetNext(); delete toDelete; writeMetricsDeleteTotal ++; } return snapshot; }
ReadMetrics 在每次执行UpdateMetrics 函数的时候,会调用WriteMetrics 的DoSnapshot,获取新的链表,头部指针转换之后,旧的链表会统一删除,删除操作也是无锁的。
void ReadMetrics::UpdateMetrics() { MetricsRecord* snapshot = WriteMetrics::GetInstance()->DoSnapshot(); MetricsRecord* toDelete; { // Only lock when change head WriteLock lock(mReadWriteLock); toDelete = mHead; mHead = snapshot; } // 删除旧链表 while (toDelete) { MetricsRecord* obj = toDelete; toDelete = toDelete->GetNext(); delete obj; } }
4. 参考文章
4、iLogtail
来源|阿里云开发者公众号
作者|太业