c++生产者和消费者线程循环

简介: 线程安全-生产者消费者模型

注意

pthread_mutex_t互斥锁

  • 使用 PTHREAD_MUTEX_INITIALIZER 进行初始化时,默认状态是未锁定的(即解锁状态)。这种方式会创建一个静态初始化的互斥锁,适用于静态全局变量或静态局部变量的初始化。

    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;            //未锁定的(即解锁状态)
    
  • 使用 pthread_mutex_init 函数进行初始化时,可以通过指定属性参数来决定初始状态。如果属性参数为 NULL,则默认情况下互斥锁是未锁定的。如果需要创建时是锁定状态,可使用 pthread_mutexattr_settype 函数将属性设置为 PTHREAD_MUTEX_INITIALIZER

    pthread_mutex_t mutex;
    pthread_mutexattr_t attr;
    pthread_mutexattr_init(&attr);
    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_INITIALIZER); // 设置属性为初始化器;指定互斥锁的初始化状态为锁定状态
    
    pthread_mutex_init(&mutex, &attr); // 初始化互斥锁;锁定状态的互斥锁
    

pthread_cond_t条件变量

  • 使用 PTHREAD_COND_INITIALIZER 进行初始化时,默认状态是已经被唤醒的。这种方式会创建一个静态初始化的条件变量,适用于静态全局变量或静态局部变量的初始化。示例代码中的 cond 使用 PTHREAD_COND_INITIALIZER 进行了初始化,因此它在创建时就是已经被唤醒的状态。

    pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
    
  • 使用 pthread_cond_init 函数进行初始化时,默认情况下条件变量是未被唤醒的。如果需要创建时是已经被唤醒的状态,可以在初始化前手动调用 pthread_cond_signalpthread_cond_broadcast 函数。

    pthread_cond_t cond;
    pthread_cond_init(&cond, NULL);
    

在使用互斥锁和条件变量时,确保正确的初始化状态非常重要。如果在使用之前未进行初始化,可能会导致未定义行为或错误。因此,建议在使用之前始终明确地初始化互斥锁和条件变量。

线程安全-生产者消费者模型

使用条件变量+互斥锁实现生产者和消费者线程循环执行的效果。

#include<stdio.h>
#include<iostream>
#include<queue>
#include<unistd.h>
#include<pthread.h>

using namespace std;
int flags = 0;
pthread_cond_t cond;                  //条件变量
pthread_mutex_t mutex;              //互斥锁
//生产者
void *product_thread_start(void *arg)
{
   
   
    while (1)
    {
   
   
        /*flags:定义flags的目的就是为了确定生产者和消费者的执行的顺序
        原因是如果不定义这个flags,生产者和消费者都要抢占这个锁,不一定
        谁抢占成功,加上flags之后就能够确定生产者先执行

        为什么要 while (flags != 0)而不使用 if (flags != 0)
        如果生产者第一次抢占到锁,第二次还是生产者抢占到锁
        第二次执行pthread_cond_wait会立即退出,原因上一次它执行了pthread_cond_signal
        但是由于while是个循环flags的值没有改,锁它会第二次执行pthread_cond_wait休眠
        直到消费者线程执行一次*/
        pthread_mutex_lock(&mutex);
        while (flags != 0){
   
   
            cout<<"I am product: " << pthread_self()
                << "  flags == " << flags
                << "  LINE="<<__LINE__<<" [生产者等待]"<<endl;

            pthread_cond_wait(&cond, &mutex);           //flags == 1等待

            cout<<"I am product: " << pthread_self()
                << "  flags == " << flags
                << "  LINE="<<__LINE__<<" [开启生产]"<<endl;
        }

        flags = 1;
        sleep(1);
        printf("生产了一辆超级跑车\n");
        pthread_cond_signal(&cond);
        pthread_mutex_unlock(&mutex);
        cout<<" [生产完]"<<endl;
        cout<<endl;
    }
    pthread_exit(NULL);
}

