事件总线RxBus,替代EventBus和otto
1)创建RxBus
public class RxBus{
private static volatile RxBus rxBus;
private final Subject<Object,Object> subject = new SerializedSubject<>(PublishSubject.create());
private RxBus(){
}
public static RxBus getInstance(){
if(rxBus == null){
synchronized(RxBus.class){
if(rxBus == null){
rxBus = new RxBus();
}
}
}
return rxBus;
}
public void post(Object ob){
subject.onNext(ob);
}
public <T> Observalbe<T> toObservable(Class<T> eventType){
return subject.ofType(eventType);
}
}
public final <R> Observable<R> ofType(final Class<R> klass){
return filter(InternalObservableUtils.isInstanceOf(klass)).cast(klass);
}
2)发送事件
public class RxBusActivity extends AppComaptActivity{
private Button bt_post;
@Override
protected void onCreate(Bundle savedInstanceState){
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_rx_bus);
bt_post = (Button)this.findViewById(R.id.bt_post);
bt_post.setOnClickListener(new View.OnClickListener(){
@Override
public void onClick(View v){
RxBus.getInstance().post(new MessageEvent("用RxJava实现RxBus"));
}
});
}
}
3)接收事件
public class RxBusFragment extends Fragment{
...
@Override
public void onActivityCreated(@Nullable Bundle savedInstanceState){
super.onActivityCreated(savedInstanceState);
subscription = RxBus.getInstance().toObservable(MessageEvent.class).subscribe(new Action1<MessageEvent>(){
@Override
public void call(MessageEvent messageEvent){
if(messageEvent!=null){
tv_text.setText(messageEvent.getMessage());
}
}
});
}
}
4)取消订阅事件
@Override
public void onDestroy(){
super.onDestroy();
if(subscription!=null&&!subscription.isUnsubscribed()){
subscription.unsubscribe();
}
}