Observable Utility Operators
本文的主题为处理 Observable 的实用工具类操作符。
这里的 Observable 实质上是可观察的数据流。
公共代码
- RxNET
public static void Dump<T>(this IObservable<T> source, string name)
{
source.Subscribe(
i => Console.WriteLine("{0}-->{1}", name, i),
ex => Console.WriteLine("{0} failed-->{1}", name, ex.Message),
() => Console.WriteLine("{0} completed", name));
}
Delay / DelaySubscription
ReactiveX - Delay operator
Reactive Extensions再入門 その29「値を指定した時間だけ遅延させるDelayメソッド」
Delay 发送源数据流的数据,但是发送时间要延迟一个指定的时间段。
DelaySubscription 发送源数据流的数据,但是要订阅时间要延迟一个指定的时间段。
- RxNET
var source = Observable.Interval(TimeSpan.FromSeconds(1))
.Take(5)
.Timestamp();
var delay = source.Delay(TimeSpan.FromSeconds(2));
source.Subscribe(
value => Console.WriteLine("source : {0}", value),
() => Console.WriteLine("source Completed"));
delay.Subscribe(
value => Console.WriteLine("delay : {0}", value),
() => Console.WriteLine("delay Completed"));
/*
source : 0@2018/07/26 9:34:09 +00:00
source : 1@2018/07/26 9:34:10 +00:00
source : 2@2018/07/26 9:34:11 +00:00
delay : 0@2018/07/26 9:34:09 +00:00
source : 3@2018/07/26 9:34:12 +00:00
delay : 1@2018/07/26 9:34:10 +00:00
source : 4@2018/07/26 9:34:13 +00:00
source Completed
delay : 2@2018/07/26 9:34:11 +00:00
delay : 3@2018/07/26 9:34:12 +00:00
delay : 4@2018/07/26 9:34:13 +00:00
delay Completed
*/
var source = Observable.Interval(TimeSpan.FromSeconds(1))
.Take(5)
.Timestamp();
var delay = source.DelaySubscription(TimeSpan.FromSeconds(2));
source.Subscribe(
value => Console.WriteLine("source : {0}", value),
() => Console.WriteLine("source Completed"));
delay.Subscribe(
value => Console.WriteLine("delay : {0}", value),
() => Console.WriteLine("delay Completed"));
/*
source : 0@2018/07/26 11:03:15 +00:00
source : 1@2018/07/26 11:03:16 +00:00
source : 2@2018/07/26 11:03:17 +00:00
delay : 0@2018/07/26 11:03:17 +00:00
source : 3@2018/07/26 11:03:18 +00:00
delay : 1@2018/07/26 11:03:18 +00:00
source : 4@2018/07/26 11:03:19 +00:00
source Completed
delay : 2@2018/07/26 11:03:19 +00:00
delay : 3@2018/07/26 11:03:20 +00:00
delay : 4@2018/07/26 11:03:21 +00:00
delay Completed
*/
- RxJava
Observable.interval(100, TimeUnit.MILLISECONDS)
.delay({ i -> Observable.timer(i * 100, TimeUnit.MILLISECONDS) })
.timeInterval()
.take(5)
.dump()
/*
onNext: Timed[time=105, unit=MILLISECONDS, value=0]
onNext: Timed[time=196, unit=MILLISECONDS, value=1]
onNext: Timed[time=201, unit=MILLISECONDS, value=2]
onNext: Timed[time=207, unit=MILLISECONDS, value=3]
onNext: Timed[time=196, unit=MILLISECONDS, value=4]
onComplete
*/
Observable.interval(100, TimeUnit.MILLISECONDS)
.delaySubscription(1000, TimeUnit.MILLISECONDS)
.timeInterval()
.take(5)
.dump()
/*
onNext: Timed[time=1105, unit=MILLISECONDS, value=0]
onNext: Timed[time=102, unit=MILLISECONDS, value=1]
onNext: Timed[time=98, unit=MILLISECONDS, value=2]
onNext: Timed[time=99, unit=MILLISECONDS, value=3]
onNext: Timed[time=99, unit=MILLISECONDS, value=4]
onComplete
*/
Observable.interval(100, TimeUnit.MILLISECONDS)
.delaySubscription(Observable.timer(1000, TimeUnit.MILLISECONDS))
.timeInterval()
.take(5)
.dump()
/*
onNext: Timed[time=1110, unit=MILLISECONDS, value=0]
onNext: Timed[time=96, unit=MILLISECONDS, value=1]
onNext: Timed[time=99, unit=MILLISECONDS, value=2]
onNext: Timed[time=100, unit=MILLISECONDS, value=3]
onNext: Timed[time=103, unit=MILLISECONDS, value=4]
onComplete
*/
- RxJS
// emit one item
const example = of(null);
// delay output of each by an extra second
const message = merge(
example.pipe(mapTo('Hello')),
example.pipe(
mapTo('World!'),
delay(1000)
),
example.pipe(
mapTo('Goodbye'),
delay(2000)
),
example.pipe(
mapTo('World!'),
delay(3000)
)
);
// output: 'Hello'...'World!'...'Goodbye'...'World!'
const subscribe = message.subscribe(val => console.log(val));
// emit value every second
const message = interval(1000);
// emit value after five seconds
const delayForFiveSeconds = () => timer(5000);
// after 5 seconds, start emitting delayed interval values
const delayWhenExample = message.pipe(delayWhen(delayForFiveSeconds));
// log values, delayed for 5 seconds
// ex. output: 5s....1...2...3
const subscribe = delayWhenExample.subscribe(val => console.log(val));
Do / Finally
ReactiveX - Do operator
Reactive Extensions再入門 その10「Doメソッド」
Reactive Extensions再入門 その12「Finallyメソッドとリソース解放」
Do / Finally 在源数据流的生命周期内的指定时间点注册一个需要被执行的回调函数。
Do 所注册的回调函数在源数据流调用 OnNext, OnError 以及 OnComplete 之前被调用。Do 最多可以注册 3 个回调函数。
Finally 所注册的回调函数在源数据流调用 OnError 或 OnComplete 之后被调用。Finally 只能注册 1 个回调函数。
- RxNET
private static void Log(object onNextValue)
{
Console.WriteLine("Logging OnNext({0}) @ {1}", onNextValue, DateTime.Now);
}
private static void Log(Exception onErrorValue)
{
Console.WriteLine("Logging OnError({0}) @ {1}", onErrorValue, DateTime.Now);
}
private static void Log()
{
Console.WriteLine("Logging OnCompleted()@ {0}", DateTime.Now);
}
var source = Observable
.Interval(TimeSpan.FromSeconds(1))
.Take(3);
var result = source.Do(
i => Log(i),
ex => Log(ex),
() => Log());
result.Subscribe(
Console.WriteLine,
() => Console.WriteLine("completed"));
/*
Logging OnNext(0) @ 2018/07/26 18:58:32
0
Logging OnNext(1) @ 2018/07/26 18:58:33
1
Logging OnNext(2) @ 2018/07/26 18:58:34
2
Logging OnCompleted()@ 2018/07/26 18:58:34
completed
*/
private static IObservable<long> GetNumbers()
{
return Observable.Interval(TimeSpan.FromMilliseconds(250))
.Do(i => Console.WriteLine("pushing {0} from GetNumbers", i));
}
var source = GetNumbers();
var result = source.Where(i => i % 3 == 0)
.Take(3)
.Select(i => (char)(i + 65));
result.Subscribe(
Console.WriteLine,
() => Console.WriteLine("completed"));
/*
pushing 0 from GetNumbers
A
pushing 1 from GetNumbers
pushing 2 from GetNumbers
pushing 3 from GetNumbers
D
pushing 4 from GetNumbers
pushing 5 from GetNumbers
pushing 6 from GetNumbers
G
completed
*/
var source = new Subject<int>();
var result = source.Finally(() => Console.WriteLine("Finally action ran"));
result.Dump("Finally");
source.OnNext(1);
source.OnNext(2);
source.OnNext(3);
source.OnCompleted();
/*
Finally-->1
Finally-->2
Finally-->3
Finally completed
Finally action ran
*/
var source = new Subject<int>();
var result = source.Finally(() => Console.WriteLine("Finally"));
var subscription = result.Subscribe(
Console.WriteLine,
Console.WriteLine,
() => Console.WriteLine("Completed"));
source.OnNext(1);
source.OnNext(2);
source.OnNext(3);
subscription.Dispose();
/*
1
2
3
Finally
*/
- RxJava
val values = Observable.just("side", "effects")
values
.doOnEach(PrintSubscriber("Log"))
.map { s -> s.toUpperCase() }
.dump("Process")
/*
Log: onNext: side
Process: onNext: SIDE
Log: onNext: effects
Process: onNext: EFFECTS
Log: onComplete
Process: onComplete
*/
val service = {
Observable
.just("First", "Second", "Third")
.doOnEach(PrintSubscriber("Log"))
}
service.invoke()
.map { s -> s.toUpperCase() }
.filter { s -> s.length > 5 }
.dump("Process")
/*
Log: onNext: First
Log: onNext: Second
Process: onNext: SECOND
Log: onNext: Third
Log: onComplete
Process: onComplete
*/
val subject = ReplaySubject.create<Int>()
val values = subject
.doOnSubscribe { println("New subscription") }
.doOnDispose() { println("Subscription over") }
val s1 = values.dump("1st")
subject.onNext(0)
values.dump("2st")
subject.onNext(1)
s1.dispose()
subject.onNext(2)
subject.onNext(3)
subject.onComplete()
/*
New subscription
1st: onNext: 0
New subscription
2st: onNext: 0
1st: onNext: 1
2st: onNext: 1
Subscription over
2st: onNext: 2
2st: onNext: 3
2st: onComplete
*/
- RxSwift
let disposeBag = DisposeBag()
Observable.of("