Dart笔记:stream_channel 包用法

简介: Dart笔记:stream_channel 包用法

标题1标题2


1. 概述

1.1 stream_channel是什么

stream_channel包提供了StreamChannel接口,它代表了一个双向通信通道。每个StreamChannel都提供了一个用于接收数据的Stream和一个用于发送数据的StreamSink。stream_channel包还包含用于处理StreamChannel和双向通信的实用工具。

StreamChannel有助于将通信逻辑与底层协议分离。例如,test 包在浏览器套件的 WebSocket 连接和 VM 测试的隔离连接中都重用了其测试套件通信协议。

stream_channel 库的主要有:

  • 实时通信:在Flutter应用程序的不同部分之间实时传输数据,例如将数据从一个屏幕传递到另一个屏幕或从一个Widget传递到另一个Widget。
  • 异步事件处理:处理异步事件流,例如从网络请求、传感器数据、或者定时器获得的事件流中提取和处理数据。
  • 多通道管理:通过MultiChannel类,可以在单个底层传输层上复用多个虚拟通道,使不同类型的数据可以通过不同的通道进行传输。
  • 与Isolate通信:通过IsolateChannel,在Flutter应用程序的不同Isolate之间进行通信,以实现并发处理和数据传递。

1.2 简单回顾: Stream

Stream<T> 表示一个异步的数据流,它可以产生一系列的数据事件(通常是某种类型的对象)供订阅者处理。流通常用于处理连续的事件或数据,例如读取文件、接收网络请求或监视用户输入。

相比于Future

功能上看
  • Future 是一次性的,它代表了一个异步操作的结果,一旦操作完成,就不能再次使用。用起来就像这样:
Future<int> fetchValue() async {
  // 异步操作
  return 666;
}
fetchValue().then((value) {
  print(value); // 处理结果
}).catchError((error) {
  print(error); // 处理错误
});
  • Stream 是持续的,它可以生成多个事件,而不会销毁。可以订阅一个 Stream 并监听其事件,每次有新事件生成时,会触发订阅者的回调函数。用起来就像这样:
// 创建一个异步生成器函数,它返回一个 Stream<int>,用于生成一个整数计数器的事件流。
Stream<int> createCounterStream() async* {
  // 使用 for 循环生成 0 到 4 的整数。
  for (int i = 0; i < 5; i++) {
    // 在生成每个整数之前,等待 1 秒钟的延迟。
    await Future.delayed(Duration(seconds: 1));
    // 使用 yield 关键字将整数添加到事件流中。
    yield i;
  }
}
// 调用 createCounterStream 函数以获取事件流。
final stream = createCounterStream();
// 使用 stream.listen() 订阅事件流,每当有新事件生成时,将调用回调函数。
stream.listen((value) {
  // 处理每个事件,这里将事件的值打印到控制台。
  print(value); // 处理每个事件
});

关于StreamSink

StreamSink 多是从一个 StreamController 实例的sink属性上获取使用的,就如controller.sink,而不会单独使用。

StreamSink 用于将数据写入一个异步数据流 (Stream)。它是 Dart 异步编程中的一个重要组件,通常用于数据的输出、推送或写入操作。例如:

import 'dart:async';
void main() {
  var controller = StreamController<int>();
  // 获取 StreamSink
  var sink = controller.sink;
  // 向流中添加数据
  sink.add(1);
  sink.add(2);
  sink.add(3);
  // 关闭流
  sink.close();
  // 监听流的数据
  controller.stream.listen((data) {
    print(data); // 打印 1、2、3
  });
}
场景上看
  • Future 适合表示一次性操作,例如发起一个网络请求、读取一个文件、执行一个计算密集型任务等。它通常用于等待某个操作完成并获取其结果;
  • Stream 适合表示连续的事件流,例如实时数据更新、用户输入事件、从多个来源获取数据等。它通常用于监视一系列事件并对它们进行处理。

1.3 加入依赖

flutter pub add stream_channel

2. 双向通信通道 StreamChannel 类

StreamChannel类是一个抽象类,表示一个双向通信通道。它定义了用于与通道进行交互的方法和属性,包括用于从通道读取数据的stream和用于向通道写入数据的sinkStreamChannel用于在Dart中实现双向通信,例如在网络通信中使用。

StreamChannel类的属性和方法

stream属性
  • 语法格式Stream<T> get stream;
  • 功能:获取用于从通道接收数据的单订阅流。
  • 返回值Stream<T> - 用于从通道接收数据的单订阅流。
  • 用法示例
StreamChannel<String> channel = ...; // 创建一个StreamChannel
Stream<String> dataStream = channel.stream; // 获取用于接收数据的流
sink属性
  • 语法格式StreamSink<T> get sink;
  • 功能:获取用于向通道发送数据的流式数据接收器。
  • 返回值StreamSink<T> - 用于向通道发送数据的流式数据接收器。
  • 用法示例
StreamChannel<String> channel = ...; // 创建一个StreamChannel
StreamSink<String> dataSink = channel.sink; // 获取用于发送数据的流式数据接收器
StreamChannel构造函数
  • 语法格式factory StreamChannel(Stream<T> stream, StreamSink<T> sink) => _StreamChannel<T>(stream, sink);
  • 功能:创建一个新的StreamChannel,用于双向通信,基于提供的streamsink
  • 参数
  • stream:用于从通道接收数据的单订阅流。
  • sink:用于向通道发送数据的流式数据接收器。
  • 返回值StreamChannel<T> - 新创建的StreamChannel实例。
  • 用法示例
Stream<String> inputStream = ...; // 创建一个输入流
StreamSink<String> outputStream = ...; // 创建一个输出流
StreamChannel<String> channel = StreamChannel(inputStream, outputStream); // 创建StreamChannel
StreamChannel.withGuarantees构造函数
  • 语法格式factory StreamChannel.withGuarantees(Stream<T> stream, StreamSink<T> sink, {bool allowSinkErrors = true}) => GuaranteeChannel(stream, sink, allowSinkErrors: allowSinkErrors);
  • 功能:创建一个新的StreamChannel,用于双向通信,同时强制实施StreamChannel文档中列出的保证。
  • 参数
  • stream:用于从通道接收数据的单订阅流。
  • sink:用于向通道发送数据的流式数据接收器。
  • allowSinkErrors:一个布尔值,指定是否允许将错误传递给sink。默认为true
  • 返回值StreamChannel<T> - 新创建的StreamChannel实例,具有保证。
  • 用法示例
