声明:本文是《C#并发编程经典实例》的样章,感谢图灵授权并发编程网站发布样章,禁止以任何形式转载此文。
问题
把一个事件作为 Rx 输入流,每次事件发生时通过 OnNext 生成数据。
解决方案
Observable 类 定 义 了 一 些 事 件 转 换 器。 大 部 分 .NET 框 架 事 件 与 FromEventPattern 兼 容, 对于不遵循通用模式的事件,需要改用 FromEvent。
FromEventPattern 最适合使用委托类型为 EventHandler 的事件。很多较新框架类的事 件都采用了这种委托类型。例如,Progress 类定义了事件 ProgressChanged,这个事件 的委托类型就是 EventHandler,因此,它就很容易被封装到 FromEventPattern:
var progress = new Progress<int>();
var progressReports = Observable.FromEventPattern<int>(
handler => progress.ProgressChanged += handler,
handler => progress.ProgressChanged -= handler);
progressReports.Subscribe(data => Trace.WriteLine("OnNext:" + data.EventArgs));
请 注 意,data.EventArgs 是 强 类 型 的 int。FromEventPattern 的 类 型 参 数( 上 例 中 为 int) 与 EventHandler 的 T 相同。Rx 用 FromEventPattern 中的两个 Lambda 参数来实现订阅 和退订事件。
较新的 UI 框架采用 EventHandler,可以很方便地应用在 FromEventPattern 中。但是有 些较旧的类常为每个事件定义不同的委托类型。这些事件也能在 FromEventPattern 中使用, 但需要做一些额外的工作。例如,System.Timers.Timer 类有一个事件 Elapsed,它的类型是 ElapsedEventHandler。对此旧类事件,可以用下面的方法封装进 FromEventPattern:
var timer = new System.Timers.Timer(interval: 1000)
{
Enabled = true
};
var ticks = Observable.FromEventPattern<ElapsedEventHandler, ElapsedEventArgs>(
handler => (s, a) => handler(s, a),
handler => timer.Elapsed += handler,
handler => timer.Elapsed -= handler);
ticks.Subscribe(data => Trace.WriteLine("OnNext: " + data.EventArgs.SignalTime));
注意,data.EventArgs 仍然是强类型的。现在 FromEventPattern 的类型参数是对应的事件 处理程序和 EventArgs 的派生类。FromEventPattern 的第一个 Lambda 参数是一个转换器, 它将 EventHandler 转换成 ElapsedEventHandler。除了传递事件,这个 转换器不应该做其他处理。
上面代码的语法明显有些别扭。另一个方法是使用反射机制:
var timer = new System.Timers.Timer(interval: 1000) {
Enabled = true
}; var ticks = Observable.FromEventPattern(timer, "Elapsed");
ticks.Subscribe(data => Trace.WriteLine("OnNext: "
+ ((ElapsedEventArgs)data.EventArgs).SignalTime));
采用这种方法后,调用 FromEventPattern 就简单多了。但是这种方法也有缺点:出现了 一个怪异的字符串(”Elapsed”),并且消息的使用者不是强类型了。就是说,这时 data. EventArgs 是 object 类型,需要人为地转换成 ElapsedEventArgs。
讨论
事件是 Rx 流数据的主要来源。本节介绍如何封装遵循标准模式的事件(标准事件模式: 第一个参数是事件发送者,第二个参数是事件的类型参数)。对于不标准的事件类型,可 以用重载 Observable.FromEvent 的办法,把事件封装进 Observable 对象。
把 事 件 封 装 进 Observable 对 象 后, 每 次 引 发 该 事 件 都 会 调 用 OnNext。 在 处 理 AsyncCompletedEventArgs 时 会 发 生 令 人 奇 怪 的 现 象, 所 有 的 异 常 信 息 都 是 通 过 数 据 形 式 传 递 的(OnNext), 而 不 是 通 过 错 误 传 递(OnError)。 看 一 个 封 装 WebClient. DownloadStringCompleted 的例子:
var client = new WebClient();
var downloadedStrings = Observable.FromEventPattern(client, "DownloadStringCompleted");
downloadedStrings.Subscribe(
data =>
{
var eventArgs = (DownloadStringCompletedEventArgs)data.EventArgs;
if (eventArgs.Error != null)
Trace.WriteLine("OnNext: (Error) " + eventArgs.Error);
else
Trace.WriteLine("OnNext: " + eventArgs.Result);
},
ex => Trace.WriteLine("OnError: " + ex.ToString()), () => Trace.WriteLine("OnCompleted"));
client.DownloadStringAsync(new Uri("http://invalid.example.com/"));
WebClient.DownloadStringAsync 出错并结束时,引发带有异常 AsyncCompletedEventArgs.Error的事件。可惜 Rx 会把这作为一个数据事件,因此这个程序的结果是显示“OnNext:(Error)”,
而不是“OnError:”。
有些事件的订阅和退订必须在特定的上下文中进行。例如,很多 UI 控件的事件必须在 UI 线程中订阅。Rx 提供了一个操作符 SubscribeOn,可以控制订阅和退订的上下文。大多数 情况下没必要使用这个操作符,因为基于 UI 的事件订阅通常就是在 UI 线程中进行的。