Flutter 80: 初识 Flutter Stream (一)

简介: 0 基础学习 Flutter,第八十步:学习一下基础 Flutter Stream!

      小菜在之前尝试 EventChannel 时曾经用到过 Stream 流数据,现在准备学习一下 flutter_bloc 时也主要用到 Stream 来做异步处理,于是简单学习一下何为 Stream

A source of asynchronous data events.

      Stream 主要应用于 Flutter 的异步操作,在其他编程语言中也存在;Stream 提供了一种接受事件队列的方法,可通过 listen 进行数据监听,通过 error 接收失败状态,通过 done 来接收结束状态;

1. Stream 创建

      Flutter 提供了多种创建 Stream 的方式;

1.1 Stream.fromFuture(Future future)

      Stream 通过 Future 创建新的单订阅流,当 Future 完成时会触发 data / error,然后以 done 事件结束;

Future<String> getData() async {
  await Future.delayed(Duration(seconds: 3));
  return '当前时间为:${DateTime.now()}';
}

_streamFromFuture() {
  Stream.fromFuture(getData())
      .listen((event) => print('Stream.fromFuture -> $event'))
      .onDone(() => print('Stream.fromFuture -> done 结束'));
}

1.2 Stream.fromFutures(Iterable> futures)

      Stream 通过一系列的 Future 创建新的单订阅流,每个 Future 都会有自身的 data / error 事件,当这一系列的 Future 均完成时,Streamdone 事件结束;若 Futures 为空,则 Stream 会立刻关闭;其分析源码,很直接的看到是将每一个 Future 事件监听完之后才会执行的微事件结束;

====================================== 源码 ======================================
factory Stream.fromFutures(Iterable<Future<T>> futures) {
    _StreamController<T> controller =
        new _SyncStreamController<T>(null, null, null, null);
    int count = 0;
    var onValue = (T value) {
      if (!controller.isClosed) {
        controller._add(value);
        if (--count == 0) controller._closeUnchecked();
      }
    };
    var onError = (error, StackTrace stack) {
      if (!controller.isClosed) {
        controller._addError(error, stack);
        if (--count == 0) controller._closeUnchecked();
      }
    };
    for (var future in futures) {
      count++;
      future.then(onValue, onError: onError);
    }
    // Use schedule microtask since controller is sync.
    if (count == 0) scheduleMicrotask(controller.close);
    return controller.stream;
}
====================================== 测试 ======================================
_streamFromFutures() {
  var data = [getData(), getData(), getData()];
  Stream.fromFutures(data)
      .listen((event) => print('Stream.fromFutures -> $event'))
      .onDone(() => print('Stream.fromFutures -> done 结束'));
}

1.3 Stream.fromIterable(Iterable elements)

      Stream 通过数据集合中获取并创建单订阅流,通过 listen 监听迭代器中每一个子 element,当 Stream 监听到取消订阅或 Iterator.moveNext 返回 false / throw 异常 时停止迭代;

_streamFromIterable() {
  var data = [1, 2, '3.toString()', true, false, 6];
  Stream.fromIterable(data)
      .listen((event) => print('Stream.fromIterable -> $event'))
      .onDone(() => print('Stream.fromIterable -> done 结束'));
}

1.4 Stream.periodic(Duration period, [T computation(int computationCount)])

      Stream 通过 Duration 对象作为参数创建一个周期性事件流,其中若不设置 computationonData 获取数据为 null;若没有事件结束则会一直周期性执行;

_streamFromPeriodic() {
  Duration interval = Duration(seconds: 1);
  // onData 获取数据为 null
  Stream<int> stream = Stream<int>.periodic(interval);
  stream.listen((event) {
  print('Stream.periodic -> $event');
  }).onDone(() {
  print('Stream.periodic -> done 结束');
  });

  // onData 获取数据为 int 类型 data
  Stream<int> streamData = Stream<int>.periodic(interval, (data) => data);
  streamData.listen((event) {
    print('Stream.periodic -> $event');
    if (event >= 10) {}
  }).onDone(() {
    print('Stream.periodic -> done 结束');
  });
}


2. Stream 基本操作

2.1 Stream take(int count)

      take() 对于单订阅方式,可以提供 take 设置之前的 Stream 订阅数据,例如设置中断 Stream.periodic 周期展示次数;小菜粗略理解为 take 可以作为中断订阅,如果 take 设置次数大于 onDone 之前的订阅数据次数,Stream 依旧获取所有 onDone 之前的订阅数据;

