进程间通信(一)

简介: 进程间通信

进程间通信


概念


在之前的学习中,我们知道进程是具有独立性的,现在却需要让进程之间进行通信,代价肯定是比较大的,而且为什么要进行通信呢?这就是通信的目的;通信该如何完成呢?


既然进程需要将数据传递给双方,肯定需要一块内存空间用来存放数据,这块空间不能由进程其中一方提供,因为进程是具有独立性的;进行通信时,除了进程双方,操作系统也参与进来,所以需要操作系统直接或间接给进程进程双方提供内存空间,让进程双方都能“看到”这一资源;不同的通信,所需要的空间也不同


目的


数据传输:一个进程需要将自己的数据发送给另一个进程

资源共享:多个进程之间共享同样的资源

通知事件:一个进程需要向另一个或一组进程发送消息,通知

进程控制:有些进程希望完全控制另一个进程的执行,此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变

这里介绍一个简单的场景:cat file|grep 'hello'该指令是,将文件file的内容打印到显示器上,不过呢,先需要经过grep进行过滤,才会将内容进行打印;两个进程之间通过|完成了某种意义上的联系,这里也就是通信


方式


System V:

可以使进程跨主机进行通信


POSIX IPC:

将进程聚焦在本地进行通信


管道:


匿名管道

命名管道

前两种较为复杂,这里就学习以管道的方式进行通信


管道


概念


管道是unix中最古老的进程间通信的方式;将一个进程连接到另一个进程的一个数据流称作一个管道


父进程通过调用管道特定的系统调用,以读方式和写方式打开一个内存级文件,并通过创建子进程的方式,被字进程继承之后,关闭各自的读写端,进而形成一个通信信道


匿名管道


在上一章节中学习到,当在磁盘上打开文件时,操作系统会生成对应的struct file进行管理;通过管道进行通信时,也存在文件,称作管道文件,属于内存级文件,没有名称,称作匿名管道


图解:

ce2f83a08e3cb486ffb4119c82067e37_2584a14f9b0d4581854fe2098e539b79.png


父进程通过文件描述符表,映射到对应的文件结构体;子进程继承父进程的文件描述表,指向同一个结构体;父进程可以通过结构体将内容写到磁盘中,子进程再到磁盘上进行读,不过这样做,效率太低,毕竟是通信;所以父进程所打开的是管道文件,也就是在内存中;结构体提供了父子进程都能够使用的内存空间(缓冲区)


内存空间已经提供,接下来就是父子进程的通信过程

图解:


a2857e0fea1df5da9908d0b2bb1e0575_c10a197fe5b84681998cf2bee32df09b.png


父进程之所以需要以读写两中方式打开管道文件,是因为如果只以一种方式打开文件的话,子进程也就继承父进程打开文件的方式,并且两者之间不能进行通信

f64a1a87e7f6c9453716b21d64a016cd_745eceaaf70d4f4a95556f9025d885db.png

9bd5c830b622f3805fdb251030c9ab84_e39bb41a094a4814a5b9cb4452bb2c66.png


匿名管道目前用来进行父子进程之间的通信


实例代码


先介绍创建管道的函数

int pipe(int pipefd[2]);

数组中第一个元素表示读端

pipefd[0] refers to the read end of the pipe

数组中第二个元素表示写端

pipefd[0] refers to the read end of the pipe


返回值

On success, zero is returned. On error, -1 is returned, and errno is set appropriately


代码演示


using namespace std;
int main()
{
    int fds[2];
    int n=pipe(fds);
    assert(n==0);
    cout<<"fds[0]"<<fds[0]<<endl;
    cout<<"fds[1]"<<fds[1]<<endl;
    return 0;
}


5e2d5a90ebdb0a5d46d76091d5dff31f_cec20dcbe8584b77bf304903b31bed33.png


由结果可知,读写端分别对应文件描述符表下标的3,4


makefile


mypipe:mypipe.cpp
  g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
  rm -f mypipe


mypipe.cpp

头文件


#include<iostream>
#include<cstdio>
#include<cstring>
#include<string>
#include<cassert>
#include<unistd.h>
#include<sys/types.h>
#include<sys/wait.h>


子进程每隔五秒写一次数据


