声明:本文是《C#并发编程经典实例》的样章,感谢图灵授权并发编程网站发布样章,禁止以任何形式转载此文。
问题
有一系列事件,需要在它们到达时进行分组。举个例子,需要对一些成对的输入作出响
应。第二个例子,需要在 2 秒钟的窗口期内,对所有输入进行响应。
解决方案
Rx 提 供 了 两 个 对 到 达 的 序 列 进 行 分 组 的 操 作:Buffer 和 Window。Buffer 会 留 住 到 达 的 事 件, 直 到 收 完 一 组 事 件, 然 后 会 把 这 一 组 事 件 以 一 个 集 合 的 形 式 一 次 性 地 转 送 过 去。 Window 会在逻辑上对到达的事件进行分组,但会在每个事件到达时立即传递过去。Buffer 的返回类型是 IObservable<IList<T>(由若干个集合组成的事件流);Window 的返回类型 是 IObservable<IObservable<T>(由若干个事件流组成的事件流)。
下面的例子使用 Interval,每秒创建 1 个 OnNext 通知,然后每 2 个通知做一次缓冲:
private void Button_Click(object sender, RoutedEventArgs e)
{
Observable.Interval(TimeSpan.FromSeconds(1))
.Buffer(2)
.Subscribe(x => Trace.WriteLine(
DateTime.Now.Second + ": Got " + x[0] + " and " + x[1]));
}
用我的电脑测试,每 2 秒产生 2 个输出:
13: Got 0 and 1
15: Got 2 and 3
17: Got 4 and 5
19: Got 6 and 7
21: Got 8 and 9
下面的例子有些类似,使用 Window 创建一些事件组,每组包含 2 个事件:
1
private void Button_Click(object sender, RoutedEventArgs e)
{
Observable.Interval(TimeSpan.FromSeconds(1))
.Window(2)
.Subscribe(group =>
{
});
}
Trace.WriteLine(DateTime.Now.Second + ": Starting new group");
group.Subscribe(
x => Trace.WriteLine(DateTime.Now.Second + ": Saw " + x),
() => Trace.WriteLine(DateTime.Now.Second + ": Ending group"));
用我的电脑测试,输出的结果就是这样:
17: Starting new group
18: Saw 0
19: Saw 1
19: Ending group
19: Starting new group
20: Saw 2
21: Saw 3
21: Ending group
21: Starting new group
22: Saw 4
23: Saw 5
23: Ending group
23: Starting new group
这几个例子说明了 Buffer 和 Window 的区别。Buffer 等待组内的所有事件,然后把所有事 件作为一个集合发布。Window 用同样的方法进行分组,但它是在每个事件到达时就发布。
Buffer 和 Window 都可以使用时间段作为参数。在下面的例子中,所有的鼠标移动事件被 收集进窗口,每秒一个窗口:
private void Button_Click(object sender, RoutedEventArgs e)
{
Observable.FromEventPattern<;MouseEventHandler, MouseEventArgs>;(
handler => (s, a) => handler(s, a), handler => MouseMove += handler, handler => MouseMove -= handler)
.Buffer(TimeSpan.FromSeconds(1))
.Subscribe(x => Trace.WriteLine(
DateTime.Now.Second + ": Saw " + x.Count + " items."));
}
输出的结果依赖于怎么移动鼠标,类似于这样:
49: Saw 93 items.
50: Saw 98 items.
51: Saw 39 items.
52: Saw 0 items.
53: Saw 4 items.
54: Saw 0 items.
55: Saw 58 items.
讨论
Buffer 和 Window 可用来抑制输入信息,并把输入塑造成我们想要的样子。另一个实用技 术是限流(throttling),将在 5.4 节介绍。
Buffer 和 Windows 都有其他重载,可用在更高级的场合。参数为 skip 和 timeShift 的重载 能创建互相重合的组,还可跳过组之间的元素。还有一些重载可使用委托,可对组的边界 进行动态定义。