Linux进程通信——管道(上)

简介: Linux进程通信——管道(上)

进程通信概念

什么是进程通信

首先我们清楚,进程是具有独立性的,如果想让进程通信,那么成本一定不低。

数据传输:一个进程需要将它的数据发送给另一个进程

资源共享:多个进程之间共享同样的资源。

通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。

进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。

为什么要有进程通信

因为有时候我们是需要多进程协同去完成某种任务的。

怎么进行通信

目前通信有两套标准:

POSIX——让通信过程可以跨主机

System V——聚焦在本地通信(比较陈旧的标准)

重点:共享内存

管道

管道是Unix中最古老的进程间通信的形式。

我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”。

管道是基于文件系统的。

那么两个进程通信:

第一个条件就是操作系统需要给双方进程提供内存空间。

第二个条件是要通信的进程看到同一份资源。(一般都是由操作系统直接或间接提供的)

不同的通信种类本质是:

上面所说的资源是操作系统的哪一个模块提供的。

匿名管道

父进程指向了一份文件,然后创建了子进程,子进程拷贝了父进程的代码和文件表,但是文件没被拷贝,这个时候父子进程看到的就是同一份文件,也就是同一份资源。

这一步才是通信的前提。

父进程往文件的缓冲区写数据,子进程从缓冲区读数据,这个就是进程之前的通信,这个方法及操作系统提供的内核文件,称为管道文件。(管道本质上就是文件)

那么需不需要将文件缓冲区的内容经过磁盘呢?之前说过打开一个文件是通过磁盘,但是因为磁盘是外设,效率很慢,所以我们实际上并不会这样。

就算不是在磁盘当中打开的文件,操作系统也能创建一个struct file对象,申请一个缓冲区。

所以这里说的管道其实是内存级文件,他不关心在磁盘的哪个路径下,要不要被写到磁盘,只要创建对象和缓冲区然后将地址添加到对应的文件描述符表就可以了。到时候操作系统会将这个文件变成管道文件。

那么我们如何让两个进程看到同一个管道文件呢?

首先清除fork是创建子进程的,子进程会继承父进程的文件地址,这样就能看到同一份管道文件了,但是这个文件并没有名字,所以叫做匿名管道。

创建匿名管道的过程

首先是父进程创建一个匿名管道。

分别以读和写的方式打开同一个文件。

然后是创建子进程,子进程会继承父进程对于这个文件的读写方式。

最后一部就是让父进程关闭读端,子进程关闭写端,这样就能让父进程给子进程读取数据了。

一般而言,我们管道只能用来单项数据通信。

管道就是输送资源的,就是数据。

这里我们来实现一下父子进程之间的通信

这里说一下:CXX,CPP,CC都是C++源文件的后缀。

首先来了解一下创建管道的函数;

这个函数的参数是一个输出型参数,储存的是读端和写端,比如说文件描述符中,3和4是在读端和写端,那么就把3和4储存到这个数组当中。(pipefd[0]是读端,pipefd[1]是写端)这就相当于用读端和写端同时打开了一个文件。

#include <iostream>
#include <unistd.h>
#include <cassert>
#include <sys/types.h>
#include <sys/wait.h>
#include <cstring>
using namespace std;
int main()
{
    //第一步创建管道
    int fds[2];
    int n = pipe(fds);
    assert(n == 0);
    //第二部创建子进程
    pid_t id = fork();
    assert(id >= 0);
    if(id == 0)//子进程
    {
        close(fds[1]);//子进程关闭写入
        while(true)
        {
            char buffer[1024];
            ssize_t s = read(fds[0],buffer,sizeof(buffer)+1);//这里读取的内容永远少1是防止超过这个内容没有位置放\0
            if(s > 0) buffer[s] = 0;
            cout << buffer << " | my pid:" << getpid() << endl;
            //这里并没有休眠
        }
        close(fds[0]);
        exit(0);
    }
    close(fds[0]);//父进程关闭读取
    const char* s = "我是父进程,我在给你发消息";
    int cnt = 0;
    while(true)
    {
        cnt++;
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"parent -> child say:%s[%d][%d]",s,cnt,getpid());
        write(fds[1],buffer,strlen(buffer));//这里传入的长度不用+1,因为系统不是通过\0来判定字符串结尾的
        sleep(1);//每隔1s写一次
    }
    close(fds[1]);
    n = waitpid(id,nullptr,0);
    assert(n == id);
    return 0;
}

通过这个可以发现父进程确实在给子进程发消息。

这种通信,称之为管道通信。

这个过程其实就相当于父进程通过操作系统写给管道,也就是相当于写给操作系统,然后子进程通过操作系统从管道当中读取内容。

管道读写的特性

读写特征

1.上面的代码是隔着1s才会写入一次,如果改成sleep(5)就是5s写入一次。

