Linux IPC实践(2) --匿名PIPE

简介: 管道概念   管道是Unix中最古老的进程间通信的形式,我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”, 管道的本质是固定大小的内核缓冲区;   如:ps aux | gre...

管道概念

   管道是Unix中最古老的进程间通信的形式,我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”, 管道的本质是固定大小的内核缓冲区;

   如:ps aux | grep httpd | awk '{print $2}' 

管道限制

   1)管道是半双工的,数据只能向一个方向流动;需要双方通信时,需要建立起两个管道;

   2)匿名管道只能用于具有共同祖先的进程(如父进程与fork出的子进程)之间进行通信, 原因是pipe创建的是两个文件描述符, 不同进程直接无法直接获得;[通常,一个管道由一个进程创建,然后该进程调用fork,此后父子进程共享该管道]

匿名管道pipe

#include <unistd.h>
int pipe(int pipefd[2]);

创建一无名管道

参数

   Pipefd:文件描述符数组,其中pipefd[0]表示读端,pipefd[1]表示写端

 

管道创建示意图



/**示例: 从子进程向父进程发送数据
管道示意图如上面第二副图
**/
int main()
{
    int fd[2];
    if (pipe(fd) == -1)
        err_exit("pipe error");

    pid_t pid = fork();
    if (pid == -1)
        err_exit("fork error");
    if (pid == 0)   //子进程: 向管道中写入数据
    {
        close(fd[0]);   //关闭读端
        string str("message from child process!");
        write(fd[1], str.c_str(), str.size());  //向写端fd[1]写入数据
        close(fd[1]);
        exit(EXIT_SUCCESS);
    }

    //父进程: 从管道中读出数据
    close(fd[1]);   //关闭写端
    char buf[BUFSIZ] = {0};
    read(fd[0], buf, sizeof(buf));
    close(fd[0]);
    cout << buf << endl;
}
/**示例: 用管道模拟: ls | wc -w的运行
1.子进程运行ls
2.父进程运行wc -w
3.通过管道, 将子进程的输出发送到wc的输入
**/
int main()
{
    int pipefd[2];
    if (pipe(pipefd) == -1)
        err_exit("pipe error");

    pid_t pid = fork();
    if (pid == -1)
        err_exit("fork error");
    if (pid == 0)   //子进程
    {
        close(pipefd[0]);   //关闭读端
        //使得STDOUT_FILENO也指向pipefd[1],亦即ls命令的输出将打印到管道中
        dup2(pipefd[1], STDOUT_FILENO); //此时可以关闭管道写端
        close(pipefd[1]);

        execlp("/bin/ls", "ls", NULL);
        //如果进程映像替换失败,则打印下面出错信息
        cerr << "child execlp error" << endl;;
        exit(EXIT_FAILURE);
    }

    //父进程
    close(pipefd[1]);   //关闭写端
    //使得STDIN_FILENO也指向pipefd[2],亦即wc命令将从管道中读取输入
    dup2(pipefd[0], STDIN_FILENO);
    close(pipefd[0]);

    execlp("/usr/bin/wc", "wc", "-w", NULL);
    cerr << "parent execlp error" << endl;
    exit(EXIT_FAILURE);
}

匿名管道读写规则

规则 1)管道空时

   O_NONBLOCK disable:read调用阻塞,即进程暂停执行,一直等到有数据来到为止。

   O_NONBLOCK enable:read调用返回-1,errno值为EAGAIN

//验证
int main()
{
    int pipefd[2];
    if (pipe(pipefd) != 0)
        err_exit("pipe error");

    pid_t pid = fork();
    if (pid == -1)
        err_exit("fork error");

    if (pid == 0)   //In Child, Write pipe
    {
        sleep(10);
        close(pipefd[0]);   //Close Read pipe
        string str("I Can Write Pipe from Child!");

        write(pipefd[1],str.c_str(),str.size());    //Write to pipe
        close(pipefd[1]);
        exit(EXIT_SUCCESS);
    }

    //In Parent, Read pipe
    close(pipefd[1]);   //Close Write pipe
    char buf[1024] = {0};

    //Set Read pipefd UnBlock! 查看在下面四行语句注释的前后有什么区别
//    int flags = fcntl(pipefd[0],F_GETFL, 0);
//    flags |= O_NONBLOCK;
//    if (fcntl(pipefd[0],F_SETFL,flags) == -1)
//        err_exit("Set UnBlock error");


    int readCount = read(pipefd[0],buf,sizeof(buf));    //Read from pipe
    if (readCount < 0)
        //read立刻返回,不再等待子进程发送数据
        err_exit("read error");

    cout << "Read from pipe: " << buf << endl;
    close(pipefd[0]);
}

规则 2)管道满时

   O_NONBLOCK disable: write调用阻塞,直到有进程读走数据

   O_NONBLOCK enable:调用返回-1,errno值为EAGAIN

