前言
我们知道 VS Code 基于 Electron,在 Electron 中,主进程和渲染进程之间是通过 IPC 进行进程之间的通信的,当然 VS Code 中还有其他一些进程(比如:Shared Process/Extension Host Process/File Watcher Process/Terminal Host Process/Terminal Process/Search Process 等),这些进程之间也都是基于 IPC 方式进行通信的。
针对不同的进程类型,甚至在 Remote 部署形式下的 VS Code,进程之间是物理隔离的,主进程可能在另一台远程机器上运行,而渲染进程又是运行在用户本地浏览器上,VS Code 针对如此复杂的进程之间通信设计了一套 IPC 通信解决方案,从而实现了针对不同的运行环境(例如本地进程、remote 形式下的跨进程、web worker)都能使用同一套调用方式。本文将会探讨 VS Code 中 IPC 模块的设计和原理。
VS Code IPC 机制
在 VS Code 中,IPC 分为两种实现:基于 Channel 和基于 RPCProtocol:
一、基于 Channel 的实现
1.1 概念
上述概念概述比较简单 & 晦涩,下面做一下概念的通俗解释:
频道
狭义定义上,频道又叫信道,信道是信号在通信系统重传输的通道,是信号从发射端传输到接收端所经过的传输煤质。在电台领域,我们经常可以听到各种频道,比如:交通之声频道、音乐之声频道。
在 VS Code 中,频道是一组可供其他端进行调用的服务集合。一个标准的频道有两个功能:
-
点播:call
-
收听:listen
客户端 & 服务端
客户端 & 服务端是频道的承载主体。一般客户端是指发起连接的一端,服务端是被连接的一端。
在 VS Code 中,主进程是服务端,提供一系列服务的频道;渲染进程是客户端,调用服务端频道中的服务或者收听服务端消息。不管是服务端还是客户端,都需要具备发送和接受消息的能力,才能实现正常的通信。
连接
客户端 & 服务端之间进行通信依赖的连接。
在 VS Code 中一个连接其实是一对客户端与服务端的对应关系。
协议
两个端之间进行消息通信的约定,比如我们是通过语言还是手语比划进行通信,需要通过协议进行约定。
在 VS Code 中,约定了最基础的协议范围包括发送和接收消息两个方法:
-
发送:send
-
接收:onMessage
接口定义
Channel(客户端频道)
/**
* An `IChannel` is an abstraction over a collection of commands.
* You can `call` several commands on a channel, each taking at
* most one single argument. A `call` always returns a promise
* with at most one single return value.
*/
export interface IChannel {
call<T>(command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T>;
listen<T>(event: string, arg?: any): Event<T>;
}
ServerChannel(服务端频道)
/**
* An `IServerChannel` is the counter part to `IChannel`,
* on the server-side. You should implement this interface
* if you'd like to handle remote promises or events.
*/
export interface IServerChannel<TContext = string> {
call<T>(ctx: TContext, command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T>;
listen<T>(ctx: TContext, event: string, arg?: any): Event<T>;
}
ChannelClient(客户端)
/**
* An `IChannelClient` has access to a collection of channels. You
* are able to get those channels, given their channel name.
*/
export interface IChannelClient {
getChannel<T extends IChannel>(channelName: string): T;
}
ChannelServer(服务端)
/**
* An `IChannelServer` hosts a collection of channels. You are
* able to register channels onto it, provided a channel name.
*/
export interface IChannelServer<TContext = string> {
registerChannel(channelName: string, channel: IServerChannel<TContext>): void;
}
IPCClient(客户端)
/**
* An `IPCClient` is both a channel client and a channel server.
*
* As the owner of a protocol, you should extend both this
* and the `IPCServer` classes to get IPC implementations
* for your protocol.
*/
export class IPCClient<TContext = string> implements IChannelClient, IChannelServer<TContext>, IDisposable {
private channelClient: ChannelClient;
private channelServer: ChannelServer<TContext>;
constructor(protocol: IMessagePassingProtocol, ctx: TContext, ipcLogger: IIPCLogger | null = null) {}
getChannel<T extends IChannel>(channelName: string): T {}
registerChannel(channelName: string, channel: IServerChannel<TContext>): void {}
dispose(): void {}
}
IPCServer(服务端)
/**
* An `IPCServer` is both a channel server and a routing channel
* client.
*
* As the owner of a protocol, you should extend both this
* and the `IPCClient` classes to get IPC implementations
* for your protocol.
*/
export class IPCServer<TContext = string> implements IChannelServer<TContext>, IRoutingChannelClient<TContext>, IConnectionHub<TContext>, IDisposable {
private channels = new Map<string, IServerChannel<TContext>>();
private _connections = new Set<Connection<TContext>>();
get connections(): Connection<TContext>[] {}
constructor(onDidClientConnect: Event<ClientConnectionEvent>) {}
/**
* Get a channel from a remote client. When passed a router,
* one can specify which client it wants to call and listen to/from.
* Otherwise, when calling without a router, a random client will
* be selected and when listening without a router, every client
* will be listened to.
*/
getChannel<T extends IChannel>(channelName: string, router: IClientRouter<TContext>): T;
getChannel<T extends IChannel>(channelName: string, clientFilter: (client: Client<TContext>) => boolean): T;
getChannel<T extends IChannel>(channelName: string, routerOrClientFilter: IClientRouter<TContext> | ((client: Client<TContext>) => boolean)): T {}
registerChannel(channelName: string, channel: IServerChannel<TContext>): void {}
dispose(): void {}
}
Connection(连接)
export interface Client<TContext> {
readonly ctx: TContext;
}
interface Connection<TContext> extends Client<TContext> {
readonly channelServer: ChannelServer<TContext>;
readonly channelClient: ChannelClient;
}
所有上述接口、类定义对应文件为:src/vs/base/parts/ipc/common/ipc.ts
1.2 使用案例
上面讲解了 VS Code 中 IPC 模块整体的概念以及 IPC 模块的核心单元,下面我们先通过一些案例简单的了解下 IPC 模块的使用方式及其基本流程。
-
请求实现(基于 IMessagePassingProtocol 协议)
class QueueProtocol implements IMessagePassingProtocol {
private buffering = true;
private buffers: VSBuffer[] = [];
private readonly _onMessage = new Emitter<VSBuffer>({
onDidAddFirstListener: () => {
for (const buffer of this.buffers) {
this._onMessage.fire(buffer);
}
this.buffers = [];
this.buffering = false;
},
onDidRemoveLastListener: () => {
this.buffering = true;
}
});
readonly onMessage = this._onMessage.event;
other!: QueueProtocol;
send(buffer: VSBuffer): void {
this.other.receive(buffer);
}
protected receive(buffer: VSBuffer): void {
if (this.buffering) {
this.buffers.push(buffer);
} else {
this._onMessage.fire(buffer);
}
}
}
-
客户端(基于 IPCClient)
class TestIPCClient extends IPCClient<string> {
private readonly _onDidDisconnect = new Emitter<void>();
readonly onDidDisconnect = this._onDidDisconnect.event;
constructor(protocol: IMessagePassingProtocol, id: string) {
super(protocol, id);
}
override dispose(): void {
this._onDidDisconnect.fire();
super.dispose();
}
}
-
服务端(基于 IPCServer)
class TestIPCServer extends IPCServer<string> {
private readonly onDidClientConnect: Emitter<ClientConnectionEvent>;
constructor() {
const onDidClientConnect = new Emitter<ClientConnectionEvent>();
super(onDidClientConnect.event);
this.onDidClientConnect = onDidClientConnect;
}
// 创建一个客户端 & 服务端的连接
createConnection(id: string): IPCClient<string> {
const [pc, ps] = createProtocolPair();
const pc = new QueueProtocol();
const ps = new QueueProtocol();
pc.other = ps;
ps.other = pc;
const client = new TestIPCClient(pc, id);
this.onDidClientConnect.fire({
protocol: ps,
onDidClientDisconnect: client.onDidDisconnect
});
return client;
}
}
-
服务端频道及其对应的服务
// 服务接口
interface ITestService {
marco(): Promise<string>;
onPong: Event<string>;
}
// 服务
class TestService implements ITestService {
private readonly _onPong = new Emitter<string>();
readonly onPong = this._onPong.event;
marco(): Promise<string> {
return Promise.resolve('polo');
}
ping(msg: string): void {
this._onPong.fire(msg);
}
}
// 服务频道
class TestChannel implements IServerChannel {
constructor(private service: ITestService) { }
call(_: unknown, command: string, arg: any, cancellationToken: CancellationToken): Promise<any> {
switch (command) {
case 'marco': return this.service.marco();
default: return Promise.reject(new Error('not implemented'));
}
}
listen(_: unknown, event: string, arg?: any): Event<any> {
switch (event) {
case 'onPong': return this.service.onPong;
default: throw new Error('not implemented');
}
}
}
-
客户端频道服务
class TestChannelClient implements ITestService {
get onPong(): Event<string> {
return this.channel.listen('onPong');
}
constructor(private channel: IChannel) { }
marco(): Promise<string> {
return this.channel.call('marco');
}
}
1.2.1 一对一 IPC
创建好上述各个对象:
// 创建服务
const service = new TestService();
// 创建服务端
const server = new TestIPCServer();
// 服务端:注册一个服务频道
server.registerChannel('testChannel', new TestChannel(service));
// 创建一个到客户端的连接
const client = server.createConnection('client1');
// 创建一个客户端频道服务
const ipcService = new TestChannelClient(client.getChannel('testChannel'));
客户端调用服务端:
const r = await ipcService.marco(); // 'polo'
客户端监听服务端:
const messages: string[] = [];
// 客户端监听
ipcService.onPong(msg => messages.push(msg));
// 服务端主动发送消息
service.ping('hello');
service.ping('world');
// messages: ['hello', 'world']
1.2.2 一对多 IPC
往往服务端需要对应多个客户端的连接
创建好上述各个对象:
// 创建服务
const service = new TestService();
// 创建服务端
const server = new TestIPCServer();
// 创建服务端频道
const channel = new TestChannel(service);
// 服务端注册服务
server.registerChannel('channel', channel);
// 创建客户端-1
const client1 = server.createConnection('client1');
// 创建客户端-1对应的频道服务
const ipcService1 = new TestChannelClient(client1.getChannel('channel'));
// 创建客户端-2
const client2 = server.createConnection('client2');
// 创建客户端-2对应的频道服务
const ipcService2 = new TestChannelClient(client2.getChannel('channel'));
客户端监听服务端:
ipcService1.onPong(() => console.log('Receive ping message on service1'));
ipcService2.onPong(() => console.log('Receive ping message on service2'));
服务端通知:
service.ping('hello world');
上述两个客户端都会收到一条服务端发送的 'hello world' 消息
1.3 实现原理
类图结构
重点讲解下 IPCServer / IPCClient,ChannelServer / ChannelClient
1.4 IPC 协议的多态实现
在上述概念中的“协议”,是一个端与端之间的 IPC 通信协议,任何只需要满足协议接口的实现都可以用于 IPC 通信:
interface IMessagePassingProtocol {
send(buffer: VSBuffer): void;
onMessage: Event<VSBuffer>;
drain?(): Promise<void>;
}
在 VS Code 存在多种端与端之间的 IPC 通信:
-
Electron 主进程 & 渲染进程:通过 ipcMain 和 ipcRenderer 实现
-
浏览器中运行的 VS Code 主进程与远程 Node.js 进程:通过 WebSocket 实现
-
浏览器中运行的 VS Code 主进程与 Web Worker 中的插件进程:通过 postMessage
二、基于 RPCProtocol 的实现
基于 RPCProtocol 的实现,主要应用于 Electron 渲染进程和 extension host 进程之间通信(如果是 Web 部署形态,则是浏览器主进程和 extension host Web Worker 之间的通信),主要场景在于 Extension 中对于 VS Code API 的调用需要转发到Electron 渲染进程(或者是浏览器主进程)中执行。
在 VS Code 的不同部署形态中,Extension 是运行在不同环境中的:
-
桌面端:Extension 运行在 Nodejs 进程中
-
Web 端:Extension 运行在浏览器 Web Worker 中
所以在这两种不同的部署形态之下,VS Code 需要实现不同的通信方案,下面我们先通过 Web 端的部署形态入手,因为这两种形态之下的通信机制是相同的,只是在具体的信息传递实现方案上会有所不同。
在 VS Çode 中,所有提供给 Extension 使用的 api 由位于 src/vs/workbench/api/common/extHost.api.impl.ts文件中的 createApiFactoryAndRegisterActors函数创建,下面以 commands.registerCommand API 为例:
export function createApiFactoryAndRegisterActors(accessor: ServicesAccessor): IExtensionApiFactory {
const rpcProtocol = accessor.get(IExtHostRpcService);
const extHostCommands = rpcProtocol.set(ExtHostContext.ExtHostCommands, accessor.get(IExtHostCommands));
const commands: typeof vscode.commands = {
registerCommand(id: string, command: <T>(...args: any[]) => T | Thenable<T>, thisArgs?: any): vscode.Disposable {
return extHostCommands.registerCommand(true, id, command, thisArgs, undefined, extension);
},
};
return {
// ...
commands,
};
}
可以看到执行 commands.registerCommand的时候其实是调用了 rpcProtocol.set(ExtHostContext.ExtHostCommands, accessor.get(IExtHostCommands))返回对象的registerCommand方法,下面重点看下 rpcProtocol这个服务,其实现了 IRPCProtocol这个接口:
export interface IRPCProtocol {
/**
* Returns a proxy to an object addressable/named in the extension host process or in the renderer process.
*/
getProxy<T>(identifier: ProxyIdentifier<T>): Proxied<T>;
/**
* Register manually created instance.
*/
set<T, R extends T>(identifier: ProxyIdentifier<T>, instance: R): R;
/**
* Assert these identifiers are already registered via `.set`.
*/
assertRegistered(identifiers: ProxyIdentifier<any>[]): void;
/**
* Wait for the write buffer (if applicable) to become empty.
*/
drain(): Promise<void>;
dispose(): void;
}
重点关注其中的两个方法及其实现:
-
set:注册一个由 identifier 标识符指定的服务实例
-
getProxy:返回一个代理对象,该代理对象指向到由 identifier 标识符创建的服务实例
还是以上述注册 commands 为例讲解其中过程:
-
通过 rpcPtotocol.set 注册一个代理对象:
const extHostCommands = rpcProtocol.set(ExtHostContext.ExtHostCommands, accessor.get(IExtHostCommands));
注册的标识符(identifier)由 ExtHostContext.ExtHostCommands指定,accessor.get(IExtHostCommands)返回的即是对应的服务实例
-
服务实例(实现了 IExtHostCommands 接口)实现:
export class ExtHostCommands implements ExtHostCommandsShape {
#proxy: MainThreadCommandsShape;
constructor(
@IExtHostRpcService extHostRpc: IExtHostRpcService,
@ILogService logService: ILogService,
@IExtHostTelemetry extHostTelemetry: IExtHostTelemetry
) {
this.#proxy = extHostRpc.getProxy(MainContext.MainThreadCommands);
//...
}
registerCommand(global: boolean, id: string, callback: <T>(...args: any[]) => T | Thenable<T>, thisArg?: any, description?: ICommandHandlerDescription, extension?: IExtensionDescription): extHostTypes.Disposable {
this._logService.trace('ExtHostCommands#registerCommand', id);
if (!id.trim().length) {
throw new Error('invalid id');
}
if (this._commands.has(id)) {
throw new Error(`command '${id}' already exists`);
}
this._commands.set(id, { callback, thisArg, description, extension });
if (global) {
// 最终执行的是 #proxy 代理对象中的 $registerCommand 方法
this.#proxy.$registerCommand(id);
}
return new extHostTypes.Disposable(() => {
if (this._commands.delete(id)) {
if (global) {
this.#proxy.$unregisterCommand(id);
}
}
});
}
}
-
rpcProtocol.getProxy(identifier) 方法返回代理对象:
export class RPCProtocol extends Disposable implements IRPCProtocol {
private readonly _protocol: IMessagePassingProtocol;
constructor(protocol: IMessagePassingProtocol, logger: IRPCProtocolLogger | null = null, transformer: IURITransformer | null = null) {
super();
this._protocol = protocol;
//...
this._protocol.onMessage((msg) => this._receiveOneMessage(msg));
}
public getProxy<T>(identifier: ProxyIdentifier<T>): Proxied<T> {
const { nid: rpcId, sid } = identifier;
if (!this._proxies[rpcId]) {
this._proxies[rpcId] = this._createProxy(rpcId, sid);
}
return this._proxies[rpcId];
}
private _createProxy<T>(rpcId: number, debugName: string): T {
const handler = {
get: (target: any, name: PropertyKey) => {
if (typeof name === 'string' && !target[name] && name.charCodeAt(0) === CharCode.DollarSign) {
target[name] = (...myArgs: any[]) => {
return this._remoteCall(rpcId, name, myArgs);
};
}
if (name === _RPCProxySymbol) {
return debugName;
}
return target[name];
}
};
return new Proxy(Object.create(null), handler);
}
}
其中_createProxy方法利用了Proxy对象创建了一个代理对象,劫持其 get 方法,将以 '$' 开头的方法代理到了this._remoteCall方法中:
private _remoteCall(rpcId: number, methodName: string, args: any[]): Promise<any> {
if (this._isDisposed) {
return new CanceledLazyPromise();
}
let cancellationToken: CancellationToken | null = null;
if (args.length > 0 && CancellationToken.isCancellationToken(args[args.length - 1])) {
cancellationToken = args.pop();
}
if (cancellationToken && cancellationToken.isCancellationRequested) {
// No need to do anything...
return Promise.reject<any>(errors.canceled());
}
const serializedRequestArguments = MessageIO.serializeRequestArguments(args, this._uriReplacer);
const req = ++this._lastMessageId;
const callId = String(req);
const result = new LazyPromise();
if (cancellationToken) {
cancellationToken.onCancellationRequested(() => {
const msg = MessageIO.serializeCancel(req);
this._logger?.logOutgoing(msg.byteLength, req, RequestInitiator.LocalSide, `cancel`);
this._protocol.send(MessageIO.serializeCancel(req));
});
}
this._pendingRPCReplies[callId] = result;
this._onWillSendRequest(req);
const msg = MessageIO.serializeRequest(req, rpcId, methodName, serializedRequestArguments, !!cancellationToken);
this._logger?.logOutgoing(msg.byteLength, req, RequestInitiator.LocalSide, `request: ${getStringIdentifierForProxy(rpcId)}.${methodName}(`, args);
this._protocol.send(msg);
return result;
}
最终使用了this._protocol.send方法将方法调用封装成一条 msg 发送到浏览器主进程中,下面我们最后来看下this._protocol对象:
class ExtensionWorker {
// protocol
readonly protocol: IMessagePassingProtocol;
constructor() {
const channel = new MessageChannel();
const emitter = new Emitter<VSBuffer>();
let terminating = false;
// send over port2, keep port1
nativePostMessage(channel.port2, [channel.port2]);
channel.port1.onmessage = event => {
const { data } = event;
if (!(data instanceof ArrayBuffer)) {
console.warn('UNKNOWN data received', data);
return;
}
const msg = VSBuffer.wrap(new Uint8Array(data, 0, data.byteLength));
if (isMessageOfType(msg, MessageType.Terminate)) {
// handle terminate-message right here
terminating = true;
onTerminate('received terminate message from renderer');
return;
}
// emit non-terminate messages to the outside
emitter.fire(msg);
};
this.protocol = {
onMessage: emitter.event,
send: vsbuf => {
if (!terminating) {
const data = vsbuf.buffer.buffer.slice(vsbuf.buffer.byteOffset, vsbuf.buffer.byteOffset + vsbuf.buffer.byteLength);
channel.port1.postMessage(data, [data]);
}
}
};
}
}
上文中提到了protocol对象实现了IMessagePassingProtocol接口:
export interface IMessagePassingProtocol {
send(buffer: VSBuffer): void;
onMessage: Event<VSBuffer>;
/**
* Wait for the write buffer (if applicable) to become empty.
*/
drain?(): Promise<void>;
}
而protocol对象的 send 方法主要是利用了MessageChannelAPI 创建了一条与浏览器主进程之间的双向通信通道,将对于 VS Code API 方法的调用封装成一条 msg 消息发送至主进程,而在主进程中会使用rpcProtocol.set注册方法中指定的 identifier(同时也是rpcProtocol.getProxy(identifier)方法对应的 identifier)标识符创建一个对应的消费服务:
@extHostNamedCustomer(MainContext.MainThreadCommands)
export class MainThreadCommands implements MainThreadCommandsShape {
private readonly _proxy: ExtHostCommandsShape;
constructor(
extHostContext: IExtHostContext,
@ICommandService private readonly _commandService: ICommandService,
@IExtensionService private readonly _extensionService: IExtensionService,
) {
this._proxy = extHostContext.getProxy(ExtHostContext.ExtHostCommands);
//...
}
$registerCommand(id: string): void {
this._commandRegistrations.set(
id,
CommandsRegistry.registerCommand(id, (accessor, ...args) => {
return this._proxy.$executeContributedCommand(id, ...args).then(result => {
return revive(result);
}, onUnexpectedExternalError);
})
);
}
}
同样需要注意的是,这套机制同样可以进行反向调用,即主进程调用 Extension host Web Worker 中注册的服务。