这说明如果管道没有数据了,读端在读,默认会直接阻塞当前正在读取的进程,只有管道有数据,操作系统识别到,读端才会去读取数据。

2. 管道是一个固定大小的缓冲区。

如果将读端一直不读,写端一直写,那么缓冲区是会被写满的。

#include <iostream>
#include <unistd.h>
#include <cassert>
#include <sys/types.h>
#include <sys/wait.h>
#include <cstring>
using namespace std;
int main()
{
    //第一步创建管道
    int fds[2];
    int n = pipe(fds);
    assert(n == 0);
    //第二部创建子进程
    pid_t id = fork();
    assert(id >= 0);
    if(id == 0)//子进程
    {
        close(fds[1]);//子进程关闭写入
        while(true)
        {
            sleep(1000);
            char buffer[1024];
            ssize_t s = read(fds[0],buffer,sizeof(buffer)+1);//这里读取的内容永远少1是防止超过这个内容没有位置放\0
            if(s > 0) buffer[s] = 0;
            cout << buffer << " | my pid:" << getpid() << endl;
        }
        close(fds[0]);
        exit(0);
    }
    close(fds[0]);//父进程关闭读取
    const char* s = "我是父进程,我在给你发消息";
    int cnt = 0;
    while(true)
    {
        cnt++;
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"parent -> child say:%s[%d][%d]",s,cnt,getpid());
        write(fds[1],buffer,strlen(buffer));//这里传入的长度不用+1,因为系统不是通过\0来判定字符串结尾的
        //sleep(1);//每隔1s写一次
        cout << "count:" << cnt << endl;
    }
    close(fds[1]);
    n = waitpid(id,nullptr,0);
    assert(n == id);
    return 0;
}

这里就是管道被写满了,如果再写就会把原来的数据覆盖。

写端写满的时候,写端会阻塞,等待读端读取。

如果写端一直写,读端一直读呢?

#include <iostream>
#include <unistd.h>
#include <cassert>
#include <sys/types.h>
#include <sys/wait.h>
#include <cstring>
using namespace std;
int main()
{
    //第一步创建管道
    int fds[2];
    int n = pipe(fds);
    assert(n == 0);
    //第二部创建子进程
    pid_t id = fork();
    assert(id >= 0);
    if(id == 0)//子进程
    {
        close(fds[1]);//子进程关闭写入
        while(true)
        {
            sleep(2);
            char buffer[1024];
            ssize_t s = read(fds[0],buffer,sizeof(buffer)+1);//这里读取的内容永远少1是防止超过这个内容没有位置放\0
            if(s > 0) buffer[s] = 0;
            cout << buffer << " | my pid:" << getpid() << endl;
        }
        close(fds[0]);
        exit(0);
    }
    close(fds[0]);//父进程关闭读取
    const char* s = "我是父进程,我在给你发消息";
    int cnt = 0;
    while(true)
    {
        cnt++;
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"parent -> child say:%s[%d][%d]",s,cnt,getpid());
        write(fds[1],buffer,strlen(buffer));//这里传入的长度不用+1,因为系统不是通过\0来判定字符串结尾的
        //sleep(1);//每隔1s写一次
        cout << "count:" << cnt << endl;
    }
    close(fds[1]);
    n = waitpid(id,nullptr,0);
    assert(n == id);
    return 0;
}

管道是按照指定大小读取的,而不是一条一条读。

3.如果写端关闭,读端会发生什么呢?

#include <iostream>
#include <unistd.h>
#include <cassert>
#include <sys/types.h>
#include <sys/wait.h>
#include <cstring>
using namespace std;
int main()
{
    //第一步创建管道
    int fds[2];
    int n = pipe(fds);
    assert(n == 0);
    //第二部创建子进程
    pid_t id = fork();
    assert(id >= 0);
    if(id == 0)//子进程
    {
        close(fds[1]);//子进程关闭写入
        while(true)
        {
            //sleep(2);
            char buffer[1024];
            ssize_t s = read(fds[0],buffer,sizeof(buffer)+1);//这里读取的内容永远少1是防止超过这个内容没有位置放\0
            if(s > 0) 
            {
                buffer[s] = 0;
                cout << buffer << " | my pid:" << getpid() << endl;
            }
            else if(s == 0)
            {
                cout << "end" << endl;
                break;
            }
        }
        close(fds[0]);
        exit(0);
    }
    close(fds[0]);//父进程关闭读取
    const char* s = "我是父进程,我在给你发消息";
    int cnt = 0;
    while(true)
    {
        cnt++;
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"parent -> child say:%s[%d][%d]",s,cnt,getpid());
        write(fds[1],buffer,strlen(buffer));//这里传入的长度不用+1,因为系统不是通过\0来判定字符串结尾的
        //sleep(1);//每隔1s写一次
        break;
    }
    close(fds[1]);
    n = waitpid(id,nullptr,0);
    assert(n == id);
    return 0;
}