_streamFromPeriodic() {
  Duration interval = Duration(seconds: 1);
  Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);
  streamData.take(5).listen((event) {
    print('Stream.periodic -> $event');
  }).onDone(() {
    print('Stream.periodic -> done 结束');
  });
}

_streamFromIterable() {
  var data = [1, 2, '3.toString()', true, false, 6];
  Stream.fromIterable(data)
      .take(8)
      .listen((event) => print('Stream.fromIterable -> $event'))
      .onDone(() => print('Stream.fromIterable -> done 结束'));
}

2.2 Stream takeWhile(bool test(T element))

      takeWhile 也可以实现上述相同效果,通过 test 返回一个 boolean 类型,如果为 false 则中断订阅;

_streamFromPeriodic() {
  Duration interval = Duration(seconds: 1);
  Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);
  streamData.takeWhile((element) {
    print('Stream.periodic.takeWhile -> $element');
    return element <= 5;
  }).listen((event) {
    print('Stream.periodic -> $event');
  }).onDone(() {
    print('Stream.periodic -> done 结束');
  });
}

2.3 Stream where(bool test(T event))

      where 用于在当前 Stream 中创建一个新的 Stream 用来丢弃不符合 test 的数据;小菜简单理解为类似数据库查询一样,仅过滤符合需求的数据流;且 where 可以设置多次;

Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);
streamData.takeWhile((element) {
  print('Stream.periodic.takeWhile -> $element');
  return element <= 5;
}).where((event) {
  print('Stream.periodic.where -> $event');
  return event > 3;
}).listen((event) {
  print('Stream.periodic -> $event');
}).onDone(() {
  print('Stream.periodic -> done 结束');
});

2.4 Stream distinct([bool equals(T previous, T next)])

      distinct 小菜理解为相邻两个数据滤重;

var data = [1, 2, '3.toString()', true, true, false, true, 6];
Stream.fromIterable(data)
    .distinct()
    .listen((event) => print('Stream.fromIterable -> $event'))
    .onDone(() => print('Stream.fromIterable -> done 结束'));

2.5 Stream skip(int count)

      skip 用于跳过符合条件的订阅数据次数;

Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);
streamData.takeWhile((element) {
      print('Stream.periodic.takeWhile -> $element');
      return element <= 6;
    }).where((event) {
      print('Stream.periodic.where -> $event');
      return event > 2;
    })
    .skip(2).listen((event) {
      print('Stream.periodic -> $event');
    }).onDone(() {
      print('Stream.periodic -> done 结束');
    });

2.6 Stream skipWhile(bool test(T element))

      skipWhile 用于跳过在 where 符合条件下满足设置 test 条件的订阅数据;即当 testtrue 时跳过当前订阅数据监听;

Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);
streamData.takeWhile((element) {
  print('Stream.periodic.takeWhile -> $element');
  return element <= 6;
}).where((event) {
  print('Stream.periodic.where -> $event');
  return event > 2;
}).skipWhile((element) {
  print('Stream.periodic.skipWhile -> $element');
  return element <= 4;
}).listen((event) {
  print('Stream.periodic -> $event');
}).onDone(() {
  print('Stream.periodic -> done 结束');
});

2.7 Stream map(S convert(T event))

      在当前 Stream 基础上创建一个新的 Stream 并对当前 Stream 进行数据操作,onData 监听到的是 map 变更后的新的数据流;

Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);
streamData.takeWhile((element) {
  print('Stream.periodic.takeWhile -> $element');
  return element <= 6;
}).where((event) {
  print('Stream.periodic.where -> $event');
  return event > 2;
}).skipWhile((element) {
  print('Stream.periodic.skipWhile -> $element');
  return element <= 4;
}).map((event) {
  print('Stream.periodic.map -> $event -> ${event * 100}');
  return event * 100;
}).listen((event) {
  print('Stream.periodic -> $event');
}).onDone(() {
  print('Stream.periodic -> done 结束');
});

2.8 Stream expand(Iterable convert(T element))

      在当前 Stream 基础上创建新的 Stream 并将当前订阅数据转为新的订阅数据组onData 监听 数据组 中每个新的订阅数据元素;

Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);
streamData.takeWhile((element) {
  print('Stream.periodic.takeWhile -> $element');
  return element <= 6;
}).where((event) {
  print('Stream.periodic.where -> $event');
  return event > 2;
}).skipWhile((element) {
  print('Stream.periodic.skipWhile -> $element');
  return element <= 4;
}).expand((element) {
  print('Stream.periodic.expand -> $element -> ${element * 10} -> ${element * 100}');
  return [element, element * 10, element * 100];
}).listen((event) {
  print('Stream.periodic -> $event');
}).onDone(() {
  print('Stream.periodic -> done 结束');
});