Stream<String> inputStream = ...; // 创建一个输入流
StreamSink<String> outputStream = ...; // 创建一个输出流
StreamChannel<String> channel = StreamChannel.withGuarantees(inputStream, outputStream, allowSinkErrors: false); // 创建具有保证的StreamChannel
StreamChannel.withCloseGuarantee构造函数
  • 语法格式factory StreamChannel.withCloseGuarantee(Stream<T> stream, StreamSink<T> sink) => CloseGuaranteeChannel(stream, sink);
  • 功能:创建一个新的StreamChannel,用于双向通信,特别强制实施通道关闭的保证。
  • 参数
  • stream:用于从通道接收数据的单订阅流。
  • sink:用于向通道发送数据的流式数据接收器。
  • 返回值StreamChannel<T> - 新创建的StreamChannel实例,具有通道关闭的保证。
  • 用法示例
Stream<String> inputStream = ...; // 创建一个输入流
StreamSink<String> outputStream = ...; // 创建一个输出流
StreamChannel<String> channel = StreamChannel.withCloseGuarantee(inputStream, outputStream); // 创建具有通道关闭保证的StreamChannel
pipe方法
  • 语法格式void pipe(StreamChannel<T> other);
  • 功能:连接当前StreamChannel到另一个StreamChannel,使两者之间的数据传输直接相互转发。
  • 参数
  • other:另一个StreamChannel,用于连接到当前通道。
  • 返回值:无。
  • 用法示例
StreamChannel<String> channel1 = ...; // 创建第一个StreamChannel
StreamChannel<String> channel2 = ...; // 创建第二个StreamChannel
channel1.pipe(channel2); // 将第一个通道的数据传输到第二个通道,反之亦然
transform方法
  • 语法格式StreamChannel<S> transform<S>(StreamChannelTransformer<S, T> transformer);
  • 功能:使用指定的StreamChannelTransformer对当前StreamChannel进行转换。
  • 参数
  • transformer:要应用于当前通道的转换器。
  • 返回值StreamChannel<S> - 转换后的StreamChannel实例。
  • 用法示例
StreamChannel<String> channel1 = ...; // 创建一个StreamChannel
StreamChannel<int> channel2 = channel1.transform(IntToStringTransformer()); // 使用转换器将String类型的通道转换为Int类型的通道
transformStream方法
  • 语法格式StreamChannel<T> transformStream(StreamTransformer<T, T> transformer);
  • 功能:仅对当前通道的stream部分应用指定的StreamTransformer
  • 参数
  • transformer:要应用于stream的转换器。
  • 返回值StreamChannel<T> - 转换后的StreamChannel实例。
  • 用法示例
StreamChannel<String>
 channel1 = ...; // 创建一个StreamChannel
StreamChannel<String> channel2 = channel1.transformStream(StringTransformer()); // 使用转换器仅对stream部分进行转换
transformSink方法
  • 语法格式StreamChannel<T> transformSink(StreamSinkTransformer<T, T> transformer);
  • 功能:仅对当前通道的sink部分应用指定的StreamSinkTransformer
  • 参数
  • transformer:要应用于sink的转换器。
  • 返回值StreamChannel<T> - 转换后的StreamChannel实例。
  • 用法示例
StreamChannel<String> channel1 = ...; // 创建一个StreamChannel
StreamChannel<String> channel2 = channel1.transformSink(StringSinkTransformer()); // 使用转换器仅对sink部分进行转换
changeStream方法
  • 语法格式StreamChannel<T> changeStream(Stream<T> Function(Stream<T>) change);
  • 功能:返回当前通道的副本,其中stream部分被替换为由change函数返回的值。
  • 参数
  • change:一个函数,用于更改stream部分。
  • 返回值StreamChannel<T> - 具有更改后的stream部分的StreamChannel实例。
  • 用法示例
StreamChannel<String> channel1 = ...; // 创建一个StreamChannel
StreamChannel<String> channel2 = channel1.changeStream((stream) => stream.where((data) => data.isNotEmpty)); // 过滤掉stream中的空数据
changeSink方法
  • 语法格式StreamChannel<T> changeSink(StreamSink<T> Function(StreamSink<T>) change);
  • 功能:返回当前通道的副本,其中sink部分被替换为由change函数返回的值。
  • 参数
  • change:一个函数,用于更改sink部分。
  • 返回值StreamChannel<T> - 具有更改后的sink部分的StreamChannel实例。
  • 用法示例
StreamChannel<String> channel1 = ...; // 创建一个StreamChannel
StreamChannel<String> channel2 = channel1.changeSink((sink) => MyCustomSink(sink)); // 使用自定义的sink替换原有的sink
cast方法
  • 语法格式StreamChannel<S> cast<S>();
  • 功能:返回当前通道的副本,将通道的泛型类型强制转换为S
  • 返回值StreamChannel<S> - 具有泛型类型为SStreamChannel实例。
  • 用法示例
StreamChannel<dynamic> channel1 = ...; // 创建一个通用类型的StreamChannel
StreamChannel<int> channel2 = channel1.cast<int>(); // 强制将通道类型转换为int类型

这些方法和属性组合使得StreamChannel类能够实现双向通信,并提供了多种方法来操作和转换通道的数据流。这些方法使得StreamChannel在Dart中成为一个有用的工具,用于处理双向数据流通信。

3. IsolateChannel类

IsolateChannel类是一个实现了StreamChannel接口的通道,用于在不同的隔离体(isolate)之间进行通信,通常用于与另一个隔离体进行数据交换。它基于两个Isolate之间的ReceivePortSendPort实现,允许双向通信。

3.1 IsolateChannel类的构造函数

IsolateChannel.connectReceive构造函数
  • 语法格式factory IsolateChannel.connectReceive(ReceivePort receivePort);
  • 功能:连接到使用IsolateChannel.connectSend创建的远程通道。
  • 参数
  • receivePort:用于接收消息的ReceivePort
  • 返回值IsolateChannel<T> - 已连接到远程通道的IsolateChannel实例。
  • 用法示例
