前言
- 将可能被多个执行流同时访问的资源叫做临界资源,临界资源需要进行保护否则会出现数据不一致等问题。
- 当我们仅用一个互斥锁对临界资源进行保护时,相当于我们将这块临界资源看作一个整体,同一时刻只允许一个执行流对这块临界资源进行访问。
- 但实际我们可以将这块临界资源再分割为多个区域,当多个执行流需要访问临界资源时,如果这些执行流访问的是临界资源的不同区域,那么我们可以让这些执行流同时访问临界资源的不同区域,此时不会出现数据不一致等问题。
POSIX信号量
信号量的概念
信号量本质是一个计数器(不设置全局变量是因为进程间是相互独立的,而这不一定能看到,看到也不能保证++引用计数为原子操作),用于多进程对共享数据对象的读取,它和管道有所不同,它不以传送数据为主要目的,它主要是用来保护共享资源(信号量也属于临界资源),使得资源在一个时刻只有一个进程独享。
每个执行流在进入临界区之前都应该先申请信号量,申请成功就有了操作特点的临界资源的权限,当操作完毕后就应该释放信号量。
信号量的工作原理
由于信号量只能进行两种操作等待和发送信号,即P(sv)和V(sv),他们的行为是这样的:
(1)P(sv):我们将申请信号量称为P操作,申请信号量的本质就是申请获得临界资源中某块资源的使用权限,当申请成功时临界资源中资源的数目应该减去一,因此P操作的本质就是让计数器减一,如果sv的值大于零,就给它减1;如果它的值为零,就挂起该进程的执行
(2)V(sv):我们将释放信号量称为V操作,释放信号量的本质就是归还临界资源中某块资源的使用权限,当释放成功时临界资源中资源的数目就应该加一,因此V操作本质就是让计数器加一,如果有其他进程因等待sv而被挂起,就让它恢复运行,如果没有进程因等待sv而挂起,就给他加1
PV操作必须是原子操作
多个执行流为了访问临界资源会竞争式的申请信号量, 因此信号量是会被多个执行流同时访问的,也就是说信号量本质也是临界资源。
信号量本质就是用于保护临界资源的,我们不可能再用信号量去保护信号量,所以信号量的PV操作必须是原子操作。
注意: 内存当中变量的++
、--
操作并不是原子操作,因此信号量不可能只是简单的对一个全局变量进行++
、--
操作。
申请信号量失败被挂起等待
当执行流在申请信号量时,可能此时信号量的值为0,也就是说信号量描述的临界资源已经全部被申请了,此时该执行流就应该在该信号量的等待队列当中进行等待,直到有信号量被释放时再被唤醒。
注意: 信号量的本质是计数器,但不意味着只有计数器,信号量还包括一个等待队列。
信号量函数
初始化信号量
初始化信号量的函数叫做sem_init,该函数的函数原型如下:
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数说明:
- sem:需要初始化的信号量。
- pshared:传入0值表示线程间共享,传入非零值表示进程间共享。
- value:信号量的初始值(计数器的初始值)。
返回值说明:
- 初始化信号量成功返回0,失败返回-1。
注意: POSIX信号量和System V信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的,但POSIX信号量可以用于线程间同步。
销毁信号量
销毁信号量的函数叫做sem_destroy,该函数的函数原型如下:
int sem_destroy(sem_t *sem);
参数说明:
- sem:需要销毁的信号量。
返回值说明:
- 销毁信号量成功返回0,失败返回-1。
等待信号量(申请信号量)
等待信号量的函数叫做sem_wait,该函数的函数原型如下:
int sem_wait(sem_t *sem);
参数说明:
- sem:需要等待的信号量。
返回值说明:
- 等待信号量成功返回0,信号量的值减一。
- 等待信号量失败返回-1,信号量的值保持不变。
发布信号量(释放信号量)
发布信号量的函数叫做sem_post,该函数的函数原型如下:
int sem_post(sem_t *sem);
参数说明
- sem:需要发布的信号量。
返回值说明:
- 发布信号量成功返回0,信号量的值加一。
- 发布信号量失败返回-1,信号量的值保持不变。
二元信号量模拟实现互斥功能
二元信号量是最简单的一种锁(互斥锁),它只用两种状态:占用与非占用,所以它的引用计数为1,说明信号量所描述的临界资源只有一份,此时信号量的作用基本就等价于互斥锁
例如,下面我们实现一个多线程抢票系统,其中我们用二元信号量模拟实现多线程互斥。
我们在主线程当中创建四个新线程,让这四个新线程执行抢票逻辑,并且每次抢完票后打印输出此时剩余的票数,其中我们用全局变量tickets记录当前剩余的票数,此时tickets是会被多个执行流同时访问的临界资源,在下面的代码中我们并没有对tickets进行任何保护操作。
#include <iostream> #include <string> #include <unistd.h> #include <pthread.h> int tickets = 2000; void* TicketGrabbing(void* arg) { std::string name = (char*)arg; while (true){ if (tickets > 0){ usleep(1000); std::cout << name << " get a ticket, tickets left: " << --tickets << std::endl; } else{ break; } } std::cout << name << " quit..." << std::endl; pthread_exit((void*)0); } int main() { pthread_t tid1, tid2, tid3, tid4; pthread_create(&tid1, nullptr, TicketGrabbing, (void*)"thread 1"); pthread_create(&tid2, nullptr, TicketGrabbing, (void*)"thread 2"); pthread_create(&tid3, nullptr, TicketGrabbing, (void*)"thread 3"); pthread_create(&tid4, nullptr, TicketGrabbing, (void*)"thread 4"); pthread_join(tid1, nullptr); pthread_join(tid2, nullptr); pthread_join(tid3, nullptr); pthread_join(tid4, nullptr); return 0; }
运行代码后可以看到,线程打印输出剩余票数时出现了票数剩余为负数的情况,这是不符合我们预期的。
下面我们在抢票逻辑当中加入二元信号量,让每个线程在访问全局变量tickets之前先申请信号量,访问完毕后再释放信号量,此时二元信号量就达到了互斥的效果。
#include <iostream> #include <string> #include <unistd.h> #include <pthread.h> #include <semaphore.h> class Sem{ public: Sem(int num) { sem_init(&_sem, 0, num); } ~Sem() { sem_destroy(&_sem); } void P() { sem_wait(&_sem); } void V() { sem_post(&_sem); } private: sem_t _sem; }; Sem sem(1); //二元信号量 int tickets = 2000; void* TicketGrabbing(void* arg) { std::string name = (char*)arg; while (true){ sem.P(); if (tickets > 0){ usleep(1000); std::cout << name << " get a ticket, tickets left: " << --tickets << std::endl; sem.V(); } else{ sem.V(); break; } } std::cout << name << " quit..." << std::endl; pthread_exit((void*)0); } int main() { pthread_t tid1, tid2, tid3, tid4; pthread_create(&tid1, nullptr, TicketGrabbing, (void*)"thread 1"); pthread_create(&tid2, nullptr, TicketGrabbing, (void*)"thread 2"); pthread_create(&tid3, nullptr, TicketGrabbing, (void*)"thread 3"); pthread_create(&tid4, nullptr, TicketGrabbing, (void*)"thread 4"); pthread_join(tid1, nullptr); pthread_join(tid2, nullptr); pthread_join(tid3, nullptr); pthread_join(tid4, nullptr); return 0; }
运行代码后就不会出现剩余票数为负的情况了,因为此时同一时刻只会有一个执行流对全局变量tickets进行访问,不会出现数据不一致的问题。
基于环形队列的生产消费模型
空间资源和数据资源
生产者关注的是空间资源,消费者关注的是数据资源
对于生产者和消费者来说,它们关注的资源是不同的:
- 生产者关注的是环形队列当中是否有空间(blank),只要有空间生产者就可以进行生产。
- 消费者关注的是环形队列当中是否有数据(data),只要有数据消费者就可以进行消费。
blank_sem和data_sem的初始值设置
现在我们用信号量来描述环形队列当中的空间资源(blank_sem)和数据资源(data_sem),在我们初始信号量时给它们设置的初始值是不同的:
- blank_sem的初始值我们应该设置为环形队列的容量,因为刚开始时环形队列当中全是空间
- data_sem的初始值我们应该设置为0,因为刚开始时环形队列当中没有数据。
生产者和消费者申请和释放资源
生产者申请空间资源,释放数据资源
对于生产者来说,生产者每次生产数据前都需要先申请blank_sem:
如果blank_sem的值不为0,则信号量申请成功,此时生产者可以进行生产操作。
如果blank_sem的值为0,则信号量申请失败,此时生产者需要在blank_sem的等待队列下进行阻塞等待,直到环形队列当中有新的空间再被唤醒。
当生产者生产完数据后,应该释放data_sem:
虽然生产者在进行生产前是对blank_sem进行的P操作,但是当生产者生产完数据,应该对data_sem进行V操作而不是blank_sem.
生产者在生产数据前申请到的是blank位置,当生产者生产完数据后,该位置当中存储的是生产者生产的数据,在该数据被消费者消费之前,该位置不再是blank位置,儿应该是data位置。
- 当生产者生产完数据后,意味着环形队列当中多了一个data位置,因此我们应该对data_sem进行v操作。
消费者申请数据资源,释放空间资源
对于消费者来说,消费者每次消费数据前都需要先申请data_sem:
- 如果data_sem的值不为0,则信号量申请成功,此时消费者可以进行消费操作。
- 如果data_sem的值为0,则信号量申请失败,此时消费者需要在data_sem的等待队列下进行阻塞等待,直到环形队列当中有新的数据后再被唤醒。
当消费者消费完数据后,应该释放blank_sem:
- 虽然消费者在进行消费前是对data_sem进行的P操作,但是当消费者消费完数据,应该对blank_sem进行V操作而不是data_sem。
- 消费者在消费数据前申请到的是data位置,当消费者消费完数据后,该位置当中的数据已经被消费过了,再次被消费就没有意义了,为了让生产者后续可以在该位置生产新的数据,我们应该将该位置算作blank位置,而不是data位置。
- 当消费者消费完数据后,意味着环形队列当中多了一个blank位置,因此我们应该对blank_sem进行V操作。
必须遵守的两个规则
在基于环形队列的生产者和消费者模型当中,生产者和消费者必须遵守如下两个规则。
第一个规则:生产者和消费者不能对同一个位置进行访问。
生产者和消费者在访问环形队列时:
- 如果生产者和消费者访问的是环形队列当中的同一个位置,那么此时生产者和消费者就相当于同时对这一块临界资源进行了访问,这当然是不允许的。
- 而如果生产者和消费者访问的是环形队列当中的不同位置,那么此时生产者和消费者是可以同时进行生产和消费的,此时不会出现数据不一致等问题。
第二个规则:无论是生产者还是消费者,都不应该将对方套一个圈以上。
- 生产者从消费者的位置开始一直按顺时针方向进行生产,如果生产者生产的速度比消费者消费的速度快,那么当生产者绕着消费者生产了一圈数据后再次遇到消费者,此时生产者就不应该再继续生产了,因为再生产就会覆盖还未被消费者消费的数据。
- 同理,消费者从生产者的位置开始一直按顺时针方向进行消费,如果消费者消费的速度比生产者生产的速度快,那么当消费者绕着生产者消费了一圈数据后再次遇到生产者,此时消费者就不应该再继续消费了,因为再消费就会消费到缓冲区中保存的废弃数据。
代码实现
其中的RingQueue就是生产者消费者模型当中的交易场所,我们可以用C++STL库当中的vector进行实现。
#pragma once #include <iostream> #include <unistd.h> #include <pthread.h> #include <semaphore.h> #include <vector> #define NUM 8 template<class T> class RingQueue { private: //P操作 void P(sem_t& s) { sem_wait(&s); } //V操作 void V(sem_t& s) { sem_post(&s); } public: RingQueue(int cap = NUM) : _cap(cap), _p_pos(0), _c_pos(0) { _q.resize(_cap); sem_init(&_blank_sem, 0, _cap); //blank_sem初始值设置为环形队列的容量 sem_init(&_data_sem, 0, 0); //data_sem初始值设置为0 } ~RingQueue() { sem_destroy(&_blank_sem); sem_destroy(&_data_sem); } //向环形队列插入数据(生产者调用) void Push(const T& data) { P(_blank_sem); //生产者关注空间资源 _q[_p_pos] = data; V(_data_sem); //生产 //更新下一次生产的位置 _p_pos++; _p_pos %= _cap; } //从环形队列获取数据(消费者调用) void Pop(T& data) { P(_data_sem); //消费者关注数据资源 data = _q[_c_pos]; V(_blank_sem); //更新下一次消费的位置 _c_pos++; _c_pos %= _cap; } private: std::vector<T> _q; //环形队列 int _cap; //环形队列的容量上限 int _p_pos; //生产位置 int _c_pos; //消费位置 sem_t _blank_sem; //描述空间资源 sem_t _data_sem; //描述数据资源 };
相关说明:
- 当不设置环形队列的大小时,我们默认将环形队列的容量上限设置为8。代码中的RingQueue是用vector实现的,生产者每次生产的数据放到vector下标为p_pos的位置,消费者每次消费的数据来源于vector下标为c_pos的位置。
- 生产者每次生产数据后p_pos都会进行++,标记下一次生产数据的存放位置,++后的下标会与环形队列的容量进行取模运算,实现“环形”的效果。
- 消费者每次消费数据后c_pos都会进行++,标记下一次消费数据的来源位置,++后的下标会与环形队列的容量进行取模运算,实现“环形”的效果。
- p_pos只会由生产者线程进行更新,c_pos只会由消费者线程进行更新,对这两个变量访问时不需要进行保护,因此代码中将p_pos和c_pos的更新放到了V操作之后,就是为了尽量减少临界区的代码。
为了方便理解,我们这里实现单生产者、单消费者的生产者消费者模型。于是在主函数我们就只需要创建一个生产者线程和一个消费者线程,生产者线程不断生产数据放入环形队列,消费者线程不断从环形队列里取出数据进行消费。
#include "RingQueue.hpp" void* Producer(void* arg) { RingQueue<int>* rq = (RingQueue<int>*)arg; while (true){ sleep(1); int data = rand() % 100 + 1; rq->Push(data); std::cout << "Producer: " << data << std::endl; } } void* Consumer(void* arg) { RingQueue<int>* rq = (RingQueue<int>*)arg; while (true){ sleep(1); int data = 0; rq->Pop(data); std::cout << "Consumer: " << data << std::endl; } } int main() { srand((unsigned int)time(nullptr)); pthread_t producer, consumer; RingQueue<int>* rq = new RingQueue<int>; pthread_create(&producer, nullptr, Producer, rq); pthread_create(&consumer, nullptr, Consumer, rq); pthread_join(producer, nullptr); pthread_join(consumer, nullptr); delete rq; return 0; }
相关说明:
- 环形队列要让生产者线程向队列中Push数据,让消费者线程从队列中Pop数据,因此这个环形队列必须要让这两个线程同时看到,所以我们在创建生产者线程和消费者线程时,需要将环形队列作为线程执行例程的参数进行传入。
- 代码中生产者生产数据就是将获取到的随机数Push到环形队列,而消费者就是从环形队列Pop数据,为了便于观察,我们可以将生产者生产的数据和消费者消费的数据进行打印输出。
生产者消费者步调一致
由于代码中生产者是每隔一秒生产一个数据,而消费者是每隔一秒消费一个数据,因此运行代码后我们可以看到生产者和消费者的执行步调是一致的。
生产者生产的快,消费者消费的慢
我们可以让生产者不停的进行生产,而消费者每隔一秒进行消费。
此时由于生产者生产的很快,运行代码后一瞬间生产者就将环形队列打满了,此时生产者想要再进行生产,但空间资源已经为0了,于是生产者只能在blank_sem的等待队列下进行阻塞等待,直到由消费者消费完一个数据后对blank_sem进行了V操作,生产者才会被唤醒进而继续进行生产。
但由于生产者的生产速度很快,生产者生产完一个数据后又会进行等待,因此后续生产者和消费者的步调又变成一致的了。
生产者生产的慢,消费者消费的快
当然我们也可以让生产者每隔一秒进行生产,而消费者不停的进行消费。
虽然消费者消费的很快,但一开始环形队列当中的数据资源为0,因此消费者只能在data_sem的等待队列下进行阻塞等待,直到生产者生产完一个数据后对data_sem进行了V操作,消费者才会被唤醒进而进行消费。
但由于消费者的消费速度很快,消费者消费完一个数据后又会进行等待,因此后续生产者和消费者的步调又变成一致的了。
信号量保护环形队列的原理
在blank_sem和data_sem两个信号量的保护后,该环形队列中不可能会出现数据不一致的问题。
因为只有当生产者和消费者指向同一个位置并访问时,才会导致数据不一致的问题,而此时生产者和消费者在对环形队列进行写入或读取数据时,只有两种情况会指向同一个位置:
- 环形队列为空时
- 环形队列为满时。
但是在这两种情况下,生产者和消费者不会同时对环形队列进行访问:
- 当环形队列为空的时,消费者一定不能进行消费,因为此时数据资源为0。
- 当环形队列为满的时,生产者一定不能进行生产,因为此时空间资源为0。
也就是说,当环形队列为空和满时,我们已经通过信号量保证了生产者和消费者的串行化过程。而除了这两种情况之外,生产者和消费者指向的都不是同一个位置,因此该环形队列当中不可能会出现数据不一致的问题。并且大部分情况下生产者和消费者指向并不是同一个位置,因此大部分情况下该环形队列可以让生产者和消费者并发的执行。