int main()
{
    //父进程以读写打开文件
    int fds[2];
    int n=pipe(fds);
    assert(n==0);
    //创建子进程
    pid_t id=fork();
    assert(id>=0);
    //子进程
    if(id==0)
    {
        //子进程进行写入,关闭读端
        close(fds[0]);
        const char *s=" 我是子进程,正在给你发消息 ";
        int cnt=0;
        while(true)
        {
            cnt++;
            char buffer[1024];
            snprintf(buffer,sizeof buffer,"child->parent say: %s[%d][%d]",s,cnt,getpid());
            //写端写满时,继续写入会发生堵塞,只能等待父进程读取
            write(fds[1],buffer,strlen(buffer));
            cout<<"count: "<< cnt <<endl;
            sleep(5);
        }
        //关闭写端
        close(fds[1]);
        cout<<"子进程关闭写端"<<endl;
        exit(0);
    }
    //父进程关闭写端
    close(fds[1]);
    //父进程通信代码
    while(true)
    {
        char buffer[1024];
        cout<<"*************************"<<endl;
        ssize_t s=read(fds[0],buffer,sizeof(buffer)-1);
        cout<<"#########################"<<endl;
        if(s>0)
        {
            buffer[s]=0;
        }
        cout<<"Get Message# "<<buffer<<"|my pid"<<getpid()<<endl;
    }
    //回收进程
    n=waitpid(id,nullptr,0);
    assert(n==id);
    return 0;
}

e15f80b5f285cd029aadc926bbc96b8b_d56d540a7c8e477f8dfe50cc2f3728dc.png


此时如果管道中没有数据,读端,默认会发生堵塞


写端一直写,读端休眠1000秒


int main()
{
    //父进程以读写打开文件
    int fds[2];
    int n=pipe(fds);
    assert(n==0);
    //创建子进程
    pid_t id=fork();
    assert(id>=0);
    //子进程
    if(id==0)
    {
        //子进程进行写入,关闭读端
        close(fds[0]);
        const char *s=" 我是子进程,正在给你发消息 ";
        int cnt=0;
        while(true)
        {
            cnt++;
            char buffer[1024];
            snprintf(buffer,sizeof buffer,"child->parent say: %s[%d][%d]",s,cnt,getpid());
            //写端写满时,继续写入会发生堵塞,只能等待父进程读取
            write(fds[1],buffer,strlen(buffer));
            cout<<"count: "<< cnt <<endl;
        }
        //关闭写端
        close(fds[1]);
        cout<<"子进程关闭写端"<<endl;
        exit(0);
    }
    //父进程关闭写端
    close(fds[1]);
    //父进程通信代码
    while(true)
    {
        sleep(1000);
        char buffer[1024];
        cout<<"*************************"<<endl;
        ssize_t s=read(fds[0],buffer,sizeof(buffer)-1);
        cout<<"#########################"<<endl;
        if(s>0)
        {
            buffer[s]=0;
        }
        cout<<"Get Message# "<<buffer<<"|my pid"<<getpid()<<endl;
    }
    //回收进程
    n=waitpid(id,nullptr,0);
    assert(n==id);
    return 0;
}

09819bba7fcfe479a1e90a61c2a5b16a_d2ba28524db349759181d861889fc666.png


写端将内存空间写满时,再进行写入会发生堵塞,需要等待读端进行读取


子进程写一次直接退出,并且关闭自己的写端


int main()
{
    //父进程以读写打开文件
    int fds[2];
    int n=pipe(fds);
    assert(n==0);
    //创建子进程
    pid_t id=fork();
    assert(id>=0);
    //子进程
    if(id==0)
    {
        //子进程进行写入,关闭读端
        close(fds[0]);
        const char*s="我是子进程,正在给你发消息";
        int cnt=0;
        while(true)
        {
            cnt++;
            char buffer[1024];
            snprintf(buffer,sizeof buffer,"child->parent say:%s[%d][%d]",s,cnt,getpid());
            //写端写满时,继续写入会发生堵塞,只能等待父进程读取
            write(fds[1],buffer,strlen(buffer));
            cout<<"count: "<<cnt<<endl;
            break;
        }
        //关闭写端
        close(fds[1]);
        cout<<"子进程关闭写端"<<endl;
        exit(0);
    }
    //父进程关闭写端
    close(fds[1]);
    //父进程通信代码
    while(true)
    {
        char buffer[1024];
        ssize_t s=read(fds[0],buffer,sizeof(buffer-1));
        if(s>0)
        {
            buffer[s]=0;
            cout<<"Get Message"<<buffer<<"|my pid"<<getpid()<<endl;
        }
        else if(s==0)
        {
            //读取到文件末尾
            cout<<"read:"<<s<<endl;
            break;
        }
    }
    int status=0;
    n=waitpid(id,&status,0);
    assert(n==id);
    cout<<"pid->"<<n<<" "<<(status&0x7F)<<endl;
    return 0;
}


