大致草稿
思维导图
学习目标
一、进程池的代码编写顺序
在进程池中,我们要创建多个子进程,并且对多个子进程和父进程建立管道的关系,确保父进程和子进程之间可以进行相互通信。
父进程就是master,而子进程就是work/slaver。大致的工作安排就是:管道中没有数据,worker进程就在阻塞等待任务的到来,而master向哪个管道写入数据,就是唤醒哪个子进程来处理任务,但是父进程要进行后端任务划分的均衡管理。
对于每一个父进程与子进程之间的联系,我们可以建立一个类来进行描述他们之间的关系,代码如下:
class Channel { public: Channel(int wfd, int id, std::string name) // 初始化列表 :_wfd(wfd) ,_subprocessid(id) ,_name(name) {} ~Channel() {} private: int _wfd; // 写入的端口 pid_t _subprocessid; // 指向的子进程的编号 std::string _name; // 进行连接的名字 };
1.1 创建信道和子进程
我们可以通过使用主函数的参数来进行获取想要创建的管道数,就是在在主函数中第一个参数表示的是命令中有几个字符串,第二个参数就是存放这些字符串的数组。大致格式就是:
int main(int argc, char* argv[]) { if(argc != 2) { std::cerr << "Usage:" << argv[0] << "processnum" << std::endl; return 1; } ...... }
由于,在系统中,这种创建的子进程不为1,所有父进程要进行管理起来(先描述,在组织)。在前言,我们可以利用数组来进行管理这个有关管道的类。
std::vector<Channel> Channels;
因此,现在,让我们开始进行创建信道和子进程,我们先来创建管道,定义一个文件描述符数组,我们将这个文件描述符数组传入管道函数中进行获取读写文件的端口。
int pipefd[2]; // 读端和写端的文件描述符 int n = pipe(pipefd); // 创建管道 if (n < 0) // 如果返回值小于0,管道创建失败 { return; }
在创建完管道之后,我们开始创建子进程,但是要注意:在子进程中,不能让子进程不退出,从而导致子进程有进入循环,开始进行创建进程,这样,进程的创建就乱了套。所以,我们要在子进程中进行退出代码的编写。
在介绍完创建管道和子进程之后,我们要来进行关闭读端或者写端,保证父进程和子进程可以通过管道实现半双工通信。代码如下:
pid_t id = fork(); // 创建子进程 if (id == 0) { close(pipefd[1]); // 关闭相应的端口 //dup2(pipefd[0], 0); work(pipefd[0]); close(pipefd[0]); exit(0); } // 3.构建一个名字 std::string channelname = "channel_" + std::to_string(i); // 父进程 close(pipefd[0]); // 关闭父进程 // 1.子进程的pid 2.父进程的写端 Channels->push_back(Channel(pipefd[1], id, channelname));
1.2 控制子进程,通过Channel
在这一部分中,我们将要执行的函数建立一张函数指针数组,我们可以通过函数指针数组的下标来指向一个函数,通过数组的下标来实现函数的功能。
#define TaskNum 3 typedef void (* test_t)(); // 函数指针类型 test_t Tasks[TaskNum]; // 函数指针数组 void Loadtask() // 加载函数,将函数指针存放在数组中 { srand(time(nullptr) ^ getpid() ^ 177777); // 获取当前时间作为随机数种子 Tasks[0] = print; Tasks[1] = download; Tasks[2] = flush; } void ExcuteTask(int num) // 启动函数 { if(num < 0 || num > 2) { return; } Tasks[num](); } int slectTask() // 随机发送一个码 { return rand() % TaskNum; }
因此,我们在利用管道进行控制子进程中,我们可以使用传递数组下标的方式来进行使用相应的子进程完成派遣得到的一些任务。
在控制子进程这一部分,我们可以分为:选择一个任务,选择一个进程,发送任务。
int selectChannel(int channelnum) // 利用循环的方式进行任务的分配 { static int next = 0; int channel = next; next++; next %= channelnum; return channel; } void sendTask(Channel &channel, int taskcommend) { // 向写端发送任务的编号 // sleep(1); write(channel.getwid(), &taskcommend, sizeof taskcommend); } // 1 选择一个任务, int taskcommend = slectTask(); // 2 选择一个进程, int channelindex = selectChannel(Channels.size()); // 3 发送任务 sendTask(Channels[channelindex], taskcommend);
这一部分还可以通过控制派遣任务的数量来给子进程分配任务,我们可以将代码进行改编,创建一个新的函数,我们可以在添加一个新的参数来指明派遣任务的数量。
void ctrlProcessOnce(std::vector<Channel> Channels) { sleep(1); // 1 选择一个任务, int taskcommend = slectTask(); // 2 选择一个进程, int channelindex = selectChannel(Channels.size()); // 3 发送任务 sendTask(Channels[channelindex], taskcommend); std::cout << "taskcommend:" << taskcommend << ' ' << "channel:" << Channels[channelindex].getname() << ' ' << "to send subprocess:" << Channels[channelindex].getid() << std::endl; } void ctrlProcess(std::vector<Channel> Channels, int times = -1) { if (times == -1) { while (true) { ctrlProcessOnce(Channels); } } else { while (times--) { ctrlProcessOnce(Channels); } } }
1.3 回收所有的管道和子进程
这一部分有一个错误,
void cleanChannel(std::vector<Channel> Channels) { for (auto k : Channels) { k.CloseChannel(); } for (auto k : Channels) { k.waitChannel(); } }