【linux】线程同步和生产消费者模型

简介: 【linux】线程同步和生产消费者模型

线程同步

当我们多线程访问同一个临界资源时,会造成并发访问一个临界资源,使得临界资源数据不安全,我们引入了锁的概念,解决了临界资源访问不安全的情况,对于线程而言竞争锁的能力有强有弱,对于之前就抢到锁的线程,当他释放锁后,由于不用做什么准备工作,他竞争锁的能力很强,导致这个线程反复的争夺锁,来访问临界资源,导致其他线程处于饥饿状态

同步:同步问题是保证数据安全的情况下,让我们的线程具有一定的顺序性

解决方案

条件变量的引入

当多线程来访问临界资源时,首先不会让他去访问临界资源,而是将这个线程放入条件变量维护的队列中去,等待临界资源就绪,举个例子,一个幼儿园里面,到了饭点,而小朋友们是每一个线程,而饭就是临界资源,每次只能有一个孩子在餐厅里面打饭,这就是锁,每个孩子跑步速度不一样,竞争锁的能力不一样,为了避免一个孩子一直在餐厅不走,一直吃饭,幼儿园老师做了这个规定,每个小朋友吃饭必须去排队,刚吃完的孩子还想吃的话,必须排队在队的后面。而条件变量维护的队列类比与排队,每个线程访问完临界资源之后必须在条件变量维护的队列后面排队

条件变量接口介绍

1、主要应用函数:

pthread_cond_init()函数

功能:初始化一个条件变量

pthread_cond_wait()函数

功能:阻塞等待一个条件变量

pthread_cond_signal()函数

功能:唤醒至少一个阻塞在条件变量上的线程

pthread_cond_broadcast()函数

功能:唤醒全部阻塞在条件变量上的线程

pthread_cond_destroy()函数

功能:销毁一个条件变量

函数分析

1.初始化一个条件变量

int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr); 

参2:attr表条件变量属性,通常为默认值,传NULL即可

也可以使用静态初始化的方法,初始化条件变量:

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

2.阻塞等待一个条件变量

int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex); 

函数作用:

阻塞等待条件变量cond(参1)满足

释放已掌握的互斥锁(解锁互斥量)相当于pthread_mutex_unlock(&mutex);

当被唤醒,pthread_cond_wait函数返回时,解除阻塞并重新申请获取互斥锁pthread_mutex_lock(&mutex);

3.唤醒至少一个阻塞在条件变量上的线程

int pthread_cond_signal(pthread_cond_t *cond);

4.唤醒全部阻塞在条件变量上的线程

int pthread_cond_broadcast(pthread_cond_t *cond);

5.销毁一个条件变量

int pthread_cond_destroy(pthread_cond_t *cond);

代码实现线程同步

1.makefile编写

mycond:mycond.cc
  g++ -o mycond mycond.cc -std=c++11 -lpthread
.PHONY:clean
clean:
  rm -f mycond

2.mycond.cc

#include<iostream>
#include<unistd.h>
#include<pthread.h>
using namespace std;
int cnt=0;
pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;
void*fun(void*args)
{
  pthread_detach(pthread_self());
  uint64_t num=(uint64_t)args;
  cout<<"pthread:"<<num<<"create success"<<endl;
  while(1)
  { pthread_mutex_lock(&mutex);
   cout<<"pthread:"<<num<<",cnt:"<<cnt++<<endl;
    pthread_mutex_unlock(&mutex);
  }
}
int main()
{
for( uint64_t i=0;i<5;i++)
{
   pthread_t tid;
   pthread_create(&tid,nullptr,fun,(void*)i);
}
while(1)
{
    
}
}

代码解释:

初始化一个锁,一个条件变量,临界资源cnt,每个线程在自己要执行的fun函数内,需要访问临界资源cnt,对cnt++,在主函数中创建5个线程,为了防止主线程退出,所有线程都退出,所以让主线程死循环,在每个fun函数里面实现线程分离,不需要主线程来等待回收其他线程,让操作系统自己回收

由于线程2竞争锁的能力强,每次都是线程2来访问临界资源。

为了解决一个线程竞争锁的能力强,使用线程同步,先实现代码在来解释

