Linux进程通信——管道(上)

简介: Linux进程通信——管道(上)

进程通信概念

什么是进程通信

首先我们清楚,进程是具有独立性的,如果想让进程通信,那么成本一定不低。

数据传输:一个进程需要将它的数据发送给另一个进程

资源共享:多个进程之间共享同样的资源。

通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。

进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。

为什么要有进程通信

因为有时候我们是需要多进程协同去完成某种任务的。

怎么进行通信

目前通信有两套标准:

POSIX——让通信过程可以跨主机

System V——聚焦在本地通信(比较陈旧的标准)

重点:共享内存

管道

管道是Unix中最古老的进程间通信的形式。

我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”。

管道是基于文件系统的。

那么两个进程通信:

第一个条件就是操作系统需要给双方进程提供内存空间。

第二个条件是要通信的进程看到同一份资源。(一般都是由操作系统直接或间接提供的)

不同的通信种类本质是:

上面所说的资源是操作系统的哪一个模块提供的。

匿名管道

父进程指向了一份文件,然后创建了子进程,子进程拷贝了父进程的代码和文件表,但是文件没被拷贝,这个时候父子进程看到的就是同一份文件,也就是同一份资源。

这一步才是通信的前提。

父进程往文件的缓冲区写数据,子进程从缓冲区读数据,这个就是进程之前的通信,这个方法及操作系统提供的内核文件,称为管道文件。(管道本质上就是文件)

那么需不需要将文件缓冲区的内容经过磁盘呢?之前说过打开一个文件是通过磁盘,但是因为磁盘是外设,效率很慢,所以我们实际上并不会这样。

就算不是在磁盘当中打开的文件,操作系统也能创建一个struct file对象,申请一个缓冲区。

所以这里说的管道其实是内存级文件,他不关心在磁盘的哪个路径下,要不要被写到磁盘,只要创建对象和缓冲区然后将地址添加到对应的文件描述符表就可以了。到时候操作系统会将这个文件变成管道文件。

那么我们如何让两个进程看到同一个管道文件呢?

首先清除fork是创建子进程的,子进程会继承父进程的文件地址,这样就能看到同一份管道文件了,但是这个文件并没有名字,所以叫做匿名管道。

创建匿名管道的过程

首先是父进程创建一个匿名管道。

分别以读和写的方式打开同一个文件。

然后是创建子进程,子进程会继承父进程对于这个文件的读写方式。

最后一部就是让父进程关闭读端,子进程关闭写端,这样就能让父进程给子进程读取数据了。

一般而言,我们管道只能用来单项数据通信。

管道就是输送资源的,就是数据。

这里我们来实现一下父子进程之间的通信

这里说一下:CXX,CPP,CC都是C++源文件的后缀。

首先来了解一下创建管道的函数;

这个函数的参数是一个输出型参数,储存的是读端和写端,比如说文件描述符中,3和4是在读端和写端,那么就把3和4储存到这个数组当中。(pipefd[0]是读端,pipefd[1]是写端)这就相当于用读端和写端同时打开了一个文件。

#include <iostream>
#include <unistd.h>
#include <cassert>
#include <sys/types.h>
#include <sys/wait.h>
#include <cstring>
using namespace std;
int main()
{
    //第一步创建管道
    int fds[2];
    int n = pipe(fds);
    assert(n == 0);
    //第二部创建子进程
    pid_t id = fork();
    assert(id >= 0);
    if(id == 0)//子进程
    {
        close(fds[1]);//子进程关闭写入
        while(true)
        {
            char buffer[1024];
            ssize_t s = read(fds[0],buffer,sizeof(buffer)+1);//这里读取的内容永远少1是防止超过这个内容没有位置放\0
            if(s > 0) buffer[s] = 0;
            cout << buffer << " | my pid:" << getpid() << endl;
            //这里并没有休眠
        }
        close(fds[0]);
        exit(0);
    }
    close(fds[0]);//父进程关闭读取
    const char* s = "我是父进程,我在给你发消息";
    int cnt = 0;
    while(true)
    {
        cnt++;
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"parent -> child say:%s[%d][%d]",s,cnt,getpid());
        write(fds[1],buffer,strlen(buffer));//这里传入的长度不用+1,因为系统不是通过\0来判定字符串结尾的
        sleep(1);//每隔1s写一次
    }
    close(fds[1]);
    n = waitpid(id,nullptr,0);
    assert(n == id);
    return 0;
}