//消费者
void *consume_thread_start(void *arg)
{
   
   
    while (1)
    {
   
   
        pthread_mutex_lock(&mutex);
        while (flags == 0){
   
   
            cout<<"I am consume: " << pthread_self()
                << "  flags == " << flags
                << "  LINE="<<__LINE__<<"[消费者等待]"<<endl;

            pthread_cond_wait(&cond, &mutex);

            cout<<"I am consume: " << pthread_self()
                << "  flags == " << flags
                << "  LINE="<<__LINE__<<" [开启消费]"<<endl;
        }
        flags = 0;
        sleep(1);
        printf("我购买了一辆超级跑车\n");
        pthread_cond_signal(&cond);
        pthread_mutex_unlock(&mutex);
        cout<<" [消费完]"<<endl;
        cout<<endl;
    }
    pthread_exit(NULL);
}
int main(int argc, const char *argv[])
{
   
   
    pthread_t product_tid, consume_tid;

    pthread_mutex_init(&mutex, NULL);       //默认状态是未锁定的(即解锁状态)
    pthread_cond_init(&cond, NULL);           //默认情况下条件变量是未被唤醒的

    if (pthread_create(&product_tid, NULL, product_thread_start, NULL))
        perror("create tid1 error");
    if (pthread_create(&consume_tid, NULL, consume_thread_start, NULL))
        perror("create tid1 error");

    pthread_join(product_tid, NULL);
    pthread_join(consume_tid, NULL);
    printf("product_tid=%#lx,consume_tid=%#lx\n", product_tid, consume_tid);
    pthread_cond_destroy(&cond);
    pthread_mutex_destroy(&mutex);

    return 0;
}

可能一:

  • 消耗者第一次抢占到锁,进入等待;
  • 生产者"生产完" pthread_cond_signal(&cond);唤醒后,生产者又抢占到锁,进入等待;
  • 消费者"消费完" pthread_cond_signal(&cond);唤醒后,消费者又抢占到锁,进入等待;
    1693753040096.png

可能二:

  • 生产者第一次抢占到锁,进入生产;
  • 生产者"生产完" pthread_cond_signal(&cond);唤醒后,生产者又抢占到锁,进入等待;
  • 消费者"消费完" pthread_cond_signal(&cond);唤醒后,消费者又抢占到锁,进入等待;

1693753109444.png

线程安全队列

参考: http://t.csdn.cn/8jJYf

#include<stdio.h>
#include<iostream>
#include<queue>
#include<unistd.h>
#include<pthread.h>
using namespace std;
#define THREAD_COUNT 1              //生产者和消费者数量

//创建线程安全队列
class RingQueue{
   
   
public:
    RingQueue(){
   
   
        capacity = 1;
        pthread_mutex_init(&que_lock, NULL);
        pthread_cond_init(&consum_cond, NULL);
        pthread_cond_init(&product_cond, NULL);
    }
    ~RingQueue(){
   
   
        pthread_mutex_destroy(&que_lock);
        pthread_cond_destroy(&consum_cond);
        pthread_cond_destroy(&product_cond);
    }

    //往队列中放数据,生产
    void Push(int data){
   
   
        pthread_mutex_lock(&que_lock);
        while(que.size()>=capacity){
   
   
            cout<<"  que.size() == " << que.size()
               << " LINE="<<__LINE__<<" [生产者等待]"<<endl;

            pthread_cond_wait(&product_cond, &que_lock);

            /*为什么要用while循环呢?
        因为当生产者被唤醒后,需要再次判断队列是否可以满足生产的条件
        生产者或者消费者都是需要在等待结束后再次判断的*/
            cout<< "  que.size() == " << que.size()
                << "  LINE="<<__LINE__<<" [开启生产]"<<endl;
        }
        que.push(data);                           //生产,往队列中放入数据

        cout<<"I am product: " << pthread_self()
           << "I product number is " << data << endl;

        pthread_mutex_unlock(&que_lock);
        //生产者完成生产后唤醒消费者线程让消费者进行消费
        pthread_cond_signal(&consum_cond);
        cout<<endl;
    }

    //从队列中取数据,消费
    int Pop(){
   
   
        pthread_mutex_lock(&que_lock);
        while(que.size() <= 0){
   
   
            cout<< "  que.size() == " << que.size()
                << "  LINE="<<__LINE__<<" [消费者等待]"<<endl;

            pthread_cond_wait(&consum_cond, &que_lock);

            cout<< "  que.size() == " << que.size()
                << "  LINE="<<__LINE__<<" [开启消费]"<<endl;
        }
        int data = que.front();
        que.pop();

        cout<<"I am consume: " << pthread_self()
           << "I consume number is " << data << endl;

        pthread_mutex_unlock(&que_lock);
        //消费者线程消费之后通知生产者来生产
        pthread_cond_signal(&product_cond);

        cout<<endl;
        return data;
    }

private:
    queue<int> que;            //线程安全的队列
    //给队列一把锁,保证互斥,保证同一时刻只有一个线程对队列进行操作
    pthread_mutex_t que_lock;

