【进程通信】利用管道创建进程池(结合代码)

简介: 【进程通信】利用管道创建进程池(结合代码)

什么叫进程池

我们知道,一个进程创建子进程通常是为了让这个子进程去为它完成某个任务。例如我们使用的指令,其实就是bash进程创建子进程让子进程去执行的。但是我们需要考虑这样一个问题:是不是遇到问题之后才创建子进程呢?

频繁的创建和销毁进程都是一项较大的开销,涉及到内存分配、上下文切换等操作。于是我们可以提前创建出一批进程,当有任务要做的时候就从这一批子进程中拿出一个空闲的去执行,一旦某个子进程把任务执行完之后也不立即销毁进程,而是等待下一个任务的来临。

我们把这些预先创建的一批子进程就叫做进程池。我们把进程池中的进程也称为工作进程

进程池的优点

  1. 通过复用已经创建的进程,减少创建和销毁进程的开销,提高了系统资源的利用与率和系统的性能。
  2. 动态调整进程池中工作进程的数量。可以让系统根据实际的负载和任务需求,动态的增加或者减少工作进程的数量,以适应不同的工作负载。提高了系统的工作的灵活性
  3. 简化编程。进程池提供了一种高级抽象,隐藏了底层进程管理的细节,使得并发进程更加简单和可靠。

创建进程池

现在我们尝试自己去实现一个进程池。

创建进程好很简单,如何管理这些进程呢?每创建一个进程我们都用一个结构体(或者类)维护,于是对工作进程的管理就变成了对结构体的管理,整个进程池从代码角度上来说就是一个结构体数组。

那我们如何将任务分配到工作进程呢?这个问题的本质其实是在问进程之间通信的方式。于是我们很容易的想到使用管道。父进程通过管道,将任务信息传递给子进程。

下面是使用进程池的一个简单模型:

代码实现:

Process.cpp文件


#include <iostream>
#include <cerrno>
#include <unistd.h>
#include <string>
#include <vector>
#include <sys/types.h>
#include <wait.h>
#include <sys/stat.h>
#include "Task.hpp"
using namespace std;

// master
class Channel // 维护管道信息
{
public:
    Channel(int wfd, int subprocessid, string name)
        : _wfd(wfd), _subprocessid(subprocessid), _name(name)
    {
    }

    int GetWfd() { return _wfd; }
    int GetProcessId() { return _subprocessid; }
    string GetName() { return _name; }

    void CloseChannel()
    { // 关闭信道的写端
        close(_wfd);
    }

    void Wait()
    { // 关闭工作进程
        int status = 0;
        pid_t rid = waitpid(_subprocessid, NULL, 0);
        if (rid > 0)
        {
            cout << "wait " << rid << " success" << endl;
        }
    }

    ~Channel()
    {
    }

private:
    int _wfd;          // 信道以写方式打开的文件描述符
    int _subprocessid; // 读端进程的pid
    string _name;      // 信道的命名
};

// 创建信道和子进程
void CreatChannelAndSub(int num, vector<Channel> &Channels, task_t task)
{ // task是一个回调函数
    for (int i = 0; i < num; i++)
    {
        // 1.创建管道
        int pipefd[2];
        int n = pipe(pipefd);
        if (n < 0)
            exit(0);

        // 创建子进程
        pid_t pid = fork();
        if (pid == 0)
        {
            // 关闭写端
            close(pipefd[1]);
            dup2(pipefd[0], 0); // 将管道的读端重定向到标准输入
            task();             // 子进程执行的任务
            close(pipefd[0]);
            exit(0);
        }
        // 父进程关闭读端,只用来输出任务
        close(pipefd[0]);
        string name = "Channel-" + to_string(i);
        // 将创建的信道存入输出型参数中
        Channels.push_back({pipefd[1], pid, name});
    }
}

int NextChannel(int channelnum)
{
    static int next = 0;
    int channel = next;
    next = (next + 1) % channelnum;
    return channel;
}

void SendTask(int taskcommand, Channel &channel)
{
    write(channel.GetWfd(), &taskcommand, sizeof(taskcommand));
}

