c++生产者和消费者线程循环

简介: 线程安全-生产者消费者模型

注意

pthread_mutex_t互斥锁

  • 使用 PTHREAD_MUTEX_INITIALIZER 进行初始化时,默认状态是未锁定的(即解锁状态)。这种方式会创建一个静态初始化的互斥锁,适用于静态全局变量或静态局部变量的初始化。

    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;            //未锁定的(即解锁状态)
  • 使用 pthread_mutex_init 函数进行初始化时,可以通过指定属性参数来决定初始状态。如果属性参数为 NULL,则默认情况下互斥锁是未锁定的。如果需要创建时是锁定状态,可使用 pthread_mutexattr_settype 函数将属性设置为 PTHREAD_MUTEX_INITIALIZER

    pthread_mutex_t mutex;
    pthread_mutexattr_t attr;
    pthread_mutexattr_init(&attr);
    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_INITIALIZER); // 设置属性为初始化器;指定互斥锁的初始化状态为锁定状态
    
    pthread_mutex_init(&mutex, &attr); // 初始化互斥锁;锁定状态的互斥锁

pthread_cond_t条件变量

  • 使用 PTHREAD_COND_INITIALIZER 进行初始化时,默认状态是已经被唤醒的。这种方式会创建一个静态初始化的条件变量,适用于静态全局变量或静态局部变量的初始化。示例代码中的 cond 使用 PTHREAD_COND_INITIALIZER 进行了初始化,因此它在创建时就是已经被唤醒的状态。

    pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
  • 使用 pthread_cond_init 函数进行初始化时,默认情况下条件变量是未被唤醒的。如果需要创建时是已经被唤醒的状态,可以在初始化前手动调用 pthread_cond_signalpthread_cond_broadcast 函数。

    pthread_cond_t cond;
    pthread_cond_init(&cond, NULL);

在使用互斥锁和条件变量时,确保正确的初始化状态非常重要。如果在使用之前未进行初始化,可能会导致未定义行为或错误。因此,建议在使用之前始终明确地初始化互斥锁和条件变量。

线程安全-生产者消费者模型

使用条件变量+互斥锁实现生产者和消费者线程循环执行的效果。

#include<stdio.h>
#include<iostream>
#include<queue>
#include<unistd.h>
#include<pthread.h>

using namespace std;
int flags = 0;
pthread_cond_t cond;                  //条件变量
pthread_mutex_t mutex;              //互斥锁
//生产者
void *product_thread_start(void *arg)
{
    while (1)
    {
        /*flags:定义flags的目的就是为了确定生产者和消费者的执行的顺序
        原因是如果不定义这个flags,生产者和消费者都要抢占这个锁,不一定
        谁抢占成功,加上flags之后就能够确定生产者先执行

        为什么要 while (flags != 0)而不使用 if (flags != 0)
        如果生产者第一次抢占到锁,第二次还是生产者抢占到锁
        第二次执行pthread_cond_wait会立即退出,原因上一次它执行了pthread_cond_signal
        但是由于while是个循环flags的值没有改,锁它会第二次执行pthread_cond_wait休眠
        直到消费者线程执行一次*/
        pthread_mutex_lock(&mutex);
        while (flags != 0){
            cout<<"I am product: " << pthread_self()
                << "  flags == " << flags
                << "  LINE="<<__LINE__<<" [生产者等待]"<<endl;

            pthread_cond_wait(&cond, &mutex);           //flags == 1等待

            cout<<"I am product: " << pthread_self()
                << "  flags == " << flags
                << "  LINE="<<__LINE__<<" [开启生产]"<<endl;
        }

        flags = 1;
        sleep(1);
        printf("生产了一辆超级跑车\n");
        pthread_cond_signal(&cond);
        pthread_mutex_unlock(&mutex);
        cout<<" [生产完]"<<endl;
        cout<<endl;
    }
    pthread_exit(NULL);
}

