标题1标题2
1. 概述
1.1 stream_channel是什么
stream_channel包提供了StreamChannel
接口,它代表了一个双向通信通道。每个StreamChannel
都提供了一个用于接收数据的Stream
和一个用于发送数据的StreamSink
。stream_channel包还包含用于处理StreamChannel
和双向通信的实用工具。
StreamChannel
有助于将通信逻辑与底层协议分离。例如,test
包在浏览器套件的 WebSocket 连接和 VM 测试的隔离连接中都重用了其测试套件通信协议。
stream_channel
库的主要有:
- 实时通信:在Flutter应用程序的不同部分之间实时传输数据,例如将数据从一个屏幕传递到另一个屏幕或从一个Widget传递到另一个Widget。
- 异步事件处理:处理异步事件流,例如从网络请求、传感器数据、或者定时器获得的事件流中提取和处理数据。
- 多通道管理:通过MultiChannel类,可以在单个底层传输层上复用多个虚拟通道,使不同类型的数据可以通过不同的通道进行传输。
- 与Isolate通信:通过IsolateChannel,在Flutter应用程序的不同Isolate之间进行通信,以实现并发处理和数据传递。
1.2 简单回顾: Stream
Stream<T> 表示一个异步的数据流,它可以产生一系列的数据事件(通常是某种类型的对象)供订阅者处理。流通常用于处理连续的事件或数据,例如读取文件、接收网络请求或监视用户输入。
相比于Future
功能上看
- Future 是一次性的,它代表了一个异步操作的结果,一旦操作完成,就不能再次使用。用起来就像这样:
Future<int> fetchValue() async { // 异步操作 return 666; } fetchValue().then((value) { print(value); // 处理结果 }).catchError((error) { print(error); // 处理错误 });
- Stream 是持续的,它可以生成多个事件,而不会销毁。可以订阅一个 Stream 并监听其事件,每次有新事件生成时,会触发订阅者的回调函数。用起来就像这样:
// 创建一个异步生成器函数,它返回一个 Stream<int>,用于生成一个整数计数器的事件流。 Stream<int> createCounterStream() async* { // 使用 for 循环生成 0 到 4 的整数。 for (int i = 0; i < 5; i++) { // 在生成每个整数之前,等待 1 秒钟的延迟。 await Future.delayed(Duration(seconds: 1)); // 使用 yield 关键字将整数添加到事件流中。 yield i; } } // 调用 createCounterStream 函数以获取事件流。 final stream = createCounterStream(); // 使用 stream.listen() 订阅事件流,每当有新事件生成时,将调用回调函数。 stream.listen((value) { // 处理每个事件,这里将事件的值打印到控制台。 print(value); // 处理每个事件 });
关于StreamSink
StreamSink 多是从一个 StreamController 实例的sink
属性上获取使用的,就如controller.sink
,而不会单独使用。
StreamSink 用于将数据写入一个异步数据流 (Stream)。它是 Dart 异步编程中的一个重要组件,通常用于数据的输出、推送或写入操作。例如:
import 'dart:async'; void main() { var controller = StreamController<int>(); // 获取 StreamSink var sink = controller.sink; // 向流中添加数据 sink.add(1); sink.add(2); sink.add(3); // 关闭流 sink.close(); // 监听流的数据 controller.stream.listen((data) { print(data); // 打印 1、2、3 }); }
场景上看
- Future 适合表示一次性操作,例如发起一个网络请求、读取一个文件、执行一个计算密集型任务等。它通常用于等待某个操作完成并获取其结果;
- Stream 适合表示连续的事件流,例如实时数据更新、用户输入事件、从多个来源获取数据等。它通常用于监视一系列事件并对它们进行处理。
1.3 加入依赖
flutter pub add stream_channel
2. 双向通信通道 StreamChannel 类
StreamChannel
类是一个抽象类,表示一个双向通信通道。它定义了用于与通道进行交互的方法和属性,包括用于从通道读取数据的stream
和用于向通道写入数据的sink
。StreamChannel
用于在Dart中实现双向通信,例如在网络通信中使用。
StreamChannel类的属性和方法
stream属性
- 语法格式:
Stream<T> get stream;
- 功能:获取用于从通道接收数据的单订阅流。
- 返回值:
Stream<T>
- 用于从通道接收数据的单订阅流。 - 用法示例:
StreamChannel<String> channel = ...; // 创建一个StreamChannel Stream<String> dataStream = channel.stream; // 获取用于接收数据的流
sink属性
- 语法格式:
StreamSink<T> get sink;
- 功能:获取用于向通道发送数据的流式数据接收器。
- 返回值:
StreamSink<T>
- 用于向通道发送数据的流式数据接收器。 - 用法示例:
StreamChannel<String> channel = ...; // 创建一个StreamChannel StreamSink<String> dataSink = channel.sink; // 获取用于发送数据的流式数据接收器
StreamChannel构造函数
- 语法格式:
factory StreamChannel(Stream<T> stream, StreamSink<T> sink) => _StreamChannel<T>(stream, sink);
- 功能:创建一个新的
StreamChannel
,用于双向通信,基于提供的stream
和sink
。 - 参数:
stream
:用于从通道接收数据的单订阅流。sink
:用于向通道发送数据的流式数据接收器。
- 返回值:
StreamChannel<T>
- 新创建的StreamChannel
实例。 - 用法示例:
Stream<String> inputStream = ...; // 创建一个输入流 StreamSink<String> outputStream = ...; // 创建一个输出流 StreamChannel<String> channel = StreamChannel(inputStream, outputStream); // 创建StreamChannel
StreamChannel.withGuarantees构造函数
- 语法格式:
factory StreamChannel.withGuarantees(Stream<T> stream, StreamSink<T> sink, {bool allowSinkErrors = true}) => GuaranteeChannel(stream, sink, allowSinkErrors: allowSinkErrors);
- 功能:创建一个新的
StreamChannel
,用于双向通信,同时强制实施StreamChannel
文档中列出的保证。 - 参数:
stream
:用于从通道接收数据的单订阅流。sink
:用于向通道发送数据的流式数据接收器。allowSinkErrors
:一个布尔值,指定是否允许将错误传递给sink
。默认为true
。
- 返回值:
StreamChannel<T>
- 新创建的StreamChannel
实例,具有保证。 - 用法示例:
Stream<String> inputStream = ...; // 创建一个输入流 StreamSink<String> outputStream = ...; // 创建一个输出流 StreamChannel<String> channel = StreamChannel.withGuarantees(inputStream, outputStream, allowSinkErrors: false); // 创建具有保证的StreamChannel
StreamChannel.withCloseGuarantee构造函数
- 语法格式:
factory StreamChannel.withCloseGuarantee(Stream<T> stream, StreamSink<T> sink) => CloseGuaranteeChannel(stream, sink);
- 功能:创建一个新的
StreamChannel
,用于双向通信,特别强制实施通道关闭的保证。 - 参数:
stream
:用于从通道接收数据的单订阅流。sink
:用于向通道发送数据的流式数据接收器。
- 返回值:
StreamChannel<T>
- 新创建的StreamChannel
实例,具有通道关闭的保证。 - 用法示例:
Stream<String> inputStream = ...; // 创建一个输入流 StreamSink<String> outputStream = ...; // 创建一个输出流 StreamChannel<String> channel = StreamChannel.withCloseGuarantee(inputStream, outputStream); // 创建具有通道关闭保证的StreamChannel
pipe方法
- 语法格式:
void pipe(StreamChannel<T> other);
- 功能:连接当前
StreamChannel
到另一个StreamChannel
,使两者之间的数据传输直接相互转发。 - 参数:
other
:另一个StreamChannel
,用于连接到当前通道。
- 返回值:无。
- 用法示例:
StreamChannel<String> channel1 = ...; // 创建第一个StreamChannel StreamChannel<String> channel2 = ...; // 创建第二个StreamChannel channel1.pipe(channel2); // 将第一个通道的数据传输到第二个通道,反之亦然
transform方法
- 语法格式:
StreamChannel<S> transform<S>(StreamChannelTransformer<S, T> transformer);
- 功能:使用指定的
StreamChannelTransformer
对当前StreamChannel
进行转换。 - 参数:
transformer
:要应用于当前通道的转换器。
- 返回值:
StreamChannel<S>
- 转换后的StreamChannel
实例。 - 用法示例:
StreamChannel<String> channel1 = ...; // 创建一个StreamChannel StreamChannel<int> channel2 = channel1.transform(IntToStringTransformer()); // 使用转换器将String类型的通道转换为Int类型的通道
transformStream方法
- 语法格式:
StreamChannel<T> transformStream(StreamTransformer<T, T> transformer);
- 功能:仅对当前通道的
stream
部分应用指定的StreamTransformer
。 - 参数:
transformer
:要应用于stream
的转换器。
- 返回值:
StreamChannel<T>
- 转换后的StreamChannel
实例。 - 用法示例:
StreamChannel<String> channel1 = ...; // 创建一个StreamChannel StreamChannel<String> channel2 = channel1.transformStream(StringTransformer()); // 使用转换器仅对stream部分进行转换
transformSink方法
- 语法格式:
StreamChannel<T> transformSink(StreamSinkTransformer<T, T> transformer);
- 功能:仅对当前通道的
sink
部分应用指定的StreamSinkTransformer
。 - 参数:
transformer
:要应用于sink
的转换器。
- 返回值:
StreamChannel<T>
- 转换后的StreamChannel
实例。 - 用法示例:
StreamChannel<String> channel1 = ...; // 创建一个StreamChannel StreamChannel<String> channel2 = channel1.transformSink(StringSinkTransformer()); // 使用转换器仅对sink部分进行转换
changeStream方法
- 语法格式:
StreamChannel<T> changeStream(Stream<T> Function(Stream<T>) change);
- 功能:返回当前通道的副本,其中
stream
部分被替换为由change
函数返回的值。 - 参数:
change
:一个函数,用于更改stream
部分。
- 返回值:
StreamChannel<T>
- 具有更改后的stream
部分的StreamChannel
实例。 - 用法示例:
StreamChannel<String> channel1 = ...; // 创建一个StreamChannel StreamChannel<String> channel2 = channel1.changeStream((stream) => stream.where((data) => data.isNotEmpty)); // 过滤掉stream中的空数据
changeSink方法
- 语法格式:
StreamChannel<T> changeSink(StreamSink<T> Function(StreamSink<T>) change);
- 功能:返回当前通道的副本,其中
sink
部分被替换为由change
函数返回的值。 - 参数:
change
:一个函数,用于更改sink
部分。
- 返回值:
StreamChannel<T>
- 具有更改后的sink
部分的StreamChannel
实例。 - 用法示例:
StreamChannel<String> channel1 = ...; // 创建一个StreamChannel StreamChannel<String> channel2 = channel1.changeSink((sink) => MyCustomSink(sink)); // 使用自定义的sink替换原有的sink
cast方法
- 语法格式:
StreamChannel<S> cast<S>();
- 功能:返回当前通道的副本,将通道的泛型类型强制转换为
S
。 - 返回值:
StreamChannel<S>
- 具有泛型类型为S
的StreamChannel
实例。 - 用法示例:
StreamChannel<dynamic> channel1 = ...; // 创建一个通用类型的StreamChannel StreamChannel<int> channel2 = channel1.cast<int>(); // 强制将通道类型转换为int类型
这些方法和属性组合使得StreamChannel
类能够实现双向通信,并提供了多种方法来操作和转换通道的数据流。这些方法使得StreamChannel
在Dart中成为一个有用的工具,用于处理双向数据流通信。
3. IsolateChannel类
IsolateChannel
类是一个实现了StreamChannel
接口的通道,用于在不同的隔离体(isolate)之间进行通信,通常用于与另一个隔离体进行数据交换。它基于两个Isolate之间的ReceivePort
和SendPort
实现,允许双向通信。
3.1 IsolateChannel类的构造函数
IsolateChannel.connectReceive构造函数
- 语法格式:
factory IsolateChannel.connectReceive(ReceivePort receivePort);
- 功能:连接到使用
IsolateChannel.connectSend
创建的远程通道。 - 参数:
receivePort
:用于接收消息的ReceivePort
。
- 返回值:
IsolateChannel<T>
- 已连接到远程通道的IsolateChannel
实例。 - 用法示例:
ReceivePort remoteReceivePort = ...; // 创建远程ReceivePort IsolateChannel<String> channel = IsolateChannel.connectReceive(remoteReceivePort);
IsolateChannel.connectSend构造函数
- 语法格式:
factory IsolateChannel.connectSend(SendPort sendPort);
- 功能:连接到使用
IsolateChannel.connectReceive
创建的远程通道。 - 参数:
sendPort
:用于发送消息的SendPort
。
- 返回值:
IsolateChannel<T>
- 已连接到远程通道的IsolateChannel
实例。 - 用法示例:
SendPort remoteSendPort = ...; // 创建远程SendPort IsolateChannel<String> channel = IsolateChannel.connectSend(remoteSendPort);
IsolateChannel构造函数
- 语法格式:
factory IsolateChannel(ReceivePort receivePort, SendPort sendPort);
- 功能:创建一个
IsolateChannel
,用于在两个隔离体之间传递消息。 - 参数:
receivePort
:用于接收消息的本地ReceivePort
。sendPort
:用于发送消息的远程SendPort
。
- 返回值:
IsolateChannel<T>
- 已创建的IsolateChannel
实例。 - 用法示例:
ReceivePort localReceivePort = ReceivePort(); // 创建本地ReceivePort SendPort remoteSendPort = ...; // 获取远程SendPort IsolateChannel<String> channel = IsolateChannel(localReceivePort, remoteSendPort);
3.2 IsolateChannel类的属性
stream属性
- 语法格式:
Stream<T> get stream;
- 功能:获取从远程通道接收到的消息的输入流。
- 返回值:
Stream<T>
- 输入流,用于接收从远程通道发送的消息。 - 用法示例:
IsolateChannel<String> channel = ...; // 创建一个IsolateChannel Stream<String> inputStream = channel.stream; // 获取输入流
sink属性
- 语法格式:
StreamSink<T> get sink;
- 功能:获取发送消息到远程通道的输出流。
- 返回值:
StreamSink<T>
- 输出流,用于发送消息到远程通道。 - 用法示例:
IsolateChannel<String> channel = ...; // 创建一个IsolateChannel StreamSink<String> outputStream = channel.sink; // 获取输出流
IsolateChannel
类允许在不同的隔离体之间进行通信,通过stream
属性接收远程通道发送的消息,通过sink
属性发送消息到远程通道。它提供了两种构造函数用于建立连接,分别是IsolateChannel.connectReceive
和IsolateChannel.connectSend
,并且可以通过IsolateChannel
构造函数创建一个新的IsolateChannel
来实现双向通信。
4.MultiChannel类
MultiChannel
类是一个抽象类,用于多路复用多个虚拟通道(Virtual Channel)在单一的底层传输层之上。它允许在一个通道上创建多个虚拟通道,每个虚拟通道都可以独立传输数据。虚拟通道可用于在两个端点之间进行双向通信,通过底层通道进行消息传递。
4.1 MultiChannel类的构造函数
MultiChannel构造函数
- 语法格式:
factory MultiChannel(StreamChannel<dynamic> inner) => _MultiChannel<T>(inner);
- 功能:创建一个新的
MultiChannel
,用于在内部传输层上发送和接收消息。 - 参数:
inner
:用于在内部传输层上发送和接收消息的底层通道,必须接受类似JSON的对象。
- 返回值:
MultiChannel<T>
- 新创建的MultiChannel
实例。 - 用法示例:
StreamChannel<dynamic> innerChannel = ...; // 创建一个内部通道 MultiChannel<String> channel = MultiChannel(innerChannel); // 创建MultiChannel
4.2 MultiChannel类的属性
stream属性
- 语法格式:
Stream<T> get stream;
- 功能:获取默认的输入流,连接到远程通道的输出。
- 返回值:
Stream<T>
- 默认的输入流。 - 用法示例:
MultiChannel<String> channel = ...; // 创建一个MultiChannel Stream<String> inputStream = channel.stream; // 获取默认的输入流
sink属性
- 语法格式:
StreamSink<T> get sink;
- 功能:获取默认的输出流,连接到远程通道的输入。如果关闭此输出流,则远程输入流将关闭,但其他虚拟通道将保持打开,并且可以创建新的虚拟通道。
- 返回值:
StreamSink<T>
- 默认的输出流。 - 用法示例:
MultiChannel<String> channel = ...; // 创建一个MultiChannel StreamSink<String> outputStream = channel.sink; // 获取默认的输出流
4.3 MultiChannel类的方法
virtualChannel方法
- 语法格式:
VirtualChannel<T> virtualChannel([int? id]);
- 功能:创建一个新的虚拟通道(Virtual Channel)。
- 参数:
id
(可选):虚拟通道的标识符。如果未提供,将创建一个新的虚拟通道。如果提供,将创建与远程通道上具有相同标识符的虚拟通道。
- 返回值:
VirtualChannel<T>
- 新创建的虚拟通道(Virtual Channel)。 - 用法示例:
MultiChannel<String> multiChannel = ...; // 创建一个MultiChannel VirtualChannel<String> virtual = multiChannel.virtualChannel(); // 创建新的虚拟通道
这些方法和属性使得MultiChannel
类能够实现多路复用多个虚拟通道在单一底层传输层上进行通信,从而实现双向通信,并允许在一个通道上创建多个虚拟通道以独立传输数据。这种模式对于网络通信和消息传递非常有用。
5. Disconnector类
Disconnector
类是一个StreamChannelTransformer
,用于允许调用者强制断开通道连接。通过这个转换器,可以实现对通道的断开操作,导致通道的stream
会发出done事件,而sink
会忽略后续的输入。同时,内部的sink
也会被关闭,以通知远程端断开连接。
5.1 Disconnector类的属性
isDisconnected属性
- 语法格式:
bool get isDisconnected;
- 功能:判断是否已经调用了
disconnect
方法来断开通道连接。 - 返回值:
bool
- 如果已经断开连接,则为true
;否则为false
。 - 用法示例:
Disconnector<String> disconnector = ...; // 创建一个Disconnector bool disconnected = disconnector.isDisconnected; // 判断是否已断开连接
5.2 Disconnector类的方法
disconnect方法
- 语法格式:
Future<void> disconnect();
- 功能:断开所有已经被转换的通道连接。
- 返回值:
Future<void>
- 表示断开连接的未来对象,当所有内部sink
的StreamSink.close
完成后,该未来对象将完成。 - 用法示例:
Disconnector<String> disconnector = ...; // 创建一个Disconnector Future<void> disconnectFuture = disconnector.disconnect(); // 断开连接并获取未来对象
bind方法
- 语法格式:
StreamChannel<T> bind(StreamChannel<T> channel);
- 功能:将
Disconnector
应用于给定的通道,返回一个已经应用了断开连接逻辑的通道。 - 参数:
channel
:要应用Disconnector
的通道。
- 返回值:
StreamChannel<T>
- 已应用了断开连接逻辑的通道。 - 用法示例:
Disconnector<String> disconnector = ...; // 创建一个Disconnector StreamChannel<String> channel = ...; // 创建一个通道 StreamChannel<String> transformedChannel = disconnector.bind(channel); // 应用Disconnector到通道
5.3 _DisconnectorSink类
_DisconnectorSink
类是Disconnector
内部使用的辅助类,它是StreamSink
的包装器,用于实现强制断开连接的功能。
5.3.1 _DisconnectorSink类的属性
done属性
- 语法格式:
Future<void> get done;
- 功能:获取底层
sink
的done属性。 - 返回值:
Future<void>
- 表示底层sink
的done属性。 - 用法示例:
_DisconnectorSink<String> disconnectorSink = ...; // 创建一个_DisconnectorSink Future<void> doneFuture = disconnectorSink.done; // 获取done属性的未来对象
5.3.2 _DisconnectorSink类的方法
add方法
- 语法格式:
void add(T data);
- 功能:向底层
sink
中添加数据,如果已经断开连接则不会添加。 - 参数:
data
:要添加的数据。
- 用法示例:
_DisconnectorSink<String> disconnectorSink = ...; // 创建一个_DisconnectorSink disconnectorSink.add("Hello"); // 向底层sink中添加数据
addError方法
- 语法格式:
void addError(Object error, [StackTrace? stackTrace]);
- 功能:向底层
sink
中添加错误信息,如果已经断开连接则不会添加。 - 参数:
error
:要添加的错误对象。stackTrace
:可选参数,表示错误的堆栈跟踪信息。
- 用法示例:
_DisconnectorSink<String> disconnectorSink = ...; // 创建一个_DisconnectorSink disconnectorSink.addError(Exception("An error occurred")); // 向底层sink中添加错误信息
addStream方法
- 语法格式:
Future<void> addStream(Stream<T> stream);
- 功能:将一个流中的数据添加到底层
sink
中,如果已经断开连接则不会添加。 - 参数:
stream
:要添加的流。
- 返回值:
Future<void>
- 表示添加流的未来对象,当添加完成后,未来对象将完成。 - 用法示例:
_DisconnectorSink<String> disconnectorSink = ...; // 创建一个_DisconnectorSink Stream<String> dataStream = ...; // 创建一个数据流 Future<void> addStreamFuture = disconnectorSink.addStream(dataStream); // 将流中的数据添加到底层sink中
close方法
- 语法格式:
Future<void> close();
- 功能:关闭底层
sink
,同时标记通道已经关闭。 - 返回值:
Future<void>
- 表示关闭底层sink
的未来对象,当关闭完成后,未来对象将完成。 - 用法示例:
_DisconnectorSink<String> disconnectorSink = ...; // 创建一个_DisconnectorSink Future<void> closeFuture = disconnectorSink.close(); // 关闭底层sink
_disconnect方法
- 语法格式:
Future<void> _disconnect();
- 功能:断开底层
sink
,停止转发事件。 - 返回值:
Future<void>
- 表示断开底层sink
的未来对象。 - 用法示例:
_DisconnectorSink<String> disconnectorSink = ...; // 创建一个_DisconnectorSink Future<void> disconnectFuture = disconnectorSink._disconnect(); // 断开底层sink
6. 分析示例代码
以下是官方的讲解示例代码:
import 'dart:async'; import 'dart:convert'; import 'dart:io'; import 'dart:isolate'; import 'package:stream_channel/isolate_channel.dart'; import 'package:stream_channel/stream_channel.dart'; Future<void> main() async { // 一个 StreamChannel<T> 在最简单的情况下,是一个包装了 Stream<T> 和 StreamSink<T> 的对象。 // 例如,可以创建一个包装标准输入输出的通道: var stdioChannel = StreamChannel(stdin, stdout); stdioChannel.sink.add('Hello!\n'.codeUnits); // 就像可以使用 StreamTransformer<T> 转换 Stream<T> 一样,可以使用 StreamChannelTransformer<T> 转换 StreamChannel<T>。 // 例如,我们可以将标准输入处理为字符串: var stringChannel = stdioChannel .transform(StreamChannelTransformer.fromCodec(utf8)) .transformStream(LineSplitter()); stringChannel.sink.add('world!\n'); // 可以通过扩展 StreamChannelMixin<T> 来实现 StreamChannel<T>,但使用 StreamChannelController<T> 更加简单。 // 控制器有两个 StreamChannel<T> 成员:'local' 和 'foreign'。 // 控制器的创建者应该使用 'local' 通道,而接收者通常不会直接访问底层控制器,而是使用 'foreign' 通道。 var ctrl = StreamChannelController<String>(); ctrl.local.stream.listen((event) { // 在这里执行有用的操作... }); // 还可以将一个通道的事件传递给另一个通道。 ctrl ..foreign.pipe(stringChannel) ..local.sink.add('Piped!\n'); await ctrl.local.sink.close(); // 通过调用 'StreamChannel<T>.withGuarantees()',可以创建一个提供所有保证的 StreamChannel<T>。 var dummyCtrl0 = StreamChannelController<String>(); var guaranteedChannel = StreamChannel.withGuarantees( dummyCtrl0.foreign.stream, dummyCtrl0.foreign.sink); // 要关闭 StreamChannel,使用 'sink.close()'。 await guaranteedChannel.sink.close(); // MultiChannel<T> 可以在单个底层传输层上复用多个虚拟通道。 // 例如,通过某种机制,应用程序可以将来自不同客户端的事件分开处理,即使通过标准 I/O 监听也可以支持多个客户端。 // // MultiChannel<T> 将事件拆分成编号通道,这些通道是 VirtualChannel<T> 的实例。 var dummyCtrl1 = StreamChannelController<String>(); var multiChannel = MultiChannel<String>(dummyCtrl1.foreign); var channel1 = multiChannel.virtualChannel(); await multiChannel.sink.close(); // 客户端/对等方还应该创建自己的 MultiChannel<T>,连接到底层传输,使用相应的 ID 处理其各自通道中的事件。 // 如何在不同端点之间传递通道 ID 取决于。 var dummyCtrl2 = StreamChannelController<String>(); var multiChannel2 = MultiChannel<String>(dummyCtrl2.foreign); var channel2 = multiChannel2.virtualChannel(channel1.id); await channel2.sink.close(); await multiChannel2.sink.close(); // 多个 Dart 应用程序的实例可以轻松通过 `SendPort`/`ReceivePort` 对来进行通信,这是通过 `IsolateChannel<T>` 类实现的。 // 通常,一个端点将创建一个 `ReceivePort`,然后调用 `IsolateChannel.connectReceive` 构造函数。 // 另一个端点将获得相应的 `SendPort`,然后调用 `IsolateChannel.connectSend`。 var recv = ReceivePort(); var recvChannel = IsolateChannel.connectReceive(recv); var sendChannel = IsolateChannel.connectSend(recv.sendPort); // 必须手动关闭 `IsolateChannel<T>` 的 sink。 await recvChannel.sink.close(); await sendChannel.sink.close(); // 可以使用 `Disconnector` 转换器使通道在远程传输端断开连接。 var disconnector = Disconnector<String>(); var disconnectable = stringChannel.transform(disconnector); disconnectable.sink.add('Still connected!'); await disconnector.disconnect(); // 此外: // * 'DelegatingStreamController<T>' 类可扩展为构建包装其他 'StreamChannel<T>' 对象的基础。 // * 'jsonDocument' 转换器将事件转换为 JSON 格式,并使用 'dart:convert' 中的 'json' 编解码器。 // * 'package:json_rpc_2' 直接构建在 'package:stream_channel' 之上,因此可以使用任何兼容的传输来创建交互式客户端/服务器或点对点应用程序(例如语言服务器、微服务等)。 }
代码输出结果为:
Hello! world! Piped!
part.1 创建标准输入输出通道
var stdioChannel = StreamChannel(stdin, stdout); stdioChannel.sink.add('Hello!\n'.codeUnits);
在这里,创建了一个StreamChannel对象stdioChannel,将标准输入stdin和标准输出stdout包装在通道中。然后,通过sink将字符串’Hello!\n’转换为UTF-8编码的字节流发送到标准输出。
part.2 转换输入为字符串
var stringChannel = stdioChannel .transform(StreamChannelTransformer.fromCodec(utf8)) .transformStream(LineSplitter()); stringChannel.sink.add('world!\n');
这部分代码将stdioChannel通过transform方法进行了两次转换。首先,使用StreamChannelTransformer.fromCodec(utf8)将输入数据流编码为UTF-8字符串,然后使用transformStream(LineSplitter())将输入数据流按行拆分。最后,通过sink将字符串’world!\n’发送到通道中。
part.3 使用 StreamChannelController
var ctrl = StreamChannelController<String>(); ctrl.local.stream.listen((event) { // 在这里执行有用的操作... });
这段代码创建了一个StreamChannelController对象ctrl,它用于管理通道。StreamChannelController包含两个StreamChannel成员:local和foreign。通常,创建者会使用local通道,而接收者通常不会直接访问底层控制器,而是使用foreign通道。在此示例中,我们订阅了ctrl.local.stream以处理来自通道的事件。
part.4 传递事件到另一个通道
ctrl ..foreign.pipe(stringChannel) ..local.sink.add('Piped!\n'); await ctrl.local.sink.close();
这部分代码将一个通道的事件传递到另一个通道。首先,通过pipe方法将ctrl.foreign通道中的事件传递到stringChannel中。然后,通过local.sink将字符串’Piped!\n’发送到ctrl.local通道中,并最后关闭ctrl.local.sink。
part.5 创建具有保证的 StreamChannel
var dummyCtrl0 = StreamChannelController<String>(); var guaranteedChannel = StreamChannel.withGuarantees( dummyCtrl0.foreign.stream, dummyCtrl0.foreign.sink); await guaranteedChannel.sink.close();
这部分代码演示了如何使用StreamChannel.withGuarantees方法创建一个具有所有保证的StreamChannel。它接受一个输入流和一个输出流,然后通过sink.close()方法关闭了通道。
part.6 使用 MultiChannel
var dummyCtrl1 = StreamChannelController<String>(); var multiChannel = MultiChannel<String>(dummyCtrl1.foreign); var channel1 = multiChannel.virtualChannel(); await multiChannel.sink.close();
这段代码演示了如何使用MultiChannel,它可以在单个底层传输层上复用多个虚拟通道。首先,创建了一个MultiChannel对象multiChannel,然后通过virtualChannel()方法创建了一个虚拟通道channel1。最后,通过sink.close()关闭了multiChannel。
part.7 创建另一个 MultiChannel
var dummyCtrl2 = StreamChannelController<String>(); var multiChannel2 = MultiChannel<String>(dummyCtrl2.foreign); var channel2 = multiChannel2.virtualChannel(channel1.id); await channel2.sink.close(); await multiChannel2.sink.close();
这段代码创建了另一个MultiChannel对象multiChannel2,并使用virtualChannel(channel1.id)创建了一个虚拟通道channel2,其ID与channel1相同。然后,通过sink.close()分别关闭了channel2和multiChannel2。
part.8 使用 IsolateChannel 进行 Dart 应用程序通信
var recv = ReceivePort(); var recvChannel = IsolateChannel.connectReceive(recv); var sendChannel = IsolateChannel.connectSend(recv.sendPort); await recvChannel.sink.close(); await sendChannel.sink.close();
这部分代码演示了如何使用IsolateChannel进行Dart应用程序的通信。首先,创建了一个ReceivePort对象recv,然后使用IsolateChannel.connectReceive和IsolateChannel.connectSend构造函数创建了两个IsolateChannel通道,用于发送和接收消息。最后,通过sink.close()方法手动关闭了这两个通道。
part.9 使用 Disconnector 断开连接
var disconnector = Disconnector<String>(); var disconnectable = stringChannel.transform(disconnector); disconnectable.sink.add('Still connected!'); await disconnector.disconnect();
这段代码创建了一个Disconnector对象disconnector,然后使用transform方法将stringChannel与disconnector进行转换,使通道可以在远程传输端断开连接。然后,通过sink.add向通道发送消息,最后使用disconnector.disconnect()方法断开连接。
F. 附录
F.1 StreamChannel 接口
表示双向通信通道
用户应该考虑流(stream)发出"done"事件作为通道关闭的规范指示。如果他们希望关闭通道,他们应该关闭sink——取消流订阅是不足够的。协议错误可能通过流或sink.done发出,具体取决于它们的根本原因。请注意,如果在调用sink.close之前通道关闭,sink可能会静默丢弃事件。
强烈建议实现混入或扩展StreamChannelMixin以获取各种实例方法的默认实现。如果同时为StreamChannelMixin添加实现,则不会将新方法视为破坏性更改。
实现必须提供以下保证
- 流是单订阅的,并且必须遵循所有单订阅流的保证。
- 关闭
sink
会导致流在发出更多事件之前关闭。 - 流关闭后,
sink
会自动关闭。如果发生这种情况,sink
方法应该静默丢弃它们的参数,直到调用sink.close为止。 - 如果流在有侦听器之前关闭,如果可能的话,
sink
应该静默丢弃事件。 - 取消流的订阅对
sink
没有影响。即使在取消订阅后,通道仍必须能够响应另一端关闭通道的情况。 - sink要么将错误转发给另一端,要么在添加错误后立即关闭,并将该错误转发给
sink.done
的未来。
这些保证允许用户与所有实现进行统一交互,并确保关闭流的任一端都会产生一致的行为。
源码
/// 一个表示双向通信通道的抽象类。 /// /// 用户应该将 [stream] 发出 "done" 事件视为通道已关闭的标志。如果他们希望关闭通道,他们应该关闭 [sink]——取消流订阅不足够。协议错误可能通过流或 [sink].done 发出,具体取决于其根本原因。请注意,在调用 [sink].close 之前,如果通道在之前关闭,sink 可能会悄悄丢弃事件。 /// /// 强烈建议实现混合或扩展 [StreamChannelMixin],以获取各种实例方法的默认实现。如果还为此接口添加了新的方法,则不会被视为破坏性更改,前提是也将实现添加到 [StreamChannelMixin]。 /// /// 实现必须提供以下保证: /// /// * 该流是单订阅的,并且必须遵循单订阅流的所有保证。 /// /// * 关闭 sink 会导致流在发出更多事件之前关闭。 /// /// * 在流关闭后,sink 会自动关闭。如果发生这种情况,sink 方法应该悄悄地丢弃它们的参数,直到调用 [sink].close。 /// /// * 如果流在有侦听器之前关闭,sink 应尽可能悄悄地丢弃事件。 /// /// * 取消流的订阅对 sink 没有影响。通道必须仍然能够响应另一端关闭通道,即使已取消订阅。 /// /// * sink 要么将错误转发到另一端,要么在添加错误后立即关闭并将该错误转发到 [sink].done 未来。 /// /// 这些保证使用户能够统一地与所有实现交互,并确保关闭流的任一端都会产生一致的行为。 abstract class StreamChannel<T> { /// 从另一端发出值的单订阅流。 Stream<T> get stream; /// 用于将值发送到另一端的 sink。 StreamSink<T> get sink; /// 创建一个通过 [stream] 和 [sink] 进行通信的新 [StreamChannel]。 /// /// 请注意,此流/接收器对必须提供 [StreamChannel] 文档中列出的保证。如果它们没有本地提供这些保证,则应使用 [StreamChannel.withGuarantees]。 factory StreamChannel(Stream<T> stream, StreamSink<T> sink) => _StreamChannel<T>(stream, sink); /// 创建一个通过 [stream] 和 [sink] 进行通信的新 [StreamChannel]。 /// /// 与 [StreamChannel.new] 不同,这强制执行 [StreamChannel] 文档中列出的保证。这使其比直接包装流和接收器要低效一些,因此应该在本机提供保证时使用 [StreamChannel.new]。 /// /// 如果 [allowSinkErrors] 为 `false`,则不允许将错误传递给 [sink]。如果有任何错误,连接将关闭,并且错误将转发到 [sink].done。 factory StreamChannel.withGuarantees(Stream<T> stream, StreamSink<T> sink, {bool allowSinkErrors = true}) => GuaranteeChannel(stream, sink, allowSinkErrors: allowSinkErrors); /// 创建一个通过 [stream] 和 [sink] 进行通信的新 [StreamChannel]。 /// /// 这特别强调了第二个保证:关闭 sink 会导致流在发出更多事件之前关闭。当在原始流的事件分发和返回流的事件之间添加了异步间隙时,例如通过使用 [StreamTransformer] 进行变换,这个保证将无效。这是保留该特定保证的一种较轻量级方式,比 [StreamChannel.withGuarantees] 要轻。 factory StreamChannel.withCloseGuarantee( Stream<T> stream, StreamSink<T> sink) => CloseGuaranteeChannel(stream, sink); /// 连接此通道到 [other],以便由任何一端发出的值都直接发送到另一端。 void pipe(StreamChannel<T> other); /// 使用 [transformer] 进行转换。 /// /// 这与调用 `transformer.bind(channel)` 相同。 StreamChannel<S> transform<S>(StreamChannelTransformer<S, T> transformer); /// 仅使用 [transformer] 转换此通道的 [stream] 组件。 StreamChannel<T> transformStream(StreamTransformer<T, T> transformer); /// 仅使用 [transformer] 转换此通道的 [sink] 组件。 StreamChannel<T> transformSink(StreamSinkTransformer<T, T> transformer); /// 返回一个具有 [stream] 替换为 [change] 返回值的副本。 StreamChannel<T> changeStream(Stream<T> Function(Stream<T>) change); /// 返回一个具有 [sink] 替换为 [change] 返回值的副本。 StreamChannel<T> changeSink(StreamSink<T> Function(StreamSink<T>) change); /// 返回一个泛型类型强制转换为 [S] 的副本。 /// /// 如果 [stream] 发出的任何事件不是类型 [S],它们将转换为 [TypeError] 事件(在某些 SDK 版本中为 `CastError`)。类似地,如果向 [sink] 添加任何不是类型 [S] 的事件,将引发 [TypeError]。 StreamChannel<S> cast<S>(); } /// 一个实现 [StreamChannel] 的类,只需将流和接收器作为参数。 /// /// 这与 [StreamChannel] 不同,因此它可以使用 [StreamChannelMixin]。 class _StreamChannel<T> extends StreamChannelMixin<T> { @override final Stream<T> stream; @override final StreamSink<T> sink; _StreamChannel(this.stream, this.sink); } /// [StreamChannelMixin] 是一个混入(mixin),它以 [stream] 和 [sink] 为基础实现了 [StreamChannel] 的实例方法。 abstract class StreamChannelMixin<T> implements StreamChannel<T> { /// 将此通道的输出与另一个通道 [other] 相关联,以便由任一通道发出的值都直接发送到另一通道。 @override void pipe(StreamChannel<T> other) { stream.pipe(other.sink); other.stream.pipe(sink); } /// 使用 [transformer] 转换此通道。 /// /// 此方法通过传递 [transformer] 来创建一个新的 [StreamChannel],将 [transformer] 绑定到当前通道。 @override StreamChannel<S> transform<S>(StreamChannelTransformer<S, T> transformer) => transformer.bind(this); /// 仅使用 [transformer] 转换此通道的 [stream] 组件。 /// /// 此方法通过传递 [transformer] 来创建一个新的 [StreamChannel],只应用于 [stream] 部分。 @override StreamChannel<T> transformStream(StreamTransformer<T, T> transformer) => changeStream(transformer.bind); /// 仅使用 [transformer] 转换此通道的 [sink] 组件。 /// /// 此方法通过传递 [transformer] 来创建一个新的 [StreamChannel],只应用于 [sink] 部分。 @override StreamChannel<T> transformSink(StreamSinkTransformer<T, T> transformer) => changeSink(transformer.bind); /// 使用 [change] 函数的返回值替换此通道的 [stream]。 /// /// 此方法通过传递 [change] 函数来创建一个新的 [StreamChannel],将 [stream] 替换为 [change] 的返回值。 @override StreamChannel<T> changeStream(Stream<T> Function(Stream<T>) change) => StreamChannel.withCloseGuarantee(change(stream), sink); /// 使用 [change] 函数的返回值替换此通道的 [sink]。 /// /// 此方法通过传递 [change] 函数来创建一个新的 [StreamChannel],将 [sink] 替换为 [change] 的返回值。 @override StreamChannel<T> changeSink(StreamSink<T> Function(StreamSink<T>) change) => StreamChannel.withCloseGuarantee(stream, change(sink)); /// 将此通道的泛型类型强制转换为 [S]。 /// /// 此方法将当前通道的 [stream] 组件的类型强制转换为 [S],并创建一个新的通道,其中 [sink] 仍然与原始通道共享。 @override StreamChannel<S> cast<S>() => StreamChannel( stream.cast(), StreamController(sync: true)..stream.cast<T>().pipe(sink)); }
F.2 StreamChannelController
/// 用于公开新 [StreamChannel] 的控制器。 /// /// 这个控制器公开了两个连接的 [StreamChannel],[local] 和 [foreign]。用户的代码应该使用 [local] 来发出和接收事件。然后可以返回 [foreign] 供其他人使用。例如,这是 [new IsolateChannel] 的简化版本的实现: /// /// ```dart /// StreamChannel isolateChannel(ReceivePort receivePort, SendPort sendPort) { /// var controller = new StreamChannelController(allowForeignErrors: false); /// /// // 将接收端口的所有事件传输到本地 sink 中... /// receivePort.pipe(controller.local.sink); /// /// // ...将本地流中的所有事件传输到发送端口。 /// controller.local.stream.listen(sendPort.send, onDone: receivePort.close); /// /// // 然后返回外部用户使用的外部控制器。 /// return controller.foreign; /// } /// ``` class StreamChannelController<T> { /// 本地通道。 /// /// 创建此 [StreamChannelController] 的用户应该直接使用此通道来发送和接收事件。 StreamChannel<T> get local => _local; late final StreamChannel<T> _local; /// 外部通道。 /// /// 这个通道应该返回给外部用户,以便他们与 [local] 进行通信。 StreamChannel<T> get foreign => _foreign; late final StreamChannel<T> _foreign; /// 创建一个 [StreamChannelController]。 /// /// 如果 [sync] 为 true,则添加到任一通道的 sink 的事件会同步分派到另一通道的 stream。只有在这些事件的来源已经是异步的情况下才应这样做。 /// /// 如果 [allowForeignErrors] 为 `false`,则不允许将错误传递给外部通道的 sink。如果传递了任何错误,连接将关闭,并且错误将转发到外部通道的 [StreamSink.done] 未来。这确保了本地流永远不会发出错误。 StreamChannelController({bool allowForeignErrors = true, bool sync = false}) { var localToForeignController = StreamController<T>(sync: sync); var foreignToLocalController = StreamController<T>(sync: sync); _local = StreamChannel<T>.withGuarantees( foreignToLocalController.stream, localToForeignController.sink); _foreign = StreamChannel<T>.withGuarantees( localToForeignController.stream, foreignToLocalController.sink, allowSinkErrors: allowForeignErrors); } }
F.3 StreamChannelCompleter
/// [channel],其中源和目标稍后提供。 /// /// [channel] 是一个正常的通道,可以立即监听它,并且可以立即添加事件,但在调用 [setChannel] 之前,它不会发出任何事件,并且添加到它的所有事件都将被缓冲。 class StreamChannelCompleter<T> { /// 此通道流的完成器。 final _streamCompleter = StreamCompleter<T>(); /// 此通道汇的完成器。 final _sinkCompleter = StreamSinkCompleter<T>(); /// 此完成器的通道。 StreamChannel<T> get channel => _channel; late final StreamChannel<T> _channel; /// 是否已调用 [setChannel]。 bool _set = false; /// 将 `Future<StreamChannel>` 转换为 `StreamChannel`。 /// /// 这使用通道完成器创建一个通道,并在未来完成时将源通道设置为未来的结果。 /// /// 如果未来以错误完成,则返回的通道的流将只包含该错误。汇将默默地丢弃所有事件。 static StreamChannel fromFuture(Future<StreamChannel> channelFuture) { var completer = StreamChannelCompleter(); channelFuture.then(completer.setChannel, onError: completer.setError); return completer.channel; } StreamChannelCompleter() { _channel = StreamChannel<T>(_streamCompleter.stream, _sinkCompleter.sink); } /// 设置通道为 [channel] 的源和目标。 /// /// 最多可以设置一次通道。 /// /// 可以最多调用 [setChannel] 或 [setError] 一次。尝试再次调用其中任何一个将失败。 void setChannel(StreamChannel<T> channel) { if (_set) throw StateError('通道已经设置过了。'); _set = true; _streamCompleter.setSourceStream(channel.stream); _sinkCompleter.setDestinationSink(channel.sink); } /// 指示连接通道时发生错误。 /// /// 这使流发出 [error] 并关闭。它使汇丢弃其所有事件。 /// /// 可以最多调用 [setChannel] 或 [setError] 一次。尝试再次调用其中任何一个将失败。 void setError(Object error, [StackTrace? stackTrace]) { if (_set) throw StateError('通道已经设置过了。'); _set = true; _streamCompleter.setError(error, stackTrace); _sinkCompleter.setDestinationSink(NullStreamSink()); } }
F.4 StreamChannelTransformer
/// [StreamChannelTransformer] 转换了传递给 [StreamChannel] 的事件以及由其发出的事件。 /// /// 这与 [StreamTransformer] 和 [StreamSinkTransformer] 的原理相同。 /// 每个转换器定义了一个 [bind] 方法,该方法接受原始的 [StreamChannel] 并返回转换后的版本。 /// /// 转换器必须能够多次调用 [bind]。如果一个子类明确实现了 [bind],它应确保返回的流遵循第二个流通道保证:关闭汇会导致流在发出更多事件之前关闭。当在原始流的事件分发和返回的流之间添加异步间隙时,例如通过 [StreamTransformer] 进行转换时,此保证将失效。可以使用 [StreamChannel.withCloseGuarantee] 轻松保留此保证。 class StreamChannelTransformer<S, T> { /// 在通道的流上使用的转换器。 final StreamTransformer<T, S> _streamTransformer; /// 在通道的汇上使用的转换器。 final StreamSinkTransformer<S, T> _sinkTransformer; /// 从现有的流和汇转换器创建一个 [StreamChannelTransformer]。 const StreamChannelTransformer( this._streamTransformer, this._sinkTransformer); /// 从编解码器的编码器和解码器创建一个 [StreamChannelTransformer]。 /// /// 内部通道汇的所有输入都使用 [Codec.encoder] 进行编码,而其流的所有输出都使用 [Codec.decoder] 进行解码。 StreamChannelTransformer.fromCodec(Codec<S, T> codec) : this(codec.decoder, StreamSinkTransformer.fromStreamTransformer(codec.encoder)); /// 转换发送到和由 [channel] 发出的事件。 /// /// 创建一个新的通道。当事件传递给返回的通道的汇时,转换器将对其进行转换并将转换后的版本传递给 `channel.sink`。当事件从 `channel.stream` 发出时,转换器将对其进行转换并将转换后的版本传递给返回的通道的流。 StreamChannel<S> bind(StreamChannel<T> channel) => StreamChannel<S>.withCloseGuarantee( channel.stream.transform(_streamTransformer), _sinkTransformer.bind(channel.sink)); }