java – Chain Observables

我有一个对象集合,称它们为obj.他们有一个act()方法. act()方法最终会导致o上的event()observable调用onComplete.

链接这些的好方法是什么?

也就是说,调用o.act(),等待o.event().onComplete,然后调用下一个o2.act(),依此类推,以便在集合中获得无限数量的o.

签名是这样的:

public class Item {
    final protected PublishSubject<Object> event = PublishSubject.create();

    public Observable<ReturnType> event() {
        return event;
    }

    public void act() {
        // do a bunch of stuff
        event.onComplete();
    }
}

然后在消费代码中:

Collection<Item> items...
foreach item in items
  item.act -> await item.event().onComplete() -> call next item.act() -> so on

解决方法:

如果我理解正确,你的对象就有这种签名:

public class Item {
    public Observable<ReturnType> event()...
    public ReturnType act()...
}

所以,如果他们这样填写:

public class Item {

    private final String data;
    private final Observable<ReturnType> event;

    public Item(String data) {
        this.data = data;

        event = Observable
                .fromCallable(this::act);
    }

    public Observable<ReturnType> event() {
        return event;
    }

    public ReturnType act() {
        System.out.println("Item.act: " + data);
        return new ReturnType();
    }
}

然后它们可以像这样链接:

Item item1 = new Item("a");
Item item2 = new Item("b");
Item item3 = new Item("c");

item1.event()
        .concatWith(item2.event())
        .concatWith(item3.event())
        .subscribe();

结果:

Item.act: a
Item.act: b
Item.act: c

然后,如果你有一个Iterable集合,你可以使用flatMap:

Iterable<Item> items = Arrays.asList(item1, item2, item3);

Observable.from(items)
        .flatMap(Item::event)
        .subscribe();

替代

一个更像你的情况的替代方案可能是:

public class Item {
    private final PublishSubject<Void> event = PublishSubject.create();

    private final String data;

    public Item(String data) {
        this.data = data;
    }

    public Observable<Void> event() {
        return event;
    }

    public Void act() {
        System.out.println("Item.act: " + data);
        // do a bunch of stuff
        event.onCompleted();
        return null;
    }
}

用法:

Iterable<Item> iterable = Arrays.asList(item2, item3);

item1.event().
        concatWith(Observable.from(iterable)
                .map(Item::act))
        .subscribe();

item1.act();

但是它不会在第2项上使用event().

上一篇:IE Edge 下载文件的时候,文件名不能有windows不支持的特殊字符


下一篇:联邦学习 Federated learning Google I/O‘19 笔记