Linux多线程实践(8) --Posix条件变量解决生产者消费者问题

简介: Posix条件变量int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);int pt...

Posix条件变量

int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);
int pthread_cond_destroy(pthread_cond_t *cond);

int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timedwait(pthread_cond_t *cond,  pthread_mutex_t  *mutex,  const  struct timespec *abstime);

int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);

    与互斥锁不同,条件变量是用来等待而不是用来上锁的。条件变量用来自动阻塞调用线程, 直到条件变量所要求的情况发生为止。通常条件变量需要和互斥锁同时使用, 利用互斥量保护条件变量;

  条件的检测是在互斥锁的保护下进行的。如果一个条件为假,一个线程自动阻塞,并释放等待状态改变的互斥锁。如果另一个线程改变了条件,它就发送信号给关联的条件变量, 并唤醒一个或多个等待在该条件变量上的线程,这些线程将重新获得互斥锁,重新评价条件。如果将条件变量放到共享内存中, 而两进程可共享读写这段内存,则条件变量可以被用来实现两进程间的线程同步。


条件变量使用规范

1.等待条件代码

pthread_mutex_lock(&mutex);

while (条件为假)
{
    pthread_cond_wait(&cond, &mutex);
}
修改条件

pthread_mutex_unlock(&mutex);

/**解释: 为什么使用while, 而不用if?

Man-Page给出了答案: If a signal is delivered to a thread waiting for a condition variable, upon return from 

the signal handler the thread resumes waiting for the condition variable as if it was not interrupted, or 

it shall return zero due to spurious wakeup.

即是说如果正在等待条件变量的一个线程收到一个信号,从信号处理函数返回的时候线程应该重新等待条件变量就好象没有被中断一样,或者被虚假地唤醒返回0。如果是上述情形,那么其实条件并未被改变,那么此时如果没有继续判断一下条件的真假就继续向下执行的话,修改条件将会出现问题,所以需要使用while 循环再判断一下,如果条件还是为假必须继续等待。

注:在多处理器系统中,pthread_cond_signal 可能会唤醒多个等待条件的线程,这也是一种spurious wakeup。

**/

2.给条件发送信号代码

pthread_mutex_lock(&mutex);

设置条件为真
pthread_cond_signal(&cond);

pthread_mutex_unlock(&mutex);


条件变量API说明

1.pthread_cond_init

使用条件变量之前要先进行初始化:可以在单个语句中生成和初始化一个条件变量如:

  pthread_cond_t my_condition=PTHREAD_COND_INITIALIZER; //用于进程间线程的通信;

  或用函数pthread_cond_init进行动态初始化;

 

2.pthread_cond_destroy

该函数可以用来摧毁所指定的条件变量,同时将会释放所给它分配的资源。调用该函数的进程并不要求等待在参数所指定的条件变量上;

 

3.pthread_cond_wait && pthread_cond_timedwait

cond_wait原语完成三件事:

  (1)对mutex解锁;

  (2)等待条件, 直到有线程向他发送通知;

  (3)当wait返回时, 再对mutex重新加锁;

第一个参数cond是指向一个条件变量的指针。第二个参数mutex则是对相关的互斥锁的指针。

函数pthread_cond_timedwait函数类型与函数pthread_cond_wait区别在于:timedwait多了一个超时, 超时值制订了我们愿意等待多长时间, 如果达到或是超过所引用的参数*abstime,它将结束阻塞并返回错误ETIME.

//timespec结构如下:
struct timespec
{
    time_t   tv_sec;        /* seconds */
    long     tv_nsec;       /* nanoseconds */
};

注意: 这个时间值是一个绝对数而不是相对数, 例如, 假设愿意等待三秒钟, 那么并不是把3秒钟转换成timespec结构, 而是需要将当前实践加上3分钟再转换成timespec结构, 这个获取当前时间值的函数可以是clock_gettime(我们采用这一个)也可以是gettimeofday.

 

4.pthread_cond_signal && pthread_cond_broadcast

cond_signal原语所完成的操作:

  向第一个等待条件的线程发起通知, 如果没有任何一个线程处于等待条件的状态, 那么这个通知将被忽略;

cond_broadcast:

  向所有等待在该条件上的线程发送通知;