ReceivePort remoteReceivePort = ...; // 创建远程ReceivePort
IsolateChannel<String> channel = IsolateChannel.connectReceive(remoteReceivePort);
IsolateChannel.connectSend构造函数
  • 语法格式factory IsolateChannel.connectSend(SendPort sendPort);
  • 功能:连接到使用IsolateChannel.connectReceive创建的远程通道。
  • 参数
  • sendPort:用于发送消息的SendPort
  • 返回值IsolateChannel<T> - 已连接到远程通道的IsolateChannel实例。
  • 用法示例
SendPort remoteSendPort = ...; // 创建远程SendPort
IsolateChannel<String> channel = IsolateChannel.connectSend(remoteSendPort);
IsolateChannel构造函数
  • 语法格式factory IsolateChannel(ReceivePort receivePort, SendPort sendPort);
  • 功能:创建一个IsolateChannel,用于在两个隔离体之间传递消息。
  • 参数
  • receivePort:用于接收消息的本地ReceivePort
  • sendPort:用于发送消息的远程SendPort
  • 返回值IsolateChannel<T> - 已创建的IsolateChannel实例。
  • 用法示例
ReceivePort localReceivePort = ReceivePort(); // 创建本地ReceivePort
SendPort remoteSendPort = ...; // 获取远程SendPort
IsolateChannel<String> channel = IsolateChannel(localReceivePort, remoteSendPort);

3.2 IsolateChannel类的属性

stream属性
  • 语法格式Stream<T> get stream;
  • 功能:获取从远程通道接收到的消息的输入流。
  • 返回值Stream<T> - 输入流,用于接收从远程通道发送的消息。
  • 用法示例
IsolateChannel<String> channel = ...; // 创建一个IsolateChannel
Stream<String> inputStream = channel.stream; // 获取输入流
sink属性
  • 语法格式StreamSink<T> get sink;
  • 功能:获取发送消息到远程通道的输出流。
  • 返回值StreamSink<T> - 输出流,用于发送消息到远程通道。
  • 用法示例
IsolateChannel<String> channel = ...; // 创建一个IsolateChannel
StreamSink<String> outputStream = channel.sink; // 获取输出流

IsolateChannel类允许在不同的隔离体之间进行通信,通过stream属性接收远程通道发送的消息,通过sink属性发送消息到远程通道。它提供了两种构造函数用于建立连接,分别是IsolateChannel.connectReceiveIsolateChannel.connectSend,并且可以通过IsolateChannel构造函数创建一个新的IsolateChannel来实现双向通信。

4.MultiChannel类

MultiChannel类是一个抽象类,用于多路复用多个虚拟通道(Virtual Channel)在单一的底层传输层之上。它允许在一个通道上创建多个虚拟通道,每个虚拟通道都可以独立传输数据。虚拟通道可用于在两个端点之间进行双向通信,通过底层通道进行消息传递。

4.1 MultiChannel类的构造函数

MultiChannel构造函数
  • 语法格式factory MultiChannel(StreamChannel<dynamic> inner) => _MultiChannel<T>(inner);
  • 功能:创建一个新的MultiChannel,用于在内部传输层上发送和接收消息。
  • 参数
  • inner:用于在内部传输层上发送和接收消息的底层通道,必须接受类似JSON的对象。
  • 返回值MultiChannel<T> - 新创建的MultiChannel实例。
  • 用法示例
StreamChannel<dynamic> innerChannel = ...; // 创建一个内部通道
MultiChannel<String> channel = MultiChannel(innerChannel); // 创建MultiChannel

4.2 MultiChannel类的属性

stream属性
  • 语法格式Stream<T> get stream;
  • 功能:获取默认的输入流,连接到远程通道的输出。
  • 返回值Stream<T> - 默认的输入流。
  • 用法示例
MultiChannel<String> channel = ...; // 创建一个MultiChannel
Stream<String> inputStream = channel.stream; // 获取默认的输入流
sink属性
  • 语法格式StreamSink<T> get sink;
  • 功能:获取默认的输出流,连接到远程通道的输入。如果关闭此输出流,则远程输入流将关闭,但其他虚拟通道将保持打开,并且可以创建新的虚拟通道。
  • 返回值StreamSink<T> - 默认的输出流。
  • 用法示例
MultiChannel<String> channel = ...; // 创建一个MultiChannel
StreamSink<String> outputStream = channel.sink; // 获取默认的输出流

4.3 MultiChannel类的方法

virtualChannel方法
  • 语法格式VirtualChannel<T> virtualChannel([int? id]);
  • 功能:创建一个新的虚拟通道(Virtual Channel)。
  • 参数
  • id(可选):虚拟通道的标识符。如果未提供,将创建一个新的虚拟通道。如果提供,将创建与远程通道上具有相同标识符的虚拟通道。
  • 返回值VirtualChannel<T> - 新创建的虚拟通道(Virtual Channel)。
  • 用法示例
MultiChannel<String> multiChannel = ...; // 创建一个MultiChannel
VirtualChannel<String> virtual = multiChannel.virtualChannel(); // 创建新的虚拟通道

这些方法和属性使得MultiChannel类能够实现多路复用多个虚拟通道在单一底层传输层上进行通信,从而实现双向通信,并允许在一个通道上创建多个虚拟通道以独立传输数据。这种模式对于网络通信和消息传递非常有用。

5. Disconnector类

Disconnector类是一个StreamChannelTransformer,用于允许调用者强制断开通道连接。通过这个转换器,可以实现对通道的断开操作,导致通道的stream会发出done事件,而sink会忽略后续的输入。同时,内部的sink也会被关闭,以通知远程端断开连接。

5.1 Disconnector类的属性

isDisconnected属性
  • 语法格式bool get isDisconnected;
  • 功能:判断是否已经调用了disconnect方法来断开通道连接。
  • 返回值bool - 如果已经断开连接,则为true;否则为false
  • 用法示例
Disconnector<String> disconnector = ...; // 创建一个Disconnector
bool disconnected = disconnector.isDisconnected; // 判断是否已断开连接

5.2 Disconnector类的方法

disconnect方法
  • 语法格式Future<void> disconnect();
  • 功能:断开所有已经被转换的通道连接。
  • 返回值Future<void> - 表示断开连接的未来对象,当所有内部sinkStreamSink.close完成后,该未来对象将完成。
  • 用法示例