2.9 Future get length

      Stream 监听订阅事件结束后,符合 where 条件的数量;

_streamLength(index) async {
  Duration interval = Duration(seconds: 1);
  Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);
  streamData = streamData.takeWhile((element) {
    print('Stream.periodic.takeWhile -> $element');
    return element <= 6;
  }).where((event) {
    print('Stream.periodic.where -> $event');
    return event > 2;
  }).skipWhile((element) {
    print('Stream.periodic.skipWhile -> $element');
    return element <= 4;
  });
  switch (index) {
    case 1:
      var length = await streamData.length;
      print('Stream.length -> $length');
      break;
    case 2:
      var isEmpty = await streamData.isEmpty;
      print('Stream.isEmpty -> $isEmpty');
      break;
    case 3:
      var isBroadcast = await streamData.isBroadcast;
      print('Stream.isBroadcast -> $isBroadcast');
      break;
    case 4:
      var first = await streamData.first;
      print('Stream.first -> $first');
      break;
    case 5:
      var last = await streamData.last;
      print('Stream.last -> $last');
      break;
  }
}

2.10 Future get isEmpty

      Stream 监听订阅事件结束后,统计是否符合 where 条件的订阅数据是否为空;

_streamLength(2);

2.11 Future get first

      获取 Stream 符合条件的第一个订阅数据;

_streamLength(4);

2.12 Future get last

      获取 Stream 符合条件的最后一个订阅数据;

_streamLength(5);

2.13 Future> toList()

      在 Stream 监听结束之后,将订阅数据存储在 List 中,该操作为异步操作;

_streamToList() async {
  var data = [1, 2, '3.toString()', true, true, false, true, 6];
  Stream stream = Stream.fromIterable(data).distinct();
  List list = await stream.toList();
  if (list != null) {
    print('Stream.toList -> ${list}');
    for (int i = 0; i < list.length; i++) {
      print('Stream.toList -> ${i + 1} -> ${list[i]}');
    }
  }
}

2.14 Future> toSet()

      在 Stream 监听结束之后,将订阅数据存储在 Set 中,Set 可以过滤重复数据;

_streamToSet() async {
  var data = [1, 2, '3.toString()', true, true, false, true, 6];
  Stream stream = Stream.fromIterable(data);
  Set set = await stream.toSet();
  if (set != null) {
    print('Stream.toSet -> ${set}');
  }
}

2.15 Future forEach(void action(T element))

      监听 Stream 中订阅数据,是对 listen 方式的一种监听;

_streamForEach() {
  var data = [1, 2, '3.toString()', true, true, false, true, 6];
  Stream stream = Stream.fromIterable(data).distinct();
  stream.forEach((element) => print('Stream.forEach -> $element'));
}


      小菜对 Stream 的尝试才刚刚开始,还有众多方法未曾尝试,对 Stream 的理解还很浅显,如有错误请多多指导!

来源: 阿策小和尚

