原始网站:http://www.boost.org/doc/libs/1_53_0/doc/html/lockfree.html
前言
Boost1.53版本中新增加了lock-free库,终于有这么一款官方的lock-free结构出来了。以前在做高性能服务器处理的时候自己费了不少精力去网上搜索代码测试,不仅浪费精力而且可靠性还不敢保证。当然在项目中我使用最多的还是one-one的circle-buffer的方式,其实也就是boost::lockfree::spsc_queue,lock-free结构减少了大量的系统调用,因此特定场合下提升的性能还是非常明显的。总而言之,Boost的lock-free库还是比较值得期待和学习的。
Non-blocking数据结构不再依赖于locks和mutexes去保证线程的安全。同步的操作完全在user-space完成,不需要直接和操作系统交互。不再依赖于guards,non-blocking数据结构需要atomic operations,尤其是CPU执行中不被中断。并不是所有的硬件支持相同系列的 atomic instructions,硬件不支持的将会使用gurard来进行模拟,当然这已经失去了lock-free的优势。
影响lock-free的三种情况:
Atomic Operations: 硬件提供atomic操作,这时候将会用spinlocks进行模拟,则会导致操作被block。
Memory Allocations:操作系统的内存分配不是lock-free的。因此没有办法去实现真正的dynamically-sized的数据结构。boost.lockfree使用memory pool去分配内部的nodes。如果memory pool耗尽,则新节点的内存需要向操作系统进行索取导致内存分配。但是对于boost.lockfree可以采用special call访问失败的方式来避免内存分配。见后续lock-free内存相关设置。
Exception Handling:C++ exception处理没有任何保证其real-time处理的行为。因此我们不鼓励在excetion的处理中使用lock-free代码。
Boost.lockfree实现了三种数据结构:
boost::lockfree::queue: a lock-free multi-produced/multi-consumer queue boost::lockfree::stack: a lock-free multi-produced/multi-consumer stack boost::lockfree::spsc_queue: a wait-free single-producer/single-consumer queue (commonly known as ringbuffer)
lock-free内存相关设置:
- boost::lockfree::fixed_sized, defaults to
boost::lockfree::fixed_sized<false>
Can be used to completely disable dynamic memory allocations during push in order to ensure lockfree behavior. If the data structure is configured as fixed-sized, the internal nodes are stored inside an array and they are addressed by array indexing. This limits the possible size of the queue to the number of elements that can be addressed by the index type (usually 2**16-2), but on platforms that lack double-width compare-and-exchange instructions, this is the best way to achieve lock-freedom. - boost::lockfree::capacity, optional If this template argument is passed to the options, the size of the queue is set at compile-time. It this option implies
fixed_sized<true>
- boost::lockfree::allocator, defaults to
boost::lockfree::allocator<std::allocator<void>>
Specifies the allocator that is used for the internal freelist
Example
#include <boost/thread/thread.hpp> #include <boost/lockfree/queue.hpp> #include <iostream> #include <boost/atomic.hpp> boost::atomic_int producer_count(0); boost::atomic_int consumer_count(0); boost::lockfree::queue<int> queue(128); const int iterations = 10000000; const int producer_thread_count = 4; const int consumer_thread_count = 4; void producer(void) { for (int i = 0; i != iterations; ++i) { int value = ++producer_count; while (!queue.push(value)) ; } } boost::atomic<bool> done (false); void consumer(void) { int value; while (!done) { while (queue.pop(value)) ++consumer_count; } while (queue.pop(value)) ++consumer_count; } int main(int argc, char* argv[]) { using namespace std; cout << "boost::lockfree::queue is "; if (!queue.is_lock_free()) cout << "not "; cout << "lockfree" << endl; boost::thread_group producer_threads, consumer_threads; for (int i = 0; i != producer_thread_count; ++i) producer_threads.create_thread(producer); for (int i = 0; i != consumer_thread_count; ++i) consumer_threads.create_thread(consumer); producer_threads.join_all(); done = true; consumer_threads.join_all(); cout << "produced " << producer_count << " objects." << endl; cout << "consumed " << consumer_count << " objects." << endl; }
#include <boost/thread/thread.hpp> #include <boost/lockfree/stack.hpp> #include <iostream> #include <boost/atomic.hpp> boost::atomic_int producer_count(0); boost::atomic_int consumer_count(0); boost::lockfree::stack<int> stack(128); const int iterations = 1000000; const int producer_thread_count = 4; const int consumer_thread_count = 4; void producer(void) { for (int i = 0; i != iterations; ++i) { int value = ++producer_count; while (!stack.push(value)) ; } } boost::atomic<bool> done (false); void consumer(void) { int value; while (!done) { while (stack.pop(value)) ++consumer_count; } while (stack.pop(value)) ++consumer_count; } int main(int argc, char* argv[]) { using namespace std; cout << "boost::lockfree::stack is "; if (!stack.is_lock_free()) cout << "not "; cout << "lockfree" << endl; boost::thread_group producer_threads, consumer_threads; for (int i = 0; i != producer_thread_count; ++i) producer_threads.create_thread(producer); for (int i = 0; i != consumer_thread_count; ++i) consumer_threads.create_thread(consumer); producer_threads.join_all(); done = true; consumer_threads.join_all(); cout << "produced " << producer_count << " objects." << endl; cout << "consumed " << consumer_count << " objects." << endl; }
#include <boost/thread/thread.hpp>#include <boost/lockfree/spsc_queue.hpp> #include <iostream>#include <boost/atomic.hpp>int producer_count = 0; boost::atomic_int consumer_count (0); boost::lockfree::spsc_queue<int, boost::lockfree::capacity<1024> > spsc_queue; const int iterations = 10000000;void producer(void){ for (int i = 0; i != iterations; ++i) { int value = ++producer_count; while (!spsc_queue.push(value)) ; }} boost::atomic<bool> done (false);void consumer(void){ int value; while (!done) { while (spsc_queue.pop(value)) ++consumer_count; } while (spsc_queue.pop(value)) ++consumer_count;}int main(int argc, char* argv[]){ using namespace std; cout << "boost::lockfree::queue is "; if (!spsc_queue.is_lock_free()) cout << "not "; cout << "lockfree" << endl; boost::thread producer_thread(producer); boost::thread consumer_thread(consumer); producer_thread.join(); done = true; consumer_thread.join(); cout << "produced " << producer_count << " objects." << endl; cout << "consumed " << consumer_count << " objects." << endl;}