【27】RxJava使用场景

(1)一个人只要自己不放弃自己,整个世界也不会放弃你.
(2)天生我才必有大用
(3)不能忍受学习之苦就一定要忍受生活之苦,这是多么痛苦而深刻的领悟.
(4)做难事必有所得
(5)精神乃真正的刀锋
(6)战胜对手有两次,第一次在内心中.
(7)好好活就是做有意义的事情.
(8)亡羊补牢,为时未晚
(9)科技领域,没有捷径与投机取巧。
(10)有实力,一年365天都是应聘的旺季,没实力,天天都是应聘的淡季。
(11)基础不牢,地动天摇
(12)编写实属不易,若喜欢或者对你有帮助记得点赞+关注或者收藏哦~

RxJava使用场景

文章目录

1.为什么学习RxJava?

(1)改变思维来提升效率
(2)Rx提供了一系列的操作符,你可以使用它们来过滤(filter)、选择(select)、变换(transform)、结合(combine)和组合(compose)多个Observable,这些操作符让执行和复合变得非常高效。

1.1什么是ReactiveX?

(1)ReactiveX是Reactive Extensions的缩写,一般简写为Rx,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流。

(2)微软给的定义是,Rx是一个函数库,让开发者可以利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序,使用Rx,开发者可以用Observables表示异步数据流,用LINQ操作符查询异步数据流, 用Schedulers参数化异步数据流的并发处理。

(3)Rx可以这样定义:Rx = Observables + LINQ + Schedulers。

(4)ReactiveX.io给的定义是,Rx是一个使用可观察数据流进行异步编程的编程接口,ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。

1.2什么是RxJava?

(1)在RxJava中,一个实现了Observer接口的对象可以订阅(subscribe)一个Observable类的实例。

(2)订阅者(subscriber)对Observable发射(emit)的任何数据或数据序列作出响应。

(3)这种模式简化了并发操作,因为它不需要阻塞等待Observable发射数据,而是创建了一个处于待命状态的观察者哨兵,哨兵在未来某个时刻响应Observable的通知。

1.3RxJava思维

响应式编程

1.4生活中的例子

(1)起点(分发事件(PATH):我饿了)
(2)下楼
(3)去餐厅
(4)点餐
(5)终点(吃饭 消费事件)

1.5程序中的例子

(1)起点(分发事件:点击登录)
(2)登录API
(3)请求服务器
(4)获取响应码
(5)终点(更新UI登录成功 消费事件)

  • 从起点到终点事件没有断掉,每一个事件依赖于上一事件的完成,环环相扣。

1.6RxJava架构

(1)github:https://github.com/ReactiveX/RxJava
(2)Author:JakeWharton(Android之神)
(3)Rx系统架构官网
http://reactivex.io

1.7RxJava基础学习资料

【27】RxJava使用场景

2.RxJava应用场景

2.1核心思想

从事件起点到终点,每一个事件都紧密依赖于上一事件的完成,环环相扣,没有中断。

2.1.1图片下载案例

2.1.1.1传统思维方式

(1)无法固定每个程序员的思维(会导致接手项目的程序员无法看懂),即每个程序员都有一套自己实现的思维。

  • A程序员:35356453 自己的思维 不同 封装方法…
  • B程序员:46576576 自己的思维 不同 全部写在一起
  • C程序员:43643654 自己的思维 不同 new Thread
  • D程序员:66545655 自己的思维 不同 使用 线程池

(2)代码

public class DownLoadActivity extends AppCompatActivity {

    private static final String         TAG  = "DownLoadActivity";
    /**
     * 1.网络图片的链接地址
     */
    private final static String         PATH = "https://gimg2.baidu.com/image_search/src=http%3A%2F%2F01.minipic.eastday.com%2F20161208%2F20161208151453_9440fdc7642c62ef1c4cb67425b606d0_9.jpeg&refer=http%3A%2F%2F01.minipic.eastday.com&app=2002&size=f9999,10000&q=a80&n=0&g=0n&fmt=jpeg?sec=1620538239&t=64eb953a8e19440231bd0527542de045";
    private              ProgressDialog progressDialog;

    @InjectView(R.id.iv_img)
    private ImageView iv_img;

    @InjectView(R.id.btn_download_image_action)
    private Button btn_download_image_action;