//消费者
void *consume_thread_start(void *arg)
{
    while (1)
    {
        pthread_mutex_lock(&mutex);
        while (flags == 0){
            cout<<"I am consume: " << pthread_self()
                << "  flags == " << flags
                << "  LINE="<<__LINE__<<"[消费者等待]"<<endl;

            pthread_cond_wait(&cond, &mutex);

            cout<<"I am consume: " << pthread_self()
                << "  flags == " << flags
                << "  LINE="<<__LINE__<<" [开启消费]"<<endl;
        }
        flags = 0;
        sleep(1);
        printf("我购买了一辆超级跑车\n");
        pthread_cond_signal(&cond);
        pthread_mutex_unlock(&mutex);
        cout<<" [消费完]"<<endl;
        cout<<endl;
    }
    pthread_exit(NULL);
}
int main(int argc, const char *argv[])
{
    pthread_t product_tid, consume_tid;

    pthread_mutex_init(&mutex, NULL);       //默认状态是未锁定的(即解锁状态)
    pthread_cond_init(&cond, NULL);           //默认情况下条件变量是未被唤醒的

    if (pthread_create(&product_tid, NULL, product_thread_start, NULL))
        perror("create tid1 error");
    if (pthread_create(&consume_tid, NULL, consume_thread_start, NULL))
        perror("create tid1 error");

    pthread_join(product_tid, NULL);
    pthread_join(consume_tid, NULL);
    printf("product_tid=%#lx,consume_tid=%#lx\n", product_tid, consume_tid);
    pthread_cond_destroy(&cond);
    pthread_mutex_destroy(&mutex);

    return 0;
}

可能一:

  • 消耗者第一次抢占到锁,进入等待;
  • 生产者"生产完" pthread_cond_signal(&cond);唤醒后,生产者又抢占到锁,进入等待;
  • 消费者"消费完" pthread_cond_signal(&cond);唤醒后,消费者又抢占到锁,进入等待;

1693753040096.png

可能二:

  • 生产者第一次抢占到锁,进入生产;
  • 生产者"生产完" pthread_cond_signal(&cond);唤醒后,生产者又抢占到锁,进入等待;
  • 消费者"消费完" pthread_cond_signal(&cond);唤醒后,消费者又抢占到锁,进入等待;

1693753109444.png

线程安全队列

#include<stdio.h>
#include<iostream>
#include<queue>
#include<unistd.h>
#include<pthread.h>
using namespace std;
#define THREAD_COUNT 1              //生产者和消费者数量

//创建线程安全队列
class RingQueue{
public:
    RingQueue(){
        capacity = 1;
        pthread_mutex_init(&que_lock, NULL);
        pthread_cond_init(&consum_cond, NULL);
        pthread_cond_init(&product_cond, NULL);
    }
    ~RingQueue(){
        pthread_mutex_destroy(&que_lock);
        pthread_cond_destroy(&consum_cond);
        pthread_cond_destroy(&product_cond);
    }

    //往队列中放数据,生产
    void Push(int data){
        pthread_mutex_lock(&que_lock);
        while(que.size()>=capacity){
            cout<<"  que.size() == " << que.size()
               << " LINE="<<__LINE__<<" [生产者等待]"<<endl;

            pthread_cond_wait(&product_cond, &que_lock);

            /*为什么要用while循环呢?
        因为当生产者被唤醒后,需要再次判断队列是否可以满足生产的条件
        生产者或者消费者都是需要在等待结束后再次判断的*/
            cout<< "  que.size() == " << que.size()
                << "  LINE="<<__LINE__<<" [开启生产]"<<endl;
        }
        que.push(data);                           //生产,往队列中放入数据

        cout<<"I am product: " << pthread_self()
           << "I product number is " << data << endl;

        pthread_mutex_unlock(&que_lock);
        //生产者完成生产后唤醒消费者线程让消费者进行消费
        pthread_cond_signal(&consum_cond);
        cout<<endl;
    }

    //从队列中取数据,消费
    int Pop(){
        pthread_mutex_lock(&que_lock);
        while(que.size() <= 0){
            cout<< "  que.size() == " << que.size()
                << "  LINE="<<__LINE__<<" [消费者等待]"<<endl;

            pthread_cond_wait(&consum_cond, &que_lock);

            cout<< "  que.size() == " << que.size()
                << "  LINE="<<__LINE__<<" [开启消费]"<<endl;
        }
        int data = que.front();
        que.pop();

        cout<<"I am consume: " << pthread_self()
           << "I consume number is " << data << endl;

        pthread_mutex_unlock(&que_lock);
        //消费者线程消费之后通知生产者来生产
        pthread_cond_signal(&product_cond);

        cout<<endl;
        return data;
    }

private:
    queue<int> que;            //线程安全的队列
    //给队列一把锁,保证互斥,保证同一时刻只有一个线程对队列进行操作
    pthread_mutex_t que_lock;