通过这个可以发现父进程确实在给子进程发消息。

这种通信,称之为管道通信。

这个过程其实就相当于父进程通过操作系统写给管道,也就是相当于写给操作系统,然后子进程通过操作系统从管道当中读取内容。

管道读写的特性

读写特征

1.上面的代码是隔着1s才会写入一次,如果改成sleep(5)就是5s写入一次。

这说明如果管道没有数据了,读端在读,默认会直接阻塞当前正在读取的进程,只有管道有数据,操作系统识别到,读端才会去读取数据。

2. 管道是一个固定大小的缓冲区。

如果将读端一直不读,写端一直写,那么缓冲区是会被写满的。

#include <iostream>
#include <unistd.h>
#include <cassert>
#include <sys/types.h>
#include <sys/wait.h>
#include <cstring>
using namespace std;
int main()
{
    //第一步创建管道
    int fds[2];
    int n = pipe(fds);
    assert(n == 0);
    //第二部创建子进程
    pid_t id = fork();
    assert(id >= 0);
    if(id == 0)//子进程
    {
        close(fds[1]);//子进程关闭写入
        while(true)
        {
            sleep(1000);
            char buffer[1024];
            ssize_t s = read(fds[0],buffer,sizeof(buffer)+1);//这里读取的内容永远少1是防止超过这个内容没有位置放\0
            if(s > 0) buffer[s] = 0;
            cout << buffer << " | my pid:" << getpid() << endl;
        }
        close(fds[0]);
        exit(0);
    }
    close(fds[0]);//父进程关闭读取
    const char* s = "我是父进程,我在给你发消息";
    int cnt = 0;
    while(true)
    {
        cnt++;
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"parent -> child say:%s[%d][%d]",s,cnt,getpid());
        write(fds[1],buffer,strlen(buffer));//这里传入的长度不用+1,因为系统不是通过\0来判定字符串结尾的
        //sleep(1);//每隔1s写一次
        cout << "count:" << cnt << endl;
    }
    close(fds[1]);
    n = waitpid(id,nullptr,0);
    assert(n == id);
    return 0;
}

这里就是管道被写满了,如果再写就会把原来的数据覆盖。

写端写满的时候,写端会阻塞,等待读端读取。

如果写端一直写,读端一直读呢?

#include <iostream>
#include <unistd.h>
#include <cassert>
#include <sys/types.h>
#include <sys/wait.h>
#include <cstring>
using namespace std;
int main()
{
    //第一步创建管道
    int fds[2];
    int n = pipe(fds);
    assert(n == 0);
    //第二部创建子进程
    pid_t id = fork();
    assert(id >= 0);
    if(id == 0)//子进程
    {
        close(fds[1]);//子进程关闭写入
        while(true)
        {
            sleep(2);
            char buffer[1024];
            ssize_t s = read(fds[0],buffer,sizeof(buffer)+1);//这里读取的内容永远少1是防止超过这个内容没有位置放\0
            if(s > 0) buffer[s] = 0;
            cout << buffer << " | my pid:" << getpid() << endl;
        }
        close(fds[0]);
        exit(0);
    }
    close(fds[0]);//父进程关闭读取
    const char* s = "我是父进程,我在给你发消息";
    int cnt = 0;
    while(true)
    {
        cnt++;
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"parent -> child say:%s[%d][%d]",s,cnt,getpid());
        write(fds[1],buffer,strlen(buffer));//这里传入的长度不用+1,因为系统不是通过\0来判定字符串结尾的
        //sleep(1);//每隔1s写一次
        cout << "count:" << cnt << endl;
    }
    close(fds[1]);
    n = waitpid(id,nullptr,0);
    assert(n == id);
    return 0;
}