#include<iostream>
#include<unistd.h>
#include<pthread.h>
using namespace std;
int cnt=0;
pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;
void*fun(void*args)
{
  pthread_detach(pthread_self());
  uint64_t num=(uint64_t)args;
  cout<<"pthread:"<<num<<"create success"<<endl;
  while(1)
  { pthread_mutex_lock(&mutex);
   pthread_cond_wait(&cond,&mutex);//新增行
   cout<<"pthread:"<<num<<",cnt:"<<cnt++<<endl;
    pthread_mutex_unlock(&mutex);
    
  }
}
int main()
{
for( uint64_t i=0;i<5;i++)
{
   pthread_t tid;
   pthread_create(&tid,nullptr,fun,(void*)i);
}
while(1)
{
sleep(1);//新增行
pthread_cond_signal(&cond);//新增行
cout<<"signal one thread..."<<endl;    //新增行
}
}

pthread_cond_wait函数可以将刚申请锁的线程让其加入队列,让其休眠,该函数还会让对应的线程释放锁,这样一轮下来,所有想访问临界资源的线程都会出现在条件变量维护的队列中,等待唤醒,一次唤醒一个,让他们去访问临界资源,在主线程中来唤醒在维护条件变量对应的队列中其他线程,去访问临界资源,sleep作用防止打印太快看不到效果。

五个线程按照顺序去访问临界资源


cp问题(生产消费者模型)

此时有三种关系:

生产者和生产者:互斥

消费者和生产者:互斥,同步(一个放,一个拿,肯定要保证顺序问题)

消费者和消费者:互斥

3种关系:生产者和生产者,消费者和消费者,生产者和消费者

2种角色:生产者和消费者

1个交易场所:特定结构的内存空间

特点:支持忙闲不均(对比冯诺依曼结构)

生产和消费进行解耦

代码实现生产消费者模型

makefile实现

cp:cp.cc
  g++ -o cp cp.cc -std=c++11 -lpthread
.PHONY:clean
clean:
  rm -f cp

cp.cc(未完成)

#include<iostream>
#include<pthread.h>
#include"blockqueue.hpp"
#include<unistd.h>
using namespace std;
void*consumer(void*args)
{
Blockqueue<int>*cq=static_cast<Blockqueue<int>*>(args);
while(1)
{
//消费数据,将队列中的数据pop
int data=cq->pop();
cout<<"消费了一个数据:"<<data<<endl;
}
}
void* productor(void*args)
{
 int data=0;
Blockqueue<int>*pq=static_cast<Blockqueue<int>*>(args);
while(1)
{ sleep(1);
 //生产数据放到队列中去
  pq->push(data);
  cout<<"生产了一个数据:"<< data++ <<endl;
}
}
int main()
{
pthread_t p,c;
Blockqueue<int>*st=new Blockqueue<int>();
 pthread_create(&p,nullptr,productor,st);
 pthread_create(&c,nullptr,consumer,st);
pthread_join(p,nullptr);
pthread_join(c,nullptr);//线程等待回收
delete st;
return 0;
}

主线程创建两个线程,一个是生产者,一个是消费者,生产者将数据插入在阻塞队列中,消费者将数据取出阻塞队列中的数据,阻塞队列相当于临界资源,主线程中new一个阻塞队列对象作为共享的资源,在各个线程要执行的函数里面,pthread_create最后一个参数可以是资源的起始地址,我们可以传阻塞队列类对象过去,让生产者线程和消费者线程都可以访问到

blockqueue.hpp

#pragma once
#include<iostream>
#include<queue>
#include<pthread.h>
using namespace std;
template<class T>
class Blockqueue
{
static const int defalutnum=5;
public:
Blockqueue(int maxcap=defalutnum)
:maxcap_(maxcap)
 {
  pthread_mutex_init(&mutex_,nullptr);
  pthread_cond_init(&p_cond_,nullptr);
    pthread_cond_init(&c_cond_,nullptr);
 }
T pop()
{
  T out=q_.front();
  q_.pop();
  return out;
}
void push(const T&in)
{
q_.push(in);
}
~Blockqueue()
 {
  pthread_mutex_destroy(&mutex_);
  pthread_cond_destroy(& p_cond_);
    pthread_cond_destroy(& c_cond_);
    
 }
private:
queue<T>q_;
int maxcap_;
pthread_mutex_t mutex_;
pthread_cond_t p_cond_;
pthread_cond_t c_cond_;
};