cf6ec37ef99d3fd03f7d6379aed3a9f6_11788d7283aa4d0199a81ddfd4dab566.png


父进程进行读取,将内容读取完之后,直接退出进程


子进程一直写,父进程读取一次直接退出


int main()
{
    //父进程以读写打开文件
    int fds[2];
    int n=pipe(fds);
    assert(n==0);
    //创建子进程
    pid_t id=fork();
    assert(id>=0);
    //子进程
    if(id==0)
    {
        //子进程进行写入,关闭读端
        close(fds[0]);
        const char*s="我是子进程,正在给你发消息";
        int cnt=0;
        while(true)
        {
            cnt++;
            char buffer[1024];
            snprintf(buffer,sizeof buffer,"child->parent say:%s[%d][%d]",s,cnt,getpid());
            //写端写满时,继续写入会发生堵塞,只能等待父进程读取
            write(fds[1],buffer,strlen(buffer));
            cout<<"count: "<<cnt<<endl;
        }
        //关闭写端
        close(fds[1]);
        cout<<"子进程关闭写端"<<endl;
        exit(0);
    }
    //父进程关闭写端
    close(fds[1]);
    //父进程通信代码
    while(true)
    {
        char buffer[1024];
        ssize_t s=read(fds[0],buffer,sizeof(buffer-1));
        if(s>0)
        {
            buffer[s]=0;
            cout<<"Get Message"<<buffer<<"|my pid"<<getpid()<<endl;
        }
        else if(s==0)
        {
            //读取到文件末尾
            cout<<"read:"<<s<<endl;
            break;
        }
        break;
    }
    close(fds[0]);
    cout<<"父进程关闭读端"<<endl;
    int status=0;
    n=waitpid(id,&status,0);
    assert(n==id);
    cout<<"pid->"<<n<<" "<<(status&0x7F)<<endl;
    return 0;
}


ba64c697fdeda3556c11942ec7e4c458_9c7d95e944b24b2db1551e7703637865.png


操作系统会直接终止写段,杀死进程


特点


只能用于具有公共祖先的进程之间的进程;通常,一个管道由一个进程创建,然后该进程调用fork,此后父子进程之间就可使用该管道

管道提供流式服务

进程退出,管道释放,管道生命周期随进程

内核会对管道操作进行同步和互斥

管道是半双工,数据只能向一个方向流动;如果需要双方通信,需要建立两个管道


进程池


父进程通过管道创建多个子进程:向每个管道中都写入不同的任务,每个子进程都读入到相应的任务并执行


图解:

f9323cf772be33475e23dc218fcf2b66_4fea85d00588406693ed4efdec6b7fc1.png


