使用 Reactor 进行反应式编程进行数据分批批量处理

一、前言

最近在做一个项目,获取JDK8 Stream对象后,想要批量消费,不想自己写个集合来做批量处理。而反应式编程实现比如rxjava或者reactor是有丰富的流操作符,所以调研了下如何把JDK8 Stream转换为反应式流。

二、批量消费

有时候场景需要我们批量消费以便提高执行效率,比如对应同一个表的插入操作,批量插入的效率比单条逐个插入效率要好很多。那么对应给定的一个数据源,如何聚合数据为批量那?当数据源是一个内存list时候,最简单方法如下:

	public static void main(String[] args) {
        // 模拟数据, 创建list
        List<Integer> personList = new ArrayList<>();
        for (int i = 0; i < 98; ++i) {
            personList.add(i);
        }

        // 切分处理
        List<List<Integer>> list = Lists.partition(personList, 20);
        list.stream().forEach(
            tempList -> System.out.println(JSON.toJSONString(tempList))
        );
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

使用Google guava包里面的Lists.partition函数把list切分为一个个最多包含20个元素的list列表,并打印输出。

如果我们想要的是从这些流中每次读取limit条记录,然后批量处理这limit条记录,这样内存中每次只会存在limit条记录。这时由于JDK Stream不支持Buffer操作,我们需要自己实现,实现代码大概如下:

    public static void main(String[] args) {
        // 模拟数据, 创建list
        List<Integer> personList = new ArrayList<>();
        for (int i = 0; i < 98; ++i) {
            personList.add(i);
        }

        // 缓存列表
        List<Integer> mergeList = new ArrayList<>();

        int limit = 20;
        // 循环获取元素并缓存
        personList.stream().forEach(e -> {
            if (mergeList.size() >= limit) {
                System.out.println(JSON.toJSONString(mergeList));
                mergeList.clear();
            }
            mergeList.add(e);
        });
        // 退出后,补漏处理
        if (mergeList.size() > 0) {
            System.out.println(JSON.toJSONString(mergeList));
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

如上代码在Stream中迭代元素时,我们把元素缓存到mergeList列表,每当mergeList有了20个元素,则处理一次。最后等流结束后,如果mergeList还有元素则需要补漏处理下。

如果不想实现上面繁琐代码,我们可以考虑吧JDK8 Stream切换到反应式实现框架比如Reactor或者Rxjava,因为后者有丰富的流操作符。其中Reactor的一个实现是:

	public static void main(String[] args) {
        // 模拟数据, 创建list
        List<Integer> personList = new ArrayList<>();
        for (int i = 0; i < 98; ++i) {
            personList.add(i);
        }

        // 为了使用buffer功能,转换为Reactor的流对象Flux
        Flux flux = Flux.fromStream(personList.stream());
        // 聚合消费
        flux.buffer(20).subscribe(e -> System.out.println(JSON.toJSONString(e)));
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

如上代码,我们使用Reactor框架的Flux.fromStream方法把JDKStream转换为Flux流对象,然后调用其buffer方法设置缓存20个元素消费一次,然后调用subscribe订阅缓存流,并打印。

上一篇:「阿里Android面试解析」20道面试帮助灵魂拷问,androidjni开发MK


下一篇:P568坦克大战P569java绘图坐标P571Java绘图技术P574Java事件处理机制P578绘制敌人的坦克(集合