    @InjectView(R.id.btn_rx_java_download_image_action)
    private Button btn_rx_java_download_image_action;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_down_load); InjectUtils.injectView(this);
        InjectUtils.injectViewAndEvent(this);
    }

    @OnClick({ R.id.btn_download_image_action, R.id.btn_rx_java_download_image_action })
    public void click(View view) {
        if (view == btn_download_image_action) {
            downloadImageAction();
        }
    }

    private final Handler handler = new Handler(new Handler.Callback() {

        @Override
        public boolean handleMessage(@NonNull Message msg) {
            Bitmap bitmap = (Bitmap) msg.obj;
            iv_img.setImageBitmap(bitmap);

            if (progressDialog != null) {
                progressDialog.dismiss();
            }
            return false;
        }
    });

    public void downloadImageAction() {
        progressDialog = new ProgressDialog(this);
        progressDialog.setTitle("下载图片中..."); progressDialog.show();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    URL url = new URL(PATH);
                    HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
                    httpURLConnection.setConnectTimeout(5000);
                    int responseCode = httpURLConnection.getResponseCode(); // 才开始 request
                    if (responseCode == HttpURLConnection.HTTP_OK) {
                        InputStream inputStream = httpURLConnection.getInputStream();
                        Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
                        Message message = handler.obtainMessage();
                        message.obj = bitmap; handler.sendMessage(message);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

}

2.1.1.2RxJava思维方式

(1)角色:起点与终点

  • 起点是被观察者Observable
  • 终点代表观察者Observer

(2)为什么插入了观察者设计模式?
因为被观察者发生了改变,观察者就会同样作出相应改变。

(3)案例

    /**
     * RxJava思维
     */
    public void rxJavaDownloadImageAction() {


        /**
         * TODO 2.第二步内部分发PATH,
         */
        Observable.just(PATH)

                /**
                 * TODO 3.第三步
                 * 1.下载图片事件
                 */
                .map(new DownloadImageFunction())

                /**
                 * 2.为图片添加水印事件
                 */
                .map(new AddWaterMarkFunction())

                /**
                 * 3.添加下载日志记录事件
                 */
                .map(new AddDownloadLogRecordFunction())

                /**
                 * 4.为上下游事件线程分配执行线程,以上事件操作在io线程中执行,以下
                 * 操作在Android主线程中执行,详见UpDownObservableTransformer设置
                 */
                .compose(new UpDownObservableTransformer<>())

                /**
                 * 5.订阅 将起点和终点订阅起来
                 */
                .subscribe(
                        new DownloadImgObserver<>()
                );

    }

    /**
     * 1.下载图片功能
     */
    class DownloadImageFunction implements Function<String, Bitmap>{
        @Override
        public Bitmap apply(String s) throws Exception {
            URL url = new URL(PATH);
            HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
            httpURLConnection.setConnectTimeout(5000);
            int responseCode = httpURLConnection.getResponseCode();
            if (responseCode == HttpURLConnection.HTTP_OK) {
                InputStream inputStream = httpURLConnection.getInputStream();
                Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
                return bitmap;
            }
            return null;
        }
    }


    /**
     * 2.为图片添加水印功能
     */
    class AddWaterMarkFunction implements Function<Bitmap,Bitmap>{

        @Override
        public Bitmap apply(Bitmap bitmap) throws Exception {
            Paint paint = new Paint();
            paint.setTextSize(20);
            paint.setColor(Color.RED);
            return BitmapUtil.drawTextToBitmap(bitmap, "这是图片水印文字",paint, 88 , 88);
        }
    }


    /**
     * 3.添加下载图片日志功能
     */
    class AddDownloadLogRecordFunction implements Function<Bitmap,Bitmap>{
        @Override
        public Bitmap apply(Bitmap bitmap) throws Exception {
            Log.d(TAG, "apply: 下载图片日志" + System.currentTimeMillis());
            return bitmap;
        }
    }

    /**
     * 5.终点(观察者)
     * 5.1对上游事件的观察者
     * 5.2上游事件指的是下载图片功能、为图片添加水印功能、添加下载图片日志功能
     * 5.3订阅了上游事件之后,首先会进入onSubscribe()开始订阅此为第1步
     * 5.4just操作符分配图片加载路径事件为第2步
     * 5.5开始执行下载图片功能事件及其后绪事件为第3步
     * 5.6当由io线程切换到主线程之后,进入onNext()执行为第4步
     * 5.7当所有事件执行完成,则进入onComplete()执行为第5步
     * @param <Bitmap>
     */
    class DownloadImgObserver<Bitmap> implements Observer<Bitmap>{

        /**
         * 订阅开始
         * 预备 开始 要分发
         * TODO 1.第一步
         */
        @Override
        public void onSubscribe(Disposable d) {
            progressDialog = new ProgressDialog(DownLoadActivity.this);
            progressDialog.setTitle("download run");
            progressDialog.show();
        }

        /**
         * TODO 4.第四步
         * 拿到事件
         * @param bitmap
         */
        @Override
        public void onNext(Bitmap bitmap) {
            iv_img.setImageBitmap((android.graphics.Bitmap) bitmap);
        }

        @Override
        public void one rror(Throwable e) {

        }

        /**
         * TODO 5.第五步
         * 完成事件
         */
        @Override
        public void onComplete() {
            if (null!= progressDialog){
                progressDialog.dismiss();
            }
        }
    }
public class BitmapUtil {
    /**
     * 图片上绘制文字 加水印
     * @param bitmap
     * @param text
     * @param paint
     * @param paddingLeft
     * @param paddingTop
     * @return
     */
    public static final Bitmap drawTextToBitmap(Bitmap bitmap, String text,
            Paint paint, int paddingLeft, int paddingTop) {
        Bitmap.Config bitmapConfig = bitmap.getConfig();

        paint.setDither(true); //1.获取跟清晰的图像采样
        paint.setFilterBitmap(true);
        if (bitmapConfig == null) {
            bitmapConfig = Bitmap.Config.ARGB_8888;
        }
        bitmap = bitmap.copy(bitmapConfig, true);
        Canvas canvas = new Canvas(bitmap);

        canvas.drawText(text, paddingLeft, paddingTop, paint);
        return bitmap;
    }
}

(4)数据由起点流向终点的过程中,上一级事件的数据类型决定了下一级事件的初始类型

  • 起点PATH是个String类型的数据,它的下一级事件为下载图片事件
  • 下载图片事件的初始数据类型即为String,而下载图片事件执行之后数据变换为Bitmap类型
  • 即决定了为图片添加水印事件的初始类型为Bitmap.
  • 当在上一级事件中将数据类型做出变更,其流转到下一事件时,下一事件的初始数据类型即为上一事件变更后的数据类型。

(5)在上一级事件流向下一级事件的过程中,可以指定上一级事件(功能)在什么线程中执行。

  • compose()中指定
  • 例如:可指定上一级事件在io线程中执行,而下一级事件在主线程中执行。
/**
 * 为上下游事件线程分配执行线程
 * 1.2指定上游事件在io线程中执行
 * 1.3指定下游事件在主线程中完成
 * 1.4ObservableTransformer<Upstream, Downstream>泛型接口
 * (1)Upstream:the upstream value type,上游数据类型
 * (2)Downstream:the downstream value type,下游数据类型
 */
public class UpDownObservableTransformer<UD> implements ObservableTransformer<UD, UD> {

    @Override
    public ObservableSource<UD> apply(Observable<UD> upstream) {

        return upstream.subscribeOn(Schedulers.io())//1.指定上游事件在io线程中执行
                .observeOn(AndroidSchedulers.mainThread())//2.指定下游事件在主线程中完成
                .map(new OtherFunction<>());//3.指定其他的下游事件
    }

    /**
     * 可以通过map操作符插入其他下游事件
     * @param <UD>
     */
    class OtherFunction<UD> implements Function<UD,UD> {

        private static final String         TAG  = "UDFunction";

        @Override
        public UD apply(UD ud) throws Exception {
            Log.d(TAG, "apply:新的下游事件");
            return ud;
        }
    }
}

2.2RxJava配合Retrofit

(1)主流框架都是Retrofit+OkHttp+RxJava
(2)Retrofit请求服务器通过OkHttp拿到响应数据之后给到RxJava,RxJava处理响应数据。
(3)请求是通过OkHttp,Retrofit只是一个管理者,它控制着OkHttp去请求网络,响应的结果丢给RxJava来处理。

2.2.1案例

(1)先获取总数据
(2)然后根据总数据API获取Item数据
(3)即需要写两个接口去完成整个功能

public interface WangAndroidApi {
    /**
     * 1.异步线程 耗时操作
     * 1.1总数据
     * 1.2RxJava事件起点
     * @return
     */
    @GET("project/tree/json")
    Observable<ProjectBean> getProject();

    /**
     * ITem数据
     * @param pageIndex
     * @param cid ?cid=294
     * @return
     */
    @GET("project/list/{pageIndex}/json")
    Observable<ProjectItem> getProjectItem(@Path("pageIndex") int pageIndex, @Query("cid") int cid);  // 异步线程 耗时操作
}

2.3防抖

(1)一个功能疯狂的点击,几秒之内点击了20以上,要控制它只响应1次,这即为防抖。
(2)使用rxbinding2库防止控件抖动

        //1.对bt_anti_shake控件进行防抖处理
        RxView.clicks(bt_anti_shake)
        //2.设置间隔2秒钟响应一次
                .throttleFirst(2000, TimeUnit.MILLISECONDS)
                .subscribe(new MyConsumer<Object>());

2.4网络嵌套

(1)先查主数据,然后再根据主数据查询item数据,即为网络嵌套。
(2)因嵌套层级过多引起的代码可读性差的问题

   /**
     * 1.防抖功能 + 网络嵌套(这种是负面教程,嵌套的太厉害了)
     * 1.1嵌套层数太多,代码可阅读性太差
     */
    private void antiShakeActon() {
        //1.对bt_anti_shake控件进行防抖处理
        RxView.clicks(bt_anti_shake)
        //2.设置间隔2秒钟响应一次
                .throttleFirst(2000, TimeUnit.MILLISECONDS)
                .subscribe(new MyConsumer<Object>());
    }

    /**
     * 1.先查询主数据,再根据主数据查item数据
     * 2.自定义泛型类实现Consumer<T>泛型接口
     * @param <T>
     */
    class MyConsumer<T> implements Consumer<T>{
        @Override
        public void accept(T t) throws Exception {
            //1.先查询主数据
            api.getProject().
                    /**
                     * 1.为上游事件与下游事件分配执行线程
                     * 1.1此案例上游事件在io线程中执行
                     * 1.2下游事件在主线程中执行
                     */
                    compose(new UpDownObservableTransformer())
                    /**
                     * 2.设置下游事件订阅者
                     */
                    .subscribe(new QueryConsumer());
        }
    }


    /**
     * 1.主数据查询事件
     * 2.自定义泛型类实现Consumer<T>泛型接品
     * 2.1此例中已经明确指定了Consumer<T>泛型接口中的T为ProjectBean,因此不再需要在类的声明
     * 中限定泛型类型
     * 2.2如果一个类继承自一个泛型类,继承的泛型类中已经明确的说明了泛型中存放的是什么类型,
     * 那么在声明当前类时,可以省略类名后面的<>。
     */
    class QueryConsumer implements Consumer<ProjectBean>{

        @Override
        public void accept(ProjectBean projectBean) throws Exception {
            for(DataBean dataBean : projectBean.getData()){
                //1.根据主数据编号查询item数据
                api.getProjectItem(1, dataBean.getId())
                        /**
                         * 1.为上游事件与下游事件分配执行线程
                         * 1.1此案例上游事件在io线程中执行
                         * 1.2下游事件在主线程中执行
                         */
                .compose(new UpDownObservableTransformer())
                        /**
                         * 2.设置下游事件订阅者
                         */
                .subscribe(new QueryProjectItemConsumer());
            }
        }
    }


    /**
     * 1.item数据查询事件
     */
    class QueryProjectItemConsumer implements Consumer<ProjectItem>{

        @Override
        public void accept(ProjectItem projectItem) throws Exception {
            //可以做UI显示相关操作
            Log.d(TAG, "accept: " + projectItem);
        }
    }

(3)使用flatMap操作符网络嵌套过多代码可读性差的问题

/**
     * 1.使用rxbinding2库防止控件抖动
     * 2.使用flatMap操作符网络请求嵌套过多的问题
     */
    private void antiShakeActonUpdate() {
        RxView.clicks(bt_anti_shake)
                //1.设置控件间隔2秒钟响应一次
                .throttleFirst(2000, TimeUnit.MILLISECONDS)
                //2.指定下游事件在io异步线程中执行
                .observeOn(Schedulers.io())
                .flatMap(
                    //3.插入请求主数据事件
                    new Function<Object, ObservableSource<ProjectBean>>() {
                    @Override
                    public ObservableSource<ProjectBean> apply(Object o) throws Exception {
                        return api.getProject();
                    }
                })
                .flatMap(new Function<ProjectBean,ObservableSource<DataBean>>(){
                    //4.插入遍历事件
                    @Override
                    public ObservableSource<DataBean> apply(ProjectBean projectBean) throws Exception {
                        return Observable.fromIterable(projectBean.getData());
                    }
                })
                .flatMap(new Function<DataBean,ObservableSource<ProjectItem>>(){
                    //5.插入获取item数据事件
                    @Override
                    public ObservableSource<ProjectItem> apply(DataBean dataBean) throws Exception {
                        return api.getProjectItem(1,dataBean.getId());
                    }
                })
                //6.切换回UI线程
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<ProjectItem>() {
                    @Override
                    public void accept(ProjectItem projectItem) throws Exception {
                        //执行相关UI操作
                        Log.d(TAG, "accept: " + projectItem);
                    }
                });
    }

