进程间通信机制
在操作系统中,进程是分配资源的最小单位,不同的进程各有其独占的资源。如果要在不同进程之间共享资源,需要利用进程间通信(IPC)机制。常见的 IPC 机制包括:
- 匿名管道(PIPE)
- 命名管道(FIFO)
- 共享内存(Shared Memory)
- 套接字(Socket)
- 消息队列(Message Queue)
下文将针对每种方式介绍其原理与使用方法,并给出相应的代码示例。
一、匿名管道(PIPE)
匿名管道(PIPE)由内核在内存中分配一个有限大小的环形缓冲区(通常为 64KB)。创建管道后,内核返回两个文件描述符:读端(fds[0])和写端(fds[1])。
PIPE 具有以下特点:
- 半双工:同一时刻只能一个方向传输数据。
- 阻塞 / 非阻塞:默认读端在缓冲区为空时阻塞;写端在缓冲区满时阻塞。可通过设置
O_NONBLOCK
改为非阻塞。 - 局部可见:只能用于有亲缘关系的父子进程或者兄弟进程。
- 生命周期:PIPE 在创建进程退出后关闭。
PIPE 的使用方法:
- 创建:调用
pipe(int fds[2])
。 - 分支:通过
fork()
,子进程继承文件描述符。 - 重定向:使用
dup2()
将fds[0]
重定向为标准输入或将fds[1]
重定向为标准输出。 - 读写:父进程或一个子进程写,另一个子进程读;或反之。
- 关闭:使用完毕后关闭不需要的端
close()
,否则可能导致阻塞或死锁。
PIPE 示例:
C 端(父进程)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int main() {
int fds[2];
if (pipe(fds) == -1) { perror("pipe"); exit(1); }
pid_t pid = fork();
if (pid == 0) {
// 子进程:关闭写端,将读端映射到 stdin
close(fds[1]);
dup2(fds[0], STDIN_FILENO);
execlp("python3", "python3", "child.py", NULL);
perror("execlp");
exit(1);
} else {
// 父进程:关闭读端,写入数据
close(fds[0]);
const char *msg = "Hello from C via pipe\n";
write(fds[1], msg, strlen(msg));
close(fds[1]);
wait(NULL);
}
return 0;
}Python 端(child.py)
1
2
3
4import sys
for line in sys.stdin:
print(f"Python child got: {line.strip()}")
二、命名管道(FIFO)
命名管道(FIFO, First In First Out)是内核在文件系统下创建的一个特殊文件节点。它与匿名管道类似,但通过路径使任意进程可打开。
FIFO 具有以下特点:
- 半双工:默认单向传输,双向通信需要使用两个 FIFO。
- 阻塞:若无写端打开,则读端阻塞;若无读端打开,则写端阻塞。
- 全局可见:FIFO 通过文件名访问,可以被多个读写进程打开,进程之间无需亲缘关系。
- 生命周期:FIFO 在创建后将存在于文件系统中,直到显式调用
unlink()
或os.remove()
删除。
FIFO 的使用方法:
- 创建:在 C 中使用
mkfifo(const char *path, mode_t mode)
,在 Python 中使用os.mkfifo(path)
。 - 打开:读端用
open(path, O_RDONLY)
,写端用open(path, O_WRONLY)
。 - 读写:在 C 中读写用
read()/write()
,在 Python 中open()
后读写。 - 关闭 / 删除:
close()
后可使用unlink()/os.remove()
删除。
FIFO 示例:
Python 端(写入)
1
2
3
4
5
6
7
8
9
10
11
12import os, time
fifo = '/tmp/myfifo'
if not os.path.exists(fifo):
os.mkfifo(fifo)
with open(fifo, 'w') as f:
for i in range(5):
msg = f"Python->C message {i}\n"
f.write(msg);
f.flush()
print(f"Sent: {msg.strip()}")
time.sleep(1)C 端(读取)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int main() {
const char *fifo = "/tmp/myfifo";
if (access(fifo, F_OK) == -1) return 1;
int fd = open(fifo, O_RDONLY);
char buf[128];
while (1) {
ssize_t n = read(fd, buf, sizeof(buf)-1);
if (n <= 0) break;
buf[n] = '\0';
printf("Received: %s", buf);
}
close(fd);
return 0;
}
三、共享内存(Shared Memory)
共享内存(Shared Memory)允许多个进程将同一物理页映射到它们的虚拟地址空间,实现零拷贝数据交换。
Shared Memory 具有以下特点:
- 全双工:任意进程可同时读写共享内存,无方向限制。
- 高性能:不涉及内核空间与用户空间拷贝,适合大数据量传输。
- 同步要求高:不提供同步机制,必须配合信号量(Semaphore)、互斥锁(Mutex)等原语使用。
- 生命周期:共享内存在调用
shm_unlink()
前一直存在,适合较长生命周期的数据共享。 - 一致性机制:现代多核系统依赖缓存一致性协议(如 MESI)确保不同 CPU 核心上可见的共享内存视图一致。
Shared Memory 的使用方法:
- 创建:在 C 中使用
shm_open(name, flags, mode)
,在 Python 中使用SharedMemory(name, create, size)
。 - 设置大小:在 C 中使用
ftruncate()
, 在 Python 中设置 size 参数。 - 映射:在 C 中使用
mmap()
,在 Python 中通过 buf 访问。 - 删除:在 C 中使用
munmap()
解出映射,并使用shm_unlink()
删除。
Shared Memory 示例:
Python 端(写入)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32import posix_ipc
import mmap
import time
# 1. 创建共享内存和信号量
shm = posix_ipc.SharedMemory("/test", flags=posix_ipc.O_CREAT, size=128)
mem_map = mmap.mmap(shm.fd, shm.size)
sem_write = posix_ipc.Semaphore("/sem_write", flags=posix_ipc.O_CREAT, initial_value=1)
sem_read = posix_ipc.Semaphore("/sem_read", flags=posix_ipc.O_CREAT, initial_value=0)
# 2. 写入共享内存
for k in range(5):
sem_write.acquire() # 获取写锁
mem_map.seek(0)
mem_map.write(f"Hello from Python {k}\0".encode())
mem_map.flush()
sem_read.release() # 释放读锁
time.sleep(1)
# 3. 发送退出信号
sem_write.acquire()
mem_map.seek(0)
mem_map.write(b"EXIT\0")
mem_map.flush()
sem_read.release()
# 4. 清理资源
mem_map.close()
shm.close_fd()
shm.unlink()
sem_write.unlink()
sem_read.unlink()C 端(读取)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
int main() {
// 1. 打开信号量和共享内存
sem_t *sem_write = sem_open(SEM_W_NAME, 0);
sem_t *sem_read = sem_open(SEM_R_NAME, 0);
if (sem_write==SEM_FAILED || sem_read==SEM_FAILED) {
perror("sem_open");
return 1;
}
int fd = shm_open(SHM_NAME, O_RDONLY, 0);
if (fd == -1) {
perror("shm_open");
return 1;
}
void *addr = mmap(NULL, BUF_SIZE, PROT_READ, MAP_SHARED, fd, 0);
if (addr == MAP_FAILED) {
perror("mmap");
return 1;
}
char buffer[BUF_SIZE];
while (1) {
// 2. 等待写端写好
sem_wait(sem_read);
// 3. 读取内容
memcpy(buffer, addr, BUF_SIZE-1);
buffer[BUF_SIZE-1] = '\0';
// 4. 检查退出标志
if (strcmp(buffer, "EXIT") == 0) {
printf("[C] 收到退出信号,结束读取。\n");
break;
}
// 5. 打印并允许写端写下一条
printf("Read: %s\n", buffer);
sem_post(sem_write);
}
// 6. 清理(写端负责 unlink,此处无需再 unlink)
munmap(addr, BUF_SIZE);
close(fd);
sem_close(sem_write);
sem_close(sem_read);
return 0;
}
四、套接字(Socket)
套接字(Socket)是操作系统提供的网络接口,支持多协议族(如 AF_UNIX、AF_INET),可实现全双工字节流或数据报交换。
Socket 具有以下特点:
- 全双工:同一套接字描述符可以同时发送和接收数据,读写操作互不干扰。
- 阻塞 / 非阻塞:套接字默认阻塞,可通过设置
O_NONBLOCK
改为非阻塞。 - 跨主机:网络模式支持跨主机通信。
- 多路复用:与
select()
、poll()
、epoll()
结合使用,在单个线程或进程中监控多个套接字的可读、可写或异常事件,高效处理并发连接。 - 生命周期:套接字从调用
socket()
创建开始,到调用close()
关闭结束。
Socket 的使用方法:
- 创建:
socket(domain, type, protocol)
- 绑定 / 监听 / 接受(服务器):
bind()
、listen()
、accept()
- 连接(客户端):
connect()
- 读写:
send()/recv()
或read()/write()
- 关闭:
close()
Socket 示例:
Python 端(写入)
1
2
3
4
5
6
7
8
9
10
11
12
13import socket
SOCKET_PATH = "/tmp/ipc_socket"
try:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client:
client.connect(SOCKET_PATH)
client.sendall(b"Hello from Python via UNIX socket")
print("Python sent data")
except FileNotFoundError:
print("Error: Server socket not found. Is the server running?")
except Exception as e:
print(f"Unexpected error: {e}")C 端(读取)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
void die(const char *msg) {
perror(msg);
exit(EXIT_FAILURE);
}
int main() {
int sock, conn;
struct sockaddr_un addr;
if ((sock = socket(AF_UNIX, SOCK_STREAM, 0)) == -1)
die("socket");
unlink(SOCKET_PATH); // 删除已有的 socket 文件
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, SOCKET_PATH, sizeof(addr.sun_path) - 1);
addr.sun_path[sizeof(addr.sun_path) - 1] = '\0';
if (bind(sock, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
close(sock);
die("bind");
}
if (listen(sock, BACKLOG) == -1) {
close(sock);
die("listen");
}
printf("Server listening on %s...\n", SOCKET_PATH);
if ((conn = accept(sock, NULL, NULL)) == -1) {
close(sock);
die("accept");
}
char buf[BUFFER_SIZE];
ssize_t n = recv(conn, buf, sizeof(buf) - 1, 0);
if (n > 0) {
buf[n] = '\0';
printf("Server received: %s\n", buf);
}
close(conn);
close(sock);
unlink(SOCKET_PATH);
return 0;
}
五、消息队列(Message Queue)
消息队列(Message Queue)在内核中维护一个有界队列,用于传递具有某种数据结构的消息,而不是简单的字节流,支持异步发送和接收,并可设置阻塞或非阻塞模式。
Message Queue 具有以下特点:
- 全双工:同一消息队列可用于发送和接收,读写互不干扰。
- 消息单元:每条消息为一个独立单元,并可设置不同优先级,优先级高的消息先被接收。
- 异步收发:发送方和接收方无需同时在线或保持连接,消息在内核队列中排队等待处理。
- 阻塞 / 超时:默认阻塞等待消息,可通过设置
O_NONBLOCK
实现非阻塞。 - 生命周期:在执行
unlink
后取消引用,实际删除发生在所有引用关闭后。
Message Queue 的使用方法:
- 创建:在 C 中使用
mq_open()
,在 Python 中使用posix_ipc.MessageQueue
。 - 发送 / 接受:用
mq_send()
发送,用mq_receive()
接收。 - 删除:使用
mq_close()
关闭,再用mq_unlink()
清理。
Message Queue 示例:
Python 端(发送)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19import posix_ipc
# 创建或打开消息队列
mq = posix_ipc.MessageQueue(
'/mq',
posix_ipc.O_CREAT,
max_messages=10,
max_message_size=64
)
# 发送消息
for i in range(3):
msg = f'msg{i}'.encode()
mq.send(msg)
print(f"Python sent: {msg.decode()}")
# 关闭并删除队列
mq.close()
mq.unlink()C 端(接收)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
int main() {
// 打开消息队列
mqd_t mq = mq_open("/mq", O_RDONLY);
if (mq == (mqd_t)-1) {
perror("mq_open");
return 1;
}
char buf[64];
unsigned int prio;
// 确定消息数量
struct mq_attr attr;
mq_getattr(mq, &attr);
// 接收消息
for (unsigned int i = 0; i < attr.mq_curmsgs; i++) {
ssize_t n = mq_receive(mq, buf, sizeof(buf), &prio);
if (n >= 0) {
buf[n] = '\0';
printf("C received: %s (priority: %u)\n", buf, prio);
}
}
// 关闭并删除队列
mq_close(mq);
mq_unlink("/mq");
return 0;
}
六、IPC 机制对比
IPC 机制 | 延迟 | 吞吐 | 同步需求 | 跨主机 | 典型场景 |
---|---|---|---|---|---|
匿名管道(PIPE) | 低 | 低 | 阻塞 | 否 | 父子进程通信 |
命名管道(FIFO) | 中等 | 中等 | 阻塞 | 否 | 脚本管道、日志传输 |
共享内存(Share Memory) | 极低 | 极高 | 手动同步 | 否 | 大数据共享、实时处理 |
套接字(Unix) | 中等 | 中等 | 阻塞 / 多路复用 | 否 | 本地服务、守护进程 |
套接字(TCP) | 较高 | 中等 | 阻塞 / 多路复用 | 是 | 分布式服务、微服务 |
消息队列(Message Queue) | 中等 | 中等 | 异步 | 否 | 异步任务、事件通知 |
说明:
- 延迟 表示通信响应速度,越低越实时。
- 吞吐 衡量单位时间内数据处理能力。
- 同步需求 体现是否需要调用方协调读写时机。
- 跨主机 说明机制是否支持分布式部署。
建议:
- 小数据、结构简单:使用 PIPE 或 FIFO。
- 大数据、性能要求高:首选共享内存,搭配同步机制。
- 本地复杂通信或服务守护:采用 Unix 套接字。
- 分布式通信:使用 TCP 套接字,配合多路复用。
- 异步事件通知 / 解耦逻辑:消息队列更合适。