写端关闭,读端返回0。

4.读关闭会发生什么呢?

操作系统当然会自己判断,如果都不读了,就没必要去写了,会发送给进程一个信号,终止写端。

这里操作系统会发送13信号。

管道本身的特征

1.管道的生命周期是进程的生命周期

2.管道可以用来进行具有血缘关系的进程之间进行通信,常用于父子通信。

3.管道是面向字节流的(网络)

4.半双工——单向通信(别名)

5.互斥同步机制——对共享资源进行保护的方案。(读写的特性)

在平时使用 | 这种的,比如:

sleep 10000 | sleep 200

这就是匿名管道,操作系统会创建父子进程,然后通过管道连接起来,其实命令行解释器就是会去寻找 | 然后进行一系列操作。

基于管道的进程池设计

思路

这是用一个进程去控制多个进程,这个进程收到某个指令,然后发给这些进程的其中之一,假如说要发1就是帮忙打印东西,2就是帮忙计算等等功能(具体功能不重要,这里不实现),发的是几,功能就是对应的。

这里要注意,不能总让一个子进程工作,要分工做。

#include <iostream>
#include <unistd.h>
#include <cstring>
#include <fcntl.h>
#include <cassert>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <vector>
#include <string>
using namespace std;
#define PROCSS_NUM 5//创建子进程的数量
#define MakeSeed() srand((unsigned long)time(nullptr)^getpid()^rand()%1234)//生成伪随机数
typedef void(*func_t)();//重命名函数指针,其实就是命名为func_t
//处理条件
void downLoadTask()
{
    cout << getpid() << ":下载任务" << endl;
    sleep(1);
}
void ioTask()
{
    cout << getpid() << ":IO任务" << endl;
    sleep(1);
}
void flushTask()
{
    cout << getpid() << ":刷新任务" << endl;
    sleep(1);
}
void loadTaskFunc(vector<func_t>& func_t)
{
    func_t.push_back(downLoadTask);
    func_t.push_back(ioTask);
    func_t.push_back(flushTask);
}
class subEd//子进程的信息
{
public:
    subEd(pid_t id,int fd)
        :_subId(id)
        ,_writeFd(fd)
    {
        char nameBuffer[1024];
        snprintf(nameBuffer,sizeof nameBuffer,"process-%d[pid(%d)-fd(%d)]",num++,_subId,_writeFd);//进程的名字是进程编号+pid+写端口
        _name = nameBuffer;
    }
public:
    static int num;//进程的编号
    string _name;//进程名字
    pid_t _subId;//进程pid
    int _writeFd;//进程写端
};
int subEd::num = 0;
int recvTask(int readFd)
{
    int code = 0;
    ssize_t s = read(readFd,&code,sizeof(int));
    if(s==4) return code;
    else return -1;
}
void createSubProcess(vector<subEd>&arr,vector<func_t>&funcMap)
{
    for(int i = 0;i < PROCSS_NUM; i++)
    {
        int fds[2];
        int n = pipe(fds);
        assert(n==0);
        (void)n;//这里防止Release之后上面的assert直接被忽略,导致n被判断没被使用
        pid_t id = fork();
        if(id == 0)
        {
            //子进程处理任务
            close(fds[1]);//关闭写端
            while(1)
            {
                //获取命令码,如果没发送,子进程阻塞
                int commandCode = recvTask(fds[0]);//读取父进程发来的信息
                //完成任务
                if(commandCode >= 0 && commandCode < funcMap.size()) funcMap[commandCode]();//调用处理条件的信息表
                else break;//如果没有可读取的内容就跳出循环,就等于写端关闭,读端也关闭
            }
            exit(0);
        }
        close(fds[0]);//父进程关闭读端
        subEd sub(id,fds[1]);//创建子进程id和父进程写fd储存的对象,这里要注意,走到这里的是父进程,父进程拿到的返回值是子进程的pid
        arr.push_back(sub);//放入子进程信息的数组中
    }
}
void sendTask(subEd& s,int taskIdx)
{
    cout << "send task num:" << taskIdx << "send to ->" << s._name << endl;
    int n = write(s._writeFd,&taskIdx,sizeof(taskIdx));//这里为了安全必须发送的是四个字节
    assert(n == sizeof(int));
    (void)n;
}
void loadBlanceContrl(vector<subEd>&arr,vector<func_t>&funcMap,int count)
{
    int processsum = arr.size();//一共多少个子进程
    int tasksum = funcMap.size();//一共多少种方案
    int flag = 0;
    if(count > 0) flag = count;
    while(1)
    {
        //随机选择一个子进程
        int idx = rand() % processsum;
        //选择一个任务
        int taskIdx = rand() % tasksum; 
        //任务发送给选择的进程
        sendTask(arr[idx], taskIdx);
        sleep(1);
        if(flag)
        {
            count--;
            if(count == 0) break;
        }
    }
    //这里关闭父进程的写端
    for(int i = 0; i < processsum; i++) close(arr[i]._writeFd);
}
void waitProcess(vector<subEd>&arr)
{
    for(int i = 0; i < arr.size();i++)
    {
        waitpid(arr[i]._subId,nullptr,0);
        cout << "wait sub process success ...:" << arr[i]._subId << endl;
    }
}
int main()
{
    MakeSeed();//随机数的开始
    //创建子进程和父进程之前的通信
    vector<func_t>funcMap;//储存子进程处理的条件
    vector<subEd>arr;//储存子进程的pid和要处理的信息
    loadTaskFunc(funcMap);
    createSubProcess(arr,funcMap);
    //父进程
    int taskcnt = 5;//假设等于0就是永远进行
    loadBlanceContrl(arr,funcMap,taskcnt);
    //回收子进程
    waitProcess(arr);
    return 0;
}

