线程同步
在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题就叫做同步
也就是说当一个线程申请锁成功后,一旦它解锁了就不能够再申请锁,而是要到整个线程队尾进行排队,让下一个线程去申请锁。这样有序的去申请锁就叫做同步。
条件变量
条件变量的使用:一个线程等待条件变量的条件成立而被挂起;另一个线程使条件成立后唤醒等待的线程。
也就是说使用条件变量后,所有的线程必须同步去执行,但条件满足时线程就挂起直到另一个线程唤醒它。
条件变量相当于一个线程执行的必要条件,只有满足条件的线程才能继续执行
条件变量的接口
定义条件变量:pthread_cond_t XXX
全局初始化 = PTHREAD_COND_INITALIZER
局部初始化使用:pthread_cond_init
#include <pthread.h> int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr); pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
参数一为条件变量的地址,参数二为可以不关心设为nullptr
销毁条件变量:pthread_cond_destroy
int pthread_cond_destroy(pthread_cond_t *cond);
满足条件变量则挂起等待:pthread_cond_wait
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
参数一为条件变量的地址,参数二为锁的地址(后面会谈到为什么有锁作为参数)
唤醒线程:pthread_cond_signal
int pthread_cond_broadcast(pthread_cond_t *cond);//唤醒全部线程 int pthread_cond_signal(pthread_cond_t *cond);//唤醒一个线程
参数都为条件变量的地址
生产者消费者场景
在日常的生活中,这个场景并不会陌生。例如:供货商 -> 超市 -> 顾客。而供货商就相当于生产者,顾客就相当于消费者,超市就充当一个中间商。顾客不可能直接去跟供货商买东西,而供货商也不会直接卖给顾客东西,超市就充当了这样一个中间的角色。
在线程的角度也是如此,假设现在一批线程充当着生产者的角色,另一批线程充当着消费者的角色。那么生产者线程不会直接就将数据传给消费者线程,而是会将数据放入到一个相当于缓冲区中,而这个过程又可以称为 生产者和消费者之间的解耦
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题
那么对于这种模型又会衍生出三种关系:
消费者和消费者的关系
对于消费者而言,因为缓冲区中的空间有限,而消费者线程只需要将数据写入到缓冲区,可是当缓冲区中已经有线程在写入中了,其他的线程就不能往缓冲区里写了,而是要等到前面的线程写完后再判断缓冲区是否还没满才可以写入。这就形成了消费者和消费者之间的互斥关系
生产者和生产者的关系
对于生产者也是同理,一个线程读时其他线程同样需要等待。这也就形成了互斥关系
生产者和消费者的关系
生产者和消费者既有互斥又有同步,当一个线程写时,另一个线程去读这种情况就会导致数据的不安全性,因此互斥就是为了保证共享资源的安全性。而每次缓冲区都只有一个线程去执行时,其他线程在等待一旦执行缓冲区的线程解锁了,等待的线程就可以马上申请锁去执行缓冲区,这样效率就会大大提高,因此同步是为了提高效率的。
从何体现出效率的提高
上面谈到缓冲区每次都只有一个线程在执行。那么在这个线程访问执行时其他线程在干什么呢?
首先线程在读或写之前肯定是会有其他的任务需要做的,比如创建写的数据,创建存放读到的数据空间等等。那么在一个线程访问缓冲区时,其他的线程就可以做这些访问缓冲区前的任务,一旦访问缓冲区的线程完成了,其他的线程就不需要再去完成访问缓冲区前的任务直接就可以访问缓冲区了,这就是效率提高的表现
因此:效率的提高真正体现的并不是访问缓冲区的过程,而是访问缓存区之前的过程,这也就是多消费者多生产者的意义
Blockqueue
阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。
阻塞队列为空时,消费者线程将被阻塞,直到阻塞队列被放入元素
阻塞队列已满时,生产者线程将被阻塞,直到有元素被取出
那么利用这个阻塞队列结合条件变量和锁,就可以编写出一套简单的模型。
blockqueue.hpp
#pragma once #include <iostream> #include <pthread.h> #include <queue> // 设置默认的最大容量 static int max = 10; template <class T> class blockqueue { public: blockqueue(const int &maxnum = max) : _maxnum(maxnum) { pthread_mutex_init(&_lock, nullptr); pthread_cond_init(&_pcond, nullptr); pthread_cond_init(&_ccond, nullptr); } // 插入数据 void push(const T &in) { // 加锁 pthread_mutex_lock(&_lock); // 判断队列是否满了,如果为空则等待 // 充当条件判断的语法必须是while,不能用if while (_q.size() == _maxnum) pthread_cond_wait(&_pcond, &_lock); // 插入数据 _q.push(in); // 走到这里说明队列一定有数据,就可以唤醒消费者的线程 pthread_cond_signal(&_ccond); // 解锁 pthread_mutex_unlock(&_lock); } // 拿到头部数据并删除 void pop(T *out) { // 加锁 pthread_mutex_lock(&_lock); // 判断队列是否满了,如果为空则等待 // 充当条件判断的语法必须是while,不能用if while (_q.size() == 0) pthread_cond_wait(&_ccond, &_lock); // 拿到头部数据并删除 *out = _q.front(); _q.pop(); // 走到这里说明队列一定不会满,就可以唤醒生产者的线程 pthread_cond_signal(&_pcond); // 解锁 pthread_mutex_unlock(&_lock); } ~blockqueue() { pthread_mutex_destroy(&_lock); pthread_cond_destroy(&_pcond); pthread_cond_destroy(&_ccond); } private: std::queue<T> _q; int _maxnum; // 最大容量 pthread_mutex_t _lock; pthread_cond_t _pcond; // 生产者的条件变量 pthread_cond_t _ccond; // 消费者的条件变量 };
注意:为什么上面的代码里判断条件需要用循环而不是if呢,这就要说到上面的为什么pthread_cond_wait的参数里会有锁了。
为什么条件变量的接口有锁作为参数
首先,能够执行到这个接口说明该线程必定申请锁成功了。如果现在线程执行到了wait这个接口,那么线程就会被阻塞。但是这个线程已经申请了锁其他的线程就没有办法再去申请锁了,那么此时这个线程就一定要解锁,而pthread_cond_wait这个接口就会自动的帮这个线程解锁。
当这个线程阻塞后被其他的线程唤醒后,pthread_cond_wait这个接口就会自动帮这个线程再次加锁,所以为了确保再次加锁的锁是和之前的一样的,pthread_cond_wait接口就会带上锁的地址作为参数。
那么为什么要用循环而不是if呢?最主要的原因是,即使这个线程被唤醒了,但是它仍有可能还是处于不满足条件的情况,因此为了确保数据的安全要再次对这个线程进行判断,直到该线程满足条件才能继续往下执行。