管道是按照指定大小读取的,而不是一条一条读。

3.如果写端关闭,读端会发生什么呢?

#include <iostream>
#include <unistd.h>
#include <cassert>
#include <sys/types.h>
#include <sys/wait.h>
#include <cstring>
using namespace std;
int main()
{
    //第一步创建管道
    int fds[2];
    int n = pipe(fds);
    assert(n == 0);
    //第二部创建子进程
    pid_t id = fork();
    assert(id >= 0);
    if(id == 0)//子进程
    {
        close(fds[1]);//子进程关闭写入
        while(true)
        {
            //sleep(2);
            char buffer[1024];
            ssize_t s = read(fds[0],buffer,sizeof(buffer)+1);//这里读取的内容永远少1是防止超过这个内容没有位置放\0
            if(s > 0) 
            {
                buffer[s] = 0;
                cout << buffer << " | my pid:" << getpid() << endl;
            }
            else if(s == 0)
            {
                cout << "end" << endl;
                break;
            }
        }
        close(fds[0]);
        exit(0);
    }
    close(fds[0]);//父进程关闭读取
    const char* s = "我是父进程,我在给你发消息";
    int cnt = 0;
    while(true)
    {
        cnt++;
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"parent -> child say:%s[%d][%d]",s,cnt,getpid());
        write(fds[1],buffer,strlen(buffer));//这里传入的长度不用+1,因为系统不是通过\0来判定字符串结尾的
        //sleep(1);//每隔1s写一次
        break;
    }
    close(fds[1]);
    n = waitpid(id,nullptr,0);
    assert(n == id);
    return 0;
}

写端关闭,读端返回0。

4.读关闭会发生什么呢?

操作系统当然会自己判断,如果都不读了,就没必要去写了,会发送给进程一个信号,终止写端。

这里操作系统会发送13信号。

管道本身的特征

1.管道的生命周期是进程的生命周期

2.管道可以用来进行具有血缘关系的进程之间进行通信,常用于父子通信。

3.管道是面向字节流的(网络)

4.半双工——单向通信(别名)

5.互斥同步机制——对共享资源进行保护的方案。(读写的特性)

在平时使用 | 这种的,比如:

sleep 10000 | sleep 200

这就是匿名管道,操作系统会创建父子进程,然后通过管道连接起来,其实命令行解释器就是会去寻找 | 然后进行一系列操作。

基于管道的进程池设计

思路

这是用一个进程去控制多个进程,这个进程收到某个指令,然后发给这些进程的其中之一,假如说要发1就是帮忙打印东西,2就是帮忙计算等等功能(具体功能不重要,这里不实现),发的是几,功能就是对应的。

这里要注意,不能总让一个子进程工作,要分工做。