    /*同步的条件变量,队列有元素,消息,没有元素等待,唤醒生产者
  保证生产者在队列中没有元素的时候进行生产(插入元素)*/
    pthread_cond_t consum_cond;
    pthread_cond_t product_cond;
    int capacity;               //队列容量,队列元素大于容量表示队满,不再往里插入元素
};

int g_val = 0;
//静态初始化保护g_val的互斥锁;默认状态是未锁定的(即解锁状态)
pthread_mutex_t g_val_lock = PTHREAD_MUTEX_INITIALIZER; //互斥锁

//生产者
void* product_thread_start(void* arg){
   
   
    RingQueue *q = (RingQueue*)arg;
    while(1){
   
   
        pthread_mutex_lock(&g_val_lock);//获取g_val的互斥锁
        q->Push(g_val);
        g_val++;
        sleep(1);
        pthread_mutex_unlock(&g_val_lock);
    }
}

//消费者
void* consum_thread_start(void* arg){
   
   
    RingQueue *q = (RingQueue*)arg;
    while(1){
   
   
        q->Pop();
    }
}

int main(){
   
   
    pthread_t consum_tid[THREAD_COUNT];
    pthread_t product_tid[THREAD_COUNT];
    RingQueue* q = new RingQueue();
    for(int i=0; i<THREAD_COUNT; ++i){
   
   
        int ret = pthread_create(&consum_tid[i], NULL, consum_thread_start, (void*)q);
        if(ret < 0){
   
   
            perror("pthread_create");
            return 0;
        }
        ret = pthread_create(&product_tid[i], NULL, product_thread_start, (void*)q);
        if(ret < 0){
   
   
            perror("pthread_create");
            return 0;
        }
    }
    for(int i=0; i<THREAD_COUNT; ++i){
   
   
        pthread_join(consum_tid[i], NULL);
        pthread_join(product_tid[i], NULL);
    }
    delete q;
    return 0;
}

可能一:

  • 生产者第一次抢占到锁,进入生产;
  • 消费者"消费完" pthread_cond_signal(&product_cond);唤醒后,消费者又抢占到锁,进入等待;
    1693754737092.png

可能二:

  • 消耗者第一次抢占到锁,进入等待;
  • 消费者"消费完" pthread_cond_signal(&product_cond);唤醒后,消费者又抢占到锁,进入等待;

1693754783995.png

线程互斥锁API

  1. 定义互斥锁的变量

    pthread_mutex_t mutex;
    
  1. 初始化线程互斥锁

    int pthread_mutex_init(pthread_mutex_t * mutex, const pthread_mutexattr_t * attr); //动态初始化
    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; //静态初始化;未锁定的(即解锁状态)
    功能:初始化线程的互斥锁
    参数:
        @mutex:互斥锁变量的地址
        @attr:缺省属性,默认填写为NULL; 未锁定的(即解锁状态)
    返回值:成功返回0,失败返回错误码
    
  1. 上锁

    int pthread_mutex_lock(pthread_mutex_t *mutex);
    int pthread_mutex_trylock(pthread_mutex_t *mutex);     //如果获取不到资源,不会阻塞,立即返回
    功能:上锁(如果互斥锁可用直接能占用锁,如果互斥锁不可用将会阻塞等待)
    参数:
        @mutex:互斥锁变量的地址
    返回值:成功返回0,失败返回错误码
    
  1. 解锁

    int pthread_mutex_unlock(pthread_mutex_t *mutex);
    功能:解锁
    参数:
        @mutex:互斥锁变量的地址
    返回值:成功返回0,失败返回错误码
    
  1. 销毁互斥锁

    int pthread_mutex_destroy(pthread_mutex_t *mutex);
    功能:销毁互斥锁
    参数:
        @mutex:互斥锁变量的地址
    返回值:成功返回0,失败返回错误码
    

