【Linux】进程间通信概念 | 匿名管道

[!Abstract] 进程间通信重点

  • 进程间通信介绍
  • 管道
  • 消息队列
  • 共享内存
  • 信号量

一、什么是进程间通信

进程间通信的概念

进程间通信简称IPC(Interprocess communication),是操作系统中的一个重要概念,它允许不同的进程在执行过程中交换数据、共享资源、协调行为等。在多道程序设计环境下,多个进程可能需要相互通信以完成复杂的任务,而进程间通信提供了各种机制来实现这种交互

进程间通信的目的

  • 数据传输:一个进程需要将它的数据发送给另一个进程
  • 资源共享:多个进程之间共享同样的资源。
  • 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
  • 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。

进程间通信的分类

  1. 管道:管道是最早的进程间通信机制之一,最早出现在UNIX系统中。它是一种简单而有效的通信方式,适用于具有父子关系的进程。管道只能用于具有共同祖先的进程之间的通信,通常用于父进程与子进程之间。管道分为:

    • 匿名管道pipe
    • 命名管道
  2. System V IPC:是一套在UNIX系统中引入的标准,包括:

    • System V 消息队列
    • System V 共享内存
    • System V 信号量
      System V IPC 提供了更为灵活和通用的进程间通信机制,使得不同进程之间能够更灵活地交换信息和共享资源。
  3. POSIX IPC:是为UNIX-like系统定义的一套标准。POSIX 进程间通信机制是在System V IPC的基础上进行改进和扩展的,以提供更简单和一致的接口。POSIX IPC包括:

    • 消息队列
    • 共享内存
    • 信号量
    • 互斥量
    • 条件变量
    • 读写锁

进程间通信的本质

进程通信的本质是,让不同的进程看到同一份资源
这种资源通常由操作系统提供。


二、什么是管道

管道是Unix中最古老的进程间通信的形式。
我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”

例如,统计我们当前使用云服务器上的登录用户个数。可以在bash下输入命令 who | wc -l 执行一条简单的管道操作,这条命令的作用是将两个命令连接起来,将第一个命令的输出作为第二个命令的输入。
请添加图片描述

  1. who:这个命令通常用于显示当前登录系统的用户信息,包括用户名、登录时间等。执行 who 会输出一些用户信息的列表。

  2. |:这是管道符号,它将第一个命令的输出传递给第二个命令的输入。在这个例子中,它将 who 命令的输出传递给下一个命令。

  3. wc -lwc 是用于统计文件中行数、字数和字符数的命令,而 -l 参数表示只统计行数。因此,wc -l 会对输入的文本进行行数统计。

请添加图片描述

三、匿名管道

匿名管道的原理

匿名管道用于进程间通信,且仅限于本地父子进程之间的通信。
使用匿名管道实现父子进程间通信的原理就是,让两个父子进程先看到同一份被打开(内存中)的文件资源,然后父子进程就可以对该文件进行写入或是读取操作,进而实现父子进程间通信。

一旦数据被读取,数据就不再存在于管道中,管道的容量被释放,可以用于接收新的数据。这确保了管道中的数据总是按照它们被写入的顺序被读取。可以理解为栈的先进先出,写入时入栈,读取时出栈。

✨站在内核角度理解管道

看待管道,就如同看待文件一样,管道的使用和文件一致,迎合了“Linux一切皆文件思想”。

请添加图片描述

注意:

  1. 为什么父进程对匿名管道文件进行写操作的时候,不会发生写时拷贝?
    匿名管道的数据传递是通过内核缓冲区进行的,而不是直接访问用户空间的内存。当父进程写入数据时,数据首先被复制到内核缓冲区,然后再由内核传递给子进程。这种传递方式不涉及用户空间的共享,因此不会引发写时拷贝。记住,写时拷贝发生在用户空间!

  1. 管道用的是文件的方案,那操作系统为什么不把进程进行通信的数据刷新到磁盘当中?
    因为这样做有IO参与会降低效率,而且也没有必要。也就是说,这种文件是一批不会把数据写到磁盘当中的文件,换句话说,磁盘文件和内存文件不一定是一一对应的,有些文件只会在内存当中存在,而不会在磁盘当中存在。
✨站在文件描述符角度理解管道

请添加图片描述

pipe系统调用

pipe函数用于创建匿名管道,pipe函数的函数原型和需要包含的头文件如下:

   #include <unistd.h>
   int pipe(int pipefd[2]);
   #include <fcntl.h>              /* Obtain O_* constant definitions */
   #include <unistd.h>
   int pipe2(int pipefd[2], int flags);