参数cond是一个条件变量的指针。当调用signal时, 一个在相同条件变量上阻塞的线程将被解锁。如果同时有多个线程阻塞,则由调度策略确定接收通知的线程。如果调用broadcast,则将通知阻塞在这个条件变量上的所有线程。一旦被唤醒,线程仍然会要求互斥锁。如果当前没有线程等待通知,则上面两种调用实际上成为一个空操作, 内核会将条件变量的通知忽略(如果参数*cond指向非法地址,则返回值EINVAL);

 

类Condition封装

//Condition类设计
class Condition
{
public:
    Condition(const pthread_mutexattr_t *mutexAttr = NULL,
              const pthread_condattr_t  *condAttr = NULL);
    ~Condition();

    //条件变量函数
    int signal();
    int broadcast();
    int wait();
    int timedwait(int seconds);

    //互斥量函数
    int lock();
    int trylock();
    int unlock();

private:
    pthread_mutex_t m_mutex;
    pthread_cond_t  m_cond;
};
//Condition类实现
Condition::Condition(const pthread_mutexattr_t *mutexAttr,
                     const pthread_condattr_t  *condAttr)
{
    //初始化互斥量
    pthread_mutex_init(&m_mutex, mutexAttr);
    //初始化条件变量
    pthread_cond_init(&m_cond, condAttr);
}
Condition::~Condition()
{
    //销毁互斥量
    pthread_mutex_destroy(&m_mutex);
    //销毁条件变量
    pthread_cond_destroy(&m_cond);
}
int Condition::signal()
{
    return pthread_cond_signal(&m_cond);
}
int Condition::broadcast()
{
    return pthread_cond_broadcast(&m_cond);
}
int Condition::wait()
{
    return pthread_cond_wait(&m_cond, &m_mutex);
}
int Condition::timedwait(int seconds)
{
    //获取当前时间
    struct timespec abstime;
    clock_gettime(CLOCK_REALTIME, &abstime);
    //将当前时间加上需要等待的秒数, 构成绝对时间值
    abstime.tv_sec += seconds;
    return pthread_cond_timedwait(&m_cond, &m_mutex, &abstime);
}

int Condition::lock()
{
    return pthread_mutex_lock(&m_mutex);
}
int Condition::trylock()
{
    return pthread_mutex_trylock(&m_mutex);
}
int Condition::unlock()
{
    return pthread_mutex_unlock(&m_mutex);
}

生产者消费者问题(无界缓冲区)

/** 实现: 我们假设是缓冲区是无界的
说明:生产者可以不停地生产,使用pthread_cond_signal  发出通知的时候,如果此时没有消费者线程在等待条件,那么这个通知将被丢弃,但也不影响整体代码的执行,没有消费者线程在等待,说明产品资源充足,即while 判断失败,不会进入等待状态,直接消费产品(即修改条件)。
**/
const unsigned int PRODUCER_COUNT = 5;	//生产者个数
const unsigned int CONSUMER_COUNT = 3;	//消费者个数

//定义Condition类
Condition cond;
//缓冲区 ~O(∩_∩)O~
int nReady = 0;
//消费者
void *consumer(void *args)
{
    int id = *(int *)args;
    delete (int *)args;
    while (true)
    {
        cond.lock();    //锁定mutex
        while (!(nReady > 0))
        {
            printf("-- thread %d wait...\n", id);
            cond.wait();    //等待条件变量
        }

        printf("** thread %d alive, and consume product %d ...\n", id, nReady);
        -- nReady;  //消费
        printf("   thread %d end consume... \n\n", id);

        cond.unlock();  //解锁mutex
        sleep(1);
    }
    pthread_exit(NULL);
}

//生产者
void *producer(void *args)
{
    int id = *(int *)args;
    delete (int *)args;
    while (true)
    {
        cond.lock();    //锁定mutex

        printf("++ thread %d signal, and produce product %d ...\n", id, nReady+1);
        ++ nReady;      //生产
        cond.signal();  //发送条件变量信号
        printf("   thread %d end produce, signal...\n\n", id);
        cond.unlock();  //解锁mutex
        sleep(1);
    }
    pthread_exit(NULL);
}

int main()
{
    pthread_t thread[PRODUCER_COUNT+CONSUMER_COUNT];

    //首先生成消费者
    for (unsigned int i = 0; i < CONSUMER_COUNT; ++i)
        pthread_create(&thread[i], NULL, consumer, new int(i));
    sleep(1);   //使生产者等待一段时间, 加速消费者等待事件产生
    //然后生成生产者
    for (unsigned int i = 0; i < PRODUCER_COUNT; ++i)
        pthread_create(&thread[CONSUMER_COUNT+i], NULL, producer, new int(i));
    for (unsigned int i = 0; i < PRODUCER_COUNT+CONSUMER_COUNT; ++i)
        pthread_join(thread[i], NULL);
}

