【Linux】匿名管道实现简单进程池

简介: 【Linux】匿名管道实现简单进程池

一、匿名管道通信的四种情况和五种特性

1.1、四种情况

  1. 管道内部没有数据且子进程不关闭自己的写端文件fd,读端(父进程)就要阻塞等待,直到管道里有数据。
  2. 管道内部被写满了且父进程(读端)不关闭自己的读端fd,写端(子进程)写满以后就要阻塞等待。
  3. 对于写端而言,如果写端不写了且关闭了写端fd,读端就会将管道中的数据读完,最后会读到返回值为0,表示读结束,类似于读到了文件的结尾。
  4. 读端关闭了,操作系统就会发送信号直接杀死进行写入的进程,因为没有读端写入也就没有了意义。

1.2、五种特性

  1. 管道自带同步机制,参照上面四种情况中的1,2,3。
  2. 具有血缘关系的进程进行通信,常见于父子。
  3. 管道是面向字节流的。
  4. 父子进程退出,管道自动释放,因为内存中的文件的生命周期是随进程的
  5. 管道只能进行单向通信。

二、匿名管道实现简单的进程池

       这个进程池可以分配我们想要的进程的个数,用命令行的方式来控制进程的个数,任务由我们自己定好,每次随机选择一个任务指派给一个进程去完成,进程的选派采用轮询的方式按顺序指派,这其中还有一些实现的细节,会在代码中以注释的方式给出。

头文件

#include <iostream>
#include <string>
#include <unistd.h>
#include <sys/types.h>
#include <vector>
#include <sys/wait.h>
#include <vector>
using namespace std;
 
typedef void (*work_t)(int pipefd);//pipefd为写端文件描述符
typedef void (*task_t)(int pipefd, pid_t fd);//fd为子进程pid
 
//三个任务
void PrintLog(int pipefd, pid_t fd)
{
    cout <<"pipefd:" << pipefd <<" fd:" << fd <<  " task:printf log task" << endl;
}
 
void ReloadConf(int pipefd, pid_t fd)
{
     cout <<"pipefd:" << pipefd <<" fd:" << fd <<  " task:Reload Conf" << endl;
}
 
void ConnectMysql(int pipefd, pid_t fd)
{
     cout <<"pipefd:" << pipefd <<" fd:" << fd <<  " task:Connect Mysql" << endl;
}
 
task_t task[3] = {PrintLog,ReloadConf,ConnectMysql };
 
//随机在三个任务中选择一个
uint32_t nextTask()
{
    return rand() % 3;
}
 
 
void worker(int pipefd)
{
    while(true)
    {
        uint32_t code = 0;
        //子进程会阻塞在此读,读到零就证明写端已经关闭了
        int n = read(0, &code, sizeof(code));
 
        if(n == sizeof(code))
        {
            if(code >= 3) continue;
            task[code](pipefd, getpid());
            sleep(1);
        }
        else if(n == 0)
        {
            cout << "child process quit now!" << endl;
            break;
        }
    }
}

源文件

#include "processpool.hpp"
 
enum
{
    UsageError = 1,
    ArgError,
    PipeError
};
 
void Usage(char *proc)
{
    cout << "Usage:" << proc << " num" << endl;
}
 
class Channel
{
private:
    //管道保存写端文件描述符,管道名和子进程pid。
    int _wfd;
    string _name;
    pid_t _processid;
 
public:
    Channel(int wfd, string name, pid_t id)
        : _wfd(wfd), _name(name), _processid(id)
    {
    }
 
    void print()
    {
        cout << "_wfd:" << _wfd << " _name:" << _name << " _processid:" << _processid << endl;
    }
 
    const string name() const
    {
        return _name;
    }
 
    const int wfd() const
    {
        return _wfd;
    }
 
    const pid_t processid() const
    {
        return _processid;
    }
 
    void Close()
    {
        close(_wfd);
    }
 
    ~Channel()
    {
    }
};
 
//进程池
class processPool
{
private:
    int _sum_child_process;//进程池中进程的个数
    vector<Channel> _channelVect;
 
public:
    processPool(int sum_child_process)
        : _sum_child_process(sum_child_process)
    {
    }
 