/** 验证规则2)
同时测试管道的容量
**/
int main()
{
    if (signal(SIGPIPE, handler) == SIG_ERR)
        err_exit("signal error");

    int pipefd[2];
    if (pipe(pipefd) != 0)
        err_exit("pipe error");

    // 将管道的写端设置成为非阻塞模式
    // 将下面三行注释之后查看效果
    int flags = fcntl(pipefd[1], F_GETFL, 0);
    if (fcntl(pipefd[1], F_SETFL, flags|O_NONBLOCK) == -1)
        err_exit("fcntl set error");

    int count = 0;
    while (true)
    {
        if (write(pipefd[1], "A", 1) == -1)
        {
            cerr << "write pipe error: " << strerror(errno) << endl;
            break;
        }
        ++ count;
    }

    cout << "pipe size = " << count << endl;
}

3)如果所有管道写端对应的文件描述符被关闭,则read返回0

//验证规则3)
int main()
{
    int pipefd[2];
    if (pipe(pipefd) != 0)
        err_exit("pipe error");

    pid_t pid = fork();
    if (pid == -1)
        err_exit("fork error");
    else if (pid == 0)
    {
        close(pipefd[1]);
        exit(EXIT_SUCCESS);
    }

    close(pipefd[1]);
    sleep(2);
    char buf[2];
    if (read(pipefd[0], buf, sizeof(buf)) == 0)
        cout << "sure" << endl;
}

4)如果所有管道读端对应的文件描述符被关闭,则write操作会产生信号SIGPIPE

//验证规则4)
int main()
{
    if (signal(SIGPIPE, handler) == SIG_ERR)
        err_exit("signal error");

    int pipefd[2];
    if (pipe(pipefd) != 0)
        err_exit("pipe error");

    pid_t pid = fork();
    if (pid == -1)
        err_exit("fork error");
    else if (pid == 0)
    {
        close(pipefd[0]);
        exit(EXIT_SUCCESS);
    }

    close(pipefd[0]);
    sleep(2);
    char test;
    if (write(pipefd[1], &test, sizeof(test)) < 0)
        err_exit("write error");
}

Linux PIPE特征

   1)当要写入的数据量不大于PIPE_BUF时,Linux将保证写入的原子性

   2)当要写入的数据量大于PIPE_BUF时,Linux将不再保证写入的原子性。


man说明:

POSIX.1-2001 says that write(2)s of less than PIPE_BUF bytes must be atomic:  

the  output data is written to the pipe as a contiguous sequence.  

Writes of more than PIPE_BUF bytes may be nonatomic: 

the kernel may interleave the data with  data  written  by  other  processes.  

POSIX.1-2001 requires PIPE_BUF to be at least 512 bytes.  

(On Linux, PIPE_BUF is 4096 bytes. 在Linux当中, PIPE_BUF为4字节). 

The precise semantics depend on whether the file descriptor is  non-blocking(O_NONBLOCK),  

whether  there  are  multiple writers to the pipe, and on n, the number of bytes to be written:

O_NONBLOCK disabled(阻塞), n <= PIPE_BUF

   All n bytes are written atomically; write(2) may block if there is not room for  n bytes to be written immediately

O_NONBLOCK enabled(非阻塞), n <= PIPE_BUF

   If there is room to write n bytes to the pipe, then write(2) succeeds immediately, writing all n bytes; 

otherwise write(2) fails, with errno set to EAGAIN(注意: 如果空间不足以写入数据, 则一个字节也不写入, 直接出错返回).

 

O_NONBLOCK disabled, n > PIPE_BUF

   The write is nonatomic: the  data  given  to  write(2)  may  be  interleaved  with write(2)s by other process; 

the write(2) blocks until n bytes have been written.

O_NONBLOCK enabled, n > PIPE_BUF

   If  the  pipe  is full, then write(2) fails, with errno set to EAGAIN(此时也是没有一个字符写入管道).  

Otherwise, from 1 to n bytes may be written (i.e., a "partial write" may  occur;  

the  caller should  check  the  return value from write(2) to see how many bytes were actually written), 

and these bytes may be interleaved with writes by other processes.

/** 验证: 
已知管道的PIPE_BUF为4K, 我们启动两个进程A, B向管道中各自写入68K的内容, 然后我们以4K为一组, 查看管道最后一个字节的内容, 多运行该程序几次, 就会发现这68K的数据会有交叉写入的情况
**/
int main()
{
    const int TEST_BUF = 68 * 1024; //设置写入的数据量为68K
    char bufA[TEST_BUF];
    char bufB[TEST_BUF];
    memset(bufA, 'A', sizeof(bufA));
    memset(bufB, 'B', sizeof(bufB));

    int pipefd[2];
    if (pipe(pipefd) != 0)
        err_exit("pipe error");

    pid_t pid;
    if ((pid = fork()) == -1)
        err_exit("first fork error");
    else if (pid == 0)  //第一个子进程A, 向管道写入bufA
    {
        close(pipefd[0]);
        int writeBytes = write(pipefd[1], bufA, sizeof(bufA));
        cout << "A Process " << getpid() << ", write "
             << writeBytes << " bytes to pipe" << endl;
        exit(EXIT_SUCCESS);
    }

    if ((pid = fork()) == -1)
        err_exit("second fork error");
    else if (pid == 0)  //第二个子进程B, 向管道写入bufB
    {
        close(pipefd[0]);
        int writeBytes = write(pipefd[1], bufB, sizeof(bufB));
        cout << "B Process " << getpid() << ", write "
             << writeBytes << " bytes to pipe" << endl;
        exit(EXIT_SUCCESS);
    }

    // 父进程
    close(pipefd[1]);
    sleep(2);   //等待两个子进程写完
    char buf[4 * 1024]; //申请一个4K的buf
    int fd = open("save.txt", O_WRONLY|O_TRUNC|O_CREAT, 0666);
    if (fd == -1)
        err_exit("file open error");

    while (true)
    {
        int readBytes = read(pipefd[0], buf, sizeof(buf));
        if (readBytes == 0)
            break;
        if (write(fd, buf, readBytes) == -1)
            err_exit("write file error");
        cout << "Parent Process " << getpid() << " read " << readBytes
             << " bytes from pipe, buf[4095] = " << buf[4095] << endl;
    }
}


