Linux多线程实践(8) --Posix条件变量解决生产者消费者问题

简介: Posix条件变量int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);int pt...

Posix条件变量

int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);
int pthread_cond_destroy(pthread_cond_t *cond);

int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timedwait(pthread_cond_t *cond,  pthread_mutex_t  *mutex,  const  struct timespec *abstime);

int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);

    与互斥锁不同,条件变量是用来等待而不是用来上锁的。条件变量用来自动阻塞调用线程, 直到条件变量所要求的情况发生为止。通常条件变量需要和互斥锁同时使用, 利用互斥量保护条件变量;

  条件的检测是在互斥锁的保护下进行的。如果一个条件为假,一个线程自动阻塞,并释放等待状态改变的互斥锁。如果另一个线程改变了条件,它就发送信号给关联的条件变量, 并唤醒一个或多个等待在该条件变量上的线程,这些线程将重新获得互斥锁,重新评价条件。如果将条件变量放到共享内存中, 而两进程可共享读写这段内存,则条件变量可以被用来实现两进程间的线程同步。


条件变量使用规范

1.等待条件代码

pthread_mutex_lock(&mutex);

while (条件为假)
{
    pthread_cond_wait(&cond, &mutex);
}
修改条件

pthread_mutex_unlock(&mutex);

/**解释: 为什么使用while, 而不用if?

Man-Page给出了答案: If a signal is delivered to a thread waiting for a condition variable, upon return from 

the signal handler the thread resumes waiting for the condition variable as if it was not interrupted, or 

it shall return zero due to spurious wakeup.

即是说如果正在等待条件变量的一个线程收到一个信号,从信号处理函数返回的时候线程应该重新等待条件变量就好象没有被中断一样,或者被虚假地唤醒返回0。如果是上述情形,那么其实条件并未被改变,那么此时如果没有继续判断一下条件的真假就继续向下执行的话,修改条件将会出现问题,所以需要使用while 循环再判断一下,如果条件还是为假必须继续等待。

注:在多处理器系统中,pthread_cond_signal 可能会唤醒多个等待条件的线程,这也是一种spurious wakeup。

**/

2.给条件发送信号代码

pthread_mutex_lock(&mutex);

设置条件为真
pthread_cond_signal(&cond);

pthread_mutex_unlock(&mutex);


条件变量API说明

1.pthread_cond_init

使用条件变量之前要先进行初始化:可以在单个语句中生成和初始化一个条件变量如:

  pthread_cond_t my_condition=PTHREAD_COND_INITIALIZER; //用于进程间线程的通信;

  或用函数pthread_cond_init进行动态初始化;

 

2.pthread_cond_destroy

该函数可以用来摧毁所指定的条件变量,同时将会释放所给它分配的资源。调用该函数的进程并不要求等待在参数所指定的条件变量上;

 

3.pthread_cond_wait && pthread_cond_timedwait

cond_wait原语完成三件事:

  (1)对mutex解锁;

  (2)等待条件, 直到有线程向他发送通知;

  (3)当wait返回时, 再对mutex重新加锁;

第一个参数cond是指向一个条件变量的指针。第二个参数mutex则是对相关的互斥锁的指针。

函数pthread_cond_timedwait函数类型与函数pthread_cond_wait区别在于:timedwait多了一个超时, 超时值制订了我们愿意等待多长时间, 如果达到或是超过所引用的参数*abstime,它将结束阻塞并返回错误ETIME.

//timespec结构如下:
struct timespec
{
    time_t   tv_sec;        /* seconds */
    long     tv_nsec;       /* nanoseconds */
};

注意: 这个时间值是一个绝对数而不是相对数, 例如, 假设愿意等待三秒钟, 那么并不是把3秒钟转换成timespec结构, 而是需要将当前实践加上3分钟再转换成timespec结构, 这个获取当前时间值的函数可以是clock_gettime(我们采用这一个)也可以是gettimeofday.

 

4.pthread_cond_signal && pthread_cond_broadcast

cond_signal原语所完成的操作:

  向第一个等待条件的线程发起通知, 如果没有任何一个线程处于等待条件的状态, 那么这个通知将被忽略;

cond_broadcast:

  向所有等待在该条件上的线程发送通知;

参数cond是一个条件变量的指针。当调用signal时, 一个在相同条件变量上阻塞的线程将被解锁。如果同时有多个线程阻塞,则由调度策略确定接收通知的线程。如果调用broadcast,则将通知阻塞在这个条件变量上的所有线程。一旦被唤醒,线程仍然会要求互斥锁。如果当前没有线程等待通知,则上面两种调用实际上成为一个空操作, 内核会将条件变量的通知忽略(如果参数*cond指向非法地址,则返回值EINVAL);

 