线程同步:条件变量的API

  1. 定义条件变量

    pthread_cond_t cond;
    
  1. 初始化条件变量

    pthread_cond_t cond = PTHREAD_COND_INITIALIZER;            //静态初始化 //已经被唤醒的
    int pthread_cond_init(pthread_cond_t *restrict cond,  const pthread_condattr_t *restrict attr);
    功能:动态初始化一个条件变量
    参数:
       @cond    :    条件变量的指针
       @attr    :        NULL使用默认属性  ;未被唤醒的
    返回值:成功返回0,失败返回非0
    
  1. 阻塞等待条件变量

    int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
    功能:阻塞等待条件变量,在条件变量中维护了一个队列,
          这里的互斥锁就是为了解决在往队列中放线程的时候出现竞态问题的。
    参数:
        @cond    :条件变量的地址
        @mutex    :互斥锁
    返回值:成功返回0,失败返回非零
    

    其实现的步骤:
    1.使用pthread_mutex_lock上锁
    2.调用pthread_cond_wait

      2.1将当前线程放入队列
      2.2解锁----------------------------------使用同一个锁的线程可获得锁
      2.3休眠----------------------------------等待
      2.4获取锁
      2.5休眠状态退出
    

    3.线程执行我的程序
    4.使用pthread_mutex_unlock解锁

  1. 给休眠的线程发信号或者广播

    int pthread_cond_signal(pthread_cond_t *cond);
    功能:唤醒一个休眠的线程
    参数:
         @cond:条件变量的地址
    返回值:成功返回0,失败返回非零
    
    int pthread_cond_broadcast(pthread_cond_t *cond);
    功能:唤醒所有休眠的线程
    参数:
         @cond:条件变量的地址
    返回值:成功返回0,失败返回非零
    
  1. 销毁条件变量

    int pthread_cond_destroy(pthread_cond_t *cond);
    功能:销毁条件变量
    参数:
         @cond:条件变量的地址
    返回值:成功返回0,失败返回非零
    
相关文章
|
1月前
|
存储 前端开发 Java
【C++ 多线程 】C++并发编程:精细控制数据打印顺序的策略
【C++ 多线程 】C++并发编程:精细控制数据打印顺序的策略
45 1
|
1月前
|
数据可视化 关系型数据库 编译器
【C/C++ 单线程性能分析工具 Gprof】 GNU的C/C++ 性能分析工具 Gprof 使用全面指南
【C/C++ 单线程性能分析工具 Gprof】 GNU的C/C++ 性能分析工具 Gprof 使用全面指南
114 2
|
1月前
|
存储 算法 Java
【C/C++ 线程池设计思路】 深入探索线程池设计:任务历史记录的高效管理策略
【C/C++ 线程池设计思路】 深入探索线程池设计:任务历史记录的高效管理策略
74 0
|
1月前
|
消息中间件 Linux 调度
【Linux 进程/线程状态 】深入理解Linux C++中的进程/线程状态:阻塞,休眠,僵死
【Linux 进程/线程状态 】深入理解Linux C++中的进程/线程状态:阻塞,休眠,僵死
69 0
|
1月前
|
存储 并行计算 Java
C++线程 并发编程:std::thread、std::sync与std::packaged_task深度解析(二)
C++线程 并发编程:std::thread、std::sync与std::packaged_task深度解析
66 0
|
24天前
|
C++
C++ While 和 For 循环:流程控制全解析
本文介绍了C++中的`switch`语句和循环结构。`switch`语句根据表达式的值执行匹配的代码块,可以使用`break`终止执行并跳出`switch`。`default`关键字用于处理没有匹配`case`的情况。接着,文章讲述了三种类型的循环:`while`循环在条件满足时执行代码,`do/while`至少执行一次代码再检查条件,`for`循环适用于已知循环次数的情况。`for`循环包含初始化、条件和递增三个部分。此外,还提到了嵌套循环和C++11引入的`foreach`循环,用于遍历数组元素。最后,鼓励读者关注微信公众号`Let us Coding`获取更多内容。
21 0
|
1月前
|
安全 Java 调度
【C/C++ 线程池设计思路 】设计与实现支持优先级任务的C++线程池 简要介绍
【C/C++ 线程池设计思路 】设计与实现支持优先级任务的C++线程池 简要介绍
45 2
|
1月前
|
Linux API C++
【C++ 线程包裹类设计】跨平台C++线程包装类:属性设置与平台差异的全面探讨
【C++ 线程包裹类设计】跨平台C++线程包装类:属性设置与平台差异的全面探讨
51 2
|
1月前
|
设计模式 安全 C++
【C++ const 函数 的使用】C++ 中 const 成员函数与线程安全性:原理、案例与最佳实践
【C++ const 函数 的使用】C++ 中 const 成员函数与线程安全性:原理、案例与最佳实践
71 2
|
1月前
|
算法 安全 Unix
【C++ 20 信号量 】C++ 线程同步新特性 C++ 20 std::counting_semaphore 信号量的用法 控制对共享资源的并发访问
【C++ 20 信号量 】C++ 线程同步新特性 C++ 20 std::counting_semaphore 信号量的用法 控制对共享资源的并发访问
30 0