Rx.NET 简介

简介: 官网: http://reactivex.io/ 它支持基本所有的主流语言. 这里我简单介绍一下Rx.NET. 之前我写了几篇关于RxJS的文章, 概念性的东西推荐看这些: http://www.

官网: http://reactivex.io/

它支持基本所有的主流语言.

这里我简单介绍一下Rx.NET.

之前我写了几篇关于RxJS的文章, 概念性的东西推荐看这些:

http://www.cnblogs.com/cgzl/p/8641738.html

http://www.cnblogs.com/cgzl/p/8649477.html

http://www.cnblogs.com/cgzl/p/8662625.html

基本概念和RxJS是一样的.

下面开始切入正题.

Rx.NET总览

Rx.NET总体上看可以分为三个部分:

  • 核心部分: Observables, Observers和Subjects
  • LINQ和扩展, 用于查询和过滤Observables
  • 并发和调度的支持

.NET Core的Events

.net core里面的event是通过委托对观察者模式的实现.

但是event在.net core里面并不是头等公民:

  • 人们对它的语法+=评价是褒贬不一的.
  • 很难进行传递和组合
  • 很难进行event的连串(chaining)和错误处理(尤其是同一个event有多个handler的时候)
  • event并没有历史记录

举个例子:

鼠标移动这个事件(event), 鼠标移动的时候会触发该事件, 这些事件会进入某个管道并记录该鼠标的坐标, 这样就会产生一个数据的集合/序列/流.

这里我们就是构建了一个基于时间线的鼠标坐标的序列, 每一次触发事件就会在这个管道上产生一个新的值. 在另一端, 一旦管道上有了新的值, 那么管道的观察者就会得到通知, 这些观察者通过提供回调函数的方式来注册到该管道上. 管道每次更新的时候, 这些回调函数就会被调用, 从而刷新了观察者的数据.

这个例子里, Observable就是管道, 一系列的值在这里被生成. Observer(观察者)在Observable有新的值的时候会被通知.

核心接口

IObservable:

  • Subscribe(IObserver<T> observer)

IObserver

  • void OnNext<T>(T value), 序列里有新的值的时候会调用这个
  • void OnCompleted(), 序列结束的时候调用这个
  • void OnError(Exception ex), 发生错误的时候调用这个

这个和RxJS基本是一样的.

Marble图

可以通过marble图来理解Rx

这图表示的是IObserver, 每当有新的值在Observable出现的时候, 传递到IObservable的Subscribe方法的参数IObserver的OnNext方法就会调用. 发生错误的话 OnError方法就会调用, 整个流也就结束了. 没有错误的话, 走到结束就会调用OnComplete方法. 不过有些Observable是不会结束的.

Observable.Subscribe()返回的Subscription对象被Dispose后, Observer就无法收到新的数据了.

 

创建Observable流/序列

 创建流/序列的方式:

  • 返回简单的值
  • 包装现有的值
  • 写一个生成函数

简单的Observables

  • Observable.Empty 返回一个直接结束的Obsevable序列
  • Observable.Never 返回一个没有值, 且永远不会结束的序列
  • Observable.Throw(exception), 返回一个带有错误的序列
  • Observable.Return(xxx) 返回单值的序列

包装Observables

可以包装下面这些来返回Observable:

  • Action
    • Observable.Start(() => 42) 返回一个含有42的序列, 并在Action结束的时候, OnComplete方法被调用.
  • Task
    • Task.ToObservable() 使用这个扩展方法进行包装, 当Task结束的时候, Observable推送新的数据, 然后结束
  • IEnumerable
    • ienumerable.ToObservable() 也是扩展方法, ienumerable的每个值都会作为新的值被推送到Observable上, 最后结束OnComplete
  • Event
    • Observable.FromEventPattern(obj, "xxChanged") 这是个工厂方法, 需要提供触发event的对象和event的名字.

生成函数

  • Range
  • Interval, Timer
  • Create(低级), Generate

看图解释:

Observable.Range(1, 4):

Observable.Interval(200):