类Condition封装

//Condition类设计
class Condition
{
public:
    Condition(const pthread_mutexattr_t *mutexAttr = NULL,
              const pthread_condattr_t  *condAttr = NULL);
    ~Condition();

    //条件变量函数
    int signal();
    int broadcast();
    int wait();
    int timedwait(int seconds);

    //互斥量函数
    int lock();
    int trylock();
    int unlock();

private:
    pthread_mutex_t m_mutex;
    pthread_cond_t  m_cond;
};
//Condition类实现
Condition::Condition(const pthread_mutexattr_t *mutexAttr,
                     const pthread_condattr_t  *condAttr)
{
    //初始化互斥量
    pthread_mutex_init(&m_mutex, mutexAttr);
    //初始化条件变量
    pthread_cond_init(&m_cond, condAttr);
}
Condition::~Condition()
{
    //销毁互斥量
    pthread_mutex_destroy(&m_mutex);
    //销毁条件变量
    pthread_cond_destroy(&m_cond);
}
int Condition::signal()
{
    return pthread_cond_signal(&m_cond);
}
int Condition::broadcast()
{
    return pthread_cond_broadcast(&m_cond);
}
int Condition::wait()
{
    return pthread_cond_wait(&m_cond, &m_mutex);
}
int Condition::timedwait(int seconds)
{
    //获取当前时间
    struct timespec abstime;
    clock_gettime(CLOCK_REALTIME, &abstime);
    //将当前时间加上需要等待的秒数, 构成绝对时间值
    abstime.tv_sec += seconds;
    return pthread_cond_timedwait(&m_cond, &m_mutex, &abstime);
}

int Condition::lock()
{
    return pthread_mutex_lock(&m_mutex);
}
int Condition::trylock()
{
    return pthread_mutex_trylock(&m_mutex);
}
int Condition::unlock()
{
    return pthread_mutex_unlock(&m_mutex);
}

生产者消费者问题(无界缓冲区)

/** 实现: 我们假设是缓冲区是无界的
说明:生产者可以不停地生产,使用pthread_cond_signal  发出通知的时候,如果此时没有消费者线程在等待条件,那么这个通知将被丢弃,但也不影响整体代码的执行,没有消费者线程在等待,说明产品资源充足,即while 判断失败,不会进入等待状态,直接消费产品(即修改条件)。
**/
const unsigned int PRODUCER_COUNT = 5;	//生产者个数
const unsigned int CONSUMER_COUNT = 3;	//消费者个数

//定义Condition类
Condition cond;
//缓冲区 ~O(∩_∩)O~
int nReady = 0;
//消费者
void *consumer(void *args)
{
    int id = *(int *)args;
    delete (int *)args;
    while (true)
    {
        cond.lock();    //锁定mutex
        while (!(nReady > 0))
        {
            printf("-- thread %d wait...\n", id);
            cond.wait();    //等待条件变量
        }

        printf("** thread %d alive, and consume product %d ...\n", id, nReady);
        -- nReady;  //消费
        printf("   thread %d end consume... \n\n", id);

        cond.unlock();  //解锁mutex
        sleep(1);
    }
    pthread_exit(NULL);
}

//生产者
void *producer(void *args)
{
    int id = *(int *)args;
    delete (int *)args;
    while (true)
    {
        cond.lock();    //锁定mutex

        printf("++ thread %d signal, and produce product %d ...\n", id, nReady+1);
        ++ nReady;      //生产
        cond.signal();  //发送条件变量信号
        printf("   thread %d end produce, signal...\n\n", id);
        cond.unlock();  //解锁mutex
        sleep(1);
    }
    pthread_exit(NULL);
}

int main()
{
    pthread_t thread[PRODUCER_COUNT+CONSUMER_COUNT];

    //首先生成消费者
    for (unsigned int i = 0; i < CONSUMER_COUNT; ++i)
        pthread_create(&thread[i], NULL, consumer, new int(i));
    sleep(1);   //使生产者等待一段时间, 加速消费者等待事件产生
    //然后生成生产者
    for (unsigned int i = 0; i < PRODUCER_COUNT; ++i)
        pthread_create(&thread[CONSUMER_COUNT+i], NULL, producer, new int(i));
    for (unsigned int i = 0; i < PRODUCER_COUNT+CONSUMER_COUNT; ++i)
        pthread_join(thread[i], NULL);
}