pipe函数的参数是一个输出型参数,数组pipefd用于返回两个指向管道读端和写端的文件描述符:

数组元素含义
pipefd[0]表示管道的读端
pipefd[1]表示管道的写端

返回值:pipe函数调用成功时返回0,调用失败时返回-1,并设置errno来指示错误类型。

fork后在父子进程间使用管道通信

在创建匿名管道实现父子进程间通信的过程中,需要pipe函数和fork函数搭配使用,具体步骤如下:请添加图片描述

代码实现

例子:从键盘读取数据,子进程写入管道,父进程读取管道,写到屏幕

#include <iostream>
#include <cassert>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>

#define MAX 1024

using namespace std;

int main()
{
    // 第1步,建立管道
    int pipefd[2] = {0};
    int n = pipe(pipefd);
    assert(n == 0); // 意料之中,用assert,意料之外,用if
    (void)n; // 防止编译器告警
    cout << "pipefd[0]: " << pipefd[0] << ", pipefd[1]: " << pipefd[1] << endl;

    // 第2步,创建子进程
    pid_t id = fork();
    if (id < 0)
    {
        perror("fork");
        return 1;
    }
    
    // 子写,父读
    // 第3步,父子关闭不需要的fd,形成单向通信的管道
    if (id == 0)
    {
        // 子进程 - 关闭读端
        close(pipefd[0]);
        // w - 只向管道写入,没有打印
        int cnt = 0;
        while(true)
        {
            char message[MAX];
            snprintf(message, sizeof(message), "hello father, I am child, pid: %d, cnt: %d", getpid(), cnt);
            cnt++;
            write(pipefd[1], message, strlen(message));
            sleep(1);

            if(cnt > 10) break;
        }
        cout << "child close w piont, quit" << endl;
        close(pipefd[1]);
        exit(0);
    }

    // 父进程 - 关闭写端
    close(pipefd[1]);

    char buffer[MAX];
    while(true)
    {
        ssize_t n = read(pipefd[0], buffer, sizeof(buffer)-1);
        if(n > 0)
        {
            buffer[n] = '\0'; // '\0', 当做字符串尾
            cout << getpid() << ", " << "child say: " << buffer << " to me!" << endl;
        }
        else if(n == 0)
        {
            cout << "child quit, me too !" << endl;
            break;
        }
    }
    cout << "father read point close"<< endl;
    close(pipefd[0]);

    sleep(5);
    int status = 0;
    pid_t rid = waitpid(id, &status, 0);
    if (rid == id)
    {
        cout << "wait success, child exit sig: " << (status&0x7F) << endl;
    }
    return 0;
}

运行结果:
请添加图片描述

匿名管道的读写规则

  1. 当没有数据可读时

    • O_NONBLOCK disable:read调用阻塞,即进程暂停执行,一直等到有数据来到为止。
    • O_NONBLOCK enable:read调用返回-1,errno值为EAGAIN。
  2. 当管道满的时

    • O_NONBLOCK disable: write调用阻塞,直到有进程读走数据
    • O_NONBLOCK enable:调用返回-1,errno值为EAGAIN
  3. 如果所有管道写端对应的文件描述符被关闭,则read返回0

总结一下就是:
在使用管道时,涉及到 openreadwrite 等系统调用时,可能会发生阻塞等待的情况:

  1. open系统调用:

    • 打开管道的读端或写端时,如果另一端尚未被打开,打开调用可能会阻塞等待。
    • 当打开读端时,通常期望有其他进程打开相应的写端,否则读端打开可能会一直阻塞,直到写端被打开。
    • 当打开写端时,通常期望有其他进程打开相应的读端,否则写端打开可能会一直阻塞,直到读端被打开。
  2. read系统调用:

    • 当从管道中读取数据时,如果管道为空,read 调用可能会阻塞等待,直到有数据可读。
    • 如果管道的写端已经关闭,并且没有数据可供读取,read 调用将返回0,表示已经读到了文件末尾(EOF)。
  3. write系统调用:

    • 当向管道中写入数据时,如果管道满了,write 调用可能会阻塞等待,直到有空间可写。
    • 如果管道的读端已经关闭,而且没有进程读取数据,write 调用可能会导致信号 SIGPIPE 被发送给写入进程。
  1. 如果所有管道读端对应的文件描述符被关闭,则write操作会产生信号SIGPIPE,进而可能导致正在write的进程退出。

验证一下:

#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/wait.h>