Observable.Timer(200, () => 42):

            Observable.Create<int>(o =>
            {
                o.OnNext(42);
                o.OnComplete();
                return Disposable.Empty;
            });

 

Observable.Generate(1,
value => value < 5,
value => value + 1,
value => value);

 

例子

using System;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading.Tasks;

namespace Test
{
    class Program
    {
        static void Main(string[] args)
        {
            var sequence = GetTaskObservable();
            sequence.Subscribe
            (
                x => Console.WriteLine($"OnNext: {x}"),
                ex => Console.WriteLine($"OnError: {ex}"),
                () => Console.WriteLine("OnCompleted")
            );
            Console.ReadKey();
        }

        private static IObservable<int> GetSimpleObservable()
        {
            return Observable.Return(42);
        }

        private static IObservable<int> GetThrowObservable()
        {
            return Observable.Throw<int>(new ArgumentException("Error in observable"));
        }

        private static IObservable<int> GetEmptyObservable()
        {
            return Observable.Empty<int>();
        }

        private static IObservable<int> GetTaskObservable()
        {
            return GetTask().ToObservable();
        }

        private static async Task<int> GetTask()
        {
            return 42;
        }

        private static IObservable<int> GetRangeObservable()
        {
            return Observable.Range(2, 10);
        }

        private static IObservable<long> GetIntervalObservable()
        {
            return Observable.Interval(TimeSpan.FromMilliseconds(200));
        }

        private static IObservable<int> GetCreateObservable()
        {
            return Observable.Create<int>(observer =>
            {
                observer.OnNext(1);
                observer.OnNext(2);
                observer.OnNext(3);
                observer.OnNext(4);
                observer.OnCompleted();
                return Disposable.Empty;
            });
        }

        private static IObservable<int> GetGenerateObservable()
        {
            return Observable.Generate(
                1,
                x => x < 5,
                x => x + 1,
                x => x
            );
        }
    }
}

 

请自行运行查看结果.

Cold 和 Hot Observable

Cold: Observable可以为每个Subscriber创建新的数据生产者

Hot: 每个Subscriber从订阅的时候开始在同一个数据生产者那里共享其余的数据.

从原理来说是这样的: Cold内部会创建一个新的数据生产者, 而Hot则会一直使用外部的数据生产者.

举个例子:

Cold: 就相当于我在腾讯视频买体育视频会员, 可以从头看里面的足球比赛.

Hot: 就相当于看足球比赛的现场直播, 如果来晚了, 那么前面就看不到了.

把Cold 变 Hot, 使用.Publish()方法.

把Hot 变 Cold, 使用.Subscribe()方法把它变成Subject即可.

 

过滤和控制序列

LINQ操作符

操作符的类型:

  • 过滤
  • 合并
  • 聚合
  • 工具

过滤

sequence.Where(x => x % 2 == 0):

.OfType<Square>():

移除重复的:

.Distinct():

.DistinctUntilChanged():

过滤头尾元素:

.Take(2)  .Skip(2):

.SkipLast(2)     .TakeLast(2):

序列的阀:

a.TakeUnit(b)l   a.SkipUntil(b):

实际例子: 把鼠标移动和点击转化为拖拽:

代码非常的简单:

var mouseDrags = mouseMoves.SkipUntil(mouseDowns).TakeUnit(mouseUps);

合并

a.Merge(b)

a.Amb(b), 其中的amb是ambiguous的缩写:

a.Concat(b):

为序列配对:

a.CombineLatest(b, (x, y) => x + y):

a.Zip(b, (x, y) => x +  y):

序列的序列:

Merge()是可以达到这种效果的:

.Switch():

聚合

聚合就是指把序列聚合成一个值, 在序列结束后才能返回值

Count() Sum():

Aggregate():

Scan():

其他工具操作符

会有一些副作用

 .Do(x => Log(x)): 但是记住不要改变序列的元素

.TimeStamp():

.Throttle(TimeSpan.FromSeconds(1))

 

异步和多线程

异步就表示不一定按顺序执行, 但是它可以保证非阻塞, 通常会有回调函数(或者委托或者async await).

