五、内存序问题
5.1 什么是内存序问题
内存序(memory order)问题是由于多线程的并行执行可能导致的对共享变量的读写操作无法按照程序员预期的顺序进行。
简单来说,编译器为了提高运算速度,有时候会做出违背代码原有顺序的优化。虽然顺序改变了,但执行的结果不会变。比如下面一段代码
int i=10; int j=20; i+=2; j+=3;
我们以为执行顺序是从上往下,但编译器任务i和j没有关联,可能会优化成
int i=10; i+=2; int j=20; j+=3;
在单核处理器的情况下,这种优化没问题,因为执行的结果不会变。但是如果是多核处理器,多线程并行执行,就会出现一些难以预知的问题。比如编译器和处理器可能会重排共享变量的写操作,使得其中一个线程的写操作先于另一个线程的写操作执行。这样会导致增量值丢失或重复计算,最终的结果可能小于预期的值。
总结来说,就是编译器和CPU会优化重排指令,改变原始程序中指令的执行顺序。这可能会导致多线程间的竞态条件和数据依赖关系出现问题,从而使得程序的行为产生难以预测的结果。
需要使用适当的内存序来指定对共享变量的读写操作的顺序和同步行为。
5.2 内存序
内存徐规定了多个线程访问同一个内存地址的语义,即
1)某个线程对内存地址的更新何时能被其他线程看见;
2)某个线程对内存地址访问附近可以做怎么样的优化;
5.2.1. memory_order_relaxed
松散内存序,只用来保证对原子对象的操作是原子的,在不需要保证顺序时使用。(保证原子性,不保证顺序性和同步性)
5.2.2. memory_order_release
释放操作,在写入某原子对象时,当前线程的任何前面的读写操作都不允许重排到这个操作的后面去,并且当前线程的所有内存写入都在对同一个原子对象进行获取的其他线程可见。(保证原子性和同步性,顺序是当前线程的前面不能写到后面;但当前线程的后面可以写到前面)
5.2.3. memory_order_acquire
获得操作,在读取某原子对象时,当前线程的任何后面的读写操作都不允许重排到这个操作的前面去,并且其他线程在对同一个原子对象释放之前的所有内存写入都在当前线程可见。(保证原子性和同步性,顺序是当前线程的后面不能写到前面;但当前线程的前面可以写到后面)
5.2.4. memory_order_acq_rel
获得释放操作,一个读‐修改‐写操作同时具有获得语义和释放语义,即它前后的任何读写操作都不允许重排,并且其他线程在对同一个原子对象释放之前的所有内存写入都在当前线程可见,当前线程的所有内存写入都在对同一个原子对象进行获取的其他线程可见;
5.2.5. memory_order_seq_cst
顺序一致性语义,对于读操作相当于获得,对于写操作相当于释放,对于读‐修改‐写操作相当于获得释放,是所有原子操作的默认内存序,并且会对所有使用此模型的原子操作建立一个全局顺序,保证了多个原子变量的操作在所有线程里观察到的操作顺序相同,当然它是最慢的同步模型。
5.3 内存屏障
内存屏障(Memory Barrier)是一种硬件或软件指令,用于控制处理器和内存系统中对内存操作的重新排序和优化。它们的作用是确保在屏障之前和之后的内存访问按照预期的顺序进行。
内存屏障主要有两种类型:读屏障(Read Barrier)和写屏障(Write Barrier)。
读屏障(也称为加载屏障):确保在读取一个变量的值之前,所有之前的读取操作和加载操作都已经完成。这可以防止读取过期的或无效的数据。
写屏障(也称为存储屏障):确保在写入一个变量的值之前,所有之前的写入操作和存储操作都已经完成。这可以防止将新的值预先存储到缓存而不是实际写入到内存中。
内存屏障的使用可以避免在多线程或并发环境下出现的一些问题,例如数据竞争、乱序执行和原子操作的正确性。通过插入内存屏障,可以使得代码在一个屏障之前或之后的内存访问按照预期的顺序执行,从而确保正确的内存可见性和一致性。
5.3.1atomic_thread_fence()
创建一个内存屏障(memory barrier),用于限制内存访问的重新排序和优化。它可以保证在屏障之前的所有内存操作都在屏障完成之前完成。
void atomic_thread_fence(std::memory_order order);
常见的 memory_order 参数包括:
std::memory_order_relaxed:最轻量级的内存顺序,允许重排和优化。
std::memory_order_acquire:在屏障之前的内存读操作必须在屏障完成之前完成。
std::memory_order_release:在屏障之前的内存写操作必须在屏障完成之前完成。
std::memory_order_acq_rel:同时具有 acquire 和 release 语义,适用于同时进行读写操作的屏障。
std::memory_order_seq_cst:对于读操作相当于获得,对于写操作相当于释放。
六、测试代码
6.1 多线程加锁
四个线程,每个线程实现count++ 500次。最后结果应该是cout = 2000;
在没有加锁的情况下,会出现cout ≠ 2000
#include <chrono> #include <iostream> #include <thread> #include <assert.h> #define USE_ATOMIC 1 #if USE_MUTEX #include <mutex> std::mutex mtx; int count = 0; #elif USE_SPINLOCK #include "spinlock.h" using spinlock_t = struct spinlock; spinlock_t spin; int count = 0; #elif USE_ATOMIC #include <atomic> std::atomic<int> count{0}; #else int count = 0; #endif void incrby(int num) { for (int i=0; i < num; i++) { #if USE_MUTEX mtx.lock(); ++count; mtx.unlock(); #elif USE_SPINLOCK spinlock_lock(&spin); ++count; spinlock_unlock(&spin); #elif USE_ATOMIC count.fetch_add(1); #else ++count; #endif } } int main() { #ifdef USE_SPINLOCK spinlock_init(&spin); #endif for (int i = 0; i < 100; i++) { #ifdef USE_ATOMIC count.store(0); #else count = 0; #endif std::thread a(incrby, 500); std::thread b(incrby, 500); std::thread c(incrby, 500); std::thread d(incrby, 500); a.join(); b.join(); c.join(); d.join(); #ifdef USE_ATOMIC if (count.load() != 2000) { #else if (count != 2000) { #endif std::cout << "i:" << i << " count:" << count << std::endl; break; } } return 0; }
6.2 内存序问题
1)实现功能:
初始x,y都为false;z为0;
线程a:先将x写为true,再将y写为true
线程b:自旋等待,知道读到的y为true,再判断达到x是否为true,若x为true,++z
2)若函数write_x_then_y()和read_y_then_x()的内存序如下
则在极少数情况下,结果z不为1.
这是因为memory_order_relaxed不保证执行顺序,因此编译器和处理器可能会优化重排。此时会出现一种执行顺序:4~> 1 ~> 2 ~> 3 。
这就是线程b中对共享变量y的读操作,先于线程a对y的写操作执行。这样会导致某个线程读取到较旧的值,而不是最新的值,影响最终结果的准确性。
3)若函数write_x_then_y()和read_y_then_x()的内存序如下
这时结果z一定为1。
这是因为内存序memory_order_release有顺序性,保守当前线程前面的操作不能优化到后面,即1一定在2前面。并且还有同步性,即2若执行了,1一定执行了。
内存序memory_order_acquire有顺序性,保守当前线程后面的操作不能优化到前面,即3一定在4前面.
#include <atomic> #include <thread> #include <assert.h> #include <iostream> std::atomic<bool> x,y; std::atomic<int> z; void write_x_then_y() { x.store(true,std::memory_order_relaxed); // 1 y.store(true,std::memory_order_release); // 2 y = true x= true } void read_y_then_x() { while(!y.load(std::memory_order_acquire)); // 3 自旋,等待y被设置为true if(x.load(std::memory_order_relaxed)) // 4 ++z; } int main() { x=false; y=false; z=0; std::thread a(write_x_then_y); std::thread b(read_y_then_x); a.join(); b.join(); std::cout << z.load(std::memory_order_relaxed) << std::endl; return 0; }
6.3 多线程同步问题
实现功能:
创建2个线程,线程t1执行i从0加到9999,结果按原子操作写入x;线程t2执行i从0减到-9999,结果按原子操作写入x;最后打印出x的值。
6.3.1 问题
代码1的执行结果如图,结果出现错乱。有时候是9999,有时候是-9999。这是因为两个线程将结果存入主存的顺序是不确定的。
代码1
#include <atomic> #include <thread> #include <iostream> std::atomic<int> x(0); void thread_func1() { for (int i = 0; i < 100000; ++i) { x.store(i, std::memory_order_relaxed); } } void thread_func2() { for (int i = 0; i < 100000; ++i) { x.store(-i, std::memory_order_relaxed); } } int main() { std::thread t1(thread_func1); std::thread t2(thread_func2); t1.join(); t2.join(); std::cout << "Final value of x = " << x.load(std::memory_order_relaxed) << std::endl; return 0; }
6.3.2 解法一:标志位
代码2通过添加一个标志位来控制线程t1的执行
代码2
#include <atomic> #include <thread> #include <iostream> std::atomic<int> x(0); std::atomic<bool> t1_finished(false); // 标志位,表示线程t1是否已完成 void thread_func1() { for (int i = 0; i < 100000; ++i) { x.store(i, std::memory_order_release); } t1_finished.store(true, std::memory_order_release); // 在线程t1完成后设置标志位为true } void thread_func2() { while (!t1_finished.load(std::memory_order_acquire)) // 检查标志位,如果线程t1未完成,则等待 { std::this_thread::yield(); } for (int i = 0; i < 100000; ++i) { x.store(-i, std::memory_order_release); } } int main() { std::thread t1(thread_func1); std::thread t2(thread_func2); t1.join(); t2.join(); std::cout << "Final value of x = " << x.load(std::memory_order_acquire) << std::endl; return 0; }
6.3.3 解法二:互斥锁
代码3,在线程t2中,每次修改x之前使用了std::lock_guardstd::mutex来自动加锁,以确保线程t2的操作在同一时刻只有一个线程进行。这样就能够保证最后只输出线程t2的结果。
代码3
#include <atomic> #include <thread> #include <iostream> #include <mutex> std::atomic<int> x(0); std::mutex mtx; // 互斥锁 void thread_func1() { for (int i = 0; i < 100000; ++i) { x.store(i, std::memory_order_relaxed); } } void thread_func2() { for (int i = 0; i < 100000; ++i) { std::lock_guard<std::mutex> lock(mtx); // 加锁 x.store(-i, std::memory_order_relaxed); } } int main() { std::thread t1(thread_func1); std::thread t2(thread_func2); t1.join(); t2.join(); std::cout << "Final value of x = " << x.load(std::memory_order_acquire) << std::endl; return 0; }
6.3.4 解法三:内存屏障
在线程t2的循环中插入了std::atomic_thread_fence(std::memory_order_release)内存屏障指令。该指令用于确保在执行x.store(-i, std::memory_order_relaxed)之前的所有先行写操作对于其它线程的读操作都可见。这样我们就能够保证最终只输出线程t2的结果。
#include <atomic> #include <thread> #include <iostream> std::atomic<int> x(0); void thread_func1() { for (int i = 0; i < 100000; ++i) { x.store(i, std::memory_order_release); } } void thread_func2() { for (int i = 0; i < 100000; ++i) { x.store(-i, std::memory_order_release); std::atomic_thread_fence(std::memory_order_release); // 插入内存屏障 } } int main() { std::thread t1(thread_func1); std::thread t2(thread_func2); t1.join(); t2.join(); std::cout << "Final value of x = " << x.load(std::memory_order_acquire) << std::endl; return 0; }