Disconnector<String> disconnector = ...; // 创建一个Disconnector
Future<void> disconnectFuture = disconnector.disconnect(); // 断开连接并获取未来对象
bind方法
  • 语法格式StreamChannel<T> bind(StreamChannel<T> channel);
  • 功能:将Disconnector应用于给定的通道,返回一个已经应用了断开连接逻辑的通道。
  • 参数
  • channel:要应用Disconnector的通道。
  • 返回值StreamChannel<T> - 已应用了断开连接逻辑的通道。
  • 用法示例
Disconnector<String> disconnector = ...; // 创建一个Disconnector
StreamChannel<String> channel = ...; // 创建一个通道
StreamChannel<String> transformedChannel = disconnector.bind(channel); // 应用Disconnector到通道

5.3 _DisconnectorSink类

_DisconnectorSink类是Disconnector内部使用的辅助类,它是StreamSink的包装器,用于实现强制断开连接的功能。

5.3.1 _DisconnectorSink类的属性

done属性
  • 语法格式Future<void> get done;
  • 功能:获取底层sink的done属性。
  • 返回值Future<void> - 表示底层sink的done属性。
  • 用法示例
_DisconnectorSink<String> disconnectorSink = ...; // 创建一个_DisconnectorSink
Future<void> doneFuture = disconnectorSink.done; // 获取done属性的未来对象

5.3.2 _DisconnectorSink类的方法

add方法
  • 语法格式void add(T data);
  • 功能:向底层sink中添加数据,如果已经断开连接则不会添加。
  • 参数
  • data:要添加的数据。
  • 用法示例
_DisconnectorSink<String> disconnectorSink = ...; // 创建一个_DisconnectorSink
disconnectorSink.add("Hello"); // 向底层sink中添加数据
addError方法
  • 语法格式void addError(Object error, [StackTrace? stackTrace]);
  • 功能:向底层sink中添加错误信息,如果已经断开连接则不会添加。
  • 参数
  • error:要添加的错误对象。
  • stackTrace:可选参数,表示错误的堆栈跟踪信息。
  • 用法示例
_DisconnectorSink<String> disconnectorSink = ...; // 创建一个_DisconnectorSink
disconnectorSink.addError(Exception("An error occurred")); // 向底层sink中添加错误信息
addStream方法
  • 语法格式Future<void> addStream(Stream<T> stream);
  • 功能:将一个流中的数据添加到底层sink中,如果已经断开连接则不会添加。
  • 参数
  • stream:要添加的流。
  • 返回值Future<void> - 表示添加流的未来对象,当添加完成后,未来对象将完成。
  • 用法示例
_DisconnectorSink<String> disconnectorSink = ...; // 创建一个_DisconnectorSink
Stream<String> dataStream = ...; // 创建一个数据流
Future<void> addStreamFuture = disconnectorSink.addStream(dataStream); // 将流中的数据添加到底层sink中
close方法
  • 语法格式Future<void> close();
  • 功能:关闭底层sink,同时标记通道已经关闭。
  • 返回值Future<void> - 表示关闭底层sink的未来对象,当关闭完成后,未来对象将完成。
  • 用法示例
_DisconnectorSink<String> disconnectorSink = ...; // 创建一个_DisconnectorSink
Future<void> closeFuture = disconnectorSink.close(); // 关闭底层sink
_disconnect方法
  • 语法格式Future<void> _disconnect();
  • 功能:断开底层sink,停止转发事件。
  • 返回值Future<void> - 表示断开底层sink的未来对象。
  • 用法示例
_DisconnectorSink<String> disconnectorSink = ...; // 创建一个_DisconnectorSink
Future<void> disconnectFuture = disconnectorSink._disconnect(); // 断开底层sink

6. 分析示例代码

以下是官方的讲解示例代码:

import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:isolate';
import 'package:stream_channel/isolate_channel.dart';
import 'package:stream_channel/stream_channel.dart';
Future<void> main() async {
  // 一个 StreamChannel<T> 在最简单的情况下,是一个包装了 Stream<T> 和 StreamSink<T> 的对象。
  // 例如,可以创建一个包装标准输入输出的通道:
  var stdioChannel = StreamChannel(stdin, stdout);
  stdioChannel.sink.add('Hello!\n'.codeUnits);
  // 就像可以使用 StreamTransformer<T> 转换 Stream<T> 一样,可以使用 StreamChannelTransformer<T> 转换 StreamChannel<T>。
  // 例如,我们可以将标准输入处理为字符串:
  var stringChannel = stdioChannel
      .transform(StreamChannelTransformer.fromCodec(utf8))
      .transformStream(LineSplitter());
  stringChannel.sink.add('world!\n');
  // 可以通过扩展 StreamChannelMixin<T> 来实现 StreamChannel<T>,但使用 StreamChannelController<T> 更加简单。
  // 控制器有两个 StreamChannel<T> 成员:'local' 和 'foreign'。
  // 控制器的创建者应该使用 'local' 通道,而接收者通常不会直接访问底层控制器,而是使用 'foreign' 通道。
  var ctrl = StreamChannelController<String>();
  ctrl.local.stream.listen((event) {
    // 在这里执行有用的操作...
  });
  // 还可以将一个通道的事件传递给另一个通道。
  ctrl
    ..foreign.pipe(stringChannel)
    ..local.sink.add('Piped!\n');
  await ctrl.local.sink.close();
  // 通过调用 'StreamChannel<T>.withGuarantees()',可以创建一个提供所有保证的 StreamChannel<T>。
  var dummyCtrl0 = StreamChannelController<String>();
  var guaranteedChannel = StreamChannel.withGuarantees(
      dummyCtrl0.foreign.stream, dummyCtrl0.foreign.sink);
  // 要关闭 StreamChannel,使用 'sink.close()'。
  await guaranteedChannel.sink.close();
  // MultiChannel<T> 可以在单个底层传输层上复用多个虚拟通道。
  // 例如,通过某种机制,应用程序可以将来自不同客户端的事件分开处理,即使通过标准 I/O 监听也可以支持多个客户端。
  //
  // MultiChannel<T> 将事件拆分成编号通道,这些通道是 VirtualChannel<T> 的实例。
  var dummyCtrl1 = StreamChannelController<String>();
  var multiChannel = MultiChannel<String>(dummyCtrl1.foreign);
  var channel1 = multiChannel.virtualChannel();
  await multiChannel.sink.close();
  // 客户端/对等方还应该创建自己的 MultiChannel<T>,连接到底层传输,使用相应的 ID 处理其各自通道中的事件。
  // 如何在不同端点之间传递通道 ID 取决于。
  var dummyCtrl2 = StreamChannelController<String>();
  var multiChannel2 = MultiChannel<String>(dummyCtrl2.foreign);
  var channel2 = multiChannel2.virtualChannel(channel1.id);
  await channel2.sink.close();
  await multiChannel2.sink.close();
  // 多个 Dart 应用程序的实例可以轻松通过 `SendPort`/`ReceivePort` 对来进行通信,这是通过 `IsolateChannel<T>` 类实现的。
  // 通常,一个端点将创建一个 `ReceivePort`,然后调用 `IsolateChannel.connectReceive` 构造函数。
  // 另一个端点将获得相应的 `SendPort`,然后调用 `IsolateChannel.connectSend`。
  var recv = ReceivePort();
  var recvChannel = IsolateChannel.connectReceive(recv);
  var sendChannel = IsolateChannel.connectSend(recv.sendPort);
  // 必须手动关闭 `IsolateChannel<T>` 的 sink。
  await recvChannel.sink.close();
  await sendChannel.sink.close();
  // 可以使用 `Disconnector` 转换器使通道在远程传输端断开连接。
  var disconnector = Disconnector<String>();
  var disconnectable = stringChannel.transform(disconnector);
  disconnectable.sink.add('Still connected!');
  await disconnector.disconnect();
  // 此外:
  //   * 'DelegatingStreamController<T>' 类可扩展为构建包装其他 'StreamChannel<T>' 对象的基础。
  //   * 'jsonDocument' 转换器将事件转换为 JSON 格式,并使用 'dart:convert' 中的 'json' 编解码器。
  //   * 'package:json_rpc_2' 直接构建在 'package:stream_channel' 之上,因此可以使用任何兼容的传输来创建交互式客户端/服务器或点对点应用程序(例如语言服务器、微服务等)。
}

