这是个难找的bug,c++的bug真是防不胜防。若不是单点调试,在生产环境中可真不好找。以下是我排查此bug的一个过程记录,留作备忘,在以后的使用过程中要小心避坑。
问题产生
我们知道c++的queue和map等数据结构是线程并发不安全的,为此我们常封装实现了线程安全的priority_queue,姑且叫做 thread_safe::priority_queue。(关于c++并发编程这块儿推荐经典书籍《C++并发编程实战》)。本以为封装后就可以放心在多线程中使用了,结果崩溃了,且还是偶发的。
先看以下示例:
#include <iostream> #include <queue> using namespace std; int main() { cout<<"Hello World"<<endl; priority_queue<int> pqueue; //pushing value in pqueue. cout<<pqueue.top(); cout<<"Hello World End"<<endl; return 0; }
再看以下示例:
#include <iostream> #include <queue> using namespace std; int main() { std::cout<<"Hello World\n"; pqueue.push(10); std::cout<<pqueue.top(); pqueue.pop(); pqueue.pop(); if(pqueue.empty()){ std::cout<<"\n pqueue is empty\n"; }else{ std::cout<<"\n pqueue is not empty\n"; } std::cout<<pqueue.top(); std::cout<<"Hello World End\n"; return 0; }
提出几个问题。这两个示例分别会输出什么?
做下测试会发现,第一个示例直接就崩了,第二个会输出 pqueue is not empty,有点儿跟想象的不一样。可能你回说这样的测试无意义吧,正常使用中,连基本的queue是否是empty都不判断吗?
这也是本次bug的导火索。请看在多线程中的示例:
thread_safe::priority_queue<int> priorityQueue_; void task_A() { LOGGING_DEBUG(" task_A start..."); while (true) { // 1 if (priorityQueue_.empty()) { LOGGING_WARN("priorityQueue is empty."); return; } // 2 auto taskPtr = priorityQueue_.top(); if (taskPtr) { } else { LOGGING_ERROR("taskPtr is nullptr."); priorityQueue_.pop(); } std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } }
在多线程环境下, 即便有priorityQueue_.empty()的判断,但是也已经失去了意义。因为并发的情况下,执行到2时,能保证priorityQueue_非空?可能它已经是empty了。
以下是本次测试thread_safe_queue的实现:
namespace thread_safe { template <class T, class Container = std::deque<T>> class queue { public: explicit queue(const Container &ctnr = Container()) : storage(ctnr) {} bool empty() const { std::lock_guard<std::mutex> lock(mutex); return storage.empty(); } size_t size() const { std::lock_guard<std::mutex> lock(mutex); return storage.size(); } T &back() { std::lock_guard<std::mutex> lock(mutex); return storage.back(); } const T &back() const { std::lock_guard<std::mutex> lock(mutex); return storage.back(); } T &front() { std::lock_guard<std::mutex> lock(mutex); return storage.front(); } const T &front() const { std::lock_guard<std::mutex> lock(mutex); return storage.front(); } void push(const T &u) { std::lock_guard<std::mutex> lock(mutex); storage.push(u); } void pop() { std::lock_guard<std::mutex> lock(mutex); storage.pop(); } private: std::queue<T, Container> storage; mutable std::mutex mutex; }; template <class T, class Container = std::vector<T>, class Compare = std::less<typename Container::value_type>> class priority_queue { public: explicit priority_queue(const Compare &x = Compare(), const Container &y = Container()) : storage(x, y) {} template <class InputIterator> priority_queue(InputIterator first, InputIterator last, const Compare &x = Compare(), const Container &y = Container()) : storage(first, last, x, y) { } bool empty() const { std::lock_guard<std::mutex> lock(mutex); return storage.empty(); } size_t size() const { std::lock_guard<std::mutex> lock(mutex); return storage.size(); } // T &top(void) // { // std::lock_guard<std::mutex> lock(mutex); // return storage.top(); // } const T &top() const { std::lock_guard<std::mutex> lock(mutex); return storage.top(); } void push(const T &u) { std::lock_guard<std::mutex> lock(mutex); storage.push(u); } void pop() { std::lock_guard<std::mutex> lock(mutex); if (!storage.empty()) storage.pop(); } private: std::priority_queue<T, Container, Compare> storage; mutable std::mutex mutex; }; } // namespace thread_safe
可以基于此封装在多线程中测试验证下。
结论
一定要多做测试,尤其是在多线程的环境下。涉及全局资源的访问要谨慎,必要时要加锁给予保护。不能因为封装实现了thread_safe_queue就认为真的safe了。以上的那个示例,priorityQueue_做了封装,但它也是全局资源的一种,并不能放心的在多线程下使用,该加锁的地方还是得加锁。
此外类似queue的这种使用,要确保在一个原子操作内完成,不可被打断。试想一个线程刚好pop,另外一个线程却刚要执行top会怎样?逻辑就错了。 还有队列为了入队出队的准确性,最好只有一对一的生产和消费,否则结果未必准确。