Exploring The Major Interfaces in Rx-阿里云开发者社区

开发者社区> 开发与运维> 正文

Exploring The Major Interfaces in Rx


本文描述了主要的Reactive Extensions (Rx)接口,其用于表示observable序列,并subscribe它们。

IObservable<T>/IObserver<T>在.NET 4.0基础类库中出现,它们也可作为.NET 3.5, Silverlight 3 and 4以及Javascript的一个包安装。


Rx exposes asynchronous and event-based data sources as push-based, observable sequences abstracted by the new IObservable<T> interface in .NET Framework 4.0. This IObservable<T> interface is a dual of the familiar IEnumerable<T> interface used for pull-based, enumerable collections. It represents a data source that can be observed, meaning that it can send data to anyone who is interested. It maintains a list of dependent IObserver<T> implementations representing such interested listeners, and notifies them automatically of any state changes.

Rx公开异步和基于事件的数据源,作为push模式的基础,observable序列由.NET Framework 4.0中的IObservable<T>接口抽象。IObservable<T>接口非常类似于IEnumerable<T>接口,只是IEnumerable<T>接口用于pull-based,可枚举集合。IObservable<T>接口代表可被观察的数据源——即它可以发送数据给任何关注他的对象。它维护了一个依赖于IObserver<T>实现的对象的列表,诸如那些关注数据源的listeners,并自动通知它们任何状态更改。

An implementation of the IObservable<T> interface can be viewed as a collection of elements of type T. Therefore, an IObservable<int> can be viewed as a collection of integers, in which integers will be pushed to the subscribed observers.


As described in What is Rx, the other half of the push model is represented by the IObserver<T> interface, which represents an observer who registers an interest through a subscription. Items are subsequently handed to the observer from the observable sequence to which it subscribes.

What is Rx所述,push模式的另一半由IObserver<T>接口表示,它代表一个observer,其通过subscription注册关注点(interest)。项随后从observable序列被移交给订阅了它的observer。

In order to receive notifications from an observable collection, you use the Subscribe method of IObservable to hand it an IObserver<T> object. In return for this observer, the Subscribe method returns an IDisposable object that acts as a handle for the subscription. This allows you to clean up the subscription after you are done.  Calling Dispose on this object detaches the observer from the source so that notifications are no longer delivered. As you can infer, in Rx you do not need to explicitly unsubscribe from an event as in the .NET event model.

为了从observable集合接收到通知,需使用IObservable 接口的Subscribe方法,以将IObservable移交给IObserver<T>对象。此observer的返回中,Subscribe方法返回了一个IDisposable对象,它代表这个subscription的handle。这允许你完成处理后清除subscription。调用此对象上的Dispose方法将分离此observer与源,这样,通知将不再传输给它。如你所推断,在Rx中,你无须像在.NET 事件模型中那样显式的unsubscribe event。

Observers support three publication events, reflected by the interface’s methods. OnNext can be called zero or more times, when the observable data source has data available. For example, an observable data source used for mouse move events can send out a Point object every time the mouse has moved. The other two methods are used to indicate completion or errors.

Observers支持3个公开的事件,都由接口的方法所反映。OnNext可以被调用0次或多次,当observable数据源具有有效数据时。例如,用于mouse move事件的observable数据源可以在每次mouse移动时发送一个Point对象。另2个方法用于指示完成或错误状态。

The following lists the IObservable<T>/IObserver<T> interfaces.


public interface IObservable<out T> 


IDisposable Subscribe(IObserver<T> observer); 


public interface IObserver<in T> 


void OnCompleted(); // Notifies the observer that the source has finished sending messages.

void OnError(Exception error); // Notifies the observer about any exception or error.

void OnNext(T value); // Pushes the next data value from the source to the observer.


Rx also provides Subscribe extension methods so that you can avoid implementing the IObserver<T> interface yourself. For each publication event (OnNext, OnError, OnCompleted) of an observable sequence, you can specify a delegate that will be invoked, as shown in the following example. If you do not specify an action for an event, the default behavior will occur.

Rx也提供了Subscribe扩展方法,因此你可以避免自己去实现IObserver<T>接口。对于observable序列的每个公开事件(OnNext, OnError, OnCompleted),你可以指定一个委托,其再合适的时候被调用,如下例所示。如果你没有为事件指定一个action,默认的行为将发生。

IObservable<int> source = Observable.Range(1, 5); //creates an observable sequence of 5 integers, starting from 1

IDisposable subscription = source.Subscribe(

x => Console.WriteLine("OnNext: {0}", x), //prints out the value being pushed

ex => Console.WriteLine("OnError: {0}", ex.Message),

() => Console.WriteLine("OnCompleted"));

You can treat the observable sequence (such as a sequence of mouse-over events) as if it were a normal collection. Thus you can write LINQ queries over the collection to do things like filtering, grouping, composing, etc. To make observable sequences more useful, the Rx assemblies provide many factory LINQ operators so that you do not need to implement any of these on your own. This will be covered in the Querying Observable Sequences using LINQ Operators topic.

你可以像普通集合那样对待observable sequence(如:mouse over事件序列)。因此你可以在这个集合上编写编写LINQ查询,执行诸如过滤,分组,组合等等。为了使observable sequences更有用,Rx程序集提供了很多工厂LINQ操作,请参考Querying Observable Sequences using LINQ Operators


You do not need to implement the IObservable<T>/IObserver<T> interfaces yourself.  Rx provides internal implementations of these interfaces for you and exposes them through various extension methods provided by the Observable and Observer types.  See the Creating and Querying Observable Sequences topic for more information.

注意:你无须自行实现IObservable<T>/IObserver<T>接口。Rx提供了这些接口的internal实现,并通过Observable and Observer类型的各种扩展方法公开它们,参考Creating and Querying Observable Sequences


版权声明:本文首发在云栖社区,遵循云栖社区版权声明:本文内容由互联网用户自发贡献,版权归用户作者所有,云栖社区不为本文内容承担相关法律责任。云栖社区已升级为阿里云开发者社区。如果您发现本文中有涉嫌抄袭的内容,欢迎发送邮件至:developer2020@service.aliyun.com 进行举报,并提供相关证据,一经查实,阿里云开发者社区将协助删除涉嫌侵权内容。

+ 订阅