#include <iostream>
#include <unistd.h>
#include <cstring>
#include <fcntl.h>
#include <cassert>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <vector>
#include <string>
using namespace std;
#define PROCSS_NUM 5//创建子进程的数量
#define MakeSeed() srand((unsigned long)time(nullptr)^getpid()^rand()%1234)//生成伪随机数
typedef void(*func_t)();//重命名函数指针,其实就是命名为func_t
//处理条件
void downLoadTask()
{
    cout << getpid() << ":下载任务" << endl;
    sleep(1);
}
void ioTask()
{
    cout << getpid() << ":IO任务" << endl;
    sleep(1);
}
void flushTask()
{
    cout << getpid() << ":刷新任务" << endl;
    sleep(1);
}
void loadTaskFunc(vector<func_t>& func_t)
{
    func_t.push_back(downLoadTask);
    func_t.push_back(ioTask);
    func_t.push_back(flushTask);
}
class subEd//子进程的信息
{
public:
    subEd(pid_t id,int fd)
        :_subId(id)
        ,_writeFd(fd)
    {
        char nameBuffer[1024];
        snprintf(nameBuffer,sizeof nameBuffer,"process-%d[pid(%d)-fd(%d)]",num++,_subId,_writeFd);//进程的名字是进程编号+pid+写端口
        _name = nameBuffer;
    }
public:
    static int num;//进程的编号
    string _name;//进程名字
    pid_t _subId;//进程pid
    int _writeFd;//进程写端
};
int subEd::num = 0;
int recvTask(int readFd)
{
    int code = 0;
    ssize_t s = read(readFd,&code,sizeof(int));
    if(s==4) return code;
    else return -1;
}
void createSubProcess(vector<subEd>&arr,vector<func_t>&funcMap)
{
    for(int i = 0;i < PROCSS_NUM; i++)
    {
        int fds[2];
        int n = pipe(fds);
        assert(n==0);
        (void)n;//这里防止Release之后上面的assert直接被忽略,导致n被判断没被使用
        pid_t id = fork();
        if(id == 0)
        {
            //子进程处理任务
            close(fds[1]);//关闭写端
            while(1)
            {
                //获取命令码,如果没发送,子进程阻塞
                int commandCode = recvTask(fds[0]);//读取父进程发来的信息
                //完成任务
                if(commandCode >= 0 && commandCode < funcMap.size()) funcMap[commandCode]();//调用处理条件的信息表
                else break;//如果没有可读取的内容就跳出循环,就等于写端关闭,读端也关闭
            }
            exit(0);
        }
        close(fds[0]);//父进程关闭读端
        subEd sub(id,fds[1]);//创建子进程id和父进程写fd储存的对象,这里要注意,走到这里的是父进程,父进程拿到的返回值是子进程的pid
        arr.push_back(sub);//放入子进程信息的数组中
    }
}
void sendTask(subEd& s,int taskIdx)
{
    cout << "send task num:" << taskIdx << "send to ->" << s._name << endl;
    int n = write(s._writeFd,&taskIdx,sizeof(taskIdx));//这里为了安全必须发送的是四个字节
    assert(n == sizeof(int));
    (void)n;
}
void loadBlanceContrl(vector<subEd>&arr,vector<func_t>&funcMap,int count)
{
    int processsum = arr.size();//一共多少个子进程
    int tasksum = funcMap.size();//一共多少种方案
    int flag = 0;
    if(count > 0) flag = count;
    while(1)
    {
        //随机选择一个子进程
        int idx = rand() % processsum;
        //选择一个任务
        int taskIdx = rand() % tasksum; 
        //任务发送给选择的进程
        sendTask(arr[idx], taskIdx);
        sleep(1);
        if(flag)
        {
            count--;
            if(count == 0) break;
        }
    }
    //这里关闭父进程的写端
    for(int i = 0; i < processsum; i++) close(arr[i]._writeFd);
}
void waitProcess(vector<subEd>&arr)
{
    for(int i = 0; i < arr.size();i++)
    {
        waitpid(arr[i]._subId,nullptr,0);
        cout << "wait sub process success ...:" << arr[i]._subId << endl;
    }
}
int main()
{
    MakeSeed();//随机数的开始
    //创建子进程和父进程之前的通信
    vector<func_t>funcMap;//储存子进程处理的条件
    vector<subEd>arr;//储存子进程的pid和要处理的信息
    loadTaskFunc(funcMap);
    createSubProcess(arr,funcMap);
    //父进程
    int taskcnt = 5;//假设等于0就是永远进行
    loadBlanceContrl(arr,funcMap,taskcnt);
    //回收子进程
    waitProcess(arr);
    return 0;
}