代码输出结果为:

Hello!
world!
Piped!

part.1 创建标准输入输出通道

var stdioChannel = StreamChannel(stdin, stdout);
stdioChannel.sink.add('Hello!\n'.codeUnits);

在这里,创建了一个StreamChannel对象stdioChannel,将标准输入stdin和标准输出stdout包装在通道中。然后,通过sink将字符串’Hello!\n’转换为UTF-8编码的字节流发送到标准输出。

part.2 转换输入为字符串

var stringChannel = stdioChannel
    .transform(StreamChannelTransformer.fromCodec(utf8))
    .transformStream(LineSplitter());
stringChannel.sink.add('world!\n');

这部分代码将stdioChannel通过transform方法进行了两次转换。首先,使用StreamChannelTransformer.fromCodec(utf8)将输入数据流编码为UTF-8字符串,然后使用transformStream(LineSplitter())将输入数据流按行拆分。最后,通过sink将字符串’world!\n’发送到通道中。

part.3 使用 StreamChannelController

var ctrl = StreamChannelController<String>();
ctrl.local.stream.listen((event) {
  // 在这里执行有用的操作...
});

这段代码创建了一个StreamChannelController对象ctrl,它用于管理通道。StreamChannelController包含两个StreamChannel成员:local和foreign。通常,创建者会使用local通道,而接收者通常不会直接访问底层控制器,而是使用foreign通道。在此示例中,我们订阅了ctrl.local.stream以处理来自通道的事件。

part.4 传递事件到另一个通道

ctrl
  ..foreign.pipe(stringChannel)
  ..local.sink.add('Piped!\n');
await ctrl.local.sink.close();

这部分代码将一个通道的事件传递到另一个通道。首先,通过pipe方法将ctrl.foreign通道中的事件传递到stringChannel中。然后,通过local.sink将字符串’Piped!\n’发送到ctrl.local通道中,并最后关闭ctrl.local.sink。

part.5 创建具有保证的 StreamChannel

var dummyCtrl0 = StreamChannelController<String>();
var guaranteedChannel = StreamChannel.withGuarantees(
    dummyCtrl0.foreign.stream, dummyCtrl0.foreign.sink);
await guaranteedChannel.sink.close();

这部分代码演示了如何使用StreamChannel.withGuarantees方法创建一个具有所有保证的StreamChannel。它接受一个输入流和一个输出流,然后通过sink.close()方法关闭了通道。

part.6 使用 MultiChannel

var dummyCtrl1 = StreamChannelController<String>();
var multiChannel = MultiChannel<String>(dummyCtrl1.foreign);
var channel1 = multiChannel.virtualChannel();
await multiChannel.sink.close();

这段代码演示了如何使用MultiChannel,它可以在单个底层传输层上复用多个虚拟通道。首先,创建了一个MultiChannel对象multiChannel,然后通过virtualChannel()方法创建了一个虚拟通道channel1。最后,通过sink.close()关闭了multiChannel。

part.7 创建另一个 MultiChannel

var dummyCtrl2 = StreamChannelController<String>();
var multiChannel2 = MultiChannel<String>(dummyCtrl2.foreign);
var channel2 = multiChannel2.virtualChannel(channel1.id);
await channel2.sink.close();
await multiChannel2.sink.close();

这段代码创建了另一个MultiChannel对象multiChannel2,并使用virtualChannel(channel1.id)创建了一个虚拟通道channel2,其ID与channel1相同。然后,通过sink.close()分别关闭了channel2和multiChannel2。

part.8 使用 IsolateChannel 进行 Dart 应用程序通信

var recv = ReceivePort();
var recvChannel = IsolateChannel.connectReceive(recv);
var sendChannel = IsolateChannel.connectSend(recv.sendPort);
await recvChannel.sink.close();
await sendChannel.sink.close();

这部分代码演示了如何使用IsolateChannel进行Dart应用程序的通信。首先,创建了一个ReceivePort对象recv,然后使用IsolateChannel.connectReceive和IsolateChannel.connectSend构造函数创建了两个IsolateChannel通道,用于发送和接收消息。最后,通过sink.close()方法手动关闭了这两个通道。

part.9 使用 Disconnector 断开连接

var disconnector = Disconnector<String>();
var disconnectable = stringChannel.transform(disconnector);
disconnectable.sink.add('Still connected!');
await disconnector.disconnect();