2.4.1 flatMap操作符思想

起点可以分发一个数据
onNext(1);
|
|
|
flatMap 自己分发 10个数据 给下面
1 --> 多发送 10次 1+“DDD”
|
|
|
subscribe{
1+“DDD”
1+“DDD”
1+“DDD”
1+“DDD”
1+“DDD”
1+“DDD”
1+“DDD”
1+“DDD”
1+“DDD”
1+“DDD”
}


(1)起点分发一个数据
(2)flatMap可以往下游事件中分发10个数据。
(3)之所以可以发送多条数据,是由flatMap()方法中接受的参数数据类型ObservableSource<?>决定的。
(4)防抖功能运行在主线程,flatMap操作最好放在异步线程,不然会发生崩溃。
(5)自己创建发射器发多次

.flatMap(new Function<ProjectBean,ObservableSource<DataBean>>(){
    //4.插入遍历事件
    @Override
    public ObservableSource<DataBean> apply(ProjectBean projectBean) throws Exception {
        //自己创建发射器发多次
        return Observable.fromIterable(projectBean.getData());
    }
})

2.5doOnNext运用

【27】RxJava使用场景

(1)先注册
(2)然后切换到UI线程
(3)UI线程登录又切换到子线程
(4)然后再切换到UI线程
(5)即频繁的在主线程与子线程做UI切换的操作
(6)这种业务需求就可以使用doOnNext()完成

