VS Code 技术揭秘系列 — IPC 通信

简介: 前言我们知道 VS Code 基于 Electron,在 Electron 中,主进程和渲染进程之间是通过 IPC 进行进程之间的通信的,当然 VS Code 中还有其他一些进程(比如:Shared Process/Extension Host Process/File Watcher Process/Terminal Host Process/Terminal Process/Search Pr

前言

我们知道 VS Code 基于 Electron,在 Electron 中,主进程和渲染进程之间是通过 IPC 进行进程之间的通信的,当然 VS Code 中还有其他一些进程(比如:Shared Process/Extension Host Process/File Watcher Process/Terminal Host Process/Terminal Process/Search Process 等),这些进程之间也都是基于 IPC 方式进行通信的。

针对不同的进程类型,甚至在 Remote 部署形式下的 VS Code,进程之间是物理隔离的,主进程可能在另一台远程机器上运行,而渲染进程又是运行在用户本地浏览器上,VS Code 针对如此复杂的进程之间通信设计了一套 IPC 通信解决方案,从而实现了针对不同的运行环境(例如本地进程、remote 形式下的跨进程、web worker)都能使用同一套调用方式。本文将会探讨 VS Code 中 IPC 模块的设计和原理。

VS Code IPC 机制

在 VS Code 中,IPC 分为两种实现:基于 Channel 和基于 RPCProtocol:

一、基于 Channel 的实现

1.1 概念

概念名词

概念解释

作用

Protocol

通信协议

通信的基础协议规范

Channel

客户端频道

端与端之间进行信息传输的通道,类似于电台频道

ServerChannel

服务端频道

端与端之间进行信息传输的通道,类似于电台频道

ChannelClient

频道的客户端

客户端频道的管理

ChannelServer

频道的服务端

服务端频道的管理

Connection

连接

端与端之间的连接对应关系

IPCClient

IPC 客户端

负责连接的建立以及 Channel 的注册和获取

IPCServer

IPC 服务端

负责连接的建立以及 Channel 的注册和获取

上述概念概述比较简单 & 晦涩,下面做一下概念的通俗解释:

频道

狭义定义上,频道又叫信道,信道是信号在通信系统重传输的通道,是信号从发射端传输到接收端所经过的传输煤质。在电台领域,我们经常可以听到各种频道,比如:交通之声频道、音乐之声频道。

在 VS Code 中,频道是一组可供其他端进行调用的服务集合。一个标准的频道有两个功能:

  • 点播:call
  • 收听:listen

客户端 & 服务端

客户端 & 服务端是频道的承载主体。一般客户端是指发起连接的一端,服务端是被连接的一端。

在 VS Code 中,主进程是服务端,提供一系列服务的频道;渲染进程是客户端,调用服务端频道中的服务或者收听服务端消息。不管是服务端还是客户端,都需要具备发送和接受消息的能力,才能实现正常的通信。

连接

客户端 & 服务端之间进行通信依赖的连接。

在 VS Code 中一个连接其实是一对客户端与服务端的对应关系。

协议

两个端之间进行消息通信的约定,比如我们是通过语言还是手语比划进行通信,需要通过协议进行约定。

在 VS Code 中,约定了最基础的协议范围包括发送和接收消息两个方法:

  • 发送:send
  • 接收:onMessage

接口定义

Channel(客户端频道)

/**
 * An `IChannel` is an abstraction over a collection of commands.
 * You can `call` several commands on a channel, each taking at
 * most one single argument. A `call` always returns a promise
 * with at most one single return value.
 */
export interface IChannel {
	call<T>(command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T>;
	listen<T>(event: string, arg?: any): Event<T>;
}

ServerChannel(服务端频道)

/**
 * An `IServerChannel` is the counter part to `IChannel`,
 * on the server-side. You should implement this interface
 * if you'd like to handle remote promises or events.
 */
export interface IServerChannel<TContext = string> {
	call<T>(ctx: TContext, command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T>;
	listen<T>(ctx: TContext, event: string, arg?: any): Event<T>;
}

ChannelClient(客户端)

/**
 * An `IChannelClient` has access to a collection of channels. You
 * are able to get those channels, given their channel name.
 */
export interface IChannelClient {
	getChannel<T extends IChannel>(channelName: string): T;
}

ChannelServer(服务端)

/**
 * An `IChannelServer` hosts a collection of channels. You are
 * able to register channels onto it, provided a channel name.
 */
export interface IChannelServer<TContext = string> {
	registerChannel(channelName: string, channel: IServerChannel<TContext>): void;
}

IPCClient(客户端)

/**
 * An `IPCClient` is both a channel client and a channel server.
 *
 * As the owner of a protocol, you should extend both this
 * and the `IPCServer` classes to get IPC implementations
 * for your protocol.
 */
export class IPCClient<TContext = string> implements IChannelClient, IChannelServer<TContext>, IDisposable {
	private channelClient: ChannelClient;
	private channelServer: ChannelServer<TContext>;

	constructor(protocol: IMessagePassingProtocol, ctx: TContext, ipcLogger: IIPCLogger | null = null) {}

	getChannel<T extends IChannel>(channelName: string): T {}

	registerChannel(channelName: string, channel: IServerChannel<TContext>): void {}

	dispose(): void {}
}

IPCServer(服务端)

/**
 * An `IPCServer` is both a channel server and a routing channel
 * client.
 *
 * As the owner of a protocol, you should extend both this
 * and the `IPCClient` classes to get IPC implementations
 * for your protocol.
 */
export class IPCServer<TContext = string> implements IChannelServer<TContext>, IRoutingChannelClient<TContext>, IConnectionHub<TContext>, IDisposable {
	private channels = new Map<string, IServerChannel<TContext>>();
	private _connections = new Set<Connection<TContext>>();

	get connections(): Connection<TContext>[] {}

	constructor(onDidClientConnect: Event<ClientConnectionEvent>) {}

	/**
	 * Get a channel from a remote client. When passed a router,
	 * one can specify which client it wants to call and listen to/from.
	 * Otherwise, when calling without a router, a random client will
	 * be selected and when listening without a router, every client
	 * will be listened to.
	 */
	getChannel<T extends IChannel>(channelName: string, router: IClientRouter<TContext>): T;
	getChannel<T extends IChannel>(channelName: string, clientFilter: (client: Client<TContext>) => boolean): T;
	getChannel<T extends IChannel>(channelName: string, routerOrClientFilter: IClientRouter<TContext> | ((client: Client<TContext>) => boolean)): T {}

	registerChannel(channelName: string, channel: IServerChannel<TContext>): void {}

	dispose(): void {}
}

Connection(连接)

export interface Client<TContext> {
	readonly ctx: TContext;
}

interface Connection<TContext> extends Client<TContext> {
	readonly channelServer: ChannelServer<TContext>;
	readonly channelClient: ChannelClient;
}

所有上述接口、类定义对应文件为:src/vs/base/parts/ipc/common/ipc.ts

1.2 使用案例

上面讲解了 VS Code 中 IPC 模块整体的概念以及 IPC 模块的核心单元,下面我们先通过一些案例简单的了解下 IPC 模块的使用方式及其基本流程。

  1. 请求实现(基于 IMessagePassingProtocol 协议)
class QueueProtocol implements IMessagePassingProtocol {
	private buffering = true;
	private buffers: VSBuffer[] = [];

	private readonly _onMessage = new Emitter<VSBuffer>({
		onDidAddFirstListener: () => {
			for (const buffer of this.buffers) {
				this._onMessage.fire(buffer);
			}

			this.buffers = [];
			this.buffering = false;
		},
		onDidRemoveLastListener: () => {
			this.buffering = true;
		}
	});

	readonly onMessage = this._onMessage.event;
	other!: QueueProtocol;

	send(buffer: VSBuffer): void {
		this.other.receive(buffer);
	}

	protected receive(buffer: VSBuffer): void {
		if (this.buffering) {
			this.buffers.push(buffer);
		} else {
			this._onMessage.fire(buffer);
		}
	}
}
  1. 客户端(基于 IPCClient)
class TestIPCClient extends IPCClient<string> {
  private readonly _onDidDisconnect = new Emitter<void>();
	readonly onDidDisconnect = this._onDidDisconnect.event;

	constructor(protocol: IMessagePassingProtocol, id: string) {
		super(protocol, id);
	}

	override dispose(): void {
		this._onDidDisconnect.fire();
		super.dispose();
	}
}
  1. 服务端(基于 IPCServer)
class TestIPCServer extends IPCServer<string> {
	private readonly onDidClientConnect: Emitter<ClientConnectionEvent>;

	constructor() {
		const onDidClientConnect = new Emitter<ClientConnectionEvent>();
		super(onDidClientConnect.event);
		this.onDidClientConnect = onDidClientConnect;
	}

  // 创建一个客户端 & 服务端的连接
	createConnection(id: string): IPCClient<string> {
		const [pc, ps] = createProtocolPair();
    const pc = new QueueProtocol();
    const ps = new QueueProtocol();
    pc.other = ps;
    ps.other = pc;
		const client = new TestIPCClient(pc, id);

		this.onDidClientConnect.fire({
			protocol: ps,
			onDidClientDisconnect: client.onDidDisconnect
		});

		return client;
	}
}
  1. 服务端频道及其对应的服务
// 服务接口
interface ITestService {
	marco(): Promise<string>;
	onPong: Event<string>;
}

// 服务
class TestService implements ITestService {
	private readonly _onPong = new Emitter<string>();
	readonly onPong = this._onPong.event;

  marco(): Promise<string> {
		return Promise.resolve('polo');
	}

	ping(msg: string): void {
		this._onPong.fire(msg);
	}
}

// 服务频道
class TestChannel implements IServerChannel {
	constructor(private service: ITestService) { }

	call(_: unknown, command: string, arg: any, cancellationToken: CancellationToken): Promise<any> {
		switch (command) {
			case 'marco': return this.service.marco();
			default: return Promise.reject(new Error('not implemented'));
		}
	}

	listen(_: unknown, event: string, arg?: any): Event<any> {
		switch (event) {
			case 'onPong': return this.service.onPong;
			default: throw new Error('not implemented');
		}
	}
}
  1. 客户端频道服务
class TestChannelClient implements ITestService {
	get onPong(): Event<string> {
		return this.channel.listen('onPong');
	}

	constructor(private channel: IChannel) { }

	marco(): Promise<string> {
		return this.channel.call('marco');
	}
}

1.2.1 一对一 IPC

创建好上述各个对象:

// 创建服务
const service = new TestService();
// 创建服务端
const server = new TestIPCServer();
// 服务端:注册一个服务频道
server.registerChannel('testChannel', new TestChannel(service));
// 创建一个到客户端的连接
const client = server.createConnection('client1');
// 创建一个客户端频道服务
const ipcService = new TestChannelClient(client.getChannel('testChannel'));

客户端调用服务端:

const r = await ipcService.marco(); // 'polo'

客户端监听服务端:

const messages: string[] = [];

// 客户端监听
ipcService.onPong(msg => messages.push(msg));

// 服务端主动发送消息
service.ping('hello');
service.ping('world');
// messages: ['hello', 'world']

1.2.2 一对多 IPC

往往服务端需要对应多个客户端的连接

创建好上述各个对象:

// 创建服务
const service = new TestService();
// 创建服务端
const server = new TestIPCServer();
// 创建服务端频道
const channel = new TestChannel(service);
// 服务端注册服务
server.registerChannel('channel', channel);

// 创建客户端-1
const client1 = server.createConnection('client1');
// 创建客户端-1对应的频道服务
const ipcService1 = new TestChannelClient(client1.getChannel('channel'));
// 创建客户端-2
const client2 = server.createConnection('client2');
// 创建客户端-2对应的频道服务
const ipcService2 = new TestChannelClient(client2.getChannel('channel'));

客户端监听服务端:

ipcService1.onPong(() => console.log('Receive ping message on service1'));
ipcService2.onPong(() => console.log('Receive ping message on service2'));

服务端通知:

service.ping('hello world');

上述两个客户端都会收到一条服务端发送的 'hello world' 消息

1.3 实现原理

类图结构

重点讲解下 IPCServer / IPCClient,ChannelServer / ChannelClient

1.4 IPC 协议的多态实现

在上述概念中的“协议”,是一个端与端之间的 IPC 通信协议,任何只需要满足协议接口的实现都可以用于 IPC 通信:

interface IMessagePassingProtocol {
  send(buffer: VSBuffer): void;
  onMessage: Event<VSBuffer>;
  drain?(): Promise<void>;
}

在 VS Code 存在多种端与端之间的 IPC 通信:

  • Electron 主进程 & 渲染进程:通过 ipcMain 和 ipcRenderer 实现
  • 浏览器中运行的 VS Code 主进程与远程 Node.js 进程:通过 WebSocket 实现
  • 浏览器中运行的 VS Code 主进程与 Web Worker 中的插件进程:通过 postMessage

二、基于 RPCProtocol 的实现

基于 RPCProtocol 的实现,主要应用于 Electron 渲染进程和 extension host 进程之间通信(如果是 Web 部署形态,则是浏览器主进程和 extension host Web Worker 之间的通信),主要场景在于 Extension 中对于 VS Code API 的调用需要转发到Electron 渲染进程(或者是浏览器主进程)中执行。

在 VS Code 的不同部署形态中,Extension 是运行在不同环境中的:

  • 桌面端:Extension 运行在 Nodejs 进程中
  • Web 端:Extension 运行在浏览器 Web Worker 中

所以在这两种不同的部署形态之下,VS Code 需要实现不同的通信方案,下面我们先通过 Web 端的部署形态入手,因为这两种形态之下的通信机制是相同的,只是在具体的信息传递实现方案上会有所不同。

在 VS Çode 中,所有提供给 Extension 使用的 api 由位于 src/vs/workbench/api/common/extHost.api.impl.ts文件中的 createApiFactoryAndRegisterActors函数创建,下面以 commands.registerCommand API 为例:

export function createApiFactoryAndRegisterActors(accessor: ServicesAccessor): IExtensionApiFactory {
  const rpcProtocol = accessor.get(IExtHostRpcService);
  const extHostCommands = rpcProtocol.set(ExtHostContext.ExtHostCommands, accessor.get(IExtHostCommands));

  const commands: typeof vscode.commands = {
    registerCommand(id: string, command: <T>(...args: any[]) => T | Thenable<T>, thisArgs?: any): vscode.Disposable {
      return extHostCommands.registerCommand(true, id, command, thisArgs, undefined, extension);
    },
  };

  return {
    // ...
    commands,
  };
}

可以看到执行 commands.registerCommand的时候其实是调用了 rpcProtocol.set(ExtHostContext.ExtHostCommands, accessor.get(IExtHostCommands))返回对象的registerCommand方法,下面重点看下 rpcProtocol这个服务,其实现了 IRPCProtocol这个接口:

export interface IRPCProtocol {
	/**
	 * Returns a proxy to an object addressable/named in the extension host process or in the renderer process.
	 */
	getProxy<T>(identifier: ProxyIdentifier<T>): Proxied<T>;

	/**
	 * Register manually created instance.
	 */
	set<T, R extends T>(identifier: ProxyIdentifier<T>, instance: R): R;

	/**
	 * Assert these identifiers are already registered via `.set`.
	 */
	assertRegistered(identifiers: ProxyIdentifier<any>[]): void;

	/**
	 * Wait for the write buffer (if applicable) to become empty.
	 */
	drain(): Promise<void>;

	dispose(): void;
}

重点关注其中的两个方法及其实现:

  • set:注册一个由 identifier 标识符指定的服务实例
  • getProxy:返回一个代理对象,该代理对象指向到由 identifier 标识符创建的服务实例

还是以上述注册 commands 为例讲解其中过程:

  1. 通过  rpcPtotocol.set 注册一个代理对象:
const extHostCommands = rpcProtocol.set(ExtHostContext.ExtHostCommands, accessor.get(IExtHostCommands));

注册的标识符(identifier)由 ExtHostContext.ExtHostCommands指定,accessor.get(IExtHostCommands)返回的即是对应的服务实例

  1. 服务实例(实现了 IExtHostCommands 接口)实现:
export class ExtHostCommands implements ExtHostCommandsShape {
  #proxy: MainThreadCommandsShape;

  constructor(
		@IExtHostRpcService extHostRpc: IExtHostRpcService,
		@ILogService logService: ILogService,
		@IExtHostTelemetry extHostTelemetry: IExtHostTelemetry
	) {
		this.#proxy = extHostRpc.getProxy(MainContext.MainThreadCommands);
		//...
	}

  registerCommand(global: boolean, id: string, callback: <T>(...args: any[]) => T | Thenable<T>, thisArg?: any, description?: ICommandHandlerDescription, extension?: IExtensionDescription): extHostTypes.Disposable {
		this._logService.trace('ExtHostCommands#registerCommand', id);

		if (!id.trim().length) {
			throw new Error('invalid id');
		}

		if (this._commands.has(id)) {
			throw new Error(`command '${id}' already exists`);
		}

		this._commands.set(id, { callback, thisArg, description, extension });
		if (global) {
      // 最终执行的是 #proxy 代理对象中的 $registerCommand 方法
			this.#proxy.$registerCommand(id);
		}

		return new extHostTypes.Disposable(() => {
			if (this._commands.delete(id)) {
				if (global) {
					this.#proxy.$unregisterCommand(id);
				}
			}
		});
	}
}
  1. rpcProtocol.getProxy(identifier) 方法返回代理对象:
export class RPCProtocol extends Disposable implements IRPCProtocol {
  private readonly _protocol: IMessagePassingProtocol;
  
  constructor(protocol: IMessagePassingProtocol, logger: IRPCProtocolLogger | null = null, transformer: IURITransformer | null = null) {
    super();
		this._protocol = protocol;
    //...
    this._protocol.onMessage((msg) => this._receiveOneMessage(msg));
  }

  public getProxy<T>(identifier: ProxyIdentifier<T>): Proxied<T> {
		const { nid: rpcId, sid } = identifier;
		if (!this._proxies[rpcId]) {
			this._proxies[rpcId] = this._createProxy(rpcId, sid);
		}
		return this._proxies[rpcId];
	}

	private _createProxy<T>(rpcId: number, debugName: string): T {
		const handler = {
			get: (target: any, name: PropertyKey) => {
				if (typeof name === 'string' && !target[name] && name.charCodeAt(0) === CharCode.DollarSign) {
					target[name] = (...myArgs: any[]) => {
						return this._remoteCall(rpcId, name, myArgs);
					};
				}
				if (name === _RPCProxySymbol) {
					return debugName;
				}
				return target[name];
			}
		};
		return new Proxy(Object.create(null), handler);
	}
}

其中_createProxy方法利用了Proxy对象创建了一个代理对象,劫持其 get 方法,将以 '$' 开头的方法代理到了this._remoteCall方法中:

private _remoteCall(rpcId: number, methodName: string, args: any[]): Promise<any> {
  if (this._isDisposed) {
    return new CanceledLazyPromise();
  }
  let cancellationToken: CancellationToken | null = null;
  if (args.length > 0 && CancellationToken.isCancellationToken(args[args.length - 1])) {
    cancellationToken = args.pop();
  }

  if (cancellationToken && cancellationToken.isCancellationRequested) {
    // No need to do anything...
    return Promise.reject<any>(errors.canceled());
  }

  const serializedRequestArguments = MessageIO.serializeRequestArguments(args, this._uriReplacer);

  const req = ++this._lastMessageId;
  const callId = String(req);
  const result = new LazyPromise();

  if (cancellationToken) {
    cancellationToken.onCancellationRequested(() => {
      const msg = MessageIO.serializeCancel(req);
      this._logger?.logOutgoing(msg.byteLength, req, RequestInitiator.LocalSide, `cancel`);
      this._protocol.send(MessageIO.serializeCancel(req));
    });
  }

  this._pendingRPCReplies[callId] = result;
  this._onWillSendRequest(req);
  const msg = MessageIO.serializeRequest(req, rpcId, methodName, serializedRequestArguments, !!cancellationToken);
  this._logger?.logOutgoing(msg.byteLength, req, RequestInitiator.LocalSide, `request: ${getStringIdentifierForProxy(rpcId)}.${methodName}(`, args);
  this._protocol.send(msg);
  return result;
}

最终使用了this._protocol.send方法将方法调用封装成一条 msg 发送到浏览器主进程中,下面我们最后来看下this._protocol对象:

class ExtensionWorker {
	// protocol
	readonly protocol: IMessagePassingProtocol;

	constructor() {
		const channel = new MessageChannel();
		const emitter = new Emitter<VSBuffer>();
		let terminating = false;

		// send over port2, keep port1
		nativePostMessage(channel.port2, [channel.port2]);

		channel.port1.onmessage = event => {
			const { data } = event;
			if (!(data instanceof ArrayBuffer)) {
				console.warn('UNKNOWN data received', data);
				return;
			}

			const msg = VSBuffer.wrap(new Uint8Array(data, 0, data.byteLength));
			if (isMessageOfType(msg, MessageType.Terminate)) {
				// handle terminate-message right here
				terminating = true;
				onTerminate('received terminate message from renderer');
				return;
			}

			// emit non-terminate messages to the outside
			emitter.fire(msg);
		};

		this.protocol = {
			onMessage: emitter.event,
			send: vsbuf => {
				if (!terminating) {
					const data = vsbuf.buffer.buffer.slice(vsbuf.buffer.byteOffset, vsbuf.buffer.byteOffset + vsbuf.buffer.byteLength);
					channel.port1.postMessage(data, [data]);
				}
			}
		};
	}
}

上文中提到了protocol对象实现了IMessagePassingProtocol接口:

export interface IMessagePassingProtocol {
	send(buffer: VSBuffer): void;
	onMessage: Event<VSBuffer>;
	/**
	 * Wait for the write buffer (if applicable) to become empty.
	 */
	drain?(): Promise<void>;
}

protocol对象的 send 方法主要是利用了MessageChannelAPI 创建了一条与浏览器主进程之间的双向通信通道,将对于 VS Code API 方法的调用封装成一条 msg 消息发送至主进程,而在主进程中会使用rpcProtocol.set注册方法中指定的 identifier(同时也是rpcProtocol.getProxy(identifier)方法对应的 identifier)标识符创建一个对应的消费服务:

@extHostNamedCustomer(MainContext.MainThreadCommands)
export class MainThreadCommands implements MainThreadCommandsShape {
  private readonly _proxy: ExtHostCommandsShape;

	constructor(
		extHostContext: IExtHostContext,
		@ICommandService private readonly _commandService: ICommandService,
		@IExtensionService private readonly _extensionService: IExtensionService,
	) {
		this._proxy = extHostContext.getProxy(ExtHostContext.ExtHostCommands);
    //...
	}

  $registerCommand(id: string): void {
		this._commandRegistrations.set(
			id,
			CommandsRegistry.registerCommand(id, (accessor, ...args) => {
				return this._proxy.$executeContributedCommand(id, ...args).then(result => {
					return revive(result);
				}, onUnexpectedExternalError);
			})
		);
	}
}

同样需要注意的是,这套机制同样可以进行反向调用,即主进程调用 Extension host Web Worker 中注册的服务。 

目录
相关文章
|
6月前
|
传感器 安全 物联网
深入理解 Franca IDL 在 IPC 通信中的应用
深入理解 Franca IDL 在 IPC 通信中的应用
201 1
|
6月前
【进程通信】用命名管道模拟server和client之间的通信
【进程通信】用命名管道模拟server和client之间的通信
|
6月前
[UDS] --- UDS概述
[UDS] --- UDS概述
695 0
|
SQL Java 关系型数据库
数据交换(client)
数据交换(client)
202 0
[ROS通信机制] ---话题通信之自定义msg类型
[ROS通信机制] ---话题通信之自定义msg类型
148 0
|
网络协议
[ROS通信机制] --- 服务通信
[ROS通信机制] --- 服务通信
180 0
|
测试技术 网络架构
CAN与CAN FD通信之间存在的问题
因为受制于产品的稳定性考验,改造成本等问题,没法快速全面普及CAN FD。另外,在2012年底提出CAN FD到2015年中成为ISO CAN FD。 也就是说目前市场上大部分都还是在用传统的CAN2.0,有一小部分用非ISO标准的CAN FD,一部分用ISO标准的CAN FD。