这段代码创建了一个Disconnector对象disconnector,然后使用transform方法将stringChannel与disconnector进行转换,使通道可以在远程传输端断开连接。然后,通过sink.add向通道发送消息,最后使用disconnector.disconnect()方法断开连接。

F. 附录

F.1 StreamChannel 接口

表示双向通信通道

用户应该考虑流(stream)发出"done"事件作为通道关闭的规范指示。如果他们希望关闭通道,他们应该关闭sink——取消流订阅是不足够的。协议错误可能通过流或sink.done发出,具体取决于它们的根本原因。请注意,如果在调用sink.close之前通道关闭,sink可能会静默丢弃事件。

强烈建议实现混入或扩展StreamChannelMixin以获取各种实例方法的默认实现。如果同时为StreamChannelMixin添加实现,则不会将新方法视为破坏性更改。

实现必须提供以下保证
  • 流是单订阅的,并且必须遵循所有单订阅流的保证。
  • 关闭sink 会导致流在发出更多事件之前关闭。
  • 流关闭后,sink 会自动关闭。如果发生这种情况,sink 方法应该静默丢弃它们的参数,直到调用sink.close为止。
  • 如果流在有侦听器之前关闭,如果可能的话,sink 应该静默丢弃事件。
  • 取消流的订阅对 sink 没有影响。即使在取消订阅后,通道仍必须能够响应另一端关闭通道的情况。
  • sink要么将错误转发给另一端,要么在添加错误后立即关闭,并将该错误转发给 sink.done 的未来。

这些保证允许用户与所有实现进行统一交互,并确保关闭流的任一端都会产生一致的行为。

源码

/// 一个表示双向通信通道的抽象类。
///
/// 用户应该将 [stream] 发出 "done" 事件视为通道已关闭的标志。如果他们希望关闭通道,他们应该关闭 [sink]——取消流订阅不足够。协议错误可能通过流或 [sink].done 发出,具体取决于其根本原因。请注意,在调用 [sink].close 之前,如果通道在之前关闭,sink 可能会悄悄丢弃事件。
///
/// 强烈建议实现混合或扩展 [StreamChannelMixin],以获取各种实例方法的默认实现。如果还为此接口添加了新的方法,则不会被视为破坏性更改,前提是也将实现添加到 [StreamChannelMixin]。
///
/// 实现必须提供以下保证:
///
/// * 该流是单订阅的,并且必须遵循单订阅流的所有保证。
///
/// * 关闭 sink 会导致流在发出更多事件之前关闭。
///
/// * 在流关闭后,sink 会自动关闭。如果发生这种情况,sink 方法应该悄悄地丢弃它们的参数,直到调用 [sink].close。
///
/// * 如果流在有侦听器之前关闭,sink 应尽可能悄悄地丢弃事件。
///
/// * 取消流的订阅对 sink 没有影响。通道必须仍然能够响应另一端关闭通道,即使已取消订阅。
///
/// * sink 要么将错误转发到另一端,要么在添加错误后立即关闭并将该错误转发到 [sink].done 未来。
///
/// 这些保证使用户能够统一地与所有实现交互,并确保关闭流的任一端都会产生一致的行为。
abstract class StreamChannel<T> {
  /// 从另一端发出值的单订阅流。
  Stream<T> get stream;
  /// 用于将值发送到另一端的 sink。
  StreamSink<T> get sink;
  /// 创建一个通过 [stream] 和 [sink] 进行通信的新 [StreamChannel]。
  ///
  /// 请注意,此流/接收器对必须提供 [StreamChannel] 文档中列出的保证。如果它们没有本地提供这些保证,则应使用 [StreamChannel.withGuarantees]。
  factory StreamChannel(Stream<T> stream, StreamSink<T> sink) =>
      _StreamChannel<T>(stream, sink);
  /// 创建一个通过 [stream] 和 [sink] 进行通信的新 [StreamChannel]。
  ///
  /// 与 [StreamChannel.new] 不同,这强制执行 [StreamChannel] 文档中列出的保证。这使其比直接包装流和接收器要低效一些,因此应该在本机提供保证时使用 [StreamChannel.new]。
  ///
  /// 如果 [allowSinkErrors] 为 `false`,则不允许将错误传递给 [sink]。如果有任何错误,连接将关闭,并且错误将转发到 [sink].done。
  factory StreamChannel.withGuarantees(Stream<T> stream, StreamSink<T> sink,
          {bool allowSinkErrors = true}) =>
      GuaranteeChannel(stream, sink, allowSinkErrors: allowSinkErrors);
  /// 创建一个通过 [stream] 和 [sink] 进行通信的新 [StreamChannel]。
  ///
  /// 这特别强调了第二个保证:关闭 sink 会导致流在发出更多事件之前关闭。当在原始流的事件分发和返回流的事件之间添加了异步间隙时,例如通过使用 [StreamTransformer] 进行变换,这个保证将无效。这是保留该特定保证的一种较轻量级方式,比 [StreamChannel.withGuarantees] 要轻。
  factory StreamChannel.withCloseGuarantee(
          Stream<T> stream, StreamSink<T> sink) =>
      CloseGuaranteeChannel(stream, sink);
  /// 连接此通道到 [other],以便由任何一端发出的值都直接发送到另一端。
  void pipe(StreamChannel<T> other);
  /// 使用 [transformer] 进行转换。
  ///
  /// 这与调用 `transformer.bind(channel)` 相同。
  StreamChannel<S> transform<S>(StreamChannelTransformer<S, T> transformer);
  /// 仅使用 [transformer] 转换此通道的 [stream] 组件。
  StreamChannel<T> transformStream(StreamTransformer<T, T> transformer);
  /// 仅使用 [transformer] 转换此通道的 [sink] 组件。
  StreamChannel<T> transformSink(StreamSinkTransformer<T, T> transformer);
  /// 返回一个具有 [stream] 替换为 [change] 返回值的副本。
  StreamChannel<T> changeStream(Stream<T> Function(Stream<T>) change);
  /// 返回一个具有 [sink] 替换为 [change] 返回值的副本。
  StreamChannel<T> changeSink(StreamSink<T> Function(StreamSink<T>) change);
  /// 返回一个泛型类型强制转换为 [S] 的副本。
  ///
  /// 如果 [stream] 发出的任何事件不是类型 [S],它们将转换为 [TypeError] 事件(在某些 SDK 版本中为 `CastError`)。类似地,如果向 [sink] 添加任何不是类型 [S] 的事件,将引发 [TypeError]。
  StreamChannel<S> cast<S>();
}
/// 一个实现 [StreamChannel] 的类,只需将流和接收器作为参数。
///
/// 这与 [StreamChannel] 不同,因此它可以使用 [StreamChannelMixin]。
class _StreamChannel<T> extends StreamChannelMixin<T> {
  @override
  final Stream<T> stream;
  @override
  final StreamSink<T> sink;
  _StreamChannel(this.stream, this.sink);
}
/// [StreamChannelMixin] 是一个混入(mixin),它以 [stream] 和 [sink] 为基础实现了 [StreamChannel] 的实例方法。
abstract class StreamChannelMixin<T> implements StreamChannel<T> {
  /// 将此通道的输出与另一个通道 [other] 相关联,以便由任一通道发出的值都直接发送到另一通道。
  @override
  void pipe(StreamChannel<T> other) {
    stream.pipe(other.sink);
    other.stream.pipe(sink);
  }
  /// 使用 [transformer] 转换此通道。
  ///
  /// 此方法通过传递 [transformer] 来创建一个新的 [StreamChannel],将 [transformer] 绑定到当前通道。
  @override
  StreamChannel<S> transform<S>(StreamChannelTransformer<S, T> transformer) =>
      transformer.bind(this);
  /// 仅使用 [transformer] 转换此通道的 [stream] 组件。
  ///
  /// 此方法通过传递 [transformer] 来创建一个新的 [StreamChannel],只应用于 [stream] 部分。
  @override
  StreamChannel<T> transformStream(StreamTransformer<T, T> transformer) =>
      changeStream(transformer.bind);
  /// 仅使用 [transformer] 转换此通道的 [sink] 组件。
  ///
  /// 此方法通过传递 [transformer] 来创建一个新的 [StreamChannel],只应用于 [sink] 部分。
  @override
  StreamChannel<T> transformSink(StreamSinkTransformer<T, T> transformer) =>
      changeSink(transformer.bind);
  /// 使用 [change] 函数的返回值替换此通道的 [stream]。
  ///
  /// 此方法通过传递 [change] 函数来创建一个新的 [StreamChannel],将 [stream] 替换为 [change] 的返回值。
  @override
  StreamChannel<T> changeStream(Stream<T> Function(Stream<T>) change) =>
      StreamChannel.withCloseGuarantee(change(stream), sink);
  /// 使用 [change] 函数的返回值替换此通道的 [sink]。
  ///
  /// 此方法通过传递 [change] 函数来创建一个新的 [StreamChannel],将 [sink] 替换为 [change] 的返回值。
  @override
  StreamChannel<T> changeSink(StreamSink<T> Function(StreamSink<T>) change) =>
      StreamChannel.withCloseGuarantee(stream, change(sink));
  /// 将此通道的泛型类型强制转换为 [S]。
  ///
  /// 此方法将当前通道的 [stream] 组件的类型强制转换为 [S],并创建一个新的通道,其中 [sink] 仍然与原始通道共享。
  @override
  StreamChannel<S> cast<S>() => StreamChannel(
      stream.cast(), StreamController(sync: true)..stream.cast<T>().pipe(sink));
}