int main()
{
	int fd[2] = { 0 };
	if (pipe(fd) < 0)
	{ //使用pipe创建匿名管道
		perror("pipe");
		return 1;
	}

	pid_t id = fork(); //使用fork创建子进程

	//子写,父读
	if (id == 0)
	{
		//child
		close(fd[0]); //子进程关闭读端
		//子进程向管道写入数据
		const char* msg = "hello father, I am child...";
		int count = 10;
		while (count--)
		{
			write(fd[1], msg, strlen(msg));
			sleep(1);
		}
		close(fd[1]); //子进程写入完毕,关闭文件
		exit(0);
	}

	//father
	close(fd[1]); //父进程关闭写端
	close(fd[0]); //父进程直接关闭读端(导致子进程被操作系统杀掉)

	int status = 0;
	waitpid(id, &status, 0);
	printf("child exit signal:%d\n", status & 0x7F); //打印子进程收到的信号
	return 0;
}

子进程没有正常向管道内写入,而是直接退出,退出信号是13,通过kill -l命令可以查看13对应的具体信号。
请添加图片描述

操作系统向子进程发送的是SIGPIPE信号将子进程终止。

  1. 原子性:
    • 当要写入的数据量不大于PIPE_BUF时,linux将保证写入的原子性。
    • 当要写入的数据量大于PIPE_BUF时,linux将不再保证写入的原子性。

[!Abstract] 关于原子性


在Linux中,当写入的数据量不超过PIPE_BUF时,内核会尽力保证写入的原子性。原子性是指一个操作在执行的过程中不会被中断,要么全部执行成功,要么全部不执行,不存在部分执行的情况。

PIPE_BUF 是一个常量,表示管道缓冲区的原子大小,其值是系统相关的,通常是4096字节。当要写入的数据量小于等于 PIPE_BUF 时,写入操作将被视为原子操作。这意味着,如果有多个进程尝试同时写入不超过 PIPE_BUF 大小的数据到同一个管道,操作系统会保证这些数据不会相互交叉,即写入的数据是完整的。

然而,当要写入的数据量大于 PIPE_BUF 时,Linux不再保证写入的原子性。这是因为在写入大量数据时,内核可能需要多次切换上下文,而这期间其他进程也可能进行写入操作,导致写入的数据不再是原子的。这并不意味着数据写入一定会出现截断或混淆,但是操作系统不再保证原子性。

原子性在并发编程中是一个重要的概念,它确保多个线程或进程在访问共享资源时不会导致数据不一致或损坏。因此,了解在特定情况下操作的原子性是确保并发程序正确执行的重要一步。


管道的5种特性

1. 匿名管道的局限性

匿名管道,可以允许具有血缘关系的进程之间进行进程间通信,常用于父子,也可以用于兄弟爷孙,匿名管道的场景仅限于此

2. 管道内部自带同步与互斥机制

我们将一次只允许一个进程使用的资源,称为临界资源。管道在同一时刻只允许一个进程对其进行写入或是读取操作,因此管道也就是一种临界资源。

临界资源是需要被保护的,若是我们不对管道这种临界资源进行任何保护机制,那么就可能出现同一时刻有多个进程对同一管道进行操作的情况,进而导致同时读写、交叉读写以及读取到的数据不一致等问题。

为了避免这些问题,内核会对管道操作进行同步与互斥:

  • 同步: 两个或两个以上的进程在运行过程中协同步调,按预定的先后次序运行。比如,A任务的运行依赖于B任务产生的数据。
  • 互斥: 一个公共资源同一时刻只能被一个进程使用,多个进程不能同时使用公共资源。

实际上,同步是一种更为复杂的互斥,而互斥是一种特殊的同步。对于管道的场景来说,互斥就是两个进程不可以同时对管道进行操作,它们会相互排斥,必须等一个进程操作完毕,另一个才能操作,而同步也是指这两个不能同时对管道进行操作,但这两个进程必须要按照某种次序来对管道进行操作。

也就是说,互斥具有唯一性和排它性,但互斥并不限制任务的运行顺序,而同步的任务之间则有明确的顺序关系。

3. 管道的生命周期随进程:

管道(通常指的是匿名管道)的生命周期是与创建它的进程相关联的。当一个进程创建了一个管道后,这个管道会一直存在。当进程退出时,操作系统会自动关闭所有打开的文件描述符,包括管道相关的文件描述符。关闭文件描述符会触发相应的资源释放操作,例如,管道中的缓冲区、文件表项等。

4. 管道提供的是面向字节流的流式服务:

对于进程A写入管道当中的数据,进程B每次从管道读取的数据的多少是任意的,这种被称为流式服务,与之相对应的是数据报服务:

  • 流式服务: 数据没有明确的分割,不分一定的报文段。
  • 数据报服务: 数据有明确的分割,拿数据按报文段拿。

管道提供的是面向字节流的服务,也就是说,它将数据视为一系列的字节,而不考虑字节之间的结构。这与面向消息的通信机制(如消息队列)不同,消息队列更注重消息的边界和结构。

在面向字节流的管道中,数据是连续的流,没有明确的消息边界。这种特性使得管道适用于一些场景,例如通过管道传递文本或二进制数据。但需要注意的是,由于没有消息边界的概念,接收端可能需要额外的协议或标记来解释和处理数据。

5. 管道是单向通信的,半双工通信的一种特殊情况:

在数据通信中,数据在线路上的传送方式可以分为以下三种:

  1. 单工通信(Simplex Communication):单工模式的数据传输是单向的。通信双方中,一方固定为发送端,另一方固定为接收端。
  2. 半双工通信(Half Duplex):半双工数据传输指数据可以在一个信号载体的两个方向上传输,但是不能同时传输。
  3. 全双工通信(Full Duplex):全双工通信允许数据在两个方向上同时传输,它的能力相当于两个单工通信方式的结合。全双工可以同时(瞬时)进行信号的双向传输。

管道是一种单向通信机制,通常是半双工(Half-Duplex)的。半双工通信意味着数据在一个方向上传输,而在另一个方向上传输时需要另外的管道。在典型的匿名管道中,一个进程负责写入数据,而另一个进程负责读取数据。要实现双向通信,需要创建两个独立的管道,或者考虑其他的通信机制(如全双工通信的命名管道或套接字)。

单向通信和半双工通信的特性使得管道更适合一些特定的应用场景,如父子进程之间的通信,或者通过管道将输出从一个进程传递到另一个进程。

[!Improtant] 重新理解命令行中的管道“|”,和pipe系统调用:


Bash中的 | 管道以及通过pipe系统调用创建的管道都是匿名管道
事实上,Bash中的 | 管道符底层调用了一些系统调用,其中就包括 pipe 系统调用。在Linux系统中,pipe 系统调用用于创建管道,而fork 系统调用用于创建子进程。通过这两个系统调用的组合,Bash能够实现进程间通信。具体步骤如下:

  1. Bash 使用 pipe 系统调用创建一个管道,得到两个文件描述符,一个用于管道的写入端,一个用于读取端。
  2. Bash 使用 fork 系统调用创建一个子进程。这个子进程将成为 | 符号左侧命令的进程。
  3. 在父进程(Bash)中,将标准输出(文件描述符1)重定向到管道的写入端。
  4. 在子进程中,将标准输入(文件描述符0)重定向到管道的读取端。
  5. Bash 分别执行 | 符号两侧的命令,它们分别成为父进程和子进程的执行体。

这样,左侧命令的输出就通过管道传递给了右侧命令,实现了进程间通信。所以,Bash中的 | 管道符在底层使用了 pipefork 系统调用来创建管道和子进程。


四、运用匿名管道建立进程池

ProcessPool.cc:

#include <iostream>
#include <string>
#include <vector>
#include <cassert>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include "Task.hpp"

const int num = 5;
static int number = 1;

class channel
{
public:
    channel(int fd, pid_t id) : ctrlfd(fd), workerid(id)
    {
        name = "channel-" + std::to_string(number++);
    }

public:
    int ctrlfd;
    pid_t workerid;
    std::string name;
};

void Work()
{
    while (true)
    {
        int code = 0;
        ssize_t n = read(0, &code, sizeof(code));
        if (n == sizeof(code))
        {
            if (!init.CheckSafe(code))
                continue;
            init.RunTask(code);
        }
        else if (n == 0)
        {
            break;
        }
        else
        {
            // do nothing
        }
    }

    std::cout << "child quit" << std::endl;
}

void PrintFd(const std::vector<int>& fds)
{
    std::cout << getpid() << " close fds: ";
    for (auto fd : fds)
    {
        std::cout << fd << " ";
    }
    std::cout << std::endl;
}