目录
相关文章
|
3月前
|
存储 Shell Linux
八、Linux Shell 脚本:变量与字符串
Shell脚本里的变量就像一个个贴着标签的“箱子”。装东西(赋值)时,=两边千万不能有空格。用单引号''装进去的东西会原封不动,用双引号""则会让里面的$变量先“变身”再装箱。默认箱子只能在当前“房间”(Shell进程)用,想让隔壁房间(子进程)也能看到,就得给箱子盖个export的“出口”戳。此外,Shell还自带了$?(上条命令的成绩单)和$1(别人递进来的第一个包裹)等许多特殊箱子,非常有用。
392 2
|
10月前
|
存储 Linux API
【Linux进程概念】—— 操作系统中的“生命体”,计算机里的“多线程”
在计算机系统的底层架构中,操作系统肩负着资源管理与任务调度的重任。当我们启动各类应用程序时,其背后复杂的运作机制便悄然展开。程序,作为静态的指令集合,如何在系统中实现动态执行?本文带你一探究竟!
【Linux进程概念】—— 操作系统中的“生命体”,计算机里的“多线程”
|
7月前
|
监控 Linux 应用服务中间件
Linux多节点多硬盘部署MinIO:分布式MinIO集群部署指南搭建高可用架构实践
通过以上步骤,已成功基于已有的 MinIO 服务,扩展为一个 MinIO 集群。该集群具有高可用性和容错性,适合生产环境使用。如果有任何问题,请检查日志或参考MinIO 官方文档。作者联系方式vx:2743642415。
2536 57
|
8月前
|
并行计算 Linux
Linux内核中的线程和进程实现详解
了解进程和线程如何工作,可以帮助我们更好地编写程序,充分利用多核CPU,实现并行计算,提高系统的响应速度和计算效能。记住,适当平衡进程和线程的使用,既要拥有独立空间的'兄弟',也需要在'家庭'中分享和并行的成员。对于这个世界,现在,你应该有一个全新的认识。
310 67
|
10月前
|
Linux
Linux编程: 在业务线程中注册和处理Linux信号
通过本文,您可以了解如何在业务线程中注册和处理Linux信号。正确处理信号可以提高程序的健壮性和稳定性。希望这些内容能帮助您更好地理解和应用Linux信号处理机制。
211 26
|
10月前
|
Linux
Linux编程: 在业务线程中注册和处理Linux信号
本文详细介绍了如何在Linux中通过在业务线程中注册和处理信号。我们讨论了信号的基本概念,并通过完整的代码示例展示了在业务线程中注册和处理信号的方法。通过正确地使用信号处理机制,可以提高程序的健壮性和响应能力。希望本文能帮助您更好地理解和应用Linux信号处理,提高开发效率和代码质量。
213 17
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
248 1
Java—多线程实现生产消费者
|
缓存 监控 网络协议
Linux操作系统的内核优化与实践####
本文旨在探讨Linux操作系统内核的优化策略与实际应用案例,深入分析内核参数调优、编译选项配置及实时性能监控的方法。通过具体实例讲解如何根据不同应用场景调整内核设置,以提升系统性能和稳定性,为系统管理员和技术爱好者提供实用的优化指南。 ####
|
监控 算法 Linux
Linux内核锁机制深度剖析与实践优化####
本文作为一篇技术性文章,深入探讨了Linux操作系统内核中锁机制的工作原理、类型及其在并发控制中的应用,旨在为开发者提供关于如何有效利用这些工具来提升系统性能和稳定性的见解。不同于常规摘要的概述性质,本文将直接通过具体案例分析,展示在不同场景下选择合适的锁策略对于解决竞争条件、死锁问题的重要性,以及如何根据实际需求调整锁的粒度以达到最佳效果,为读者呈现一份实用性强的实践指南。 ####
|
关系型数据库 MySQL Linux
Linux环境下MySQL数据库自动定时备份实践
数据库备份是确保数据安全的重要措施。在Linux环境下,实现MySQL数据库的自动定时备份可以通过多种方式完成。本文将介绍如何使用`cron`定时任务和`mysqldump`工具来实现MySQL数据库的每日自动备份。
759 3