c++生产者和消费者线程循环

简介: 线程安全-生产者消费者模型

注意

pthread_mutex_t互斥锁

  • 使用 PTHREAD_MUTEX_INITIALIZER 进行初始化时,默认状态是未锁定的(即解锁状态)。这种方式会创建一个静态初始化的互斥锁,适用于静态全局变量或静态局部变量的初始化。

    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;            //未锁定的(即解锁状态)
  • 使用 pthread_mutex_init 函数进行初始化时,可以通过指定属性参数来决定初始状态。如果属性参数为 NULL,则默认情况下互斥锁是未锁定的。如果需要创建时是锁定状态,可使用 pthread_mutexattr_settype 函数将属性设置为 PTHREAD_MUTEX_INITIALIZER

    pthread_mutex_t mutex;
    pthread_mutexattr_t attr;
    pthread_mutexattr_init(&attr);
    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_INITIALIZER); // 设置属性为初始化器;指定互斥锁的初始化状态为锁定状态
    
    pthread_mutex_init(&mutex, &attr); // 初始化互斥锁;锁定状态的互斥锁

pthread_cond_t条件变量

  • 使用 PTHREAD_COND_INITIALIZER 进行初始化时,默认状态是已经被唤醒的。这种方式会创建一个静态初始化的条件变量,适用于静态全局变量或静态局部变量的初始化。示例代码中的 cond 使用 PTHREAD_COND_INITIALIZER 进行了初始化,因此它在创建时就是已经被唤醒的状态。

    pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
  • 使用 pthread_cond_init 函数进行初始化时,默认情况下条件变量是未被唤醒的。如果需要创建时是已经被唤醒的状态,可以在初始化前手动调用 pthread_cond_signalpthread_cond_broadcast 函数。

    pthread_cond_t cond;
    pthread_cond_init(&cond, NULL);

在使用互斥锁和条件变量时,确保正确的初始化状态非常重要。如果在使用之前未进行初始化,可能会导致未定义行为或错误。因此,建议在使用之前始终明确地初始化互斥锁和条件变量。

线程安全-生产者消费者模型

使用条件变量+互斥锁实现生产者和消费者线程循环执行的效果。

#include<stdio.h>
#include<iostream>
#include<queue>
#include<unistd.h>
#include<pthread.h>

using namespace std;
int flags = 0;
pthread_cond_t cond;                  //条件变量
pthread_mutex_t mutex;              //互斥锁
//生产者
void *product_thread_start(void *arg)
{
    while (1)
    {
        /*flags:定义flags的目的就是为了确定生产者和消费者的执行的顺序
        原因是如果不定义这个flags,生产者和消费者都要抢占这个锁,不一定
        谁抢占成功,加上flags之后就能够确定生产者先执行

        为什么要 while (flags != 0)而不使用 if (flags != 0)
        如果生产者第一次抢占到锁,第二次还是生产者抢占到锁
        第二次执行pthread_cond_wait会立即退出,原因上一次它执行了pthread_cond_signal
        但是由于while是个循环flags的值没有改,锁它会第二次执行pthread_cond_wait休眠
        直到消费者线程执行一次*/
        pthread_mutex_lock(&mutex);
        while (flags != 0){
            cout<<"I am product: " << pthread_self()
                << "  flags == " << flags
                << "  LINE="<<__LINE__<<" [生产者等待]"<<endl;

            pthread_cond_wait(&cond, &mutex);           //flags == 1等待

            cout<<"I am product: " << pthread_self()
                << "  flags == " << flags
                << "  LINE="<<__LINE__<<" [开启生产]"<<endl;
        }

        flags = 1;
        sleep(1);
        printf("生产了一辆超级跑车\n");
        pthread_cond_signal(&cond);
        pthread_mutex_unlock(&mutex);
        cout<<" [生产完]"<<endl;
        cout<<endl;
    }
    pthread_exit(NULL);
}