    /*同步的条件变量,队列有元素,消息,没有元素等待,唤醒生产者
  保证生产者在队列中没有元素的时候进行生产(插入元素)*/
    pthread_cond_t consum_cond;
    pthread_cond_t product_cond;
    int capacity;               //队列容量,队列元素大于容量表示队满,不再往里插入元素
};

int g_val = 0;
//静态初始化保护g_val的互斥锁;默认状态是未锁定的(即解锁状态)
pthread_mutex_t g_val_lock = PTHREAD_MUTEX_INITIALIZER; //互斥锁

//生产者
void* product_thread_start(void* arg){
    RingQueue *q = (RingQueue*)arg;
    while(1){
        pthread_mutex_lock(&g_val_lock);//获取g_val的互斥锁
        q->Push(g_val);
        g_val++;
        sleep(1);
        pthread_mutex_unlock(&g_val_lock);
    }
}

//消费者
void* consum_thread_start(void* arg){
    RingQueue *q = (RingQueue*)arg;
    while(1){
        q->Pop();
    }
}

int main(){
    pthread_t consum_tid[THREAD_COUNT];
    pthread_t product_tid[THREAD_COUNT];
    RingQueue* q = new RingQueue();
    for(int i=0; i<THREAD_COUNT; ++i){
        int ret = pthread_create(&consum_tid[i], NULL, consum_thread_start, (void*)q);
        if(ret < 0){
            perror("pthread_create");
            return 0;
        }
        ret = pthread_create(&product_tid[i], NULL, product_thread_start, (void*)q);
        if(ret < 0){
            perror("pthread_create");
            return 0;
        }
    }
    for(int i=0; i<THREAD_COUNT; ++i){
        pthread_join(consum_tid[i], NULL);
        pthread_join(product_tid[i], NULL);
    }
    delete q;
    return 0;
}

可能一:

  • 生产者第一次抢占到锁,进入生产;
  • 消费者"消费完" pthread_cond_signal(&product_cond);唤醒后,消费者又抢占到锁,进入等待;

1693754737092.png

可能二:

  • 消耗者第一次抢占到锁,进入等待;
  • 消费者"消费完" pthread_cond_signal(&product_cond);唤醒后,消费者又抢占到锁,进入等待;

1693754783995.png

线程互斥锁API

  1. 定义互斥锁的变量

    pthread_mutex_t mutex;
  2. 初始化线程互斥锁

    int pthread_mutex_init(pthread_mutex_t * mutex, const pthread_mutexattr_t * attr); //动态初始化
    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; //静态初始化;未锁定的(即解锁状态)
    功能:初始化线程的互斥锁
    参数:
        @mutex:互斥锁变量的地址
        @attr:缺省属性,默认填写为NULL; 未锁定的(即解锁状态)
    返回值:成功返回0,失败返回错误码
  3. 上锁

    int pthread_mutex_lock(pthread_mutex_t *mutex);
    int pthread_mutex_trylock(pthread_mutex_t *mutex);     //如果获取不到资源,不会阻塞,立即返回
    功能:上锁(如果互斥锁可用直接能占用锁,如果互斥锁不可用将会阻塞等待)
    参数:
        @mutex:互斥锁变量的地址
    返回值:成功返回0,失败返回错误码
  4. 解锁

    int pthread_mutex_unlock(pthread_mutex_t *mutex);
    功能:解锁
    参数:
        @mutex:互斥锁变量的地址
    返回值:成功返回0,失败返回错误码 
  5. 销毁互斥锁

    int pthread_mutex_destroy(pthread_mutex_t *mutex);
    功能:销毁互斥锁
    参数:
        @mutex:互斥锁变量的地址
    返回值:成功返回0,失败返回错误码