附-管道容量查询

man 7 pipe

 

注意: 管道的容量不一定就等于PIPE_BUF, 如在Ubuntu中, 管道容量为64K, 而PIPE_BUF为4K.

目录
相关文章
|
2月前
|
Ubuntu Linux vr&ar
IM跨平台技术学习(十二):万字长文详解QQ Linux端实时音视频背后的跨平台实践
本文详细记录了新版QQ音视频通话在 Linux 平台适配开发过程中的技术方案与实现细节,希望能帮助大家理解在 Linux 平台从 0 到 1 实现音视频通话能力的过程。
123 2
|
22天前
|
存储 人工智能 数据管理
深入理解Linux操作系统之文件系统管理探索人工智能:从理论到实践的旅程
【8月更文挑战第30天】在探索Linux的无限可能时,我们不可避免地会遇到文件系统管理这一核心话题。本文将深入浅出地介绍Linux文件系统的基础知识、操作命令及高级技巧,帮助你更有效地管理和维护你的系统。从基础概念到实践应用,我们将一步步揭开Linux文件系统的神秘面纱。
|
1月前
|
存储 Linux 数据处理
在Linux中,管道(pipe)和重定向(redirection)的是什么?
在Linux中,管道(pipe)和重定向(redirection)的是什么?
|
1月前
|
存储 安全 Linux
Linux存储安全:系统更新和补丁管理的策略与实践
【8月更文挑战第19天】安全是一个持续的过程,需要不断地评估、更新和改进策略。
33 0
|
1月前
|
存储 安全 Linux
Linux存储安全:数据加密的实践与策略
【8月更文挑战第19天】数据加密是Linux存储安全的基石之一。通过使用LUKS进行磁盘加密和使用GnuPG进行文件加密,可以显著提高数据的安全性。
43 0
|
1月前
|
存储 监控 安全
Linux存储安全:访问控制的实践与策略
【8月更文挑战第18天】Linux存储安全:访问控制的实践与策略
39 0
|
1月前
|
存储 安全 Linux
Linux存储安全:深入实践与案例分析
【8月更文挑战第18天】Linux存储安全是一个多层次、多维度的问题,需要从物理安全、访问控制、数据加密、审计监控、系统更新、备份策略等多个方面综合考虑。通过本文介绍的具体措施和案例代码,读者可以更好地理解如何在Linux系统中实施存储安全措施。安全是一个持续的过程,需要不断地评估、更新和改进策略。
60 0
|
2月前
|
监控 安全 Linux
Linux命令ssltap的深入解析与应用实践
`ssltap`是一个假想的Linux命令,用于模拟SSL/TLS流量分析。它捕获、解密(如果有密钥)并分析加密流量,提供实时监控、协议解析和安全审计。特点包括实时性、灵活性、可扩展性和安全性。示例用法包括捕获特定端口流量和实时监控会话状态。在实际操作中应注意私钥安全、性能影响及合规性,建议定期审计和自动化监控。
|
2月前
|
数据可视化 安全 Linux
探索Linux命令repo-graph:深入解析与应用实践
`repo-graph`是Linux的Yum-utils工具,用于可视化仓库中软件包的依赖关系,简化复杂网络管理。它通过分析元数据生成图形,支持自定义输出格式和特定包分析。例如,`repo-graph --repoid=updates`显示更新仓库的依赖,而`--packages=httpd`则专注httpd包。注意权限、复杂性和选择合适输出格式。定期分析和图形化展示是最佳实践。
|
3月前
|
消息中间件 运维 监控
Linux命令ipcs详解:IPC对象的全面洞察
`ipcs`命令详解:Linux下用于洞察IPC(消息队列、信号量、共享内存)对象的工具。它列出系统中的IPC资源,显示详细信息,如ID、所有者、权限等。参数如`-m`、`-q`、`-s`分别显示共享内存、消息队列和信号量信息。结合`-l`或`-c`可调整输出格式。定期检查IPC状态有助于系统管理和性能优化。需注意权限和谨慎操作。