在操作系统中,进程是分配资源的最小单位,不同的进程各有其独占的资源。如果要在不同进程之间共享资源,需要利用进程间通信(IPC)机制。常见的 IPC 机制包括:

  • 匿名管道(PIPE)
  • 命名管道(FIFO)
  • 共享内存(Shared Memory)
  • 套接字(Socket)
  • 消息队列(Message Queue)

下文将针对每种方式介绍其原理与使用方法,并给出相应的代码示例。

一、匿名管道(PIPE)

匿名管道(PIPE)由内核在内存中分配一个有限大小的环形缓冲区(通常为 64KB)。创建管道后,内核返回两个文件描述符:读端(fds[0])和写端(fds[1])。

PIPE 具有以下特点:

  • 半双工:同一时刻只能一个方向传输数据。
  • 阻塞 / 非阻塞:默认读端在缓冲区为空时阻塞;写端在缓冲区满时阻塞。可通过设置 O_NONBLOCK 改为非阻塞。
  • 局部可见:只能用于有亲缘关系的父子进程或者兄弟进程。
  • 生命周期:PIPE 在创建进程退出后关闭。

PIPE 的使用方法:

  1. 创建:调用pipe(int fds[2])
  2. 分支:通过fork(),子进程继承文件描述符。
  3. 重定向:使用 dup2()fds[0]重定向为标准输入或将 fds[1] 重定向为标准输出。
  4. 读写:父进程或一个子进程写,另一个子进程读;或反之。
  5. 关闭:使用完毕后关闭不需要的端close(),否则可能导致阻塞或死锁。

PIPE 示例:

  • C 端(父进程)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    #include <unistd.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <sys/wait.h>
    int main() {
    int fds[2];
    if (pipe(fds) == -1) { perror("pipe"); exit(1); }
    pid_t pid = fork();
    if (pid == 0) {
    // 子进程:关闭写端,将读端映射到 stdin
    close(fds[1]);
    dup2(fds[0], STDIN_FILENO);
    execlp("python3", "python3", "child.py", NULL);
    perror("execlp");
    exit(1);
    } else {
    // 父进程:关闭读端,写入数据
    close(fds[0]);
    const char *msg = "Hello from C via pipe\n";
    write(fds[1], msg, strlen(msg));
    close(fds[1]);
    wait(NULL);
    }
    return 0;
    }
  • Python 端(child.py)

    1
    2
    3
    4
    import sys

    for line in sys.stdin:
    print(f"Python child got: {line.strip()}")

二、命名管道(FIFO)

命名管道(FIFO, First In First Out)是内核在文件系统下创建的一个特殊文件节点。它与匿名管道类似,但通过路径使任意进程可打开。

FIFO 具有以下特点:

  • 半双工:默认单向传输,双向通信需要使用两个 FIFO。
  • 阻塞:若无写端打开,则读端阻塞;若无读端打开,则写端阻塞。
  • 全局可见:FIFO 通过文件名访问,可以被多个读写进程打开,进程之间无需亲缘关系。
  • 生命周期:FIFO 在创建后将存在于文件系统中,直到显式调用 unlink()os.remove()删除。

FIFO 的使用方法:

  1. 创建:在 C 中使用mkfifo(const char *path, mode_t mode),在 Python 中使用os.mkfifo(path)
  2. 打开:读端用open(path, O_RDONLY),写端用open(path, O_WRONLY)
  3. 读写:在 C 中读写用 read()/write(),在 Python 中open() 后读写。
  4. 关闭 / 删除:close()后可使用 unlink()/os.remove() 删除。

FIFO 示例:

  • Python 端(写入)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    import os, time

    fifo = '/tmp/myfifo'
    if not os.path.exists(fifo):
    os.mkfifo(fifo)
    with open(fifo, 'w') as f:
    for i in range(5):
    msg = f"Python->C message {i}\n"
    f.write(msg);
    f.flush()
    print(f"Sent: {msg.strip()}")
    time.sleep(1)
  • C 端(读取)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    #include <stdio.h>
    #include <fcntl.h>
    #include <unistd.h>
    int main() {
    const char *fifo = "/tmp/myfifo";
    if (access(fifo, F_OK) == -1) return 1;
    int fd = open(fifo, O_RDONLY);
    char buf[128];
    while (1) {
    ssize_t n = read(fd, buf, sizeof(buf)-1);
    if (n <= 0) break;
    buf[n] = '\0';
    printf("Received: %s", buf);
    }
    close(fd);
    return 0;
    }

三、共享内存(Shared Memory)

共享内存(Shared Memory)允许多个进程将同一物理页映射到它们的虚拟地址空间,实现零拷贝数据交换。

Shared Memory 具有以下特点:

  • 全双工:任意进程可同时读写共享内存,无方向限制。
  • 高性能:不涉及内核空间与用户空间拷贝,适合大数据量传输。
  • 同步要求高:不提供同步机制,必须配合信号量(Semaphore)、互斥锁(Mutex)等原语使用。
  • 生命周期:共享内存在调用 shm_unlink() 前一直存在,适合较长生命周期的数据共享。
  • 一致性机制:现代多核系统依赖缓存一致性协议(如 MESI)确保不同 CPU 核心上可见的共享内存视图一致。