//消费者
void *consume_thread_start(void *arg)
{
    while (1)
    {
        pthread_mutex_lock(&mutex);
        while (flags == 0){
            cout<<"I am consume: " << pthread_self()
                << "  flags == " << flags
                << "  LINE="<<__LINE__<<"[消费者等待]"<<endl;

            pthread_cond_wait(&cond, &mutex);

            cout<<"I am consume: " << pthread_self()
                << "  flags == " << flags
                << "  LINE="<<__LINE__<<" [开启消费]"<<endl;
        }
        flags = 0;
        sleep(1);
        printf("我购买了一辆超级跑车\n");
        pthread_cond_signal(&cond);
        pthread_mutex_unlock(&mutex);
        cout<<" [消费完]"<<endl;
        cout<<endl;
    }
    pthread_exit(NULL);
}
int main(int argc, const char *argv[])
{
    pthread_t product_tid, consume_tid;

    pthread_mutex_init(&mutex, NULL);       //默认状态是未锁定的(即解锁状态)
    pthread_cond_init(&cond, NULL);           //默认情况下条件变量是未被唤醒的

    if (pthread_create(&product_tid, NULL, product_thread_start, NULL))
        perror("create tid1 error");
    if (pthread_create(&consume_tid, NULL, consume_thread_start, NULL))
        perror("create tid1 error");

    pthread_join(product_tid, NULL);
    pthread_join(consume_tid, NULL);
    printf("product_tid=%#lx,consume_tid=%#lx\n", product_tid, consume_tid);
    pthread_cond_destroy(&cond);
    pthread_mutex_destroy(&mutex);

    return 0;
}

可能一:

  • 消耗者第一次抢占到锁,进入等待;
  • 生产者"生产完" pthread_cond_signal(&cond);唤醒后,生产者又抢占到锁,进入等待;
  • 消费者"消费完" pthread_cond_signal(&cond);唤醒后,消费者又抢占到锁,进入等待;

1693753040096.png

可能二:

  • 生产者第一次抢占到锁,进入生产;
  • 生产者"生产完" pthread_cond_signal(&cond);唤醒后,生产者又抢占到锁,进入等待;
  • 消费者"消费完" pthread_cond_signal(&cond);唤醒后,消费者又抢占到锁,进入等待;

1693753109444.png

线程安全队列

#include<stdio.h>
#include<iostream>
#include<queue>
#include<unistd.h>
#include<pthread.h>
using namespace std;
#define THREAD_COUNT 1              //生产者和消费者数量

//创建线程安全队列
class RingQueue{
public:
    RingQueue(){
        capacity = 1;
        pthread_mutex_init(&que_lock, NULL);
        pthread_cond_init(&consum_cond, NULL);
        pthread_cond_init(&product_cond, NULL);
    }
    ~RingQueue(){
        pthread_mutex_destroy(&que_lock);
        pthread_cond_destroy(&consum_cond);
        pthread_cond_destroy(&product_cond);
    }

    //往队列中放数据,生产
    void Push(int data){
        pthread_mutex_lock(&que_lock);
        while(que.size()>=capacity){
            cout<<"  que.size() == " << que.size()
               << " LINE="<<__LINE__<<" [生产者等待]"<<endl;

            pthread_cond_wait(&product_cond, &que_lock);

            /*为什么要用while循环呢?
        因为当生产者被唤醒后,需要再次判断队列是否可以满足生产的条件
        生产者或者消费者都是需要在等待结束后再次判断的*/
            cout<< "  que.size() == " << que.size()
                << "  LINE="<<__LINE__<<" [开启生产]"<<endl;
        }
        que.push(data);                           //生产,往队列中放入数据

        cout<<"I am product: " << pthread_self()
           << "I product number is " << data << endl;

        pthread_mutex_unlock(&que_lock);
        //生产者完成生产后唤醒消费者线程让消费者进行消费
        pthread_cond_signal(&consum_cond);
        cout<<endl;
    }

    //从队列中取数据,消费
    int Pop(){
        pthread_mutex_lock(&que_lock);
        while(que.size() <= 0){
            cout<< "  que.size() == " << que.size()
                << "  LINE="<<__LINE__<<" [消费者等待]"<<endl;

            pthread_cond_wait(&consum_cond, &que_lock);

            cout<< "  que.size() == " << que.size()
                << "  LINE="<<__LINE__<<" [开启消费]"<<endl;
        }
        int data = que.front();
        que.pop();

        cout<<"I am consume: " << pthread_self()
           << "I consume number is " << data << endl;

        pthread_mutex_unlock(&que_lock);
        //消费者线程消费之后通知生产者来生产
        pthread_cond_signal(&product_cond);

        cout<<endl;
        return data;
    }

private:
    queue<int> que;            //线程安全的队列
    //给队列一把锁,保证互斥,保证同一时刻只有一个线程对队列进行操作
    pthread_mutex_t que_lock;

