【转】生产者与消费者

简介:

下面是一个生产者消费者问题,来介绍condition_variable的用法。当线程间的共享数据发生变化的时候,可以通过condition_variable来通知其他的线程。消费者wait 直到生产者通知其状态发生改变,Condition_variable是使用方法如下:

·当持有锁之后,线程调用wait

·wait解开持有的互斥锁(mutex),阻塞本线程,并将自己加入到唤醒队列中

·当收到通知(notification),该线程从阻塞中恢复,并加入互斥锁队列(mutex queue)

 线程被唤醒之后继续持有锁运行。

 

Condition variable有两种类型:condition_variable 和 condition_variable_any,前一种效率更高,但是使用不够灵活,只支持std::unique_lock<std::mutex>类型的互斥锁;后一种比较灵活,支持所有类型的锁,但是效率稍微低一些。

有一点需要注意的是使用condition variable进行通信的线程,condition variable 需要使用相同的互斥信号量(mutex)。

下面来看例子:(当按下回车键之后停止)

复制代码
#include <thread>

#include <iostream>

#include <mutex>

#include <queue>

#include <condition_variable>

#include <atomic>

using namespace std;

int main()
{

    mutex lockBuffer; //申明互斥信号量

    volatile bool ArretDemande = false; //使生产、消费过程的结束

    queue<long> buffer;       

    condition_variable_any cndNotifierConsommateurs;//condition variable

    condition_variable_any cndNotifierProducteur;   
 
    thread ThreadProducteur([&]()//生产者线程
    {
       
        std::atomic<long> interlock;//对interlock的操作将是原子的

        interlock=1;   

        while(true)
        {               

                std::this_thread::sleep_for (chrono::milliseconds (15));               

                long element=interlock.fetch_add (1);//【1】

                lockBuffer.lock ();

                while(buffer.size()==10 && ArretDemande ==false)
                {
                   
                    cndNotifierProducteur.wait (lockBuffer);//【2】

                }

                if (ArretDemande==true)

                {                   

                    lockBuffer.unlock ();

                    cndNotifierConsommateurs.notify_one ();//【3】

                    break;

                }

                buffer.push(element);

                cout << "Production unlement :" << element << " size :" << buffer.size() << endl;

                lockBuffer.unlock ();

                cndNotifierConsommateurs.notify_one ();

        }

    } );

    thread ThreadConsommateur([&]()
    {
      
        while(true)
            {
               
                lockBuffer.lock ();

                while(buffer.empty () && ArretDemande==false)

                {                   

                    cndNotifierConsommateurs.wait(lockBuffer);

                }

                if (ArretDemande==true && buffer.empty ())

                {

                    lockBuffer.unlock();

                    cndNotifierProducteur.notify_one ();

                    break;

                }

                long element=buffer.front();

                buffer.pop ();

                cout << "Consommation element :" << element << " size :" << buffer.size() << endl;

                lockBuffer.unlock ();

                cndNotifierProducteur.notify_one ();

            }           

    } );

    std::cout << "Pour arreter pressez [ENTREZ]" << std::endl;

    getchar();

    std::cout << "Arret demande" << endl
    ArretDemande=true;

    ThreadProducteur.join();
    ThreadConsommateur.join();

    cout<<"Main Thread"<<endl;

    return 0;

}
复制代码

运行结果:

对程序进行一下说明,程序中有三个线程,主线程、生产者线程、消费者线程,三个线程之间乱序执行,通过一些全局变量来控制他们的执行顺序。主线程的作用是控制生产消费过程是否结束,当程序运行之后,主线程通过getchar()接收一个输入,接收到输入后会将ArretDemande设置为true,另外两个线程会终止。生产者线程将生产出来的数据放在一个queue类型的buffer中,并解锁,通知消费之线程,buffer中最多“能”存10个数据,如果buffer中已经有10个数据还没有被取走,则会通知消费者线程“消费”,如果ArretDmande被置位,则打开锁,并通知消费之线程。消费者线程主要是将buffer中的数据取出来,当buffer为空的时候阻塞自己,并通知生产者线程,当ArretDemande被置位,且已经消费完产品则解锁,并通知生产者线程。需要注意的是需要通信的生产者和消费者这两个线程通过condition variable来实现通信,必须操作同一个mutex,这里是lockbuffer,并且每次Notify都会打开当前锁。

程序中对interlock进行的操作是原子的,interlock.fet_add(N),效果是将interlock加N,然后返回interlock在加N之前的值,atomic类型是通过一定的内存顺序规则来实现这个过程的。

虽然conditon_variable 只能支持std::unique_lock<std::mutex>类型的互斥锁,但是在大部分情况下已经够用,而且使用std::unique_lock<std::mutex>会比较简单,因为std::unique_lock<std::mutex>在声明的时候就会初始化,在生命周期结束之后就会自动解锁,因此我们不用太花精力来考虑什么时候解锁。我们来看看下面这段程序:

复制代码
#include <condition_variable>
#include <mutex>
#include <thread>
#include <iostream>
#include <queue>
#include <chrono>
 
int main()
{
    std::queue<int> produced_nums;
    std::mutex m;;
    std::condition_variable cond_var;
    bool done = false;
    bool notified = false;
 
    std::thread producer([&]() {
        for ( int i = 0; i < 5; ++i) {
            std::this_thread::sleep_for(std::chrono:: seconds(1));
            std:: unique_lock<std::mutex > lock(m);  //May lock mutex after construction, unlock before destruction.
            std::cout << "producing " << i << '\n' ;
            produced_nums.push(i);
            notified = true;
        cond_var.notify_one();
        }  
 
        done = true;
        cond_var.notify_one();
    });
    //cond_var.notify_one();
    std::thread consumer([&]() {
        while (!done) {
            std:: unique_lock<std::mutex > lock(m);
            while (!notified) {  // loop to avoid spurious wakeups
                cond_var.wait(lock);
            }  
            while (!produced_nums.empty()) {
                std::cout << "consuming " << produced_nums.front() << '\n';
                produced_nums.pop();
            }  
            notified = false;
        }  
    });
 
    producer.join();
    consumer.join();

        return 0;
}
复制代码

运行结果:

C:\Windows\system32\cmd.exe /c producer_consumer.exe
producing 0
consuming 0
producing 1
consuming 1
producing 2
consuming 2
producing 3
consuming 3
producing 4
consuming 4
Hit any key to close this window...

本文转自编程小翁博客园博客,原文链接:http://www.cnblogs.com/wengzilin/p/3680020.html,如需转载请自行联系原作者

相关文章
|
5天前
|
弹性计算 人工智能 安全
云上十五年——「弹性计算十五周年」系列客户故事(第二期)
阿里云弹性计算十五年深耕,以第九代ECS g9i实例引领算力革新。携手海尔三翼鸟、小鹏汽车、微帧科技等企业,实现性能跃升与成本优化,赋能AI、物联网、智能驾驶等前沿场景,共绘云端增长新图景。
|
11天前
|
存储 弹性计算 人工智能
【2025云栖精华内容】 打造持续领先,全球覆盖的澎湃算力底座——通用计算产品发布与行业实践专场回顾
2025年9月24日,阿里云弹性计算团队多位产品、技术专家及服务器团队技术专家共同在【2025云栖大会】现场带来了《通用计算产品发布与行业实践》的专场论坛,本论坛聚焦弹性计算多款通用算力产品发布。同时,ECS云服务器安全能力、资源售卖模式、计算AI助手等用户体验关键环节也宣布升级,让用云更简单、更智能。海尔三翼鸟云服务负责人刘建锋先生作为特邀嘉宾,莅临现场分享了关于阿里云ECS g9i推动AIoT平台的场景落地实践。
【2025云栖精华内容】 打造持续领先,全球覆盖的澎湃算力底座——通用计算产品发布与行业实践专场回顾
|
3天前
|
云安全 人工智能 安全
Dify平台集成阿里云AI安全护栏,构建AI Runtime安全防线
阿里云 AI 安全护栏加入Dify平台,打造可信赖的 AI
|
10天前
|
人工智能 自然语言处理 自动驾驶
关于举办首届全国大学生“启真问智”人工智能模型&智能体大赛决赛的通知
关于举办首届全国大学生“启真问智”人工智能模型&智能体大赛决赛的通知
|
6天前
|
人工智能 运维 Java
Spring AI Alibaba Admin 开源!以数据为中心的 Agent 开发平台
Spring AI Alibaba Admin 正式发布!一站式实现 Prompt 管理、动态热更新、评测集构建、自动化评估与全链路可观测,助力企业高效构建可信赖的 AI Agent 应用。开源共建,现已上线!
500 13
|
4天前
|
编解码 文字识别 算法
一张图能装下“千言万语”?DeepSeek-OCR 用视觉压缩长文本,效率提升10倍!
一张图能装下“千言万语”?DeepSeek-OCR 用视觉压缩长文本,效率提升10倍!
389 10
|
11天前
|
编解码 自然语言处理 文字识别
Qwen3-VL再添丁!4B/8B Dense模型开源,更轻量,仍强大
凌晨,Qwen3-VL系列再添新成员——Dense架构的Qwen3-VL-8B、Qwen3-VL-4B 模型,本地部署友好,并完整保留了Qwen3-VL的全部表现,评测指标表现优秀。
688 7
Qwen3-VL再添丁!4B/8B Dense模型开源,更轻量,仍强大
|
13天前
|
机器学习/深度学习 缓存 自然语言处理
【万字长文】大模型训练推理和性能优化算法总结和实践
我们是阿里云公共云 AI 汽车行业大模型技术团队,致力于通过专业的全栈 AI 技术推动 AI 的落地应用。
614 37
【万字长文】大模型训练推理和性能优化算法总结和实践