Linux线程的生产者消费者模型 --- 阻塞队列(blockqueue)(一)

简介: Linux线程的生产者消费者模型 --- 阻塞队列(blockqueue)(一)

线程同步

在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题就叫做同步

也就是说当一个线程申请锁成功后,一旦它解锁了就不能够再申请锁,而是要到整个线程队尾进行排队,让下一个线程去申请锁。这样有序的去申请锁就叫做同步。

条件变量

条件变量的使用:一个线程等待条件变量的条件成立而被挂起;另一个线程使条件成立后唤醒等待的线程。

也就是说使用条件变量后,所有的线程必须同步去执行,但条件满足时线程就挂起直到另一个线程唤醒它。

条件变量相当于一个线程执行的必要条件,只有满足条件的线程才能继续执行

条件变量的接口

定义条件变量:pthread_cond_t XXX

全局初始化 = PTHREAD_COND_INITALIZER

局部初始化使用:pthread_cond_init

#include <pthread.h>
int pthread_cond_init(pthread_cond_t *restrict cond,
      const pthread_condattr_t *restrict attr);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

参数一为条件变量的地址,参数二为可以不关心设为nullptr

销毁条件变量:pthread_cond_destroy

int pthread_cond_destroy(pthread_cond_t *cond);

满足条件变量则挂起等待:pthread_cond_wait

int pthread_cond_wait(pthread_cond_t *restrict cond,
              pthread_mutex_t *restrict mutex);

参数一为条件变量的地址,参数二为锁的地址(后面会谈到为什么有锁作为参数)

唤醒线程:pthread_cond_signal

 int pthread_cond_broadcast(pthread_cond_t *cond);//唤醒全部线程
 int pthread_cond_signal(pthread_cond_t *cond);//唤醒一个线程

参数都为条件变量的地址

生产者消费者场景

在日常的生活中,这个场景并不会陌生。例如:供货商 -> 超市 -> 顾客。而供货商就相当于生产者,顾客就相当于消费者,超市就充当一个中间商。顾客不可能直接去跟供货商买东西,而供货商也不会直接卖给顾客东西,超市就充当了这样一个中间的角色。


d37a2c3138d0169b1fa0ed6e02da018c.png

在线程的角度也是如此,假设现在一批线程充当着生产者的角色,另一批线程充当着消费者的角色。那么生产者线程不会直接就将数据传给消费者线程,而是会将数据放入到一个相当于缓冲区中,而这个过程又可以称为 生产者和消费者之间的解耦

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题

那么对于这种模型又会衍生出三种关系:

消费者和消费者的关系

对于消费者而言,因为缓冲区中的空间有限,而消费者线程只需要将数据写入到缓冲区,可是当缓冲区中已经有线程在写入中了,其他的线程就不能往缓冲区里写了,而是要等到前面的线程写完后再判断缓冲区是否还没满才可以写入。这就形成了消费者和消费者之间的互斥关系

生产者和生产者的关系

对于生产者也是同理,一个线程读时其他线程同样需要等待。这也就形成了互斥关系

生产者和消费者的关系

生产者和消费者既有互斥又有同步,当一个线程写时,另一个线程去读这种情况就会导致数据的不安全性,因此互斥就是为了保证共享资源的安全性。而每次缓冲区都只有一个线程去执行时,其他线程在等待一旦执行缓冲区的线程解锁了,等待的线程就可以马上申请锁去执行缓冲区,这样效率就会大大提高,因此同步是为了提高效率的。

从何体现出效率的提高

上面谈到缓冲区每次都只有一个线程在执行。那么在这个线程访问执行时其他线程在干什么呢?

首先线程在读或写之前肯定是会有其他的任务需要做的,比如创建写的数据,创建存放读到的数据空间等等。那么在一个线程访问缓冲区时,其他的线程就可以做这些访问缓冲区前的任务,一旦访问缓冲区的线程完成了,其他的线程就不需要再去完成访问缓冲区前的任务直接就可以访问缓冲区了,这就是效率提高的表现

因此:效率的提高真正体现的并不是访问缓冲区的过程,而是访问缓存区之前的过程,这也就是多消费者多生产者的意义

Blockqueue

阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。

阻塞队列为空时,消费者线程将被阻塞,直到阻塞队列被放入元素

阻塞队列已满时,生产者线程将被阻塞,直到有元素被取出

那么利用这个阻塞队列结合条件变量和锁,就可以编写出一套简单的模型。

blockqueue.hpp