相关文章
|
17天前
|
算法 Linux 调度
深入理解Linux操作系统的进程管理
本文旨在探讨Linux操作系统中的进程管理机制,包括进程的创建、执行、调度和终止等环节。通过对Linux内核中相关模块的分析,揭示其高效的进程管理策略,为开发者提供优化程序性能和资源利用率的参考。
43 1
|
6天前
|
存储 监控 Linux
嵌入式Linux系统编程 — 5.3 times、clock函数获取进程时间
在嵌入式Linux系统编程中,`times`和 `clock`函数是获取进程时间的两个重要工具。`times`函数提供了更详细的进程和子进程时间信息,而 `clock`函数则提供了更简单的处理器时间获取方法。根据具体需求选择合适的函数,可以更有效地进行性能分析和资源管理。通过本文的介绍,希望能帮助您更好地理解和使用这两个函数,提高嵌入式系统编程的效率和效果。
50 13
|
12天前
|
SQL 运维 监控
南大通用GBase 8a MPP Cluster Linux端SQL进程监控工具
南大通用GBase 8a MPP Cluster Linux端SQL进程监控工具
|
20天前
|
运维 监控 Linux
Linux操作系统的守护进程与服务管理深度剖析####
本文作为一篇技术性文章,旨在深入探讨Linux操作系统中守护进程与服务管理的机制、工具及实践策略。不同于传统的摘要概述,本文将以“守护进程的生命周期”为核心线索,串联起Linux服务管理的各个方面,从守护进程的定义与特性出发,逐步深入到Systemd的工作原理、服务单元文件编写、服务状态管理以及故障排查技巧,为读者呈现一幅Linux服务管理的全景图。 ####
|
26天前
|
缓存 算法 Linux
Linux内核的心脏:深入理解进程调度器
本文探讨了Linux操作系统中至关重要的组成部分——进程调度器。通过分析其工作原理、调度算法以及在不同场景下的表现,揭示它是如何高效管理CPU资源,确保系统响应性和公平性的。本文旨在为读者提供一个清晰的视图,了解在多任务环境下,Linux是如何智能地分配处理器时间给各个进程的。
|
1月前
|
网络协议 Linux 虚拟化
如何在 Linux 系统中查看进程的详细信息?
如何在 Linux 系统中查看进程的详细信息?
79 1
|
1月前
|
Linux
如何在 Linux 系统中查看进程占用的内存?
如何在 Linux 系统中查看进程占用的内存?
|
Linux Shell
详解linux进程间通信-管道 popen函数 dup2函数
  前言:进程之间交换信息的唯一方法是经由f o r k或e x e c传送打开文件,或通过文件系统。本章将说明进程之间相互通信的其他技术—I P C(InterProcess Communication)。
1602 0
|
1月前
|
Linux 网络安全 数据安全/隐私保护
Linux 超级强大的十六进制 dump 工具:XXD 命令,我教你应该如何使用!
在 Linux 系统中,xxd 命令是一个强大的十六进制 dump 工具,可以将文件或数据以十六进制和 ASCII 字符形式显示,帮助用户深入了解和分析数据。本文详细介绍了 xxd 命令的基本用法、高级功能及实际应用案例,包括查看文件内容、指定输出格式、写入文件、数据比较、数据提取、数据转换和数据加密解密等。通过掌握这些技巧,用户可以更高效地处理各种数据问题。
97 8
|
1月前
|
监控 Linux
如何检查 Linux 内存使用量是否耗尽?这 5 个命令堪称绝了!
本文介绍了在Linux系统中检查内存使用情况的5个常用命令:`free`、`top`、`vmstat`、`pidstat` 和 `/proc/meminfo` 文件,帮助用户准确监控内存状态,确保系统稳定运行。
291 6