目录
相关文章
|
1月前
|
Java 开发者
解锁并发编程新姿势!深度揭秘AQS独占锁&ReentrantLock重入锁奥秘,Condition条件变量让你玩转线程协作,秒变并发大神!
【8月更文挑战第4天】AQS是Java并发编程的核心框架,为锁和同步器提供基础结构。ReentrantLock基于AQS实现可重入互斥锁,比`synchronized`更灵活,支持可中断锁获取及超时控制。通过维护计数器实现锁的重入性。Condition接口允许ReentrantLock创建多个条件变量,支持细粒度线程协作,超越了传统`wait`/`notify`机制,助力开发者构建高效可靠的并发应用。
64 0
|
7天前
|
存储 Java 程序员
优化Java多线程应用:是创建Thread对象直接调用start()方法?还是用个变量调用?
这篇文章探讨了Java中两种创建和启动线程的方法,并分析了它们的区别。作者建议直接调用 `Thread` 对象的 `start()` 方法,而非保持强引用,以避免内存泄漏、简化线程生命周期管理,并减少不必要的线程控制。文章详细解释了这种方法在使用 `ThreadLocal` 时的优势,并提供了代码示例。作者洛小豆,文章来源于稀土掘金。
|
8天前
|
存储 Ubuntu Linux
C语言 多线程编程(1) 初识线程和条件变量
本文档详细介绍了多线程的概念、相关命令及线程的操作方法。首先解释了线程的定义及其与进程的关系,接着对比了线程与进程的区别。随后介绍了如何在 Linux 系统中使用 `pidstat`、`top` 和 `ps` 命令查看线程信息。文档还探讨了多进程和多线程模式各自的优缺点及适用场景,并详细讲解了如何使用 POSIX 线程库创建、退出、等待和取消线程。此外,还介绍了线程分离的概念和方法,并提供了多个示例代码帮助理解。最后,深入探讨了线程间的通讯机制、互斥锁和条件变量的使用,通过具体示例展示了如何实现生产者与消费者的同步模型。
|
28天前
|
算法 Java
JUC(1)线程和进程、并发和并行、线程的状态、lock锁、生产者和消费者问题
该博客文章综合介绍了Java并发编程的基础知识,包括线程与进程的区别、并发与并行的概念、线程的生命周期状态、`sleep`与`wait`方法的差异、`Lock`接口及其实现类与`synchronized`关键字的对比,以及生产者和消费者问题的解决方案和使用`Condition`对象替代`synchronized`关键字的方法。
JUC(1)线程和进程、并发和并行、线程的状态、lock锁、生产者和消费者问题
|
20天前
|
消息中间件 设计模式 安全
多线程魔法:揭秘一个JVM中如何同时运行多个消费者
【8月更文挑战第22天】在Java虚拟机(JVM)中探索多消费者模式,此模式解耦生产与消费过程,提升系统性能。通过`ExecutorService`和`BlockingQueue`构建含2个生产者及4个消费者的系统,实现实时消息处理。多消费者模式虽增强处理能力,但也引入线程安全与资源竞争等挑战,需谨慎设计以确保高效稳定运行。
48 2
|
29天前
|
安全 Java
Java模拟生产者-消费者问题。生产者不断的往仓库中存放产品,消费者从仓库中消费产品。其中生产者和消费者都可以有若干个。在这里,生产者是一个线程,消费者是一个线程。仓库容量有限,只有库满时生产者不能存
该博客文章通过Java代码示例演示了生产者-消费者问题,其中生产者在仓库未满时生产产品,消费者在仓库有产品时消费产品,通过同步机制确保多线程环境下的线程安全和有效通信。
|
20天前
|
Linux Shell
在Linux中,如何将二进制文件添加到 $PATH 变量中?
在Linux中,如何将二进制文件添加到 $PATH 变量中?
|
2月前
|
存储 SQL Java
(七)全面剖析Java并发编程之线程变量副本ThreadLocal原理分析
在之前的文章:彻底理解Java并发编程之Synchronized关键字实现原理剖析中我们曾初次谈到线程安全问题引发的"三要素":多线程、共享资源/临界资源、非原子性操作,简而言之:在同一时刻,多条线程同时对临界资源进行非原子性操作则有可能产生线程安全问题。
|
2月前
|
Java Linux
linux 对子用户配置java 环境变量
linux 对子用户配置java 环境变量
23 3
|
2月前
|
存储 Python 容器
Node中的AsyncLocalStorage 使用问题之在Python中,线程内变量的问题如何解决
Node中的AsyncLocalStorage 使用问题之在Python中,线程内变量的问题如何解决