Shared Memory 的使用方法:

  1. 创建:在 C 中使用shm_open(name, flags, mode),在 Python 中使用SharedMemory(name, create, size)
  2. 设置大小:在 C 中使用ftruncate(), 在 Python 中设置 size 参数。
  3. 映射:在 C 中使用mmap(),在 Python 中通过 buf 访问。
  4. 删除:在 C 中使用 munmap() 解出映射,并使用 shm_unlink() 删除。

Shared Memory 示例:

  • Python 端(写入)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    import posix_ipc
    import mmap
    import time

    # 1. 创建共享内存和信号量
    shm = posix_ipc.SharedMemory("/test", flags=posix_ipc.O_CREAT, size=128)
    mem_map = mmap.mmap(shm.fd, shm.size)
    sem_write = posix_ipc.Semaphore("/sem_write", flags=posix_ipc.O_CREAT, initial_value=1)
    sem_read = posix_ipc.Semaphore("/sem_read", flags=posix_ipc.O_CREAT, initial_value=0)

    # 2. 写入共享内存
    for k in range(5):
    sem_write.acquire() # 获取写锁
    mem_map.seek(0)
    mem_map.write(f"Hello from Python {k}\0".encode())
    mem_map.flush()
    sem_read.release() # 释放读锁
    time.sleep(1)

    # 3. 发送退出信号
    sem_write.acquire()
    mem_map.seek(0)
    mem_map.write(b"EXIT\0")
    mem_map.flush()
    sem_read.release()

    # 4. 清理资源
    mem_map.close()
    shm.close_fd()
    shm.unlink()
    sem_write.unlink()
    sem_read.unlink()
  • C 端(读取)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    #include <fcntl.h>
    #include <sys/mman.h>
    #include <semaphore.h>
    #include <stdio.h>
    #include <unistd.h>
    #include <string.h>
    #include <stdlib.h>

    #define SHM_NAME "/test"
    #define SEM_W_NAME "/sem_write"
    #define SEM_R_NAME "/sem_read"
    #define BUF_SIZE 256

    int main() {
    // 1. 打开信号量和共享内存
    sem_t *sem_write = sem_open(SEM_W_NAME, 0);
    sem_t *sem_read = sem_open(SEM_R_NAME, 0);
    if (sem_write==SEM_FAILED || sem_read==SEM_FAILED) {
    perror("sem_open");
    return 1;
    }

    int fd = shm_open(SHM_NAME, O_RDONLY, 0);
    if (fd == -1) {
    perror("shm_open");
    return 1;
    }

    void *addr = mmap(NULL, BUF_SIZE, PROT_READ, MAP_SHARED, fd, 0);
    if (addr == MAP_FAILED) {
    perror("mmap");
    return 1;
    }

    char buffer[BUF_SIZE];
    while (1) {
    // 2. 等待写端写好
    sem_wait(sem_read);

    // 3. 读取内容
    memcpy(buffer, addr, BUF_SIZE-1);
    buffer[BUF_SIZE-1] = '\0';

    // 4. 检查退出标志
    if (strcmp(buffer, "EXIT") == 0) {
    printf("[C] 收到退出信号,结束读取。\n");
    break;
    }

    // 5. 打印并允许写端写下一条
    printf("Read: %s\n", buffer);
    sem_post(sem_write);
    }

    // 6. 清理(写端负责 unlink,此处无需再 unlink)
    munmap(addr, BUF_SIZE);
    close(fd);
    sem_close(sem_write);
    sem_close(sem_read);

    return 0;
    }

四、套接字(Socket)

套接字(Socket)是操作系统提供的网络接口,支持多协议族(如 AF_UNIX、AF_INET),可实现全双工字节流或数据报交换。

Socket 具有以下特点:

  • 全双工:同一套接字描述符可以同时发送和接收数据,读写操作互不干扰。
  • 阻塞 / 非阻塞:套接字默认阻塞,可通过设置 O_NONBLOCK 改为非阻塞。
  • 跨主机:网络模式支持跨主机通信。
  • 多路复用:与 select()poll()epoll() 结合使用,在单个线程或进程中监控多个套接字的可读、可写或异常事件,高效处理并发连接。
  • 生命周期:套接字从调用 socket() 创建开始,到调用 close() 关闭结束。

Socket 的使用方法:

  1. 创建:socket(domain, type, protocol)
  2. 绑定 / 监听 / 接受(服务器):bind()listen()accept()
  3. 连接(客户端):connect()
  4. 读写:send()/recv()read()/write()
  5. 关闭:close()

