【Kotlin 反应式编程】第1讲 你好,Reactive Programming
1.创建 Gradle Kotlin 项目
2.传统的命令式编程风格
添加 rxkotlin 依赖
compile group: 'io.reactivex.rxjava2', name: 'rxkotlin', version: '2.2.0'
import io.reactivex.rxkotlin.subscribeBy
import io.reactivex.rxkotlin.toObservable
fun main(args: Array<String>) {
val list = listOf("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
list.toObservable() // extension function for Iterables
.filter { it.length >= 5 }
.subscribeBy( // named arguments for lambda Subscribers
onNext = { println(it) },
onError = { it.printStackTrace() },
onComplete = { println("Done!") }
)
}
3.传统的命令式编程思维
//命令式编程( Imperative)
fun testImperative1() {
val x1 = 7
val flag1 = isOdd(x1)
println("flagA1:$flag1")
val x2 = 10
val flag2 = isOdd(x2)
println("flagA2:$flag2")
}
fun testImperative2() {
var x = 7
val flag = isOdd(x)
println("flagB1:$flag")
x = 10
println("flagB2:$flag")
}
4.函数式编程思维
fun testImperative3() {
val s = System.currentTimeMillis()
val flag = { x: Int -> isOdd(x) }
val t1 = Thread {
println("flagC1:${flag(7)}")
}
val t2 = Thread {
println("flagC2:${flag(10)}")
}
t1.start()
t2.start()
t1.join()
t2.join()
val t = System.currentTimeMillis()
println("testImperative3=${t - s}ms")
}
5.使用 RxJava 类库 API
//函数式编程( Functional)
fun testReactive() {
val s = System.currentTimeMillis()
val subject: Subject<Int> = PublishSubject.create()
subject.map { isOdd(it) }.subscribeBy(
onNext = { println("flagD:$it") },
onError = { it.printStackTrace() },
onComplete = { println("Done!") }
)
// 一种基于异步数据流概念的编程模式。数据流就像一条河:它可以被观测,被过滤,被操作,或者为新的消费者与另外一条流合并为一条新的流。
val t1 = Thread {
subject.onNext(7)
}
val t2 = Thread {
subject.onNext(10)
}
t1.start()
t2.start()
t1.join()
t2.join()
val t = System.currentTimeMillis()
println("testReactive=${t - s}ms")
}
三个重要的对象
Observable-数据发送者
Subscriber-订阅者
OnSubscribe-事件
(1)create 函数
val subject: Subject<Int> = PublishSubject.create()
create 操作符应该是最常见的操作符了,主要用于产生一个 Obserable 被观察者对象,为了方便大家的认知,以后的教程中统一把被观察者 Observable 称为发射器(上游事件),观察者 Observer 称为接收器(下游事件)。
(2)map 函数
subject.map { isOdd(it) }.subscribeBy(
onNext = { println("flagD:$it") },
onError = { it.printStackTrace() },
onComplete = { println("Done!") }
)
一个Observable可能被多个subscriber订阅,而不同的订阅所需要的最终数据不同,但事件的操作逻辑是相同的,就可以利用map来满足不同的数据需求。