注:本文的工程是基于 vs2010 的,在 vs2012 中区别不大。
本文的意图是让读者熟悉 Reactive Extension for .net(Rx) 的使用。通过一系列的例子,让读者感受
基于 observable 概念的 Rx 异步编程组合的威力。
准备
为了完成下面的练习需要读者有以下的准备:
1)了解 .net 和 C# 编程语言
2)了解异步编程的概念和相关的组合
3)安装了 vs2010 和 .net 4
4)在 MSDN DevLabs (http://msdn.microsoft.com/en-us/devlabs/default.aspx)安装 Rx for .net 4
Rx 是什么?
Rx 可以被概括为下面的话,你也可以从 DevLabs 的首页看到:
Rx 是一个使用可观察序列编写异步和基于事件编程的类库。
包含下面三个核心的特征,这些都会在后面的实验里涉及:
1)Asynchronous and event-based:像标题中反映的,Rx 的核心功能是简化这些编程模型。大家都知道
无论是 windows 平台还是 web上页面阻塞的体验很差,于是异步操作变得很重要。像 .net 中的事件、异步编程
模型、tasks、AJAX等等通常是很复杂的。
2)Composition:异步组合编程在今天看来是比较难的。但是编写的大部分上层代码与问题的解决关系不大。特别是,操作中数据的流动一点也不清晰,并且代码的执行通常依赖异步回调。
3)Observable collections:通过观察异步运算的数据源,我们可以借鉴现有的 LINQ 编程模型的知识。是的:
你的鼠标的移动和单击是一个 database。在 Rx 的世界里,异步数据源的组合操作是使用 LINQ 式的语句的各种配合使用,看起来像过滤、注入、联合、基于时间的操作。
本文流程
在下面我们用渐进的方法解释 Rx。首先,我们看一下在 .net 4 中 Rx 的核心接口。我们一旦理解了这些
接口(和它们与 IEnumerable<T> 的联系),我们接着演示如何使用 Rx 类库创建简单的可观察序列。
接下来我们介绍如何衔接已有的像 .net 中的事件和异步编程。然后演示更多的异步组合数据源的查询操作,
充分释放 Rx 的真正威力。
练习 1 - 了解 Rx 的接口和程序集
目标:基本了解在 .net 4 BCL 中的 IObservable<T> 和 IObserver<T> 接口 和 System.Reactive 与
System.CoreEx 程序集。
1、打开 vs2010 创建一个 C# 的控制台程序。确保是基于 .net framework 4的项目
2、在 Main 方法中输入下面的代码段,下面显示了 Rx 中两个核心接口。从 .net4 开始,这些接口被编译到了
基本的类库 mscorlib.dll 中。
注意:在其它平台获得 Rx 的支持(包括 .net 3.5 sp1 和 Silverlight),需要添加一个 System.Observable.dll
的程序集以便获得这两个接口。
3、在开始解释这些接口前,我们看一下它们。使用“对象浏览器”(在“视图”菜单项)或者选中代码片段按 F12.
我们可以使用下面的变量命名接口对象:
IObservable<int> source;
IObserver<int> handler;
接口的定义如下:
两个接口的角色是互补的。这 IObserable<T> 接口作为可以被观察(observed)的数据源,意味着
它可以向任何监听它的人发送数据。那些感兴趣的对象就使用 IObserver<T> 代表。
为了从一个 observable 集合接到通知,需要为 Subscribe 方法来传递一个 IObserver<T> 对象。
同时 Subcribe 方法返回一个 IDisposable 对象作为注册结果的引用,调用这个对象的 Dispose 方法会分离
数据源和观察着的联系,从而不会再有通知的传递。这与 .net 中事件的 += 和 -= 类似,但是 Subscribe
方法提供了更多的灵活性。
观察者(Observers)支持三种通知,体现在接口的方法上。当被观察的数据源可用时,ONext 方法可以被调用 0
或多次。例如,一个记录鼠标移动事件的被观察数据源,在每次鼠标移动时会发送一个 Point 对象。另外两个方法
用来标识成功或者异常结束。
背景:这两个接口和 IEnumerable<T>与 IEnumerator<T>相似,但是却有不同。首先可以概括为两个简单
的单词:推 vs 拉。也就是一个 observable 数据源向它的 observers "推"数据,而 enumerable 数据源是通过
enumerator 进行“拉”数据(典型的是使用 foreach 结构)。从这种二元性得出结论,在一个世界里的所有的 LINQ
操作可以相应地运用在另一个世界里。我们可以在下文中看到。
4、下面将解释使用 Rx 库中的方法,如何创建一个 observable 序列。现在还不能运行:
5、上面的代码演示了 observable 序列的典型使用方式。你可能已经注意到关于一个 IObserable<T>对象
的 Subscribe 方法的作用:
6、为了使 observable 序列更有用,Rx 库提供了一整套的方法来对它进行操作,并且可以进行组合使用。
虽然 .net 4 BCL 提供了简单的接口,但是 Rx 为它们提供了丰富的方法。为了添加引用,在解决方案中,
右键单击项目节点,选择“Add Referance...”,你可以找到 System.Reactive 和 System.CoreEx:
把它们添加到你的项目中。在这个试验中,我们没有涉猎到包含 LINQ to Objects 扩展的 Syste.Interactive 程
序集。
7、你会想知道为什么我们添加这两个程序集。System.Reactive 包含了 observable 序列的操作(用扩展方法的
实现)。因为其中很多操作涉及到了并行(异步编程),所以需要引入 scheduler(任务调度器) 的概念。
System.Reactive 和System.Interactive 并不会直接处理并发,而是 System.CoreEx 中的方法提供了相应的任
务调度的实现。稍后我们会看到像 ObserveOn 的操作,来使用 schedulers。
8、通过添加上面的程序集,我们现在可以看到 IObserable<T> 对象的更多方法。因为它们是扩展方法,
一个小试验会揭示两个额外的方法。删除 System 以外的命名空间,观察到智能提示中 IObservable<T>:
可以注意到 Subscribe 扩展方法是通过 System 命名空间添加的。这些重载的方法允许你不必实现
IObserver<T> 接口,因为你可以使用接口指定这三个 handler 方法(OnNext,OnError,OnCompleted)。
例如:
9、为了看到运用到 IObservable<T> 对象的操作,直接添加 System.Linq 命名空间。这次可以看到很多
操作。如果你对 Linq 很熟悉,可以找到你喜欢的操作,比如 Where、Select 等等。
结论:和预想的一样,IObserable<T> 和 IObserver<T> 代表了一个数据源和一个监听者。为了是监听者
监听可观察序列,它提供了一个 Subscribe 方法,并返回一个实现了 IDisposable 的对象用来 unsbscribe。
因为这些接口是在 .net 4 BCL 中,在其他平台它们仅可以在添加 System.Observable 后可用。为了丰富
observable 序列的操作,可以通过导入 System 和 System.Linq 命名空间,它们为 System.Reactive 提供了
一系列的扩展方法。
试验2:创建可观察序列(observable sequences)
目标:可观察序列仅仅直接实现了 IObservable<T> 接口。然而提供了一套创建可观察序列的工厂方法。
这些工厂方法提供了一个最初探索可观察数据源和观察者的核心概念的很好的方式。
1、在试验1 的基础上,也就是添加 System.CoreEx 和 System.Reactive 并且同时倒入了 System 和
System.Linq 命名空间。也确保已有下面的代码架构:
2、在上面的部分,我们用各种数据源替换注释的部分,并且观察它们的行为。我们同时会对比 enumerable
序列。因为可观察序列有时通过并行来 pump out 它们的通知(使用异步),我们需要在使用 Console.ReadLine
来保持主线程:
3、我们使用 Empty 这个工厂方法作为开始:
运行这段代码会输出:
OnCompeted
换句话说,这个空序列是标识完成,通知观察着调用 OnCompleted。这个和 LINQ to Object 中的
Enumerable.Empty 或者一个空 array (比如 new int[0])很相似。对于那些 enumerable 序列,
第一次调用 enumerator 的 MoveNext 方法将会返回 false,标识着结束。
背景:你可能想知道一个 observable 序列什么时候开始运行。特别是,什么触发一个空序列去向观察者激发
OnCompeted 通知? 从不同的序列得出的答案是不同的。在这个试验中看到的大部分序列被称为所谓的
cold observables ,意味着它们在注册时就开始运行了。与之相对的 hot observables ,例如鼠标移动事件
,在 subscription 之前也还是流动的。(没有方法可以阻止鼠标的移动)
4、除了 OnCompleted 消息之外,OnError 是一个结束的通知,意味着后面不会再有通知了。工厂方法 Empty
会创建一个空 observable 序列 会立即出发结束事件,下面的 Throw 方法创建一个会立即向 observers 发送
OnError 消息的一个 observable 序列。
运行代码会输出:
OnError:Oops
背景:一个 可观察序列通常用这个 OnError 消息标识一个失败的运算。相比于 CLR 中的异常机制,在 Rx
中的 errors 通常终结并且快速通过 observer 的事件传递失败。存在的更高级处理 errors 的 handlers 有
Catch,OnErrorResumeNext 和 Finally。在本文中我们并不会讨论它们,它们的名称已经很好的解释了它们。
5、最后一个基本的工厂方法叫做 Return。它的作用是返回一个单元素序列,就像一个单元素的数组。当被观察者
注册观察者时会得到两个消息:为 OnNext 传递一个值,触发 OnCompleted 标识序列的结束:
运行代码输出:
OnNext:42
OnCompeted
背景:Return 扮演了一个在 LINQ 理论中的基本的角色,叫做单体。
6、对应于 Enumerable ,Range 方法传递一个开始值和一个长度,返回一个 32位 int 值的序列:
运行代码输出:
OnNext:5
OnNext:6
OnNext:7
OnCompleted
注意:在这个是试验中的所有序列,Range 是一个 cold observable。也就意味着一旦注册一个 observer 时,
它就会开始输出它的结果。cold observable 序列的另一个属性是每次注册都会引发相同行为。因此,如果两次
调用 Subscribe 方法,两个观察者(observers)都会收到(observable)的相同的消息。它不会因为这个观察者
的 data observation 运行结束,而导致另一个观察者不会运行。并且每个观察者都会得到产生的相同的序列。像
Return 和 Range 这样简单的方法,传递给每个 observer 的信息都是相同的。然而,依靠一些外部条件产生的
的 observable 序列,可能向它们的每个观察者传递不同的信息。
7、在 Rx 中的 Generate 构造方法,它像是 C# 中的迭代器(比较 "yield return" 和 "yield break" 语句)循环
输出序列。它使用几个 委托式的参数来判断 结束、下一步、输出一个结果并且发送到它的 observer:
运行代码输出:
OnNext:0
OnNext:1
OnNext:4
OnNext:9
OnNext:16
OnCompleted
注意:另一个名为 GenerateWithTime 的相似方法,会等待一个计算后的时间之后才会进行到下一次迭代。我们一
会就会看到。
8、另一个很有趣的叫做 Never 的方法。它会产生一个永远不会向它的观察者传递任何通知的可观察序列:
运行这段代码不会有任何形式的输出
背景:这个方法有它的实际用途。比如你可以使用它来产生一个不会触发 OnCompleted 事件的序列。
9、Subcribe 方法有一个异步的特性,以异步的方式观察一个序列。这样的一个方法叫做 Run。
下面的demo 通常作为 直到 observable 序列触发 observer 的 OnError 或者 OnCompleted 后
才会停止阻塞调用线程,这就是 Run 的工作:
在这里,直到这个 sequence 顺利完成或者抛出异常后,才会继续运行。
10、最后,我们在 VS 中看一下下面 observable 序列的行为:
这个示例使用 GenerateWithTime 方法允许指定两次输出结果的时间间隔。在这个示例中,0 会在 subscribe 观察者时立刻输出,接着 1秒后输出 1,2秒后输出 4,3秒后输出 9,4秒后输出 16。
注意:也许你并不会使用 using 块的方式立即 dispose 一个 subscription。在这个示例中因为需要等待
用户的输入而它会很好的执行。在其它时候你可以保持这个 IDisposable 对象的引用,以在稍后释放它,或者不用
担心它的释放。
a、首先添加一个断点:
b、按 F10 运行并调试。你会注意到并没有命中断点,而是执行到 Console.ReadLine 方法:
c、之所以是这样,是因为 SubScribe 方法的调用,开启了依赖一个 timer 的后台线程来 输出这个
observable 序列的值。我们会在稍后讨论 Rx 的并发。现在我们观察一下 debugger 线程窗口(Debug,
Windows,Threads 或者 CTRL + D,T):
注意:为了查看 System.CoreEx 和 System.Reactive 调用堆栈框,你必须禁选 "Just My Code"选项。然后切
换到调用栈窗口,右键单击并选中 Show External Code。
我们可以清楚的看到上面的工作线程被 System.Threading timer 在 System.Concurrency 命名空间里的
ThradPoolScheduler 中运行代码。当必要的时候,Rx 通常使用这个命名空间的基本方法来创建并发。
d、单击 F5继续执行。这次,我们可以在 debugger 中看到在后台线程中命中断点:
注意到主线程中还停留在 Console.ReadLine(灰色的部分显示不同的线程),此时我们的调用
栈显示 这个基于 timer 的后台线程输出了消息 OnNext message for x = 1
e、读者可以多次按 F5 继续执行,可以看到每次 OnNext 的消息输出。在 OnCompleted 设置
断点,可以看到 这个 GenerateWithTime 序列的同样的多线程行为:
一个序列向它的观察者传递消息是否是在一个后台线程取决于很多因素。更多详细内容,我们会在实验3 中遇到。
根据需要,开发者可以手动指定 IScheduler 对象来控制这一方面。
结论:创建一个 observable 序列不需要手动实现 IObservable<T> 接口 和 实现 IObserver<T>使用的
Subscribe 方法。开始介绍的 一系列的方法可以创建 0,1个或者更多元素。后面介绍的扩展方法 Subcribe
可以为 OnNext,OnError 和 OnCompleted 传递各种组合委托。