Socket 示例:

  • Python 端(写入)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    import socket

    SOCKET_PATH = "/tmp/ipc_socket"

    try:
    with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client:
    client.connect(SOCKET_PATH)
    client.sendall(b"Hello from Python via UNIX socket")
    print("Python sent data")
    except FileNotFoundError:
    print("Error: Server socket not found. Is the server running?")
    except Exception as e:
    print(f"Unexpected error: {e}")
  • C 端(读取)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    #include <sys/socket.h>
    #include <sys/un.h>
    #include <unistd.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>

    #define SOCKET_PATH "/tmp/ipc_socket"
    #define BACKLOG 5
    #define BUFFER_SIZE 128

    void die(const char *msg) {
    perror(msg);
    exit(EXIT_FAILURE);
    }

    int main() {
    int sock, conn;
    struct sockaddr_un addr;

    if ((sock = socket(AF_UNIX, SOCK_STREAM, 0)) == -1)
    die("socket");

    unlink(SOCKET_PATH); // 删除已有的 socket 文件

    memset(&addr, 0, sizeof(addr));
    addr.sun_family = AF_UNIX;
    strncpy(addr.sun_path, SOCKET_PATH, sizeof(addr.sun_path) - 1);
    addr.sun_path[sizeof(addr.sun_path) - 1] = '\0';

    if (bind(sock, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
    close(sock);
    die("bind");
    }

    if (listen(sock, BACKLOG) == -1) {
    close(sock);
    die("listen");
    }

    printf("Server listening on %s...\n", SOCKET_PATH);

    if ((conn = accept(sock, NULL, NULL)) == -1) {
    close(sock);
    die("accept");
    }

    char buf[BUFFER_SIZE];
    ssize_t n = recv(conn, buf, sizeof(buf) - 1, 0);
    if (n > 0) {
    buf[n] = '\0';
    printf("Server received: %s\n", buf);
    }

    close(conn);
    close(sock);
    unlink(SOCKET_PATH);

    return 0;
    }

五、消息队列(Message Queue)

消息队列(Message Queue)在内核中维护一个有界队列,用于传递具有某种数据结构的消息,而不是简单的字节流,支持异步发送和接收,并可设置阻塞或非阻塞模式。

Message Queue 具有以下特点:

  • 全双工:同一消息队列可用于发送和接收,读写互不干扰。
  • 消息单元:每条消息为一个独立单元,并可设置不同优先级,优先级高的消息先被接收。
  • 异步收发:发送方和接收方无需同时在线或保持连接,消息在内核队列中排队等待处理。
  • 阻塞 / 超时:默认阻塞等待消息,可通过设置 O_NONBLOCK 实现非阻塞。
  • 生命周期:在执行 unlink 后取消引用,实际删除发生在所有引用关闭后。

Message Queue 的使用方法:

  1. 创建:在 C 中使用mq_open(),在 Python 中使用posix_ipc.MessageQueue
  2. 发送 / 接受:用 mq_send() 发送,用 mq_receive() 接收。
  3. 删除:使用 mq_close() 关闭,再用 mq_unlink() 清理。

Message Queue 示例:

  • Python 端(发送)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    import posix_ipc

    # 创建或打开消息队列
    mq = posix_ipc.MessageQueue(
    '/mq',
    posix_ipc.O_CREAT,
    max_messages=10,
    max_message_size=64
    )

    # 发送消息
    for i in range(3):
    msg = f'msg{i}'.encode()
    mq.send(msg)
    print(f"Python sent: {msg.decode()}")

    # 关闭并删除队列
    mq.close()
    mq.unlink()
  • C 端(接收)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    #include <mqueue.h>
    #include <stdio.h>

    int main() {
    // 打开消息队列
    mqd_t mq = mq_open("/mq", O_RDONLY);
    if (mq == (mqd_t)-1) {
    perror("mq_open");
    return 1;
    }

    char buf[64];
    unsigned int prio;
    // 确定消息数量
    struct mq_attr attr;
    mq_getattr(mq, &attr);

    // 接收消息
    for (unsigned int i = 0; i < attr.mq_curmsgs; i++) {
    ssize_t n = mq_receive(mq, buf, sizeof(buf), &prio);
    if (n >= 0) {
    buf[n] = '\0';
    printf("C received: %s (priority: %u)\n", buf, prio);
    }
    }

    // 关闭并删除队列
    mq_close(mq);
    mq_unlink("/mq");
    return 0;
    }

六、IPC 机制对比

IPC 机制 延迟 吞吐 同步需求 跨主机 典型场景
匿名管道(PIPE) 阻塞 父子进程通信
命名管道(FIFO) 中等 中等 阻塞 脚本管道、日志传输
共享内存(Share Memory) 极低 极高 手动同步 大数据共享、实时处理
套接字(Unix) 中等 中等 阻塞 / 多路复用 本地服务、守护进程
套接字(TCP) 较高 中等 阻塞 / 多路复用 分布式服务、微服务
消息队列(Message Queue) 中等 中等 异步 异步任务、事件通知

说明

  • 延迟 表示通信响应速度,越低越实时。
  • 吞吐 衡量单位时间内数据处理能力。
  • 同步需求 体现是否需要调用方协调读写时机。
  • 跨主机 说明机制是否支持分布式部署。

建议

  • 小数据、结构简单:使用 PIPE 或 FIFO。
  • 大数据、性能要求高:首选共享内存,搭配同步机制。
  • 本地复杂通信或服务守护:采用 Unix 套接字。
  • 分布式通信:使用 TCP 套接字,配合多路复用。
  • 异步事件通知 / 解耦逻辑:消息队列更合适。