Pipe IPC 在 Go 中的使用与实现。
管道(Pipe)
多个进程在协作完成同一任务时,通常彼此要传输数据,共享资源。
在 shell 中常常会用到管道符,如查看占用 80 端口的进程:netstat -an | grep :80
,在 bash 中每个命令在执行时都是独立的进程,netstat
父进程通过管道将数据传输给 fork 出的 grep
子进程处理。这就是最简单的 IPC 管道通信。
分类
- 匿名管道:shell 中的 pipe 就是匿名管道,只能在父子进程 / 有亲缘关系的进程之间使用。
原因:管道在 Linux 中是文件,想要通过匿名管道来读写数据,必须拥有相同的文件描述符,而拥有相同 fd 的两个进程需有亲缘关系。 - 命名管道:允许无亲缘关系的进程间传输数据。
特点
- 半双工:数据只能是单向流动的(优点:简单,缺点:单向)
- 面向字节流:管道中的数据是原生的字节流(优点:职责单一,缺点:相比消息队列实现的 IPC,无法选择接收或丢弃发来的数据)
匿名管道
os/exec
包支持在执行的系统命令上建立匿名管道:
1
|
func stdoutPipe() {
|
此外,模拟 2 个系统命令,可手动设置 cmd1.Stdout = &buf
和 cmd2.Stdin = &buf
来模拟匿名管道。匿名管道由于交换双方受限的特点,使用场景不多。
命名管道
os.Pipe
os
包中支持操作系统级别的命名管道生成与操作:
1
|
// os.Pipe 是阻塞的
|
io.Pipe
io
包中支持线程安全的命名管道生成与原子性操作:
由于命名管道提供多路复用,即多个进程都可向 Pipe 中写入数据,此时需要保证操作互斥,io.Pipe
提供了更为安全的原子性操作管道。不过注意它的阻塞操作:
1
|
// io.Pipe 命名管道是阻塞的
|
命名管道在一端未就绪的情况下,会阻塞另一端的进程。
上边 writer 写入数据后会一直阻塞直到有进程从 pipe 读取数据,不过其后顺序执行的 reader 读取不可能执行到,才造成死锁
稍微改造下,对命名管道的读写都改成 并发调用 即可:
1
|
func atomicPipe() {
|
从上可总结出 Golang 中命名管道的 2 个特点:
- os.Pipe 比较底层,不保证多次写入的原子操作,阻塞。而 io.Pipe 读写操作有独占锁限制,是线程安全的.
- io.Pipe 读写也都是阻塞的,应并发读写而非顺序读写。
io.Pipe 源码分析
源码 $GOROOT/src/io/pipe.go
对 pipe 定义如下:
1
|
type pipe struct {
|
PipeReader
从管道内读取数据的操作如下:
1
|
// 注意返回值 n 和 err,当错误发生时依旧会把成功读取的字节数 n 给返回来
|
PipeWriter
将数据写入管道的操作如下:
1
|
// 哪怕发生错误也会把成功写入的字节数返回来
|
注意观察会发现 read 和 write 用到了同一把锁 p.l
,那先写数据时在死循环内一直死等,l
锁也不释放,那 read 获取不到锁怎么读?
实际上是可以读的,关键在于 p.wwait.Wait()
中 Wait
的实现:
1
|
func (c *Cond) Wait() {
|
可看到,在等待时会在内部释放锁,让被通知的 PipeReader 获取锁,才能读取数据。
看其他源码会发现同一个 Pipe 的任意 reader 或 writer 主动 Close
掉后,其他端操作时会得到 ErrClosedPipe
的错误,但返回值里边待有成功操作的字节数,保证了已操作的数据不丢失。
应用场景
分析 io.Pipe 源码可知,PipeWriter 和 PipeReader 通过 pipe 的 data []byte
来进行数据传输,其中 lock 机制保证了 Pipe 在同一时刻只能有一个操作,并且 writer 主动写入必须阻塞到有 reader 读取,reader 主动读取必须阻塞到有 writer 写入。
Pipe 给人的感觉就是 2 个 goroutine 一个写一个读,但完全可以用在多写对多读的场景,不过我没使用过。下边的示例将 5 个字符放到 5 个 writer 分别写入到 Pipe,另一端的 reader 一直在等待读取直到最后一个 writer 手动 Close:
1
|
// 多个 writer 将数据写入 pipe,一个 reader 读取数据
|
运行:
总结
进程间通信方式包括管道,信号,消息队列,共享内存,信号量和 socket 等方式,对应到 Go 实现是 io.Pipe,os.Signal,sync.Mutex,net pkg 等。本文简要解析了 Go 中 pipe 的实现,实际开发中它的场景比较冷门,比如像数据流的实时处理可以考虑使用。