(1)一个人只要自己不放弃自己,整个世界也不会放弃你.
(2)天生我才必有大用
(3)不能忍受学习之苦就一定要忍受生活之苦,这是多么痛苦而深刻的领悟.
(4)做难事必有所得
(5)精神乃真正的刀锋
(6)战胜对手有两次,第一次在内心中.
(7)好好活就是做有意义的事情.
(8)亡羊补牢,为时未晚
(9)科技领域,没有捷径与投机取巧。
(10)有实力,一年365天都是应聘的旺季,没实力,天天都是应聘的淡季。
(11)基础不牢,地动天摇
(12)编写实属不易,若喜欢或者对你有帮助记得点赞+关注或者收藏哦~
RxJava模式与原理
文章目录
1.标准观察者与RxJava观察者
1.1标准的观察者设计模式
1.1.1生活中案例
(1)微信公众号与关注公众号的用户
(2)是一个被观察者有多个观察者的情况
1.1.2生活中案例代码实现
(1)抽象被观察者角色
public interface Observable {
//关注 添加观察者
void addObServer(Observer observer);
//取消关注 删除观察者
void removeObserver(Observer observer);
//被观察者发出了改变通知观察者
void notifyObservers();
//被观察者发布一条消息的功能
void pushMessage(String message);
}
(2)抽象观察者角色
public interface Observer {
//被观察者改变了,收到改变通知,观察者做出相应响应
void update(String message);
}
(3)具体被观察者角色
public class WechatServerObservable implements Observable{
/**
* 容器管理观察者
*/
private List<Observer> observerList = new ArrayList<>();
private String message;
@Override
public void addObServer(Observer observer) {
observerList.add(observer);
}
@Override
public void removeObserver(Observer observer) {
if(null != observerList){
observerList.remove(observer);
}
}
@Override
public void notifyObservers() {
for(Observer observer : observerList){
observer.update(message);
}
}
@Override
public void pushMessage(String message) {
this.message = message;
notifyObservers();
}
}
(4)具体观察者角色
public class Person implements Observer{
private String name;
private String message;
public Person(String name) {
this.name = name;
}
@Override
public void update(String message) {
this.message = message;
readMessage();
}
private void readMessage(){
System.out.println(String.format("%s收到了推送消息:%s",name,message));
}
}
(5)客户端
public class ObserverModelActivity extends AppCompatActivity {
@InjectView(R.id.btn_observer_test)
private Button btn_observer_test;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_observer_model);
InjectUtils.injectViewAndEvent(this);
}
@OnClick(R.id.btn_observer_test)
public void onViewClick(View view){
testObserverModel();
}
public void testObserverModel(){
String msg = "重大消息:国家改革委发布智慧农业转型号召";
//1.被观察者
Observable observable = new WechatServerObservable();
//2.观察者
Observer observer1 = new Person("张三");
Observer observer2 = new Person("张化");
Observer observer3 = new Person("张丽");
Observer observer4 = new Person("张雪");
observable.addObServer(observer1);
observable.addObServer(observer2);
observable.addObServer(observer3);
observable.addObServer(observer4);
observable.pushMessage(msg);
}
}
1.2扩展的RxJava观察者设计模式
1.2.1RxJava Hook点
(1)什么时候用到Hook?
- 整个项目都在使用RxJava,想对RxJava做监听,此时就会使用到Hook技术
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
- RxJavaPlugins.onAssembly为全局RxJava的Hook,create与map还有其他操作符都有这样一个方法。
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
-
可以让程序员插入自己的Hook,即先执行程序员自己写的Function,然后再执行RxJava自身的Hook.
-
如何让onObservableAssembly不为空,满足程序员Hook先执行的条件呢?可以查看此值唯一的赋值处.
public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
}
RxJavaPlugins.onObservableAssembly = onObservableAssembly;
}
- 即直接调用setOnObservableAssembly函数设置一个值就可以了。
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_source1);
RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
@Override
public Observable apply(Observable observable) throws Exception {
Log.d(Flag.TAG, "apply: 整个项目 全局 监听 到底有多少地方使用 RxJava:" + observable);
//不破坏人家的功能
return observable;
}
});
}
1.2.2RxJava Hook机制
(1)Hook即钩子
(2)程序在执行过程中,想个办法,先让程序执行自己写的一部分功能,然后再执行正常的程序。
1.2.3RxJava观察者模式
1.2.3.1 Observer源码看看
public interface Observer<T> {
/**
* Provides the Observer with the means of cancelling (disposing) the
* connection (channel) with the Observable in both
* synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
* @param d the Disposable instance whose {@link Disposable#dispose()} can
* be called anytime to cancel the connection
* @since 2.0
*/
void onSubscribe(@NonNull Disposable d);
/**
* Provides the Observer with a new item to observe.
* <p>
* The {@link Observable} may call this method 0 or more times.
* <p>
* The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
* {@link #onError}.
*
* @param t
* the item emitted by the Observable
*/
void onNext(@NonNull T t);
/**
* Notifies the Observer that the {@link Observable} has experienced an error condition.
* <p>
* If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
* {@link #onComplete}.
*
* @param e
* the exception encountered by the Observable
*/
void one rror(@NonNull Throwable e);
/**
* Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
* <p>
* The {@link Observable} will not call this method if it calls {@link #onError}.
*/
void onComplete();
}
(1)抽象观察者Observer为一个泛型,即在构建具体的观察者时传什么类型就是什么类型
(2)onSubscribe为订阅函数,即在subscribe执行的时候就立即得到执行
(3)onNext拿到上一个事件(卡片或功能)流下来的数据.
(4)onError拿到上一个事件(卡片或功能)流下来的错误数据.
(5)onComplete事件结束
1.2.3.2 Observable创建过程源码分析
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
(1)ObservableOnSubscribe:自定义source
- io.reactivex.internal.operators.observable.ObservableCreate
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
(2)create创建的过程即将自定义的source赋给了io.reactivex.internal.operators.observable.ObservableCreate#source成员变量。
1.2.3.3 subscribe订阅过程源码分析
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call one rror because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
(1)实际上是ObservableCreate.subscribe
(2)将自定义观察者传给了它。
(3)进入到被观察者的io.reactivex.Observable#subscribeActual
protected abstract void subscribeActual(Observer<? super T> observer);
(4)这个方法执行之后,会直接回调到
io.reactivex.internal.operators.observable.ObservableCreate#subscribeActual
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
(5)将自定义观察者丢进来,并创建发射器,并且传入自定义观察者。
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
(6)然后执行onSubscribe方法,这也就是为什么执行了subscribe方法之后,这个方法马上会得到执行的原因。
observer.onSubscribe(parent);
(7)自定义source将发射器传入进去
source.subscribe(parent);
(8)调发射器的onNext
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//2.2发射器.onNext
e.onNext("A");
}
}
(9)再由发射器调用自定义观察者onNext
(10)整体调用图,即为一个U型结构。
1.2.3.3Observable创建过程时序图
1.2.3.4Observable与Observer订阅过程时序图
1.2.4.标准观察者设计模式与RxJava观察者设计模式对比
(1)在标准的观察者设计模式中,是一个“被观察者”,多个“观察者”,并且需要“被观察者”发出改变通知后,所有的“观察者”才能观察者。
(2)在RxJava观察者设计模式中,是多个“被观察者”,一个“观察者”,并且需要事件起点与终点在“订阅”一次之后,才发出改变通知,终点(观察者)才能观察到。
- 为什么是多个“被观察者”?
因为可以有多个操作符如flatMap,map操作符,这就意味着有多个观察者
严格意义上来讲RxJava应用的是发布订阅模式。
(3)RxJava观察者设计模式,没有容器的概念。
2.map变换操作符原理
(1)对数据进行变换
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
(2)订阅触发之后,调用终点,它是通过ObservableMap.subscribe
io.reactivex.internal.operators.observable.ObservableMap#subscribeActual
- source是上层事件
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
(3)添加一个MapObserver(终点)包裹
(4)调用了subscribe后
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
(5)io.reactivex.internal.operators.observable.ObservableCreate.CreateEmitter#onNext
(6)io.reactivex.internal.operators.observable.ObservableMap.MapObserver#onNext
(7)io.reactivex.functions.Function#apply
(8)封装包裹拆包裹的过程
这即为卡片式思维
(13)map流程分析
3.装饰模型
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("Derry");
e.onComplete();
}
})
// ↓ObservableCreate.map 包裹模型中 最里面
.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return 45454;
}
})
// ObservableMap.map
.map(new Function<Integer, Boolean>() {
@Override
public Boolean apply(Integer integer) throws Exception {
return true;
}
})
// ↓包裹模型中最外面 往上走↑的时候在一层 层的剥开
// ObservableMap.subscribe
.subscribe(new Observer<Boolean>() {
@Override
public void onSubscribe(Disposable d) { }
@Override
public void onNext(Boolean bool) {
Log.d(Flag.TAG, "onNext bool:" + bool);
}
@Override
public void one rror(Throwable e) { }
@Override
public void onComplete() { }
});
4.背压
(1)起点到终点发射10000个事件
(2)终点处理不过来,这时候采取背压策略。
(3)即不停的生产产品,生产的速度远远的超过消费的速度。内存会不停的消耗增长。
(4)使用Flowable解决背压问题。
(5)map操作符只能发射一次事件,flapMap可以发射多次事件。