但是异步对于Rx来说就是它的本性

Rx的同步异步对比:

多线程

Rx不是多线程的, 但是它是线程自由的(就是可以使用多个线程), 它被设计成只是用必须的线程而已.

多线程表示, 同时有多个线程在执行. 也可以称作并发. 它可以分担计算量. 但是据需要考虑线程安全了.

Rx已经做了一些抽象, 所以不必过多的考虑线程安全了.

例如: 

Observable.Interval(TimeSpan.FromSeconds(1)).Subscribe(xxx):

UI的例子:

Observable.Interval(TimeSpan.FromSeconds(1)).ObserveOn(SynchronizationContext.Current).Subscribe(t => searchBox.Text = t.ToString()):

如果计算量比较大的话:

Observable.Create(大量工作).Subscribe(xxx):

UI假死, 这就不好了.

应该这样:

Observable.Create(大量工作).SubscribeOn(NewThreadScheduler.Default).ObserveOn(SynchronizationContext.Current).Subscribe(xxx):

 

Schedulers

Scheduler可以在Rx里面安排执行动作. 它使用IScheduler接口.

现在就可以把Scheduler理解为是对未来执行的一个抽象.

它同时也负责着Rx所有的并发工作.

Rx提供了很多Scheduler.

下面是.net现有有很多种在未来执行动作的方法:

Rx里面就这个:

IScheduler接口:

基本上不用直接去使用IScheduler, 因为内置了很多现成的Schedulers了:

  • Immediate, 这是唯一一个不是异步的Scheduler
  • CurrentThread
  • EventLoop
  • Dispatcher
  • NewThread 
  • TaskPool, ThreadPool

Schedulers实际上到处都使用着:

应该用哪个Scheduler?

Fake Scheduler:

用于测试

 

下面是我的关于ASP.NET Core Web API相关技术的公众号--草根专栏:

目录
相关文章
|
开发框架 .NET 编译器
C#OOP之十四 .Net Framework简介
C#OOP之十四 .Net Framework简介
96 0
DL之SPP-Net:SPP-Net算法的简介(论文介绍)、架构详解、案例应用等配图集合之详细攻略(二)
DL之SPP-Net:SPP-Net算法的简介(论文介绍)、架构详解、案例应用等配图集合之详细攻略
DL之SPP-Net:SPP-Net算法的简介(论文介绍)、架构详解、案例应用等配图集合之详细攻略(二)
|
XML 开发工具 数据格式
Microsoft .NET:Microsoft .NET之.net4.5.1简介、安装、使用方法之详细攻略
Microsoft .NET:Microsoft .NET之.net4.5.1简介、安装、使用方法之详细攻略
|
C# Windows .NET
带你读《C# 7.0核心技术指南》之一:C#和.NET Framework简介
本书前三章将集中介绍C#语言。首先介绍最基本的语法、类型和变量。而后会介绍一些高级的特性,如不安全代码以及预处理指令。其余各章则涵盖了.NET Framework的核心功能,包括LINQ、XML、集合、并发、I/O和网络、内存管理、反射、动态编程、特性、安全、应用程序域和原生互操作性等主题。第6章和第7章是后续主题的基础,除这两章之外,其余各章可以按照需要以任何顺序阅读。LINQ相关的三个章节最好按顺序阅读。其中的一些章节需要一些并发相关的知识,这些知识将在第14章中介绍。
|
物联网
Microsoft .NET Gadgeteer 简介及其它
Microsoft .NET Gadgeteer 为开发小型电子模块或嵌入式设备的用户,提供一个快速构建原型机的平台。它结合了面向对象编程的优点,提供一系列电子模块,可以快速地用这些模块进行计算机辅助设计。
758 0
|
存储 XML C#
.NET Core/.NET之Stream简介
之前写了一篇C#装饰模式的文章提到了.NET Core的Stream, 所以这里尽量把Stream介绍全点. (都是书上的内容) .NET Core/.NET的Streams 首先需要知道, System.
1507 0