2.5.1案例

public void request2() {
        MyRetrofit
                .createRetrofit()
                .create(IReqeustNetwor.class)
                //@TODO 第2步
                // 1.请求服务器注册操作
                .registerAction(new RegisterRequest())
                //2.指定上游事件在io异步线程中执行
                .subscribeOn(Schedulers.io())
                //3.第定下游事件在主线程中执行
                .observeOn(AndroidSchedulers.mainThread())
                //@TODO 第3步
                //4.注册完更新UI
                .doOnNext(new Consumer<RegisterResponse>() {
                    @Override
                    public void accept(RegisterResponse registerResponse) throws Exception {
                        //注册完成之后,更新注册UI
                    }
                })
                //5.指定下游事件在异步线程中执行
                .observeOn(Schedulers.io())
                //@TODO 第4步
                //6.登录事件
                .flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>() { // todo 4
                    @Override
                    public ObservableSource<LoginResponse> apply(RegisterResponse registerResponse) throws Exception {
                        Observable<LoginResponse> loginResponseObservable = MyRetrofit
                                .createRetrofit()
                                .create(IReqeustNetwor.class)
                                .loginAction(new LoginReqeust());
                        return loginResponseObservable;
                    }
                })
                //7.指定下游事件在主线程中执行
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<LoginResponse>() {
                    /**
                     * 一定是主线程,为什么,因为 subscribe 马上调用onSubscribe
                     * @param d
                     */
                    @Override
                    public void onSubscribe(Disposable d) {
                        //1.第1步
                        progressDialog = new ProgressDialog(RequestActivity.this);
                        progressDialog.show();
                        disposable = d;
                    }

                    @Override
                    public void onNext(LoginResponse loginResponse) {
                        //@TODO 第5步
                        //登录完成之后,更新登录的UI
                    }

                    @Override
                    public void one rror(Throwable e) {

                    }

                    /**
                     * @TODO 第6步
                     */
                    @Override
                    public void onComplete() {
                        if (progressDialog != null) {
                            progressDialog.dismiss();
                        }
                    }
                })
        ;
    }

(1)Disposable 用了之后要消毁,不然会引发内存泄漏。
(2)是因为事件在由上而下分发执行过程中,每次分发都会判断Disposable是否被释放,isDisposed,如果没有中断则继续执行,如果中断了就不执行。
(3)中断的原因,有些时候Activity会被用户给中断掉。

上一篇:Wireshark中PIDs与网络包的关联


下一篇:安卓rxjava合并多个请求,面试要掌握这几个关键点!薪资翻倍