由于该阻塞队列是两者共享的,临界资源,防止并发访问,要实现安全保护,所以要使用锁,因为生产者和消费者都会对临界资源访问,两者对锁的竞争能力不一样,可能会导致饥饿问题,使用线程同步,所以要用条件变量

maxcap为队列最大可以放几个值,这是我们规定的,为了解决忙而不均,同步问题,达到最大,就让生产者线程休眠,如果阻塞队列的个数为0,就让消费者线程休眠,所以这里需要两个条件变量分别维护。


现在要解决的是临界资源的保护,保证多线程并发安全,使用锁,还要就是两个线程竞争锁的能力不同,可能导致另一个线程饥饿问题,使用线程同步

blockqueue.hpp

#pragma once
#include<iostream>
#include<queue>
#include<pthread.h>
using namespace std;
template<class T>
class Blockqueue
{
static const int defalutnum=5;
public:
Blockqueue(int maxcap=defalutnum)
:maxcap_(maxcap)
 {
  pthread_mutex_init(&mutex_,nullptr);
  pthread_cond_init(&p_cond_,nullptr);
   pthread_cond_init(&c_cond_,nullptr);
   max_water=(maxcap*2)/3;
   min_water=maxcap/3;
 }
T pop()
{ pthread_mutex_lock(&mutex_);
 while(q_.size()==0)
    {
    pthread_cond_wait(&c_cond_,&mutex_);
    }
  T out=q_.front();
  q_.pop();
  if(q_.size()<min_water)
  pthread_cond_signal(&p_cond_);
  pthread_mutex_unlock(&mutex_);
  return out;
}
void push(const T&in)
{
pthread_mutex_lock(&mutex_);
while(q_.size()==maxcap_)
{pthread_cond_wait(&p_cond_,&mutex_);}
q_.push(in);
if(q_.size()>max_water)
pthread_cond_signal(&c_cond_);
pthread_mutex_unlock(&mutex_);
}
~Blockqueue()
 {
  pthread_mutex_destroy(&mutex_);
  pthread_cond_destroy(&p_cond_);
   pthread_cond_destroy(&c_cond_);
    
 }
private:
queue<T>q_;
int maxcap_;
pthread_mutex_t mutex_;
pthread_cond_t p_cond_;
pthread_cond_t c_cond_;
int max_water;
int min_water;
};


目录
相关文章
|
1月前
|
缓存 安全 Linux
Linux 五种IO模型
Linux 五种IO模型
|
1月前
|
网络协议 Linux 数据安全/隐私保护
在Linux中,TCP/IP 的七层模型有哪些?
在Linux中,TCP/IP 的七层模型有哪些?
|
1月前
|
Linux 数据安全/隐私保护
在Linux中,什么是文件权限?什么是rwx权限模型?
在Linux中,什么是文件权限?什么是rwx权限模型?
|
1月前
|
Linux 开发者
Linux源码阅读笔记18-插入模型及删除模块操作
Linux源码阅读笔记18-插入模型及删除模块操作
|
1月前
|
存储 设计模式 NoSQL
Linux线程详解
Linux线程详解
|
1月前
|
Kubernetes Linux API
在Linux中,LVS-DR模型的特性是什么?
在Linux中,LVS-DR模型的特性是什么?
|
1月前
|
负载均衡 算法 Linux
在Linux中,LVS-NAT模型的特性是什么?
在Linux中,LVS-NAT模型的特性是什么?
|
1月前
|
负载均衡 Linux 调度
在Linux中,进程和线程有何作用?
在Linux中,进程和线程有何作用?
|
1天前
|
Linux Docker 容器
9. 同步执行Linux多条命令
9. 同步执行Linux多条命令
|
1天前
|
Linux Shell
10-10|linux命令查询 关键字在文本中出现的行数
10-10|linux命令查询 关键字在文本中出现的行数