    /*同步的条件变量,队列有元素,消息,没有元素等待,唤醒生产者
  保证生产者在队列中没有元素的时候进行生产(插入元素)*/
    pthread_cond_t consum_cond;
    pthread_cond_t product_cond;
    int capacity;               //队列容量,队列元素大于容量表示队满,不再往里插入元素
};

int g_val = 0;
//静态初始化保护g_val的互斥锁;默认状态是未锁定的(即解锁状态)
pthread_mutex_t g_val_lock = PTHREAD_MUTEX_INITIALIZER; //互斥锁

//生产者
void* product_thread_start(void* arg){
    RingQueue *q = (RingQueue*)arg;
    while(1){
        pthread_mutex_lock(&g_val_lock);//获取g_val的互斥锁
        q->Push(g_val);
        g_val++;
        sleep(1);
        pthread_mutex_unlock(&g_val_lock);
    }
}

//消费者
void* consum_thread_start(void* arg){
    RingQueue *q = (RingQueue*)arg;
    while(1){
        q->Pop();
    }
}

int main(){
    pthread_t consum_tid[THREAD_COUNT];
    pthread_t product_tid[THREAD_COUNT];
    RingQueue* q = new RingQueue();
    for(int i=0; i<THREAD_COUNT; ++i){
        int ret = pthread_create(&consum_tid[i], NULL, consum_thread_start, (void*)q);
        if(ret < 0){
            perror("pthread_create");
            return 0;
        }
        ret = pthread_create(&product_tid[i], NULL, product_thread_start, (void*)q);
        if(ret < 0){
            perror("pthread_create");
            return 0;
        }
    }
    for(int i=0; i<THREAD_COUNT; ++i){
        pthread_join(consum_tid[i], NULL);
        pthread_join(product_tid[i], NULL);
    }
    delete q;
    return 0;
}

可能一:

  • 生产者第一次抢占到锁,进入生产;
  • 消费者"消费完" pthread_cond_signal(&product_cond);唤醒后,消费者又抢占到锁,进入等待;

1693754737092.png

可能二:

  • 消耗者第一次抢占到锁,进入等待;
  • 消费者"消费完" pthread_cond_signal(&product_cond);唤醒后,消费者又抢占到锁,进入等待;

1693754783995.png

线程互斥锁API

  1. 定义互斥锁的变量

    pthread_mutex_t mutex;
  2. 初始化线程互斥锁

    int pthread_mutex_init(pthread_mutex_t * mutex, const pthread_mutexattr_t * attr); //动态初始化
    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; //静态初始化;未锁定的(即解锁状态)
    功能:初始化线程的互斥锁
    参数:
        @mutex:互斥锁变量的地址
        @attr:缺省属性,默认填写为NULL; 未锁定的(即解锁状态)
    返回值:成功返回0,失败返回错误码
  3. 上锁

    int pthread_mutex_lock(pthread_mutex_t *mutex);
    int pthread_mutex_trylock(pthread_mutex_t *mutex);     //如果获取不到资源,不会阻塞,立即返回
    功能:上锁(如果互斥锁可用直接能占用锁,如果互斥锁不可用将会阻塞等待)
    参数:
        @mutex:互斥锁变量的地址
    返回值:成功返回0,失败返回错误码
  4. 解锁

    int pthread_mutex_unlock(pthread_mutex_t *mutex);
    功能:解锁
    参数:
        @mutex:互斥锁变量的地址
    返回值:成功返回0,失败返回错误码 
  5. 销毁互斥锁

    int pthread_mutex_destroy(pthread_mutex_t *mutex);
    功能:销毁互斥锁
    参数:
        @mutex:互斥锁变量的地址
    返回值:成功返回0,失败返回错误码