    // 创建信道和子进程
    int CreateProcess(work_t work)//任务以函数指针的方式传入
    {
        vector<int> fds;
        for (int i = 0; i < _sum_child_process; i++)
        {
            //创建匿名管道
            int pipefd[2];
            int n = pipe(pipefd);
            if (n == -1)
                return PipeError;
            pid_t id = fork();
            if (id == 0)
            {
                //关闭多余的写端描述符,因为父进程在创建子进程的同时会将父进程的文件描述符表也给子进程拷贝一份,
                //这样子进程的文件描述符表就会保存了之前的子进程的写端文件描述符,必须要把之前的子进程的写端文件描述符关闭,
                //否则子进程在退出的时候会出异常
                if (!fds.empty())
                {
                    for (auto e : fds)
                        close(e);
                }
                close(pipefd[1]);
                dup2(pipefd[0], 0);//输入重定向
                work(pipefd[0]);
                exit(0);
            }
            string name = "Channel" + to_string(i);
            close(pipefd[0]);
            _channelVect.push_back(Channel(pipefd[1], name, id));
            fds.push_back(pipefd[1]);
        }
        return 0;
    }
    //发送某个任务给某个管道
    void SendTaskCode(const Channel *channel, uint32_t code)
    {
        cout << "send code: " << code << " to " << channel->name() << " sub prorcess id: " << channel->processid() << endl;
        write(channel->wfd(), &code, sizeof(code));
    }
 
    // 控制子进程
    void CtrlProcessPool(processPool *processPoolPtr, int cnt)//cnt表示任务的个数
    {
        while (cnt)
        {
            // 1、选择一个进程和信道
            const Channel *cur = processPoolPtr->nextChannel();
            cout << cur->name() << endl;
 
            // 2、选择一个任务
            uint32_t n = nextTask();
 
            // 派送任务
            processPoolPtr->SendTaskCode(cur, n);
            sleep(1);
            cnt--;
        }
    }
 
    //按顺序选择一个管道
    const Channel *nextChannel()
    {
        static int n = 0;
        n %= _sum_child_process;
        return &_channelVect[n++];
    }
 
    void Print()
    {
        for (auto &channel : _channelVect)
        {
            channel.print();
        }
    }
 
    // 退出所有子进程
    void killAll()
    {
        for (auto &Channel : _channelVect)
        {
            Channel.Close();
            pid_t rid = Channel.processid();
            pid_t quitid = waitpid(rid, nullptr, 0);
            if (rid == quitid)
                cout << rid << " quit succeed!" << endl;
        }
    }   
    ~processPool()
    {
    }
};
 
int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        Usage(argv[0]);
        return ArgError;
    }
 
    // 创建信道和子进程
    int sum_child_process = stoi(argv[1]);
    processPool *processPoolPtr = new processPool(sum_child_process);
    // vector<Channel> channelVect;
    processPoolPtr->CreateProcess(worker);
 
    srand(time(nullptr));
 
    // 控制子进程
    int cnt = 10;//10个任务
    processPoolPtr->CtrlProcessPool(processPoolPtr, cnt);
 
    // 回收子进程
    processPoolPtr->killAll();
    //processPoolPtr->Wait();
 
    delete processPoolPtr;
    return 0;
}

makefile

myprocesspool:processpool.cc
  g++ -o $@ $^
.PHONY:clean
clean:
  rm -f myprocesspool