#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
// 设置默认的最大容量
static int max = 10;
template <class T>
class blockqueue
{
public:
    blockqueue(const int &maxnum = max)
        : _maxnum(maxnum)
    {
        pthread_mutex_init(&_lock, nullptr);
        pthread_cond_init(&_pcond, nullptr);
        pthread_cond_init(&_ccond, nullptr);
    }
    // 插入数据
    void push(const T &in)
    {
        // 加锁
        pthread_mutex_lock(&_lock);
        // 判断队列是否满了,如果为空则等待
        // 充当条件判断的语法必须是while,不能用if
        while (_q.size() == _maxnum)
            pthread_cond_wait(&_pcond, &_lock);
        // 插入数据
        _q.push(in);
        // 走到这里说明队列一定有数据,就可以唤醒消费者的线程
        pthread_cond_signal(&_ccond);
        // 解锁
        pthread_mutex_unlock(&_lock);
    }
    // 拿到头部数据并删除
    void pop(T *out)
    {
        // 加锁
        pthread_mutex_lock(&_lock);
        // 判断队列是否满了,如果为空则等待
        // 充当条件判断的语法必须是while,不能用if
        while (_q.size() == 0)
            pthread_cond_wait(&_ccond, &_lock);
        // 拿到头部数据并删除
        *out = _q.front();
        _q.pop();
        // 走到这里说明队列一定不会满,就可以唤醒生产者的线程
        pthread_cond_signal(&_pcond);
        // 解锁
        pthread_mutex_unlock(&_lock);
    }
    ~blockqueue()
    {
        pthread_mutex_destroy(&_lock);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }
private:
    std::queue<T> _q;
    int _maxnum; // 最大容量
    pthread_mutex_t _lock;
    pthread_cond_t _pcond; // 生产者的条件变量
    pthread_cond_t _ccond; // 消费者的条件变量
};

注意:为什么上面的代码里判断条件需要用循环而不是if呢,这就要说到上面的为什么pthread_cond_wait的参数里会有锁了。

为什么条件变量的接口有锁作为参数

首先,能够执行到这个接口说明该线程必定申请锁成功了。如果现在线程执行到了wait这个接口,那么线程就会被阻塞。但是这个线程已经申请了锁其他的线程就没有办法再去申请锁了,那么此时这个线程就一定要解锁,而pthread_cond_wait这个接口就会自动的帮这个线程解锁。

当这个线程阻塞后被其他的线程唤醒后,pthread_cond_wait这个接口就会自动帮这个线程再次加锁,所以为了确保再次加锁的锁是和之前的一样的,pthread_cond_wait接口就会带上锁的地址作为参数。

那么为什么要用循环而不是if呢?最主要的原因是,即使这个线程被唤醒了,但是它仍有可能还是处于不满足条件的情况,因此为了确保数据的安全要再次对这个线程进行判断,直到该线程满足条件才能继续往下执行。


相关实践学习
CentOS 7迁移Anolis OS 7
龙蜥操作系统Anolis OS的体验。Anolis OS 7生态上和依赖管理上保持跟CentOS 7.x兼容,一键式迁移脚本centos2anolis.py。本文为您介绍如何通过AOMS迁移工具实现CentOS 7.x到Anolis OS 7的迁移。
目录
相关文章
|
2月前
|
并行计算 JavaScript 前端开发
单线程模型
【10月更文挑战第15天】
|
2月前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
24 1
|
2月前
|
资源调度 Linux 调度
Linux C/C++之线程基础
这篇文章详细介绍了Linux下C/C++线程的基本概念、创建和管理线程的方法,以及线程同步的各种机制,并通过实例代码展示了线程同步技术的应用。
33 0
Linux C/C++之线程基础
|
2月前
|
NoSQL Redis 数据库
Redis单线程模型 redis 为什么是单线程?为什么 redis 单线程效率还能那么高,速度还能特别快
本文解释了Redis为什么采用单线程模型,以及为什么Redis单线程模型的效率和速度依然可以非常高,主要原因包括Redis操作主要访问内存、核心操作简单、单线程避免了线程竞争开销,以及使用了IO多路复用机制epoll。
54 0
Redis单线程模型 redis 为什么是单线程?为什么 redis 单线程效率还能那么高,速度还能特别快
|
2月前
|
安全 调度 C#
STA模型、同步上下文和多线程、异步调度
【10月更文挑战第19天】本文介绍了 STA 模型、同步上下文和多线程、异步调度的概念及其优缺点。STA 模型适用于单线程环境,确保资源访问的顺序性;同步上下文和多线程提高了程序的并发性和响应性,但增加了复杂性;异步调度提升了程序的响应性和资源利用率,但也带来了编程复杂性和错误处理的挑战。选择合适的模型需根据具体应用场景和需求进行权衡。
|
2月前
|
消息中间件 NoSQL 关系型数据库
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
【多线程-从零开始-捌】阻塞队列,消费者生产者模型
29 0
|
2月前
|
安全 Linux
Linux线程(十一)线程互斥锁-条件变量详解
Linux线程(十一)线程互斥锁-条件变量详解
|
2月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
58 1
C++ 多线程之初识多线程
|
2月前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
27 3
|
2月前
|
Java 开发者
在Java多线程编程中,选择合适的线程创建方法至关重要
【10月更文挑战第20天】在Java多线程编程中,选择合适的线程创建方法至关重要。本文通过案例分析,探讨了继承Thread类和实现Runnable接口两种方法的优缺点及适用场景,帮助开发者做出明智的选择。
23 2
下一篇
DataWorks