进程池设计

本文涉及的产品
传统型负载均衡 CLB,每月750个小时 15LCU
网络型负载均衡 NLB,每月750个小时 15LCU
应用型负载均衡 ALB,每月750个小时 15LCU
简介: 本文介绍了当要求父进程作为写端,需要多个子进程作为读端去读取父进程往匿名管道写入的数据,并拿着数据去完成一些任务,这样一项微进程池设计。

进程池设计

进程池设计代码目的头文件对子进程操作建立子进程对象并把子进程对象放进数组里建立子进程需要执行的任务表创建子进程和父进程通信的管道,并且让子进程阻塞读取对父进程操作回收子进程整体代码子进程具有读端未关闭的bug

代码目的

创建一个父进程和多个子进程,父子进程之间通过匿名管道完成进程间通信。让父进程作为写端,子进程作为读端,父进程随机给任意一个子进程写入数据让子进程完成相应任务。

头文件

#include<iostream>

#include<vector>

#include<unistd.h>

#include<string.h>

#include<cassert>

#include<ctime>

#include<sys/types.h>

#include<sys/wait.h>

对子进程操作

建立子进程对象并把子进程对象放进数组里

//创建子进程对象

class  SubEp//endpoint---子进程对象

{

public:

SubEp(pid_tsubid,intwritefd)//第一个参数是子进程的pid,第二个参数是该子进程读端对于父进程的写端fd

:_subid(subid)

,_writefd(writefd)

{

  charnamebuffer[1024];

  //第一个参数是表示第几号子进程,第二个参数是子进程的pid,第三个参数是该子进程读端对于的父进程的写端

  snprintf(namebuffer,sizeofnamebuffer,"process: %d [pid(%d) - fd(%d)]",num++,_subid,_writefd);

  _name=namebuffer;

}

public:

staticintnum;

string_name;

pid_t_subid;

int_writefd;//该子进程与父进程匿名管道对于的父进程的写端fd

};

intSubEp::num=0;

  • 子进程对象需要传递两个参数来初始化成员变量_subid和 _writefd。一是子进程的pid二是该子进程读端对应父进程写端的文件描述符fd
  • 成员变量num表示是第几个创建出来的子进程,第一个创建出来的子进程为0,使用后++后续子进程的num依次是1,2等等。因此num不能由于出了SubEp对象作用域后被销毁,所以定义为static,变量num生命周期取决于SubEp类的生命周期
  • 成员变量 _name用namebuffer初始化,用来标识该子进程的其他成员变量

建立子进程需要执行的任务表

//创建父进程给子进程派发的任务列表

typedefvoid(*func_t)();//函数指针类型,函数返回值为void

voiddownloadTask()//模拟下载任务

{

   cout<<getpid()<<": 下载任务\n"<<endl;

   sleep(1);

}

voidfflushTask()//模拟刷新任务

{

   cout<<getpid()<<": 刷新任务\n"<<endl;

   sleep(1);

}

voidsubscribeTask()//模拟订阅任务

{

   cout<<getpid()<<": 订阅任务\n"<<endl;

   sleep(1);

}

//把上面的三种任务load到列表中即让存放函数指针的vector的各个指针能够指向上面的函数,为了后面方便调用

voidloadTaskFunc(vector<func_t>*out)

{

   assert(out);//vector创建成功

   out->push_back(downloadTask);

   out->push_back(fflushTask);

   out->push_back(subscribeTask);

}

  • 子进程需要执行的任务都是函数对象,建立一个对象是函数指针的数组out,通过loadTaskFunc函数把任务函数尾插到数组out里,然后通过输出型参数返回。

创建子进程和父进程通信的管道,并且让子进程阻塞读取

voidCreateSubProcesses( vector<SubEp>*subs,vector<func_t>&funcMap)