相关文章
|
3月前
|
存储 Linux API
【Linux进程概念】—— 操作系统中的“生命体”,计算机里的“多线程”
在计算机系统的底层架构中,操作系统肩负着资源管理与任务调度的重任。当我们启动各类应用程序时,其背后复杂的运作机制便悄然展开。程序,作为静态的指令集合,如何在系统中实现动态执行?本文带你一探究竟!
【Linux进程概念】—— 操作系统中的“生命体”,计算机里的“多线程”
|
1月前
|
并行计算 Linux
Linux内核中的线程和进程实现详解
了解进程和线程如何工作,可以帮助我们更好地编写程序,充分利用多核CPU,实现并行计算,提高系统的响应速度和计算效能。记住,适当平衡进程和线程的使用,既要拥有独立空间的'兄弟',也需要在'家庭'中分享和并行的成员。对于这个世界,现在,你应该有一个全新的认识。
146 67
|
4月前
|
消息中间件 存储 网络协议
从零开始掌握进程间通信:管道、信号、消息队列、共享内存大揭秘
本文详细介绍了进程间通信(IPC)的六种主要方式:管道、信号、消息队列、共享内存、信号量和套接字。每种方式都有其特点和适用场景,如管道适用于父子进程间的通信,消息队列能传递结构化数据,共享内存提供高速数据交换,信号量用于同步控制,套接字支持跨网络通信。通过对比和分析,帮助读者理解并选择合适的IPC机制,以提高系统性能和可靠性。
513 14
|
2月前
|
存储 Linux 调度
【Linux】进程概念和进程状态
本文详细介绍了Linux系统中进程的核心概念与管理机制。从进程的定义出发,阐述了其作为操作系统资源管理的基本单位的重要性,并深入解析了task_struct结构体的内容及其在进程管理中的作用。同时,文章讲解了进程的基本操作(如获取PID、查看进程信息等)、父进程与子进程的关系(重点分析fork函数)、以及进程的三种主要状态(运行、阻塞、挂起)。此外,还探讨了Linux特有的进程状态表示和孤儿进程的处理方式。通过学习这些内容,读者可以更好地理解Linux进程的运行原理并优化系统性能。
74 4
|
3月前
|
存储 网络协议 Linux
【Linux】进程IO|系统调用|open|write|文件描述符fd|封装|理解一切皆文件
本文详细介绍了Linux中的进程IO与系统调用,包括 `open`、`write`、`read`和 `close`函数及其用法,解释了文件描述符(fd)的概念,并深入探讨了Linux中的“一切皆文件”思想。这种设计极大地简化了系统编程,使得处理不同类型的IO设备变得更加一致和简单。通过本文的学习,您应该能够更好地理解和应用Linux中的进程IO操作,提高系统编程的效率和能力。
145 34
|
2月前
|
Linux
Linux:守护进程(进程组、会话和守护进程)
守护进程在 Linux 系统中扮演着重要角色,通过后台执行关键任务和服务,确保系统的稳定运行。理解进程组和会话的概念,是正确创建和管理守护进程的基础。使用现代的 `systemd` 或传统的 `init.d` 方法,可以有效地管理守护进程,提升系统的可靠性和可维护性。希望本文能帮助读者深入理解并掌握 Linux 守护进程的相关知识。
85 7
|
2月前
|
Linux Shell
Linux 进程前台后台切换与作业控制
进程前台/后台切换及作业控制简介: 在 Shell 中,启动的程序默认为前台进程,会占用终端直到执行完毕。例如,执行 `./shella.sh` 时,终端会被占用。为避免不便,可将命令放到后台运行,如 `./shella.sh &`,此时终端命令行立即返回,可继续输入其他命令。 常用作业控制命令: - `fg %1`:将后台作业切换到前台。 - `Ctrl + Z`:暂停前台作业并放到后台。 - `bg %1`:让暂停的后台作业继续执行。 - `kill %1`:终止后台作业。 优先级调整:
118 5
|
2月前
|
Linux 应用服务中间件 nginx
Linux 进程管理基础
Linux 进程是操作系统中运行程序的实例,彼此隔离以确保安全性和稳定性。常用命令查看和管理进程:`ps` 显示当前终端会话相关进程;`ps aux` 和 `ps -ef` 显示所有进程信息;`ps -u username` 查看特定用户进程;`ps -e | grep &lt;进程名&gt;` 查找特定进程;`ps -p &lt;PID&gt;` 查看指定 PID 的进程详情。终止进程可用 `kill &lt;PID&gt;` 或 `pkill &lt;进程名&gt;`,强制终止加 `-9` 选项。
50 3
|
3月前
|
消息中间件 Linux C++
c++ linux通过实现独立进程之间的通信和传递字符串 demo
的进程间通信机制,适用于父子进程之间的数据传输。希望本文能帮助您更好地理解和应用Linux管道,提升开发效率。 在实际开发中,除了管道,还可以根据具体需求选择消息队列、共享内存、套接字等其他进程间通信方
91 16
|
4月前
|
消息中间件 Linux
Linux:进程间通信(共享内存详细讲解以及小项目使用和相关指令、消息队列、信号量)
通过上述讲解和代码示例,您可以理解和实现Linux系统中的进程间通信机制,包括共享内存、消息队列和信号量。这些机制在实际开发中非常重要,能够提高系统的并发处理能力和数据通信效率。希望本文能为您的学习和开发提供实用的指导和帮助。
302 20