之前一些简单功能用 Async await 的写法用的风生水起, 感觉还不错, 不过它的停止功能就比较尴尬, 一个 Task 没有办法去停止, 使用一个 CancelToken 的话还要自己去写里面的逻辑, 非常坑. 有个日本人写了一个整合到 Unity 的叫啥忘了.
UniRx 其实很早就有人写了, 貌似也是个日本人, 参照的就是微软的 Reactive Extensions, 它也好在异步逻辑可以写成像同步的一样, 不停拼接起来就行了, 比如下面的例子 :
var parallel = Observable.WhenAll( ObservableWWW.Get("http://google.com/"), ObservableWWW.Get("http://bing.com/"), ObservableWWW.Get("http://unity3d.com/")); parallel.Subscribe(xs => { Debug.Log(xs[0].Substring(0, 100)); // google Debug.Log(xs[1].Substring(0, 100)); // bing Debug.Log(xs[2].Substring(0, 100)); // unity });
或者实际使用中的例子, 删掉主要代码后的 :
var clearStep = Observable.FromCoroutine(() => ClearIteratively(800)); List<IObservable<Unit>> allSteps = new List<IObservable<Unit>>() { clearStep }; foreach(var uploadData in m_points) { var download = Download(uploadData); allSteps.Add(download); } var _Rx = Observable.WhenAll(allSteps.ToArray()).SubscribeOn(Scheduler.ThreadPool).Do<Unit>((_) => { // ... }).SubscribeOnMainThread().Do<Unit>((_) => { onReceived.Invoke(); }).SelectMany(() => ShowVisualisation(localDatas, Color.red, 100)).Subscribe();
先做了一个协程的Clear, 然后进行多个数据的下载, 然后所有数据现在完成后在多线程中组装, 之后回到主线程通知, 然后进行协程的显示数据...
然后最大的好处是我可以在任何时候进行停止逻辑 _Rx.Dispose(); 就行了, 它的执行点在协程的各个 yield 和每个拼接处, 随时都可以进行停止, 或是不再往下执行.
唯一的问题是一旦进行 Subscribe() 之后, 就无法再在此流程上添加后续处理了, 有点尴尬...
例子:
var ob1 = ObservableWWW.Get("https://fanyi.baidu.com/"); var ob2 = Observable.Timer(new System.TimeSpan(0, 0, 5)); var ob3 = ObservableWWW.Get("https://www.baidu.com/"); ob1.Do<string>(x => Debug.Log(x)).ContinueWith(ob2).ContinueWith(ob3).Do<string>(x => Debug.Log(x)).Subscribe();
如果我是已经开始的一段逻辑:
var disposable = ob1.Do<string>(x => Debug.Log(x)).ContinueWith(ob2).ContinueWith(ob3).Subscribe(); // disposable ......
这时候想要打印 ob3 的内容就没办法了.......
虽然不完美不过对于异步流水线化的过程来说使用非常方便