{

   vector<int>deleteFd;

//创建子进程并且创建好父进程与各个子进程通信的管道

intfds[2];

for(size_ti=0;i<PROCESS_NUM;i++)//创建子进程

{

       intn=pipe(fds);//建立父子间进程的匿名管道--建立成功返回0,建立失败返回-1

       assert(n==0);//判断管道是否建立成功

       (void)n;

       pid_tid=fork();//创建子进程

       if(id==0)

       {

            for(size_ti=0;i<deleteFd.size();i++) close(deleteFd[i]);//因为有写实拷贝,所以这里关闭不会影响父进程

//因为子进程会继承父进程文件描述符表,所以上一个子进程的读端对应的父进程的写端这个进程也会继承下来,即当前子进程和上一个子进程之间也有匿名管道

//可能会导致上一个子进程的父进程读端关闭,而此时还有当前这个子进程的读端连接着上一个子进程,使得上一个子进程不能正常关闭读端而造成bug

//所以要手动关闭当前子进程对应上一个子进程的读端的写端。

           close(fds[1]);//关闭子进程的写端-保留读端负责读

          //对子进程操作

    while(true)

    {

       //1. 获取任务码,让子进程阻塞等待父进程写写入的任务码,

       inttaskcode=receiveTask(fds[0]);

       //2.完成任务--调用对应任务码的函数

       if(taskcode>=0&&taskcode<funcMap.size())

        funcMap[taskcode]();//调用函数完成任务

       elseif(taskcode==-1)  break;

    }

           exit(0);//子进程退出

       }

   //这里往后是父进程语句

 //写端关闭,读端读到0然后读端自己关闭

       close(fds[0]);//关闭当前子进程与父进程相联系的匿名管道的父进程的读端

       SubEpsub(id,fds[1]);//第一个参数传的是子进程的pid,第二个参数传的是此时子进程读端对于的父进程的写端

       subs->push_back(sub);

       deleteFd.push_back(fds[1]);//记录当前的写端供下个子进程用

}

  • 在函数CreateSubProcesses内,先建立父进程相连的匿名管道,然后创建子进程,子进程也拷贝了一份父进程的文件描述符表,能通过文件描述符连接到匿名管道,因此父子进程通信的管道建立完成。
  • 在父进程语句中,需要注意的是,通过传参数子进程的pid和此时子进程读端对于的父进程的写端fd给SubEP类构建子进程对象,并且将对象放进数组subs里。
  • 在子进程的语句中,通过receiveTask函数获取任务码

intreceiveTask(intreadfd)

{

   intretcode=0;//返回任务码

   ssize_ts=read(readfd,&retcode,sizeof(retcode));//从读端读出来的任务码放到retcode里

   cout<<"process has read the TaskCode: "<<retcode<<endl;

   if(s==sizeof(int)) returnretcode;

   elseif(s<=0)return-1;

   elsereturn0;

}

  • 让子进程在receiveTask函数中阻塞读取管道里的数据。前提已知父进程往匿名管道写入整数数据,数据范围为[0,任务个数-1]即任务数组对应的下标范围,子进程把读取到的数据存到变量retcode里,然后判断retcode是否是整数数据大小,如果是就返回数据给上层CreateSubProcesses函数,如果不是就返回-1。
  • 当变量taskcode接收到receiveTask函数返回的任务码时,如果任务码符合范围[0,任务个数-1]即父子进程按照我们的意愿通信正常,然后子进程拿着任务码调用funcMap数组执行任务;但如果接收的返回值是-1,则是父子进程间通信不正常,直接退出判断语句。

这里提到的子进程操作主要是子进程阻塞读取父进程写入的数据,还有子进程拿到数据执行任务。

对父进程操作

voidloadBalanceContrl(constvector<SubEp>&subs,constvector<func_t>&funcMap,intcomcode)

{

   intprocessnum=subs.size();//子进程的个数

   inttasknum=funcMap.size();//任务的个数

   boolnumoftime=(comcode==0?true:false);//若命令码是0则一直运行,若命令码为正数x,则允许x次后退出

   while(true)

   {

  //rand()为伪随机数

  //1.找到哪一个子进程

  intsubIndex=rand()%processnum;

   //2.找到哪一个执行哪一个任务

  inttaskIndex=rand()%tasknum;

   //3.任务发送给选择的进程

    sendTask(subs[subIndex],taskIndex);//第一个参数传第几个子进程,第二个参数传第几个任务

    sleep(1);

 if(!numoftime)

 {

   comcode--;

   if(comcode==0)

   break;

 }

   }

   //走到这里则是父进程给子进程通信完了,需要逐个关闭子进程读端对于的写端--倒退关解决bug

   for(size_ti=0;i<subs.size();i++)

   {

     close(subs[i]._writefd);

     cout<<"close process: [ "<<i<<" ]'s writeeop"<<endl;

   }

}

  • loadBalanceContrl函数需要main函数传入子进程数组subs,任务数组funcMap和命令码comcode。comcode用来指定父进程发送多少次数据给子进程即子进程需要执行多少次任务
  • numoftime用来鉴别父进程需要写入多少次数据,当comcode为0时则numoftime为真,则父进程死循环往匿名管道里写数据;若命令码为正数x为非0,则numoftime为假,则父进程往匿名管道里写x次数据。
  • 通过sendTask函数让父进程选择指定的子进程,写入指定的任务码到匿名管道中

voidsendTask(constSubEp&process, inttasknum)

{

   cout<<"send Task num: "<<tasknum<<" to the process: "<<process._name<<endl;//打印日志:任务几发送给几号子进程

   ssize_tn=write(process._writefd,&tasknum,sizeof(tasknum));//该子进程读端对于的写端往管道里写入任务几-4个字节的数据

   assert(n==sizeof(int));//判断写入的数据是否是4个字节

   (void)n;

}

  • 父子间进程通信完之后,按照子进程创建时间从先往后依次关闭子进程读端对应的父进程的写端。

回收子进程

voidwaitProcess(constvector<SubEp>&processes)

{

   for(size_ti=0;i<processes.size();i++)

   {

       waitpid(processes[i]._subid,nullptr,0);

       cout<<"wait success for process: "<<processes[i]._subid<<endl;

   }

}

  • 按照子进程创建时间从先往后依次回收子进程。

整体代码

#include<iostream>

#include<vector>

#include<unistd.h>

#include<string.h>

#include<cassert>

#include<ctime>

#include<sys/types.h>

#include<sys/wait.h>

using namespace std;

#define PROCESS_NUM 3

#define MakeSeed() srand((unsigned long)time(nullptr)^getpid()^rand()%1234)//建立伪随机数种子

//创建父进程给子进程派发的任务列表

typedef void(*func_t)();//函数指针类型,函数返回值为void


void downloadTask()//模拟下载任务

{

   cout<<getpid()<<": 下载任务\n"<<endl;

   sleep(1);

}


void fflushTask()//模拟刷新任务

{

   cout<<getpid()<<": 刷新任务\n"<<endl;

   sleep(1);

}


void subscribeTask()//模拟订阅任务

{

   cout<<getpid()<<": 订阅任务\n"<<endl;

   sleep(1);

}

//把上面的三种任务load到列表中即让存放函数指针的vector的各个指针能够指向上面的函数,为了后面方便调用

void loadTaskFunc(vector<func_t>*out)

{

   assert(out);//vector创建成功

   out->push_back(downloadTask);

   out->push_back(fflushTask);

   out->push_back(subscribeTask);


}


//创建子进程对象

class  SubEp//endpoint---子进程对象

{


public:

SubEp(pid_t subid,int writefd)//第一个参数是子进程的pid,第二个参数是该子进程读端对于父进程的写端fd

:_subid(subid)

,_writefd(writefd)

{

  char namebuffer[1024];

  //第一个参数是表示第几号子进程,第二个参数是子进程的pid,第三个参数是该子进程读端对于的父进程的写端

  snprintf(namebuffer,sizeof namebuffer,"process: %d [pid(%d) - fd(%d)]",num++,_subid,_writefd);

  _name=namebuffer;

}

public:

static int num;

string _name;

pid_t _subid;

int _writefd;//该子进程与父进程匿名管道对于的父进程的写端fd

};

int SubEp::num=0;


int receiveTask(int readfd)

{

   int retcode=0;//返回任务码

   ssize_t s= read(readfd,&retcode,sizeof(retcode));//从读端读出来的任务码放到retcode里

   cout<<"process has read the TaskCode: "<<retcode<<endl;

   if(s==sizeof(int)) return retcode;

   else if(s<=0)return -1;

   else return 0;

}


void CreateSubProcesses( vector<SubEp>*subs,vector<func_t>& funcMap)

{

   vector<int> deleteFd;

//创建子进程并且创建好父进程与各个子进程通信的管道

int fds[2];

for(size_t i=0;i<PROCESS_NUM;i++)//创建子进程

{

       int n=pipe(fds);//建立父子间进程的匿名管道--建立成功返回0,建立失败返回-1

       assert(n==0);//判断管道是否建立成功

       (void)n;


       pid_t id=fork();//创建子进程

       if(id==0)//子进程进入判断语句

       {

           for(size_t i=0;i<deleteFd.size();i++) close(deleteFd[i]);//因为有写实拷贝,所以这里关闭不会影响父进程

//因为子进程会继承父进程文件描述符表,所以上一个子进程的读端对应的父进程的写端这个进程也会继承下来,即当前子进程和上一个子进程之间也有匿名管道

//可能会导致上一个子进程的父进程读端关闭,而此时还有当前这个子进程的读端连接着上一个子进程,使得上一个子进程不能正常关闭读端而造成bug

//所以要手动关闭当前子进程对应上一个子进程的读端的写端。

           close(fds[1]);//关闭子进程的写端-保留读端负责读

          //对子进程操作

    while(true)

    {

       //1. 获取任务码,让子进程阻塞等待父进程写写入的任务码,

       int taskcode=receiveTask(fds[0]);

       //2.完成任务--调用对应任务码的函数

       if(taskcode>=0 && taskcode<funcMap.size())

        funcMap[taskcode]();//调用函数完成任务

       else if(taskcode==-1)  break;

    }

           exit(0);

       }

 //写端关闭,读端读到0然后读端自己关闭

       close(fds[0]);//关闭当前子进程与父进程相联系的匿名管道的父进程的读端

       SubEp sub(id,fds[1]);//第一个参数传的是子进程的pid,第二个参数传的是此时子进程读端对于的父进程的写端

       subs->push_back(sub);

      deleteFd.push_back(fds[1]);//记录当前的写端供下个子进程用


}

}

void sendTask(const SubEp& process, int tasknum)

{

   cout<<"send Task num: "<<tasknum<<" to the process: "<<process._name<<endl;//打印日志:任务几发送给几号子进程

   ssize_t n=write(process._writefd,&tasknum,sizeof(tasknum));//该子进程读端对于的写端往管道里写入任务几-4个字节的数据

   assert(n==sizeof(int));//判断写入的数据是否是4个字节

   (void)n;

}


void loadBalanceContrl(const vector<SubEp>& subs,const vector<func_t> &funcMap,int comcode)

{

   int processnum=subs.size();//子进程的个数

   int tasknum=funcMap.size();//任务的个数

   bool numoftime=(comcode==0?true:false);//若命令码是0则一直运行,若命令码为正数x,则允许x次后退出

   while(true)

   {

       //rand()为伪随机数

  //1.找到哪一个子进程

  int subIndex=rand()%processnum;

   //2.找到哪一个执行哪一个任务

  int taskIndex=rand()%tasknum;

   //3.任务发送给选择的进程

    sendTask(subs[subIndex],taskIndex);//第一个参数传第几个子进程,第二个参数传第几个任务

    sleep(1);

 if(!numoftime)

 {

   comcode--;

   if(comcode==0)

   break;

 }

   }

   //走到这里则是父进程给子进程通信完了,需要逐个关闭子进程读端对于的父进程写端

   for(size_t i=0;i<subs.size();i++)

   {

     close(subs[i]._writefd);

     cout<<"close process: [ "<<i<<" ]'s writeeop"<<endl;

   //    waitpid(subs[i]._subid,nullptr,0);

   //     cout<<"wait success for process: "<<subs[i]._subid<<endl;

   }

}


void waitProcess(const vector<SubEp>& processes)

{

   for(size_t i=0;i<processes.size();i++)

   {

       waitpid(processes[i]._subid,nullptr,0);

       cout<<"wait success for process: "<<processes[i]._subid<<endl;

   }

}


int main()

{

   MakeSeed();//建立伪随机数种子


vector<SubEp> subs;//创建子进程对象并将子进程对象放进数组里

vector<func_t> funcMap;//建立一个任务表:父进程写入管道,子进程在管道读取,读取到的数据引导子进程去完成一些任务

loadTaskFunc(&funcMap);


//1.创建子进程并且创建好父进程与各个子进程通信的管道,并且让子进程阻塞等待父进程写入

CreateSubProcesses(&subs,funcMap);


//2.对父进程操作


//父进程给子进程发送命令码,为0则一直运行,为正数x则运行x次后退出

int Runcount=0;

cout<<"请输入需要执行几次任务,输入0则为一直循环执行任务,请输入: ";

cin>>Runcount;

cout<<endl;

//这个函数负责让父进程给子进程发送命令码,让子进程去执行任务,要求子进程做到负载均衡    

loadBalanceContrl(subs,funcMap,Runcount);//第一个参数是子进程列表,第二个参数任务列表,第三个参数是父进程给子进程发送的命令码


//3.回收子进程

waitProcess(subs);

   return 0;

}

子进程具有读端未关闭的bug

下面通过画图来模拟父进程fork出多个子进程的流程

  • 通过上面的图例可以看到,2号子进程有一个写端与1号子进程的读端通信着。

说明:假设父进程创建管道时文件描述符fd[3]是读端,fd[4]是写端。那么在创建1号子进程时子进程拷贝父进程的文件描述符表,然后再关闭父进程的读端fd[3],关闭子进程的写端fd[4],这样父进程(写端fd[4])和1号子进程(读端fd[3])就构成了进程间通信的管道。

  1. 当父进程在创建2号子进程时,2号子进程也拷贝了一份父进程的文件描述表,此时表上fd[4]写端连着1号子进程fd[3]的匿名管道,那么2号子进程也会继承下来。因为具有写实拷贝,子进程2在fd[3]打开读端并不会影响父进程。此时再次关闭父进程读端fd[3],子进程关闭写端fd[5],因为2号子进程的写端fd[4]存在不会影响与父进程进行通信,所以不会关闭fd[4]。
  2. 那么在后续关闭父进程的写端时,想要的效果是两个子进程的读端都读到0,然后子进程自动关闭读端。然而现实是父进程关闭写端,2号子进程的读端只对应父进程1个写端,那么2号子进程的读端会关闭。而1号子进程的读端对应父进程的写端和2号子进程的写端,当父进程的写端关闭时,匿名管道还与2号子进程写端相连,导致1号子进程的读端不会读到0所以1号子进程的读端不能正常关闭!,也就导致子进程无法正常退出,父进程无法正常回收子进程

得出结论:当父进程创建多个子进程,并且父进程作为写端而多个子进程作为读端从而进行进程间通信时,需要单独把子进程的所有写端都关闭。

这里提供两种方法关闭子进程的所有写端

  1. 方案一:在父进程创建子进程前构建一个vector对象,父进程创建子进程后,把父进程的写端放进vector里。等到父进程创建完下一个子进程时,vector里的写端即是当前子进程读端对应的上一个子进程的写端(有可能不只是一个写端),再把vector里的所有写端关闭即可。

  • 这次实现就是用的这个方案,其实不用也可以,因为当父进程往匿名管道里写完数据时,先把父进程对应各个子进程的写端全部关闭,然后再将全部子进程进行回收,这种顺序不会出现bug;但如果是按照创建子进程时间从旧往新关掉一个父进程的写端,然后立刻等待回收一个相应的子进程的话,会导致出现该子进程读端还有其他子进程的写端通信着,该子进程读端没有读到0导致子进程没有正常退出,那么父进程也就回收不到子进程。
  1. 方案二:在父进程关闭写端,需要所有子进程关闭读端时,依次按照创建的时间从新往旧(从后往前)关闭父进程的写端。由于最后创建的子进程的读端只对应父进程的写端,那么父进程关闭写端时,最后一个子进程的读端读到0正常关闭读端,那么该子进程的文件描述符表也会被关闭,进而该子进程正常退出,从而该子进程连接着前一个子进程的写端也会被关闭;那么轮到下一个子进程时,该子进程的读端也只会对应父进程的写端,父进程关闭写端,子进程读端读到0正常关闭读端,子进程正常退出。


相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
目录
相关文章
|
8天前
|
Java
直接拿来用:进程&进程池&线程&线程池
直接拿来用:进程&进程池&线程&线程池
|
3天前
|
数据采集 消息中间件 并行计算
进程、线程与协程:并发执行的三种重要概念与应用
进程、线程与协程:并发执行的三种重要概念与应用
14 0
|
5月前
|
Shell
【进程通信】利用管道创建进程池(结合代码)
【进程通信】利用管道创建进程池(结合代码)
|
5月前
|
存储 设计模式 算法
【C++ 并发 线程池设计】深入理解C++线程池:设计、实现与应用
【C++ 并发 线程池设计】深入理解C++线程池:设计、实现与应用
188 0
|
缓存 前端开发 Java
线程池中线程重用导致的问题
之前在公司做的一个项目中,有一个 core 的公共依赖包,那个依赖里面简单封装了用户的信息。
|
调度 Python
并发编程实践:进程、线程和threading 模块的全面解析
并发编程实践:进程、线程和threading 模块的全面解析
|
监控 Java 数据库连接
【多线程】基础 | 你到底懂不懂线程池?
线程池(Thread Pool)是一种基于池化思想管理线程的工具,预先创建一些对象放入池中,使用的时候直接取出使用,用完归还接下去复用,通过一定的策略调整 池中参数,实现池的动态伸缩。
|
存储 Linux 调度
Linux多线程:线程概念、线程间的独有与共享、多线程VS多进程,线程控制:线程创建、线程终止、线程等待、线程分离
Linux多线程:线程概念、线程间的独有与共享、多线程VS多进程,线程控制:线程创建、线程终止、线程等待、线程分离
222 0
|
开发者
进程池的使用 | 学习笔记
快速学习进程池的使用,介绍了进程池的使用系统机制, 以及在实际应用过程中如何使用。
多线程之线程池及总整体总结
多线程之线程池及总整体总结
多线程之线程池及总整体总结