目录
相关文章
Flutter:Stream.periodic 示例
本文将带您了解在 Flutter中使用Stream.periodic的完整示例 该Stream.periodic构造,顾名思义,是用来创建流,在周期间隔反复广播事件。简单用法:
198 0
Flutter 82: 初识 Flutter Stream (二)
0 基础学习 Flutter,第八十二步:学习基础的 Stream 和 StreamController!
1002 0
|
13天前
|
缓存 监控 前端开发
【Flutter 前端技术开发专栏】Flutter 应用的启动优化策略
【4月更文挑战第30天】本文探讨了Flutter应用启动优化策略,包括理解启动过程、资源加载优化、减少初始化工作、界面布局简化、异步初始化、预加载关键数据、性能监控分析以及案例和未来优化方向。通过这些方法,可以缩短启动时间,提升用户体验。使用Flutter DevTools等工具可助于识别和解决性能瓶颈,实现持续优化。
【Flutter 前端技术开发专栏】Flutter 应用的启动优化策略
|
13天前
|
开发框架 Dart 前端开发
【Flutter前端技术开发专栏】Flutter与React Native的对比与选择
【4月更文挑战第30天】对比 Flutter(Dart,强类型,Google支持,快速热重载,高性能渲染)与 React Native(JavaScript,庞大生态,热重载,依赖原生渲染),文章讨论了开发语言、生态系统、性能、开发体验、学习曲线、社区支持及项目选择因素。两者各有优势,选择取决于项目需求、团队技能和长期维护考虑。参考文献包括官方文档和性能比较文章。
【Flutter前端技术开发专栏】Flutter与React Native的对比与选择
|
13天前
|
前端开发 Android开发 iOS开发
【Flutter前端技术开发专栏】Flutter在Android与iOS上的性能对比
【4月更文挑战第30天】Flutter 框架实现跨平台移动应用,通过一致的 UI 渲染(Skia 引擎)、热重载功能和响应式框架提高开发效率和用户体验。然而,Android 和 iOS 的系统差异、渲染机制及编译过程影响性能。性能对比显示,iOS 可能因硬件优化提供更流畅体验,而 Android 更具灵活性和广泛硬件支持。开发者可采用代码、资源优化和特定平台优化策略,利用性能分析工具提升应用性能。
【Flutter前端技术开发专栏】Flutter在Android与iOS上的性能对比
|
13天前
|
Dart 前端开发 测试技术
【Flutter前端技术开发专栏】Flutter开发中的代码质量与重构实践
【4月更文挑战第30天】随着Flutter在跨平台开发的普及,保证代码质量成为开发者关注的重点。优质代码能确保应用性能与稳定性,提高开发效率。关键策略包括遵循最佳实践,编写可读性强的代码,实施代码审查和自动化测试。重构实践在项目扩展时尤为重要,适时重构能优化结构,降低维护成本。开发者应重视代码质量和重构,以促进项目成功。
【Flutter前端技术开发专栏】Flutter开发中的代码质量与重构实践
|
13天前
|
存储 缓存 监控
【Flutter前端技术开发专栏】Flutter中的列表滚动性能优化
【4月更文挑战第30天】本文探讨了Flutter中优化列表滚动性能的策略。建议使用`ListView.builder`以节省内存,避免一次性渲染所有列表项。为防止列表项重建,可使用`UniqueKey`或`ObjectKey`。缓存已渲染项、减少不必要的重绘和异步加载大数据集也是关键。此外,选择轻量级组件,如`StatelessWidget`,并利用Flutter DevTools监控性能以识别和解决瓶颈。持续测试和调整以提升用户体验。
【Flutter前端技术开发专栏】Flutter中的列表滚动性能优化
|
13天前
|
Dart 前端开发 安全
【Flutter前端技术开发专栏】Flutter中的线程与并发编程实践
【4月更文挑战第30天】本文探讨了Flutter中线程管理和并发编程的关键性,强调其对应用性能和用户体验的影响。Dart语言提供了`async`、`await`、`Stream`和`Future`等原生异步支持。Flutter采用事件驱动的单线程模型,通过`Isolate`实现线程隔离。实践中,可利用`async/await`、`StreamBuilder`和`Isolate`处理异步任务,同时注意线程安全和性能调优。参考文献包括Dart异步编程、Flutter线程模型和DevTools文档。
【Flutter前端技术开发专栏】Flutter中的线程与并发编程实践
|
13天前
|
Dart 前端开发 开发者
【Flutter前端技术开发专栏】Flutter中的性能分析工具Profiler
【4月更文挑战第30天】Flutter Profiler是用于性能优化的关键工具,提供CPU、GPU、内存和网络分析。它帮助开发者识别性能瓶颈,如CPU过度使用、渲染延迟、内存泄漏和网络效率低。通过实时监控和分析,开发者能优化代码、减少内存占用、改善渲染速度和网络请求,从而提升应用性能和用户体验。定期使用并结合实际场景与其它工具进行综合分析,是实现最佳实践的关键。
【Flutter前端技术开发专栏】Flutter中的性能分析工具Profiler
|
13天前
|
前端开发 数据处理 Android开发
【Flutter 前端技术开发专栏】Flutter 中的调试技巧与工具使用
【4月更文挑战第30天】本文探讨了Flutter开发中的调试技巧和工具,强调其在及时发现问题和提高效率上的重要性。介绍了基本的调试方法如打印日志和断点调试,以及Android Studio/VS Code的调试器和Flutter Inspector的使用。文章还涉及调试常见问题的解决、性能和内存分析等高级技巧,并通过实际案例演示调试过程。在团队协作中,有效调试能提升整体开发效率,而随着技术发展,调试工具也将持续进化。
【Flutter 前端技术开发专栏】Flutter 中的调试技巧与工具使用