线程同步:条件变量的API

  1. 定义条件变量

    pthread_cond_t cond;
  1. 初始化条件变量

    pthread_cond_t cond = PTHREAD_COND_INITIALIZER;            //静态初始化 //已经被唤醒的
    int pthread_cond_init(pthread_cond_t *restrict cond,  const pthread_condattr_t *restrict attr);
    功能:动态初始化一个条件变量
    参数:
       @cond    :    条件变量的指针
       @attr    :        NULL使用默认属性  ;未被唤醒的
    返回值:成功返回0,失败返回非0
  2. 阻塞等待条件变量

    int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
    功能:阻塞等待条件变量,在条件变量中维护了一个队列,
          这里的互斥锁就是为了解决在往队列中放线程的时候出现竞态问题的。
    参数:
        @cond    :条件变量的地址
        @mutex    :互斥锁
    返回值:成功返回0,失败返回非零

    其实现的步骤:
    1.使用pthread_mutex_lock上锁
    2.调用pthread_cond_wait

      2.1将当前线程放入队列
      2.2解锁----------------------------------使用同一个锁的线程可获得锁
      2.3休眠----------------------------------等待
      2.4获取锁
      2.5休眠状态退出

    3.线程执行我的程序
    4.使用pthread_mutex_unlock解锁

  3. 给休眠的线程发信号或者广播

    int pthread_cond_signal(pthread_cond_t *cond);
    功能:唤醒一个休眠的线程
    参数:
         @cond:条件变量的地址
    返回值:成功返回0,失败返回非零
    
    int pthread_cond_broadcast(pthread_cond_t *cond);
    功能:唤醒所有休眠的线程
    参数:
         @cond:条件变量的地址
    返回值:成功返回0,失败返回非零  
  4. 销毁条件变量

    int pthread_cond_destroy(pthread_cond_t *cond);
    功能:销毁条件变量
    参数:
         @cond:条件变量的地址
    返回值:成功返回0,失败返回非零     
相关文章
|
2月前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
20 1
|
2月前
|
消息中间件 NoSQL 关系型数据库
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
25 0
|
4月前
|
算法 Java
JUC(1)线程和进程、并发和并行、线程的状态、lock锁、生产者和消费者问题
该博客文章综合介绍了Java并发编程的基础知识,包括线程与进程的区别、并发与并行的概念、线程的生命周期状态、`sleep`与`wait`方法的差异、`Lock`接口及其实现类与`synchronized`关键字的对比,以及生产者和消费者问题的解决方案和使用`Condition`对象替代`synchronized`关键字的方法。
JUC(1)线程和进程、并发和并行、线程的状态、lock锁、生产者和消费者问题
|
4月前
|
消息中间件 设计模式 安全
多线程魔法:揭秘一个JVM中如何同时运行多个消费者
【8月更文挑战第22天】在Java虚拟机(JVM)中探索多消费者模式,此模式解耦生产与消费过程,提升系统性能。通过`ExecutorService`和`BlockingQueue`构建含2个生产者及4个消费者的系统,实现实时消息处理。多消费者模式虽增强处理能力,但也引入线程安全与资源竞争等挑战,需谨慎设计以确保高效稳定运行。
97 2
|
4月前
|
安全 Java
Java模拟生产者-消费者问题。生产者不断的往仓库中存放产品,消费者从仓库中消费产品。其中生产者和消费者都可以有若干个。在这里,生产者是一个线程,消费者是一个线程。仓库容量有限,只有库满时生产者不能存
该博客文章通过Java代码示例演示了生产者-消费者问题,其中生产者在仓库未满时生产产品,消费者在仓库有产品时消费产品,通过同步机制确保多线程环境下的线程安全和有效通信。
|
3月前
|
JavaScript 安全 前端开发
ArkTS线程中通过napi创建的C++线程
需要注意的是,N-API和ArkTS的具体使用会随Node.js的版本不断更新和变化,所以在实际编写代码前,查看最新的官方文档是很重要的,以了解最新的最佳实践和使用模式。此外,C++线程的使用在Node.js插件中应当慎重,过多地使用它们可能会造成资源争用,并可能降低应用程序的性能。
83 0
|
2月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
51 1
C++ 多线程之初识多线程
|
2月前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
21 3
|
2月前
|
Java 开发者
在Java多线程编程中,选择合适的线程创建方法至关重要
【10月更文挑战第20天】在Java多线程编程中,选择合适的线程创建方法至关重要。本文通过案例分析,探讨了继承Thread类和实现Runnable接口两种方法的优缺点及适用场景,帮助开发者做出明智的选择。
19 2
|
2月前
|
Java
Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口
【10月更文挑战第20天】《JAVA多线程深度解析:线程的创建之路》介绍了Java中多线程编程的基本概念和创建线程的两种主要方式:继承Thread类和实现Runnable接口。文章详细讲解了每种方式的实现方法、优缺点及适用场景,帮助读者更好地理解和掌握多线程编程技术,为复杂任务的高效处理奠定基础。
32 2