#include <iostream>
#include <ctime>
#include <sys/types.h>
#include <unistd.h>
#include <vector>
#include <cassert>
#include <string>
#include <cstdio>
#include <sys/types.h>
#include <sys/wait.h>
#define MakeSeed() srand((unsigned long)time(nullptr)^getpid())
#define PROCESS_NUM 8
using namespace std;
typedef void(*func_t)();
void DownLoadTask()
{
    cout<<getpid()<<": 下载任务\n"<<endl;
    sleep(1);
}
void IOTask()
{
    cout<<getpid()<<": IO任务\n"<<endl;
    sleep(1);
}
void FlushTask()
{
    cout<<getpid()<<": 刷新任务\n"<<endl;
    sleep(1);
}
void LoadTaskFunc(vector<func_t>* func_t)
{
    assert(func_t);
    func_t->push_back(DownLoadTask);
    func_t->push_back(IOTask);
    func_t->push_back(FlushTask);
}
class subEP
{
public:
    subEP(pid_t subid,int writefd)
        :_subid(subid)
        ,_writefd(writefd)
    {
        char namebuffer[1024];
        snprintf(namebuffer,sizeof namebuffer,"process-%d[pid(%d)-fd(%d)]",_num++,_subid,_writefd);
        _name=namebuffer;
    }
public:
    static int _num;
    string _name;
    pid_t _subid;
    int _writefd;
};
int subEP::_num=0;
void SentTask(const subEP &process,int tasknum)
{
    cout<<"send task num: "<<tasknum<<" to -> "<<process._name<<endl;
    int n=write(process._writefd,&tasknum,sizeof tasknum);
    assert(n==sizeof(int));
    (void)n;
}
int recvTask(int readfd)
{
    int code=0;
    ssize_t s=read(readfd,&code,sizeof code);
    if(s==4)
        return code;
    else if(s<=0)
        return -1;
    else
        return 0;
}
void CreateSubProcess(vector<subEP>* subs,vector<func_t>& funcmap)
{
    vector<int> deletefd;
    for(int i=0;i<PROCESS_NUM;i++)
    {
        int fds[2];
        int n=pipe(fds);
        assert(n==0);
        (void)n;
        pid_t id=fork();
        //子进程,处理任务
        if(id==0)
        {
            for(int i=0;i<deletefd.size();i++)
                close(deletefd[i]);
            //子进程关闭写端
            close(fds[1]);
            while(true)
            {
                //1.获取命令码,如果没有发送,子进程应该阻塞
                int CommandCode=recvTask(fds[0]);
                if(CommandCode>=0 && CommandCode<funcmap.size())
                {
                    funcmap[CommandCode]();
                }
                else if(CommandCode==-1)
                    break;
            }
            exit(0);
        }
        //父进程关闭读端
        close(fds[0]);
        subEP sub(id,fds[1]);
        subs->push_back(sub);
        deletefd.push_back(fds[1]);
    }
}
void LoadBlanceContrl(vector<subEP>& subs,const vector<func_t>&funcmap,int count)
{
    int processnum=subs.size();
    int tasknum=funcmap.size();
    bool forever=(count==0?true:false);
    while(true)
    {
        //1.选择一个子进程
        int subIndex=rand()%processnum;
        //2.选择一个任务
        int taskIndex=rand()%tasknum;
        //3.任务发送给选择的进程
        SentTask(subs[subIndex],taskIndex);
        sleep(1);
        if(!forever)
        {
            count--;
            if(count==0)
                break;
        }
    }
    //写端停止,读端读取到结束
    for(int i=0;i<processnum;i++)
    {
        close(subs[i]._writefd);
    }
}
void WaitProcess(vector<subEP> processes)
{
    int processnum=processes.size();
    for(int i=0;i<processnum;i++)
    {
        waitpid(processes[i]._subid,NULL,0);
        cout<<"wait process success ... "<<processes[i]._subid<<endl;
    }
}
int main()
{
    //时间种子
    MakeSeed();
    //1.建立子进程并创建子进程通信管道
    //1.1.加载方法表
    vector<func_t> funcmap;
    LoadTaskFunc(&funcmap);
    //1.2.创建子进程,维护父子通信信道(管道)
    vector<subEP> subs;
    //1.3.给子进程分配函数
    CreateSubProcess(&subs,funcmap);
    //2.父进程,控制子进程,负载均衡地向子进程发送命令码
    int TaskCnt=3;//0,永远进行
    LoadBlanceContrl(subs,funcmap,TaskCnt);
    //3.回收子进程信息
    WaitProcess(subs);
    return 0;
}

b8872136d927383d290c5851c25ba693_eced8f98411f4012a7c81540e6011d10.png

20b53500d04e3cdefd1069706775b5f8_1648cea5c6bb4ceb9ed3d07204ec8a12.png


目录
相关文章
|
2天前
|
消息中间件 缓存 Shell
|
6月前
|
存储 消息中间件 Linux
IPC(进程间通信)(上)
IPC(进程间通信)
34 0
|
6月前
|
API
IPC(进程间通信)(下)
IPC(进程间通信)(下)
22 0
|
10月前
进程间通信(二)
进程间通信
56 0
|
11月前
|
消息中间件
进程间通信(非常实用)
进程间通信(非常实用)
|
开发者
进程间通信 | 学习笔记
快速学习进程间通信,介绍了进程间通信系统机制, 以及在实际应用过程中如何使用。
72 0
进程间通信——共享内存
进程间通信——共享内存
186 0
进程间通信——共享内存
|
C++ Windows Unix
C++进程间通信的十一种方法
转载:https://www.cnblogs.com/swunield/articles/3893250.html 进程通常被定义为一个正在运行的程序的实例,它由两个部分组成: 一个是操作系统用来管理进程的内核对象。
2745 0