六、二元信号量模拟互斥功能
信号量本质是计数器,若将信号量的初始值设置为1,那么此时该信号量被称为二元信号量。信号量的初始值为1,说明信号量所描述的临界资源只有一份,此时信号量的作用基本等价于互斥锁
在主线程中创建五个新线程,让这五个新线程执行抢票逻辑,并且每次抢完票后打印输出此时剩余的票数。用全局变量tickets记录当前剩余的票数,此时tickets是会被多个执行流同时访问的临界资源,在下面的代码中并没有对tickets进行任何保护操作
#include <iostream> #include <string> #include <pthread.h> #include <unistd.h> int tickets = 2000; void* TicketGrabbing(void* arg) { std::string name = (char*)arg; while (true){ if (tickets > 0){ usleep(1000); std::cout << name << " get a ticket, tickets left: " << --tickets << std::endl; } else{ break; } } std::cout << name << " quit..." << std::endl; pthread_exit((void*)0); } int main() { pthread_t tid[5]; pthread_create(tid, nullptr, TicketGrabbing, (void*)"thread 1"); pthread_create(tid + 1, nullptr, TicketGrabbing, (void*)"thread 2"); pthread_create(tid + 2, nullptr, TicketGrabbing, (void*)"thread 3"); pthread_create(tid + 3, nullptr, TicketGrabbing, (void*)"thread 4"); pthread_create(tid + 4, nullptr, TicketGrabbing, (void*)"thread 5"); pthread_join(tid[0], nullptr); pthread_join(tid[1], nullptr); pthread_join(tid[2], nullptr); pthread_join(tid[3], nullptr); pthread_join(tid[4], nullptr); return 0; }
运行代码后发现,线程打印输出剩余票数时出现了票数剩余为负数的情况,这并不符合预期
下面在抢票逻辑中加入二元信号量,让每个线程在访问全局变量tickets之前先申请信号量,访问完毕后再释放信号量,此时就达到了互斥的效果
#include <iostream> #include <string> #include <pthread.h> #include <semaphore.h> #include <unistd.h> class Sem{ public: Sem(int num) { sem_init(&_sem, 0, num); } ~Sem() { sem_destroy(&_sem); } void P() { sem_wait(&_sem); } void V() { sem_post(&_sem); } private: sem_t _sem; }; static Sem sem(1); int tickets = 2000; void* TicketGrabbing(void* arg) { std::string name = (char*)arg; while (true){ sem.P(); if (tickets > 0){ usleep(1000); std::cout << name << " get a ticket, tickets left: " << --tickets << std::endl; sem.V(); } else{ sem.V(); break; } } std::cout << name << " quit..." << std::endl; pthread_exit((void*)0); } int main() { pthread_t tid[5]; pthread_create(tid, nullptr, TicketGrabbing, (void*)"thread 1"); pthread_create(tid + 1, nullptr, TicketGrabbing, (void*)"thread 2"); pthread_create(tid + 2, nullptr, TicketGrabbing, (void*)"thread 3"); pthread_create(tid + 3, nullptr, TicketGrabbing, (void*)"thread 4"); pthread_create(tid + 4, nullptr, TicketGrabbing, (void*)"thread 5"); pthread_join(tid[0], nullptr); pthread_join(tid[1], nullptr); pthread_join(tid[2], nullptr); pthread_join(tid[3], nullptr); pthread_join(tid[4], nullptr); return 0; }
运行代码后就不会出现剩余票数为负的情况了,同一时刻只会有一个执行流对全局变量tickets进行访问,不会出现数据不一致的问题
七、基于RingQueue的生产者消费者模型
7.1 空间资源与数据资源
生产者关注的是空间资源,消费者关注的是数据资源
对于生产者和消费者而言,关注的资源是不同的:
生产者关注的是环形队列当中是否有空间(blank),只要有空间生产者就可以进行生产
消费者关注的是环形队列当中是否有数据(data),只要有数据消费者就可以进行消费
_space_sem和_data_sem的初始值设置
用信号量来描述环形队列当中的空间资源(_space_sem)和数据资源(_data_sem),在初始信号量时设置的初始值是有所不同的:
_space_sem的初始值应该设置为环形队列的容量,因为刚开始时环形队列中全是空间
_data_sem的初始值应该设置为0,因为刚开始时环形队列当中没有数据
7.2 资源的申请与释放
生产者申请空间资源,释放数据资源
对于生产者来说,生产者每次生产数据前都需要先申请_space_sem:
若_space_sem的值不为0,则信号量申请成功,此时生产者可以进行生产操作
若_space_sem的值为0,则信号量申请失败,此时生产者需要在_space_sem的等待队列下进行阻塞等待,直到环形队列当中有新的空间后再被唤醒
当生产者生产完数据后,应释放_data_sem:
虽然生产者在进行生产前是对_space_sem进行的P操作,但是当生产者生产完数据,应该对_data_sem进行V操作而不是_space_sem
生产者在生产数据前申请到的是空间位置,当生产者生产完数据后,该位置当中存储的是生产者生产的数据,在该数据被消费者消费之前,该位置不再是blank位置,而是data位置
当生产者生产完数据后,意味着环形队列当中多了一个data位置,因此应该对_data_sem进行V操作
消费者申请数据资源,释放空间资源
对于消费者来说,消费者每次消费数据前都需要先申请_data_sem:
若_data_sem的值不为0,则信号量申请成功,此时消费者可以进行消费操作
若_data_sem的值为0,则信号量申请失败,此时消费者需要在_data_sem的等待队列下进行阻塞等待,直到环形队列当中有新的数据后再被唤醒
当消费者消费完数据后,应该释放_space_sem:
虽然消费者在进行消费前是对_data_sem进行的P操作,但是当消费者消费完数据,应该对_space_sem进行V操作
消费者在消费数据前申请到的是data位置,当消费者消费完数据后,该位置当中的数据已经被消费过了,再次被消费就没有意义了,为了让生产者后续可以在该位置生产新的数据,应该将该位置算作blank位置,而不是data位置
当消费者消费完数据后,意味着环形队列当中多了一个blank位置,因此应该对_space_sem进行V操作
7.3 两个规则
规则一:生产者和消费者不能对同一个位置进行访问
若生产者和消费者访问的是环形队列当中的同一个位置,那么此时生产者和消费者就相当于同时对这一块临界资源进行了访问,这是不允许的
若生产者和消费者访问的是环形队列当中的不同位置,那么此时生产者和消费者是可以同时进行生产和消费的,此时不会出现数据不一致等问题
规则二:无论是生产者还是消费者,都不应该将对方套一个圈以上
生产者从消费者的位置开始一直按顺时针方向进行生产,若生产者生产的速度比消费者消费的速度快,那么当生产者绕着消费者生产了一圈数据后再次遇到消费者,此时生产者就不应该再继续生产了,因为再生产就会覆盖还未被消费者消费的数据
同理,消费者从生产者的位置开始一直按顺时针方向进行消费,若消费者消费的速度比生产者生产的速度快,那么当消费者绕着生产者消费了一圈数据后再次遇到生产者,此时消费者就不应该再继续消费了,因为再消费就会消费到缓冲区中保存的废弃数据
7.4 代码实现
RingQueue就是生产者消费者模型当中的交易场所,可以用C++STL库中的vector进行模拟
#include "ringQueue.hpp" #include <unistd.h> #include <sys/types.h> #include <cstdlib> #include <ctime> void* Producer(void* arg) { RingQueue<int>* rq = (RingQueue<int>*)arg; while(true) { sleep(1); //构建数据或任务对象(一般从外部获取) int date = rand() % 100 + 1; //Push入环形队列 rq->Push(date); std::cout << "生产:" << date << "[" << pthread_self() << "]" <<std::endl; } } void* Consumer(void* arg) { RingQueue<int>* rq = (RingQueue<int>*)arg; while(true) { sleep(1); //从环形队列中读取数据或任务 int date = 0; rq->Pop(&date); //处理数据或任务 std::cout << "消费:" << date << "[" << pthread_self() << "]" <<std::endl; } } int main() { srand((unsigned)time(nullptr) * getpid()); RingQueue<int>* rq = new RingQueue<int>(); // rq->Debug(); pthread_t producer[3],consumer[2]; pthread_create(producer,nullptr,Producer,rq); pthread_create(producer + 1,nullptr,Producer,rq); pthread_create(producer + 2,nullptr,Producer,rq); pthread_create(consumer,nullptr,Consumer,rq); pthread_create(consumer + 1,nullptr,Consumer,rq); for(int i = 0;i < 3; ++i) pthread_join(producer[i],nullptr); for(int i = 0;i < 2; ++i) pthread_join(consumer[i],nullptr); return 0; }
当不设置环形队列的大小时,默认将环形队列的容量上限设置为5
代码中的RingQueue是用vector模拟的,生产者每次生产的数据放到vector下标为_producer_index的位置,消费者每次消费的数据来源于vector下标为_consumer_index的位置
生产者每次生产数据后_producer_index都会进行++,标记下一次生产数据的存放位置,++后的下标会与环形队列的容量进行取模运算,实现"环形"的效果
消费者每次消费数据后_consumer_index都会进行++,标记下一次消费数据的来源位置,++后的下标会与环形队列的容量进行取模运算,实现"环形"的效果
_producer_index只会由生产者线程进行更新,_consumer_index只会由消费者线程进行更新
生产者线程不断生产数据放入环形队列,消费者线程不断从环形队列里取出数据进行消费
#include "ringQueue.hpp" #include <unistd.h> #include <sys/types.h> #include <cstdlib> #include <ctime> void* Producer(void* arg) { RingQueue<int>* rq = (RingQueue<int>*)arg; while(true) { sleep(1); //构建数据或任务对象(一般从外部获取) int date = rand() % 100 + 1; //Push入环形队列 rq->Push(date); std::cout << "生产:" << date << "[" << pthread_self() << "]" <<std::endl; } } void* Consumer(void* arg) { RingQueue<int>* rq = (RingQueue<int>*)arg; while(true) { sleep(1); //从环形队列中读取数据或任务 int date = 0; rq->Pop(&date); //处理数据或任务 std::cout << "消费:" << date << "[" << pthread_self() << "]" <<std::endl; } } int main() { srand((unsigned)time(nullptr) * getpid()); RingQueue<int>* rq = new RingQueue<int>(); // rq->Debug(); pthread_t producer[3],consumer[2]; pthread_create(producer,nullptr,Producer,rq); pthread_create(producer + 1,nullptr,Producer,rq); pthread_create(producer + 2,nullptr,Producer,rq); pthread_create(consumer,nullptr,Consumer,rq); pthread_create(consumer + 1,nullptr,Consumer,rq); for(int i = 0;i < 3; ++i) pthread_join(producer[i],nullptr); for(int i = 0;i < 2; ++i) pthread_join(consumer[i],nullptr); return 0; }
环形队列要让生产者线程向队列中Push数据,让消费者线程从队列中Pop数据,因此这个环形队列必须要让这两个线程同时看到,所以在创建生产者线程和消费者线程时,需要将环形队列作为线程执行例程的参数进行传入
代码中生产者生产数据就是将获取到的随机数Push到环形队列,而消费者就是从环形队列Pop数据,为了便于观察,可以将生产者生产的数据和消费者消费的数据进行打印输出
将信号量进行封装
#ifndef _SEM_HPP #define _SEM_HPP #include <iostream> #include <semaphore.h> class Sem { public: Sem(size_t value) { sem_init(&_sem,0,value); } ~Sem() { sem_destroy(&_sem); } void P() { sem_wait(&_sem); } void V() { sem_post(&_sem); } private: sem_t _sem; }; #endif
7.5 信号量维护环形队列的原理
在_space_sem和_data_sem两个信号量的保护,该环形队列中不可能会出现数据不一致的问题
只有当生产者和消费者指向同一个位置并访问时,才会导致数据不一致的问题,而此时生产者和消费者在对环形队列进行写入或读取数据时,只有两种情况会指向同一个位置:
环形队列为空时
环形队列为满时
但在这两种情况下,生产者和消费者不会同时对环形队列进行访问:
当环形队列为空的时,消费者一定不能进行消费,因为此时数据资源为0
当环形队列为满的时,生产者一定不能进行生产,因为此时空间资源为0
即当环形队列为空和满时,已经通过信号量保证了生产者和消费者的串行化过程。而除了这两种情况之外,生产者和消费者指向的都不是同一个位置,因此该环形队列当中不可能会出现数据不一致的问题。并且大部分情况下生产者和消费者指向并不是同一个位置,因此大部分情况下该环形队列可以让生产者和消费者并发的执行