RxJava2学习笔记(3)

上回继续,今天来学习下zip(打包)操作

一、zip操作

    @Test
    public void zipTest() {
        Observable.zip(Observable.create(emitter -> {
            for (int i = 0; i < 10; i++) {
                emitter.onNext(100 + i);
            }
        }), Observable.create(emitter -> {
            for (int i = 0; i < 5; i++) {
                emitter.onNext(new Character((char) (65 + i)));
            }
        }), (integer, character) -> integer + "" + character).subscribe(s -> System.out.println(s));
    }

zip字面意义,就是打包操作,把多个Obserable合并在一起,形成一个新的Obserable,类似文件1、文件2 ... 文件n,合成一个新文件。上面这段代码的输出:

100A
101B
102C
103D
104E

第1个生产者,发射了10个数字(100~109),第1个生产者发射了5个字符(A~E),合并处理时,会把 “数字+字符",变成一个新字符串,然后继续发射。注意:这里有一个类似"木桶原理",即决定一个木桶能盛多少水的,永远是最短的那块木头。10发A型子弹 + 5发B型子弹,按1:1来合成,最终只有得到5发新型子弹。

 

二、限流

生产者-消费者模型中,有可能会遇到这样一种情况:生产者精力旺盛,狂生产数据,然后消费者力不从心,根本来不及处理,这样上游就堵住了,严重的话,可能导致内存耗尽。最简单的办法,就是把来不及处理的内容给扔掉(即:丢弃策略)。刚刚提到的zip操作中的木桶原理,就可以派上用场了。

    @Test
    public void zipTest1() throws InterruptedException {
        Observable.zip(Observable.create(emitter -> {
            for (int i = 0; ; i++) { //一直不停的发
                emitter.onNext(i);
            }
        }).subscribeOn(Schedulers.newThread()), Observable.create(emitter -> {
            for (int i = 0; i < 5; i++) {
                emitter.onNext(0); //这里技巧性的处理:发1个0过去
            }
        }).subscribeOn(Schedulers.newThread()),
                (BiFunction<Object, Object, Object>) (i1, i2) -> (Integer) i1 + (Integer) i2) //1个数字+0,不影响原值
                .subscribe(integer -> System.out.println(integer));

        Thread.sleep(200);
    }

  输出:

0
1
2
3
4

  如果是字符串,可以参考下面这样处理:

        Observable.zip(Observable.create(emitter -> {
                    for (int i = 0; ; i++) {
                        emitter.onNext("A" + i);
                    }
                }).subscribeOn(Schedulers.newThread()), Observable.create(emitter -> {
                    for (int i = 0; i < 5; i++) {
                        emitter.onNext("");
                    }
                }).subscribeOn(Schedulers.newThread()),
                (BiFunction<Object, Object, Object>) (i1, i2) -> (String) i1 + (String) i2)
                .subscribe(s -> System.out.println(s));
        Thread.sleep(200);

  输出:

A0
A1
A2
A3
A4

 

三、Flowable

刚才用zip这种"奇淫技巧"实现了限流,但其实rxjava还有更科学的做法(Flowable)。再思考一下“限流”这种场景,生产者太猛,一下喷出来的量太多,而消费者太弱,完全吸收不下。比较温和的方式,最好是生产者喷发前先问下消费者,你1次能接承受多大的量?我根据你的能力来

作者:菩提树下的杨过
出处:http://yjmyzz.cnblogs.com
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
上一篇:RxJava2学习笔记(1)


下一篇:机器学习笔记(6):多类逻辑回归-使用gluon