F.2 StreamChannelController

/// 用于公开新 [StreamChannel] 的控制器。
///
/// 这个控制器公开了两个连接的 [StreamChannel],[local] 和 [foreign]。用户的代码应该使用 [local] 来发出和接收事件。然后可以返回 [foreign] 供其他人使用。例如,这是 [new IsolateChannel] 的简化版本的实现:
///
/// ```dart
/// StreamChannel isolateChannel(ReceivePort receivePort, SendPort sendPort) {
///   var controller = new StreamChannelController(allowForeignErrors: false);
///
///   // 将接收端口的所有事件传输到本地 sink 中...
///   receivePort.pipe(controller.local.sink);
///
///   // ...将本地流中的所有事件传输到发送端口。
///   controller.local.stream.listen(sendPort.send, onDone: receivePort.close);
///
///   // 然后返回外部用户使用的外部控制器。
///   return controller.foreign;
/// }
/// ```
class StreamChannelController<T> {
  /// 本地通道。
  ///
  /// 创建此 [StreamChannelController] 的用户应该直接使用此通道来发送和接收事件。
  StreamChannel<T> get local => _local;
  late final StreamChannel<T> _local;
  /// 外部通道。
  ///
  /// 这个通道应该返回给外部用户,以便他们与 [local] 进行通信。
  StreamChannel<T> get foreign => _foreign;
  late final StreamChannel<T> _foreign;
  /// 创建一个 [StreamChannelController]。
  ///
  /// 如果 [sync] 为 true,则添加到任一通道的 sink 的事件会同步分派到另一通道的 stream。只有在这些事件的来源已经是异步的情况下才应这样做。
  ///
  /// 如果 [allowForeignErrors] 为 `false`,则不允许将错误传递给外部通道的 sink。如果传递了任何错误,连接将关闭,并且错误将转发到外部通道的 [StreamSink.done] 未来。这确保了本地流永远不会发出错误。
  StreamChannelController({bool allowForeignErrors = true, bool sync = false}) {
    var localToForeignController = StreamController<T>(sync: sync);
    var foreignToLocalController = StreamController<T>(sync: sync);
    _local = StreamChannel<T>.withGuarantees(
        foreignToLocalController.stream, localToForeignController.sink);
    _foreign = StreamChannel<T>.withGuarantees(
        localToForeignController.stream, foreignToLocalController.sink,
        allowSinkErrors: allowForeignErrors);
  }
}

F.3 StreamChannelCompleter