线程同步:条件变量的API

  1. 定义条件变量

    pthread_cond_t cond;
  1. 初始化条件变量

    pthread_cond_t cond = PTHREAD_COND_INITIALIZER;            //静态初始化 //已经被唤醒的
    int pthread_cond_init(pthread_cond_t *restrict cond,  const pthread_condattr_t *restrict attr);
    功能:动态初始化一个条件变量
    参数:
       @cond    :    条件变量的指针
       @attr    :        NULL使用默认属性  ;未被唤醒的
    返回值:成功返回0,失败返回非0
  2. 阻塞等待条件变量

    int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
    功能:阻塞等待条件变量,在条件变量中维护了一个队列,
          这里的互斥锁就是为了解决在往队列中放线程的时候出现竞态问题的。
    参数:
        @cond    :条件变量的地址
        @mutex    :互斥锁
    返回值:成功返回0,失败返回非零

    其实现的步骤:
    1.使用pthread_mutex_lock上锁
    2.调用pthread_cond_wait

      2.1将当前线程放入队列
      2.2解锁----------------------------------使用同一个锁的线程可获得锁
      2.3休眠----------------------------------等待
      2.4获取锁
      2.5休眠状态退出

    3.线程执行我的程序
    4.使用pthread_mutex_unlock解锁

  3. 给休眠的线程发信号或者广播

    int pthread_cond_signal(pthread_cond_t *cond);
    功能:唤醒一个休眠的线程
    参数:
         @cond:条件变量的地址
    返回值:成功返回0,失败返回非零
    
    int pthread_cond_broadcast(pthread_cond_t *cond);
    功能:唤醒所有休眠的线程
    参数:
         @cond:条件变量的地址
    返回值:成功返回0,失败返回非零  
  4. 销毁条件变量

    int pthread_cond_destroy(pthread_cond_t *cond);
    功能:销毁条件变量
    参数:
         @cond:条件变量的地址
    返回值:成功返回0,失败返回非零     
相关文章
|
1月前
|
缓存 安全 C++
C++无锁队列:解锁多线程编程新境界
【10月更文挑战第27天】
50 7
|
1月前
|
消息中间件 存储 安全
|
2月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
58 1
C++ 多线程之初识多线程
|
2月前
|
存储 并行计算 安全
C++多线程应用
【10月更文挑战第29天】C++ 中的多线程应用广泛,常见场景包括并行计算、网络编程中的并发服务器和图形用户界面(GUI)应用。通过多线程可以显著提升计算速度和响应能力。示例代码展示了如何使用 `pthread` 库创建和管理线程。注意事项包括数据同步与互斥、线程间通信和线程安全的类设计,以确保程序的正确性和稳定性。
|
2月前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
24 1
|
2月前
|
存储 前端开发 C++
C++ 多线程之带返回值的线程处理函数
这篇文章介绍了在C++中使用`async`函数、`packaged_task`和`promise`三种方法来创建带返回值的线程处理函数。
80 6
|
2月前
|
缓存 负载均衡 Java
c++写高性能的任务流线程池(万字详解!)
本文介绍了一种高性能的任务流线程池设计,涵盖多种优化机制。首先介绍了Work Steal机制,通过任务偷窃提高资源利用率。接着讨论了优先级任务,使不同优先级的任务得到合理调度。然后提出了缓存机制,通过环形缓存队列提升程序负载能力。Local Thread机制则通过预先创建线程减少创建和销毁线程的开销。Lock Free机制进一步减少了锁的竞争。容量动态调整机制根据任务负载动态调整线程数量。批量处理机制提高了任务处理效率。此外,还介绍了负载均衡、避免等待、预测优化、减少复制等策略。最后,任务组的设计便于管理和复用多任务。整体设计旨在提升线程池的性能和稳定性。
84 5
|
2月前
|
C++
C++ 多线程之线程管理函数
这篇文章介绍了C++中多线程编程的几个关键函数,包括获取线程ID的`get_id()`,延时函数`sleep_for()`,线程让步函数`yield()`,以及阻塞线程直到指定时间的`sleep_until()`。
39 0
C++ 多线程之线程管理函数
|
2月前
|
资源调度 Linux 调度
Linux C/C++之线程基础
这篇文章详细介绍了Linux下C/C++线程的基本概念、创建和管理线程的方法,以及线程同步的各种机制,并通过实例代码展示了线程同步技术的应用。
34 0
Linux C/C++之线程基础
|
2月前
|
消息中间件 NoSQL 关系型数据库
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
31 0