// 传参形式:
// 1. 输入参数:const &
// 2. 输出参数:*
// 3. 输入输出参数:&
void CreateChannels(std::vector<channel>* c)
{
    // bug
    std::vector<int> old;
    for (int i = 0; i < num; i++)
    {
        // 1. 定义并创建管道
        int pipefd[2];
        int n = pipe(pipefd);
        assert(n == 0);
        (void)n;

        // 2. 创建进程
        pid_t id = fork();
        assert(id != -1);

        // 3. 构建单向通信信道
        if (id == 0) // child
        {
            if (!old.empty())
            {
                for (auto fd : old)
                {
                    close(fd);
                }
                PrintFd(old);
            }
            close(pipefd[1]);
            dup2(pipefd[0], 0);
            Work();
            exit(0); // 会自动关闭自己打开的所有的fd
        }

        // father
        close(pipefd[0]);
        c->push_back(channel(pipefd[1], id));
        old.push_back(pipefd[1]);
        // childid, pipefd[1]
    }
}

void PrintDebug(const std::vector<channel>& c)
{
    for (const auto& channel : c)
    {
        std::cout << channel.name << ", " << channel.ctrlfd << ", " << channel.workerid << std::endl;
    }
}

void SendCommand(const std::vector<channel>& c, bool flag, int num = -1)
{
    int pos = 0;
    while (true)
    {
        // 1. 选择任务
        int command = init.SelectTask();

        // 2. 选择信道(进程)
        const auto& channel = c[pos++];
        pos %= c.size();

        // debug
        std::cout << "send command " << init.ToDesc(command) << "[" << command << "]"
            << " in "
            << channel.name << " worker is : " << channel.workerid << std::endl;

        // 3. 发送任务
        write(channel.ctrlfd, &command, sizeof(command));

        // 4. 判断是否要退出
        if (!flag)
        {
            num--;
            if (num <= 0)
                break;
        }
        sleep(1);
    }

    std::cout << "SendCommand done..." << std::endl;
}
void ReleaseChannels(std::vector<channel> c)
{
    // version 2
    // int num = c.size() - 1;

    // for (; num >= 0; num--)
    // {
    //     close(c[num].ctrlfd);
    //     waitpid(c[num].workerid, nullptr, 0);
    // }

    // version 1
    for (const auto& channel : c)
    {
        close(channel.ctrlfd);
        waitpid(channel.workerid, nullptr, 0);
    }
    // for (const auto &channel : c)
    // {
    //     pid_t rid = waitpid(channel.workerid, nullptr, 0);
    //     if (rid == channel.workerid)
    //     {
    //         std::cout << "wait child: " << channel.workerid << " success" << std::endl;
    //     }
    // }
}
int main()
{
    std::vector<channel> channels;
    // 1. 创建信道,创建进程
    CreateChannels(&channels);

    // 2. 开始发送任务
    const bool g_always_loop = true;
    // SendCommand(channels, g_always_loop);
    SendCommand(channels, !g_always_loop, 10);

    // 3. 回收资源,想让子进程退出,并且释放管道,只要关闭写端
    ReleaseChannels(channels);

    return 0;
}

Task.hpp:

#pragma once

#include <iostream>
#include <functional>
#include <vector>
#include <ctime>
#include <unistd.h>

// using task_t = std::function<void()>;
typedef std::function<void()> task_t;

void Download()
{
    std::cout << "我是一个下载任务"
        << " 处理者: " << getpid() << std::endl;
}

void PrintLog()
{
    std::cout << "我是一个打印日志的任务"
        << " 处理者: " << getpid() << std::endl;
}

void PushVideoStream()
{
    std::cout << "这是一个推送视频流的任务"
        << " 处理者: " << getpid() << std::endl;
}

// void ProcessExit()
// {
//     exit(0);
// }

class Init
{
public:
    // 任务码
    const static int g_download_code = 0;
    const static int g_printlog_code = 1;
    const static int g_push_videostream_code = 2;
    // 任务集合
    std::vector<task_t> tasks;

public:
    Init()
    {
        tasks.push_back(Download);
        tasks.push_back(PrintLog);
        tasks.push_back(PushVideoStream);

        srand(time(nullptr) ^ getpid());
    }
    bool CheckSafe(int code)
    {
        if (code >= 0 && code < tasks.size())
            return true;
        else
            return false;
    }
    void RunTask(int code)
    {
        return tasks[code]();
    }
    int SelectTask()
    {
        return rand() % tasks.size();
    }
    std::string ToDesc(int code)
    {
        switch (code)
        {
        case g_download_code:
            return "Download";
        case g_printlog_code:
            return "PrintLog";
        case g_push_videostream_code:
            return "PushVideoStream";
        default:
            return "Unknow";
        }
    }
};

Init init; // 定义对象

Makefile:

proc:ProcessPool.cc
    g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
    rm -f proc

运行:
请添加图片描述


如果涉及到在文件系统中创建一个有名的管道,那么就是在使用命名管道,下一篇文章我们讲命名管道的概念。