/// [channel],其中源和目标稍后提供。
///
/// [channel] 是一个正常的通道,可以立即监听它,并且可以立即添加事件,但在调用 [setChannel] 之前,它不会发出任何事件,并且添加到它的所有事件都将被缓冲。
class StreamChannelCompleter<T> {
  /// 此通道流的完成器。
  final _streamCompleter = StreamCompleter<T>();
  /// 此通道汇的完成器。
  final _sinkCompleter = StreamSinkCompleter<T>();
  /// 此完成器的通道。
  StreamChannel<T> get channel => _channel;
  late final StreamChannel<T> _channel;
  /// 是否已调用 [setChannel]。
  bool _set = false;
  /// 将 `Future<StreamChannel>` 转换为 `StreamChannel`。
  ///
  /// 这使用通道完成器创建一个通道,并在未来完成时将源通道设置为未来的结果。
  ///
  /// 如果未来以错误完成,则返回的通道的流将只包含该错误。汇将默默地丢弃所有事件。
  static StreamChannel fromFuture(Future<StreamChannel> channelFuture) {
    var completer = StreamChannelCompleter();
    channelFuture.then(completer.setChannel, onError: completer.setError);
    return completer.channel;
  }
  StreamChannelCompleter() {
    _channel = StreamChannel<T>(_streamCompleter.stream, _sinkCompleter.sink);
  }
  /// 设置通道为 [channel] 的源和目标。
  ///
  /// 最多可以设置一次通道。
  ///
  /// 可以最多调用 [setChannel] 或 [setError] 一次。尝试再次调用其中任何一个将失败。
  void setChannel(StreamChannel<T> channel) {
    if (_set) throw StateError('通道已经设置过了。');
    _set = true;
    _streamCompleter.setSourceStream(channel.stream);
    _sinkCompleter.setDestinationSink(channel.sink);
  }
  /// 指示连接通道时发生错误。
  ///
  /// 这使流发出 [error] 并关闭。它使汇丢弃其所有事件。
  ///
  /// 可以最多调用 [setChannel] 或 [setError] 一次。尝试再次调用其中任何一个将失败。
  void setError(Object error, [StackTrace? stackTrace]) {
    if (_set) throw StateError('通道已经设置过了。');
    _set = true;
    _streamCompleter.setError(error, stackTrace);
    _sinkCompleter.setDestinationSink(NullStreamSink());
  }
}

F.4 StreamChannelTransformer

/// [StreamChannelTransformer] 转换了传递给 [StreamChannel] 的事件以及由其发出的事件。
///
/// 这与 [StreamTransformer] 和 [StreamSinkTransformer] 的原理相同。
/// 每个转换器定义了一个 [bind] 方法,该方法接受原始的 [StreamChannel] 并返回转换后的版本。
///
/// 转换器必须能够多次调用 [bind]。如果一个子类明确实现了 [bind],它应确保返回的流遵循第二个流通道保证:关闭汇会导致流在发出更多事件之前关闭。当在原始流的事件分发和返回的流之间添加异步间隙时,例如通过 [StreamTransformer] 进行转换时,此保证将失效。可以使用 [StreamChannel.withCloseGuarantee] 轻松保留此保证。
class StreamChannelTransformer<S, T> {
  /// 在通道的流上使用的转换器。
  final StreamTransformer<T, S> _streamTransformer;
  /// 在通道的汇上使用的转换器。
  final StreamSinkTransformer<S, T> _sinkTransformer;
  /// 从现有的流和汇转换器创建一个 [StreamChannelTransformer]。
  const StreamChannelTransformer(
      this._streamTransformer, this._sinkTransformer);
  /// 从编解码器的编码器和解码器创建一个 [StreamChannelTransformer]。
  ///
  /// 内部通道汇的所有输入都使用 [Codec.encoder] 进行编码,而其流的所有输出都使用 [Codec.decoder] 进行解码。
  StreamChannelTransformer.fromCodec(Codec<S, T> codec)
      : this(codec.decoder,
            StreamSinkTransformer.fromStreamTransformer(codec.encoder));
  /// 转换发送到和由 [channel] 发出的事件。
  ///
  /// 创建一个新的通道。当事件传递给返回的通道的汇时,转换器将对其进行转换并将转换后的版本传递给 `channel.sink`。当事件从 `channel.stream` 发出时,转换器将对其进行转换并将转换后的版本传递给返回的通道的流。
  StreamChannel<S> bind(StreamChannel<T> channel) =>
      StreamChannel<S>.withCloseGuarantee(
          channel.stream.transform(_streamTransformer),
          _sinkTransformer.bind(channel.sink));
}
目录
相关文章
带你读《深入浅出Dart》十七、Dart的Stream(1)
带你读《深入浅出Dart》十七、Dart的Stream(1)
|
Dart 数据处理
带你读《深入浅出Dart》十七、Dart的Stream(3)
带你读《深入浅出Dart》十七、Dart的Stream(3)
|
Dart API
带你读《深入浅出Dart》十七、Dart的Stream(2)
带你读《深入浅出Dart》十七、Dart的Stream(2)
|
安全 Go 索引
Golang 语言中的 channel 实现原理
Golang 语言中的 channel 实现原理
66 0
|
消息中间件 缓存 Go
Golang 语言中 Channel 的使用方式
Golang 语言中 Channel 的使用方式
58 0
|
3月前
|
Java 数据处理
Stream流的简单使用
这篇文章介绍了Java中Stream流的基本概念和使用方法。文章解释了Stream流的三类方法:获取流、中间方法和终结方法。详细讨论了如何生成Stream流,包括从Collection体系集合、Map体系集合、数组和同种数据类型的多个数据中生成流。接着,介绍了Stream流的中间操作方法,如`filter`、`limit`、`skip`、`concat`和`distinct`。文章还讨论了Stream流的终结方法,如`forEach`和`count`,以及收集方法,如`collect`。最后,通过几个例子演示了如何使用Stream流进行数据处理和收集操作。
|
5月前
|
存储 Java BI
Java基础之stream流最新版,stream流的基本操作
Java基础之stream流最新版,stream流的基本操作
73 0
|
7月前
|
数据管理 Go 开发者
Golang深入浅出之-Go语言上下文(context)包:处理取消与超时
【4月更文挑战第25天】Go语言中的`context`包在并发、网络请求和长任务中至关重要,提供取消、截止时间和元数据管理。本文探讨`context`基础,如`Background()`、`TODO()`、`WithCancel()`、`WithDeadline()`和`WithTimeout()`。常见问题包括不当传递、过度使用`Background()`和`TODO()`以及忽略错误处理。通过取消和超时示例,强调正确传递上下文、处理取消错误和设置超时以提高应用健壮性和响应性。正确使用`context`是构建稳定高效Go应用的关键。
129 1
|
7月前
|
SQL Java
【java高级】stream流的基本用法(一)
【java高级】stream流的基本用法(一)
37 0
|
7月前
|
Go
Go-channel的妙用
Go-channel的妙用
61 0