IPCChannel的设计与实现
这是一个专门用于在两个进程之间发送消息,使用专用的管道/文件描述符来传输二进制消息的Channel。
实现收消息的能力
需求:我们希望调用方能够使用channel.onMessage((readBufferProvider) => {...})
就能够接收到对端发送来的消息。
对于所有的Channel
,都有着一个属性onMessage Emitter
,它是实现收消息能力的关键。在Channel
的外部使用onMessage
就能够接收到对端发送来的消息,而在Channel
的内部使用onMessageEmitter.fire(msg)
在合适的时机将对端消息派发出去。
在IPCChannel
中,这个时机就是binaryMessagePipe.onMessage(msg => ....)
,我们监听binaryMessagePipe
的onMessage
,它接收一个消息处理函数(msg:string) => void
。在这里,我们将消息通过onMessageEmitter.fire(msg)
派发出去,完成IPCChannel
收消息的能力。我们的实现如下:
constructor() {
this.messagePipe.onMessage(message => {
this.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(message));
});
}
实现发消息的能力
需求:我们希望调用方能够通过下面代码来发消息给对端。
const writeBuffer = channel.getWriteBuffer();
msgDecoder.request(writeBuffer, method, args);
writeBuffer.commit();
对于所有的Channel
,都有着一个方法getWriteBuffer
,它是实现发消息能力的关键。当调用方使用writeBuffer.commit
的时候,消息就能够进入传输通道了。在IPCChannel
的内部,为了让commit
生效,我们在提供writeBuffer
的时候,让其通过writeBuffer.onCommit(msg => {...})
监听外部调用commit
的操作。一旦writeBuffer.commit
,就会触发writeBuffer.onCommit
事件,这就是发消息的时机,我们接收到了即将发送给对端的消息,这这个时候通过binaryMessagePipe.send(msg)
将消息发送给对端。我们的实现如下:
getWriteBuffer(): WriteBuffer {
const result = new Uint8ArrayWriteBuffer();
result.onCommit(buffer => {
this.messagePipe.send(buffer);
});
return result;
}
BinaryMessagePipe
无论在收消息和发消息的实现当中,我们发现都用到了同一个组件BinaryMessagePipe
。在实现IPCChannel
的收消息能力时,用到了binaryMessagePipe.onMessage
来辅助实现IPCChannel.onMessage
收消息。在实现IPCChannel
的发消息能力时,用到了binaryMessagePipe.send
来辅助实现wirteBuffer.onCommit(msg => ...)
在接收到本地要发给对端的消息后,将消息发送给对端。
我们在构造IPCChannel
的时候,对BinaryMessagePipe
进行了实例化:
class IPCChannel extends AbstractChannel {
constructor(childProcess?: cp.ChildProcess) {
super();
if (childProcess) {
this.setupChildProcess(childProcess); // 父进程视角:实例化到子进程的管道 (父进程 → 子进程 的管道)
} else {
this.setupProcess(); // 子进程视角:实例化到父进程的管道 (子进程 → 父进程 的管道)
}
}
protected setupChildProcess(childProcess: cp.ChildProcess): void {
this.messagePipe = new BinaryMessagePipe(childProcess.stdio[4] as Duplex);
}
protected setupProcess(): void {
this.messagePipe = new BinaryMessagePipe(new Socket({ fd: 4 }));
}
}
上面这段代码的意思是:根据childProcess
是否存在,选择setupChildProcess
还是setupProcess
来完成BinaryMessagePipe
的构造。无论是在父进程中创建到子进程的BinaryMessagePipe
时的实例化参数childProcess.stdio[4]
,还是子进程中创建到父进程的BinaryMessagePipe
时的实例化参数new Socket({ fd: 4 })
,都是选用了5号管道作为通信管道。为啥选用5号管道的理由是,前4个管道都被标准用途使用,设计第5个管道就是专门用于二进制消息的传输的。
// 前4个管道被标准用途占用
stdio[0] // stdin - 标准输入
stdio[1] // stdout - 标准输出
stdio[2] // stderr - 标准错误
stdio[3] // IPC - 进程间通信
// 第5个管道专门用于二进制消息
stdio[4] // 二进制消息管道 - 不与其他功能冲突
将5号管道用来进行二进制消息的传输,能够让前4个管道用在标准用途,不会阻塞标准IO,让二进制消息和标准IO完全分离开来。
从数据结构设计角度看,BinaryMessagePipe
无非是将底层管道childProcess.stdio[4]
和new Socket({ fd: 4 })
统一包装,屏蔽了底层管道的差异性,对上层使用提供了一致的接口。
没有BinaryMessagePipe
,在实现IPCChannel
我们就需要每一处需要使用的地方进行条件判断,比如初始化和监听消息时:
if (childProcess) {
// 父进程:使用 childProcess.stdio[4]
this.underlyingPipe = childProcess.stdio[4] as Duplex;
} else {
// 子进程:使用 process.stdin 或其他方式
this.underlyingPipe = new Socket({ fd: 4 });
}
this.underlyingPipe.on('data', this.handleData.bind(this));
不仅如此,我们还需要BinaryMessagePipe
在内部对消息进行相关复杂处理(消息边界处理、编码解码、错误处理)。这就是设计BinaryMessagePipe
这个结构的基本意图:屏蔽底层管道差异性,分离职责封装复杂的消息处理操作,对外提供统一使用接口。