相关文章
|
5月前
|
并行计算 Linux
Linux内核中的线程和进程实现详解
了解进程和线程如何工作,可以帮助我们更好地编写程序,充分利用多核CPU,实现并行计算,提高系统的响应速度和计算效能。记住,适当平衡进程和线程的使用,既要拥有独立空间的'兄弟',也需要在'家庭'中分享和并行的成员。对于这个世界,现在,你应该有一个全新的认识。
221 67
|
4月前
|
Web App开发 Linux 程序员
获取和理解Linux进程以及其PID的基础知识。
总的来说,理解Linux进程及其PID需要我们明白,进程就如同汽车,负责执行任务,而PID则是独特的车牌号,为我们提供了管理的便利。知道这个,我们就可以更好地理解和操作Linux系统,甚至通过对进程的有效管理,让系统运行得更加顺畅。
102 16
|
4月前
|
Unix Linux
对于Linux的进程概念以及进程状态的理解和解析
现在,我们已经了解了Linux进程的基础知识和进程状态的理解了。这就像我们理解了城市中行人的行走和行为模式!希望这个形象的例子能帮助我们更好地理解这个重要的概念,并在实际应用中发挥作用。
89 20
|
3月前
|
监控 Shell Linux
Linux进程控制(详细讲解)
进程等待是系统通过调用特定的接口(如waitwaitpid)来实现的。来进行对子进程状态检测与回收的功能。
68 0
|
3月前
|
存储 负载均衡 算法
Linux2.6内核进程调度队列
本篇文章是Linux进程系列中的最后一篇文章,本来是想放在上一篇文章的结尾的,但是想了想还是单独写一篇文章吧,虽然说这部分内容是比较难的,所有一般来说是简单的提及带过的,但是为了让大家对进程有更深的理解与认识,还是看了一些别人的文章,然后学习了学习,然后对此做了总结,尽可能详细的介绍明白。最后推荐一篇文章Linux的进程优先级 NI 和 PR - 简书。
90 0
|
3月前
|
存储 Linux Shell
Linux进程概念-详细版(二)
在Linux进程概念-详细版(一)中我们解释了什么是进程,以及进程的各种状态,已经对进程有了一定的认识,那么这篇文章将会继续补全上篇文章剩余没有说到的,进程优先级,环境变量,程序地址空间,进程地址空间,以及调度队列。
59 0
|
3月前
|
Linux 调度 C语言
Linux进程概念-详细版(一)
子进程与父进程代码共享,其子进程直接用父进程的代码,其自己本身无代码,所以子进程无法改动代码,平时所说的修改是修改的数据。为什么要创建子进程:为了让其父子进程执行不同的代码块。子进程的数据相对于父进程是会进行写时拷贝(COW)。
62 0
|
7月前
|
存储 Linux API
【Linux进程概念】—— 操作系统中的“生命体”,计算机里的“多线程”
在计算机系统的底层架构中,操作系统肩负着资源管理与任务调度的重任。当我们启动各类应用程序时,其背后复杂的运作机制便悄然展开。程序,作为静态的指令集合,如何在系统中实现动态执行?本文带你一探究竟!
【Linux进程概念】—— 操作系统中的“生命体”,计算机里的“多线程”
|
6月前
|
存储 Linux 调度
【Linux】进程概念和进程状态
本文详细介绍了Linux系统中进程的核心概念与管理机制。从进程的定义出发,阐述了其作为操作系统资源管理的基本单位的重要性,并深入解析了task_struct结构体的内容及其在进程管理中的作用。同时,文章讲解了进程的基本操作(如获取PID、查看进程信息等)、父进程与子进程的关系(重点分析fork函数)、以及进程的三种主要状态(运行、阻塞、挂起)。此外,还探讨了Linux特有的进程状态表示和孤儿进程的处理方式。通过学习这些内容,读者可以更好地理解Linux进程的运行原理并优化系统性能。
203 4
|
存储 缓存 Linux
【Linux】进程概念(冯诺依曼体系结构、操作系统、进程)-- 详解
【Linux】进程概念(冯诺依曼体系结构、操作系统、进程)-- 详解

热门文章

最新文章