// 具体任务
void CtrlProcessOnce(vector<Channel> &Channels)
{
    // 1.选择一个任务
    int taskcommand = SelectTask();
    // 2.选择一个空信道
    int taskchannel = NextChannel(Channels.size());

    // 3.发送任务编号给信道
    SendTask(taskcommand, Channels[taskchannel]);

    cout << "taskcommand: " << taskcommand << " channel: " << Channels[taskchannel].GetName() << " sub process: " << Channels[taskchannel].GetProcessId() << endl;
}

// 控制子进程执行任务
void CtrlProcess(vector<Channel> &Channels, int time = -1)
{ // time表示分配多少次任务给子进程去执行,默认是无限
    if (time == -1)
    { // 默认
        while (true)
        {
            CtrlProcessOnce(Channels);
            sleep(1);
        }
    }
    else if (time > 0)
    {
        while (time--)
        {
            CtrlProcessOnce(Channels);
            sleep(1);
        }
    }
}

void CleanUpChannel(vector<Channel> &channels)
{
    for (auto &it : channels)
    {
        it.CloseChannel();
    }

    for (auto &it : channels)
    {
        it.Wait();
    }
}

int main(int argc, char *argv[])
{ // 参数列表,argv[1]表示创建的工作进程的数量,即内存池大小
    if (argc != 2)
    {
        cerr << "Usage: " << argv[0] << " Processnum" << endl;
        return 1;
    }
    int num = stoi(argv[1]);
    // 加载任务
    LoadTask();
    vector<Channel> Channels; // 进程池

    // 创建信道和子进程
    CreatChannelAndSub(num, Channels, work1);

    // 通过Channel控制子进程
    CtrlProcess(Channels, 10);
    // 关闭子进程和信道的写端
    CleanUpChannel(Channels);
    return 0;
}

Task.hpp文件


#include <iostream>
#include <ctime>
#include <sys/stat.h>
#include <cstdlib>
#include <unistd.h>
#include <sys/types.h>
#define TASKNUM 3
using namespace std;

typedef void (*task_t)(); // task_t函数指针类型

void Print()
{ // 任务1
    cout << "Printf task" << endl;
}

void DownLoad()
{ // 任务2
    cout << "DownLoad task" << endl;
}

void Flush()
{ // 任务3
    cout << "Flush task" << endl;
}

task_t tasks[TASKNUM]; // 用来存放函数指针

void LoadTask()
{
    srand(time(nullptr));
    tasks[0] = Print;
    tasks[1] = DownLoad;
    tasks[2] = Flush;
}

void ExcuteTask(int n)
{
    if (n < 0 || n > 2)
        return;

    tasks[n]();
}

int SelectTask()
{
    int n = rand() % TASKNUM;
    return n;
}

void work1()
{
    while (true)
    {
        int command = 0;
        int n = read(0, &command, sizeof command); // 读取管道中的任务编号(int)
        if (n == sizeof(int))
        {
            ExcuteTask(command);
        }
        else if (n == 0)
        {
            cout << "sub process: " << getpid() << " quit " << endl;
            break;
        }
    }
}

观察运行结果:

相关文章
|
1月前
|
存储 Unix Linux
进程间通信方式-----管道通信
【10月更文挑战第29天】管道通信是一种重要的进程间通信机制,它为进程间的数据传输和同步提供了一种简单有效的方法。通过合理地使用管道通信,可以实现不同进程之间的协作,提高系统的整体性能和效率。
|
1月前
|
消息中间件 存储 供应链
进程间通信方式-----消息队列通信
【10月更文挑战第29天】消息队列通信是一种强大而灵活的进程间通信机制,它通过异步通信、解耦和缓冲等特性,为分布式系统和多进程应用提供了高效的通信方式。在实际应用中,需要根据具体的需求和场景,合理地选择和使用消息队列,以充分发挥其优势,同时注意其可能带来的复杂性和性能开销等问题。
|
2月前
|
存储 Python
Python中的多进程通信实践指南
Python中的多进程通信实践指南
29 0
|
3月前
|
Java Android开发 数据安全/隐私保护
Android中多进程通信有几种方式?需要注意哪些问题?
本文介绍了Android中的多进程通信(IPC),探讨了IPC的重要性及其实现方式,如Intent、Binder、AIDL等,并通过一个使用Binder机制的示例详细说明了其实现过程。
389 4
|
3月前
|
消息中间件 Unix Linux
C语言 多进程编程(二)管道
本文详细介绍了Linux下的进程间通信(IPC),重点讨论了管道通信机制。首先,文章概述了进程间通信的基本概念及重要性,并列举了几种常见的IPC方式。接着深入探讨了管道通信,包括无名管道(匿名管道)和有名管道(命名管道)。无名管道主要用于父子进程间的单向通信,有名管道则可用于任意进程间的通信。文中提供了丰富的示例代码,展示了如何使用`pipe()`和`mkfifo()`函数创建管道,并通过实例演示了如何利用管道进行进程间的消息传递。此外,还分析了管道的特点、优缺点以及如何通过`errno`判断管道是否存在,帮助读者更好地理解和应用管道通信技术。
|
3月前
|
SQL 网络协议 数据库连接
已解决:连接SqlServer出现 provider: Shared Memory Provider, error: 0 - 管道的另一端上无任何进程【C#连接SqlServer踩坑记录】
本文介绍了解决连接SqlServer时出现“provider: Shared Memory Provider, error: 0 - 管道的另一端上无任何进程”错误的步骤,包括更改服务器验证模式、修改sa用户设置、启用TCP/IP协议,以及检查数据库连接语句中的实例名是否正确。此外,还解释了实例名mssqlserver和sqlserver之间的区别,包括它们在默认设置、功能和用途上的差异。
|
4月前
|
开发者 API Windows
从怀旧到革新:看WinForms如何在保持向后兼容性的前提下,借助.NET新平台的力量实现自我进化与应用现代化,让经典桌面应用焕发第二春——我们的WinForms应用转型之路深度剖析
【8月更文挑战第31天】在Windows桌面应用开发中,Windows Forms(WinForms)依然是许多开发者的首选。尽管.NET Framework已演进至.NET 5 及更高版本,WinForms 仍作为核心组件保留,支持现有代码库的同时引入新特性。开发者可将项目迁移至.NET Core,享受性能提升和跨平台能力。迁移时需注意API变更,确保应用平稳过渡。通过自定义样式或第三方控件库,还可增强视觉效果。结合.NET新功能,WinForms 应用不仅能延续既有投资,还能焕发新生。 示例代码展示了如何在.NET Core中创建包含按钮和标签的基本窗口,实现简单的用户交互。
78 0
|
5月前
|
运维 关系型数据库 MySQL
掌握taskset:优化你的Linux进程,提升系统性能
在多核处理器成为现代计算标准的今天,运维人员和性能调优人员面临着如何有效利用这些处理能力的挑战。优化进程运行的位置不仅可以提高性能,还能更好地管理和分配系统资源。 其中,taskset命令是一个强大的工具,它允许管理员将进程绑定到特定的CPU核心,减少上下文切换的开销,从而提升整体效率。
掌握taskset:优化你的Linux进程,提升系统性能
|
5月前
|
弹性计算 Linux 区块链
Linux系统CPU异常占用(minerd 、tplink等挖矿进程)
Linux系统CPU异常占用(minerd 、tplink等挖矿进程)
191 4
Linux系统CPU异常占用(minerd 、tplink等挖矿进程)
|
4月前
|
算法 Linux 调度
探索进程调度:Linux内核中的完全公平调度器
【8月更文挑战第2天】在操作系统的心脏——内核中,进程调度算法扮演着至关重要的角色。本文将深入探讨Linux内核中的完全公平调度器(Completely Fair Scheduler, CFS),一个旨在提供公平时间分配给所有进程的调度器。我们将通过代码示例,理解CFS如何管理运行队列、选择下一个运行进程以及如何对实时负载进行响应。文章将揭示CFS的设计哲学,并展示其如何在现代多任务计算环境中实现高效的资源分配。