Java 8 Stream API 入门者教程
过往记忆大数据 过往记忆大数据
Java 8 给我们带来了一个新功能,也就是本文要介绍的 Stream API,它可以让我们以一种声明的方式处理数据。Stream 使用一种类似于 SQL 的语法来提供一种对 Java 集合运算和表达的高阶抽象。极大提高 Java 程序员的生产力,让程序员写出高效率、干净、简洁的代码。
Stream 创建
有很多方法可以创建不同源的 stream 实例,stream 实例一旦创建,将不会修改其源,因此我们从单个源创建多个 stream 实例。
Empty Stream
如果我们想创建一个空的 Stream,可以使用 empty() 方法,具体如下:
Stream<String> iteblogEmptyStream = Stream.empty();
通常在 streams 没有元素然后不想返回 null 的情况下使用:
public Stream<String> streamOf(List<String> list) {
return list == null || list.isEmpty() ? Stream.empty() : list.stream();
}
通过集合(Collection)创建 Stream
Java 中的任何继承 Collection 接口的类都可以创建 Stream
List<String> list = Lists.newArrayList("iteblog", "iteblog_hadoop");
Stream<String> listStream = list.stream();
Set<String> set = Sets.newHashSet();
Stream<String> setStream = set.stream();
通过数组(Array)创建 Stream
数组也可以创建 Stream
Stream<String> streamOfArray = Stream.of("a", "b", "c");
当然,我们也可以通过已有的数组来创建 Stream
String[] iteblogArr = new String[]{"iteblog", "iteblog_hadoop", "java 8"};
Stream<String> streamOfArrayFull = Arrays.stream(iteblogArr);
Stream<String> streamOfArrayPart = Arrays.stream(iteblogArr, 1, 3);
通过 Stream.builder() 创建 Stream Stream 提供了 builder 方法来创建 Stream:
Stream streamBuilder = Stream.builder().add("iteblog").add("iteblog_hadoop").add("java").build();
Stream<Object> streamBuilder = Stream.builder().add("iteblog").add("iteblog_hadoop").add("java").build();
上面创建的 Stream 类型是 Stream,如果我们想创建指定类型的 Stream,需要显示地指定类型
Stream<String> streamBuilder = Stream.<String>builder().add("iteblog").add("iteblog_hadoop").add("java").build();
通过 Stream.generate() 创建 Stream Stream.generate() 方法接收一个 Supplier 类型的参数来生成元素,生成的 stream 大小是无限的,所以我们需要指定 stream 生成的大小,以免出现内存不够的问题:
Stream<String> streamGenerated = Stream.generate(() -> "iteblog").limit(88);
通过 Stream.iterate() 创建 Stream 我们也可以通过 Stream.iterate() 来创建 Stream
Stream<Integer> streamIterated = Stream.iterate(2, n -> n * 2).limit(88);
Stream.iterate 方法的第一个参数将是这个 Stream 的第一个值,第二个元素将是前一个元素乘以 2。和 Stream.generate() 方法一样,我们也需要指定 stream 生成的大小,以免出现内存不够的问题。
通过原子类型创建 Stream
Java 8 中的 int, long 和 double 三个原子类型可以用来创建 streams,对应的接口分别是 IntStream, LongStream, DoubleStream
IntStream intStream = IntStream.range(0, 10);
LongStream longStream = LongStream.rangeClosed(0, 10);
DoubleStream doubleStream = DoubleStream.of(1.0, 2.0);
range(int startInclusive, int endExclusive)
相当于下面的代码:
for (long i = startInclusive; i < endExclusive ; i++) { ... }
rangeClosed(int startInclusive, int endInclusive)
相当于下面的代码:
for (long i = startInclusive; i <= endInclusive ; i++) { ... }
区别大家应该看出来了:rangeClosed 生成的 Stream 包含最后一个元素,而 range 缺不是。当然,Java 8 中的 Random 类也为我们添加了生成上面三个原子类型对应的 Stream:
Random random = new Random();
IntStream intStream = random.ints(10);
LongStream longs = random.longs(10);
DoubleStream doubleStream = random.doubles(10);
通过字符串创建 Stream
Java 8 中的 String 类提供了 chars() 方法来创建 Stream:
IntStream streamOfChars = "abc".chars();
我们也可以通过下面方法来创建 Stream:
Stream<String> streamOfString = Pattern.compile(", ").splitAsStream("a, b, c");
通过文件创建 Stream
Java 8 的 Java NIO 类中 Files 允许我们通过 lines() 方法创建 Stream,文件中的每一行数据将变成 stream 中的一个元素:
Path path = Paths.get("/user/iteblog/test.txt");
Stream<String> streamOfStrings = Files.lines(path);
Stream<String> streamWithCharset = Files.lines(path, Charset.forName("UTF-8"));
Stream 的引用
下面的代码是允许的:
Stream<String> stream = Stream.of("iteblog", "iteblog_hadoop", "spark")
.filter(element -> element.contains("iteblog"));
Optional<String> anyElement = stream.findAny();
我们使用 stream 变量来引用一个定义好的 Stream,这个是允许的,接下来我们使用 findAny() 来操作这个 Stream,也是可以运行的。但是如果我们复用 stream 变量,这样执行的时候就会出现 IllegalStateException 异常:
Optional<String> firstElement = stream.findFirst();
Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
at java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464)
at com.java.iteblog.Java8Test.main(Java8Test.java:19)
上面的实例说明 Java 8 streams 的引用是不可复用的。之所以这样是因为 Java 的 streams 设计目的是提供一种能力,以函数的方式将有限的操作序列应用于元素的源,而不是存储元素。如果我们这样写是可以的
List<String> iteblogList = Stream.of("iteblog", "iteblog_hadoop", "spark")
.filter(element -> element.contains("iteblog")).collect(Collectors.toList());
Optional<String> anyElement = iteblogList.stream().findAny();
Optional<String> firstElement = iteblogList.stream().findFirst();
Stream 管道(stream pipeline)
要对数据源的元素执行一系列操作并聚合它们的结果,需要三个部分:数据源(source)、中间操作(intermediate operations)和终端操作(terminal operation)。
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众帐号:iteblog_hadoop
中间操作返回一个新的修改过的 Stream。比如下面的例子我们使用 skip() 方法跳过了旧 Stream 的第一个元素,并返回一个名为 iteblogSkip 的新 Stream:
Stream<String> iteblogSkip = Stream.of("iteblog", "iteblog_hadoop", "spark").skip(1);
如果需要多个修改操作,可以链接多个中间操作:
Stream<String> iteblogSkip = Stream.of("iteblog", "iteblog_hadoop", "spark")
.skip(1).map(element -> element.substring(0, 3));
上面例子我们同时使用了 skip() 和 map() 方法,并得到了有新的 Stream 引用。
stream 本身是没有价值的,用户真正感兴趣的是终端操作的结果,它可以是某种类型的值,也可以是应用于流的每个元素的操作。每个流只能使用一个终端操作。使用 stream 的正确和最方便的方式是通过 stream 管道,它是一个数据源、中间操作和终端操作的链(chain),例如:
long count = Stream.of("iteblog", "iteblog_hadoop", "spark")
.skip(1).map(element -> element.substring(0, 3))
.count();
延迟调用(Lazy Invocation)
如果想及时了解Spark、Hadoop或者HBase相关的文章,欢迎关注微信公众帐号:iteblog_hadoop
中间操作是惰性(Lazy)的,这意味着仅在终端操作执行需要时才调用它们。为了说明这个,假设我们有一个名为 wasCalled() 的方法,每次被调用其内部的计数器都会递增:
private static long counter;
private static void wasCalled() {
counter++;
}
现在我们在 filter() 中间操作中调用 wasCalled() 方法:
List<String> list = Arrays.asList("iteblog", "iteblog_hadoop", "spark");
counter = 0;
Stream<String> stream = list.stream().filter(element -> {
wasCalled();
return element.contains("iteblog");
});
System.out.println(counter);
我们的数据源 list 里面有三个元素,然后我们在这个数据源上调用了 filter() 方法,按道理应该会对每个元素调用一次 filter() 方法,这样 counter 比变量的值应该是 3。但是如果我们运行上面的代码你会发现 counter 的值还是 0!也就是说 filter() 方法根本就没调用,原因就是中间操作是惰性(Lazy)的,只有加上终端操作才会执行这个 filter() 方法。
我们把上面的代码修改成下面的代码:
List<String> list = Arrays.asList("iteblog", "iteblog_hadoop", "spark");
list.stream().filter(element -> {
System.out.println("filter() was called");
return element.contains("hadoop");
}).map(element -> {
System.out.println("map() was called");
return element.toUpperCase();
}).findFirst();
输出:
filter() was called
filter() was called
map() was called
可以看出,在终端输出了两次 filter() was called,一次 map() was called。也就是说 filter() 函数被调用了两次;map() 函数被调用了一次。Stream 管道是垂直执行的,在我们的例子里面,先运行 filter() 再运行 map(),只有 filter() 返回为 true 才会调用 map(),然后 findFirst() 只需要找到第一个满足的元素就可以终止程序的运行。
Stream 的运行顺序
从性能的角度来看,Stream 管道中不同操作的链接顺序是很重要的。下面两段代码片段的运行结果是一样的,但是下面的代码是推荐使用的。
long size = list.stream().map(element -> {
wasCalled();
return element.substring(0, 3);
}).skip(2).count();
long size = list.stream().skip(2).map(element -> {
wasCalled();
return element.substring(0, 3);
}).count();
因为第一个代码片段运行了三次 map(),而第二个代码片段只运行了一次 map()。所以在写 Java Stream 程序的时候,推荐 Stream 管道的顺序是:skip() -> filter() -> distinct()。
Stream 聚合
Stream API 有许多终端操作,它们将 Stream 聚合为一个原子类型的数据,例如 count()、max()、min()、sum() 等,但是这些操作根据预定义的实现进行工作的。如果开发人员需要定制 Stream 的聚合逻辑,该怎么办?这就是本小结要介绍的 reduce() 和 collect() 方法。
reduce() 方法介绍
reduce() 有三个重载的方法,但是都是接收以下几种类型的参数:
•identity:累加器的初始值,如果 stream 为空且没有要累加的内容,则为默认值;
•accumulator:指定元素聚合逻辑的函数。accumulator 在对数据源里面的每个元素进行聚合时,都会生成一个新的临时对象,生成的对象数等于数据源里面的元素个数,但是只有最后一个值是有用的,这个对性能不是很好的。
•combiner:聚合累加器结果的函数。只有在并行模式下才会调用 Combiner,以减少来自不同线程的累加器的结果。现在让我们来看看如何来使用 reduce() 三个方法:实例一
OptionalInt sum = IntStream.range(1, 4).reduce((a, b) -> a + b);
System.out.println(sum);
输出:
OptionalInt[6](也就是 1 + 2 + 3)
实例二
int reducedTwoParams = IntStream.range(1, 4).reduce(10, (a, b) -> a + b);
System.out.println(reducedTwoParams);
输出:
16(也就是 10 + 1 + 2 + 3)
实例三
int reducedParams = Stream.of(1, 2, 3)
.reduce(10, (a, b) -> a + b, (a, b) -> {
System.out.println("combiner was called");
return a + b;
});
System.out.println(reducedParams);
输出:
16(也就是 10 + 1 + 2 + 3)
可以看出,实例三虽然我们指定了 combiner 但是控制台并没有输出 combiner was called,也就是说上面的 combiner 其实并没有调用。如果我们想调用 combiner,可以修改如下:
int reducedParallel = Arrays.asList(1, 2, 3).parallelStream()
.reduce(10, (a, b) -> a + b, (a, b) -> {
System.out.println("combiner was called");
return a + b;
});
System.out.println(reducedParallel);
输出:
combiner was called
combiner was called
36
可以看出这次的输出结果是 36,并且输出两次 combiner was called。之所以是 36,是因为上面程序先对每个元素调用 accumulator,也就是调用了三次 accumulator,然后其和累加器的初始值进行相加,因为这个 actions 是并行执行的,所以调用三次 accumulator 得到的结果为 (10 + 1 = 11; 10 + 2 = 12; 10 + 3 = 13)。现在我们调用 combiner 把上面三次结果相加 (12 + 13 = 25; 25 + 11 = 36) 所以得到了 36。
collect() 方法介绍
collect() 方法也提供了聚合相关的逻辑实现,其函数签名为 R collect(Collector< ? super T, A, R> collector),Java 8 中提供了大多数常用的 collectors 逻辑实现,我们可以直接使用。为了说明如何使用,我们还是提供一些例子:
static class Product {
private int price;
private String name;
Product(int price, String name) {
this.price = price;
this.name = name;
}
public int getPrice() {
return price;
}
public void setPrice(int price) {
this.price = price;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
List<Product> productList = Arrays.asList(new Product(23, "potatoes"),
new Product(14, "orange"), new Product(13, "lemon"),
new Product(23, "bread"), new Product(13, "sugar"));
将 productList 里面的商品名称全部拿出来,并转换成 list:
List<String> collectorCollection = productList.stream().map(Product::getName).collect(Collectors.toList());
将 productList 里面的商品名称全部拿出来,并组合成 string:
String listToString = productList.stream().map(Product::getName)
.collect(Collectors.joining(", ", "[", "]"));
计算 productList 里面所有商品的平均价格:
double averagePrice = productList.stream().collect(Collectors.averagingInt(Product::getPrice));
计算 productList 里面所有商品的总价:
int summingPrice = productList.stream().collect(Collectors.summingInt(Product::getPrice));
计算 productList 里面所有商品统计信息:
IntSummaryStatistics statistics = productList.stream().collect(Collectors.summarizingInt(Product::getPrice));
System.out.println(statistics);
输出:
IntSummaryStatistics{count=5, sum=86, min=13, average=17.200000, max=23}
按照商品价格对商品进行归类
Map<Integer, List<Product>> collectorMapOfLists = productList.stream()
.collect(Collectors.groupingBy(Product::getPrice));
上面的结果是价格相等的商品都放到同一个 List 中。按照相关逻辑对商品进行分组
Map<Boolean, List<Product>> mapPartioned = productList.stream()
.collect(Collectors.partitioningBy(element -> element.getPrice() > 15));
上面程序的运行结果是价格大于15的放到一个 List 中。将 list 转换成 set
Set<Product> unmodifiableSet = productList.stream()
.collect(Collectors.collectingAndThen(Collectors.toSet(),
Collections::unmodifiableSet));
自定义 collector
总有些原因系统自带的 API 无法满足我们的需求,这时候我们就可以自定义 collector。比如下面我们自定义了一个 collector,把所有的商品放到 LinkedList 里面:
Collector<Product, ?, LinkedList<Product>> toLinkedList =
Collector.of(LinkedList::new, LinkedList::add,
(first, second) -> {
first.addAll(second);
return first;
});
LinkedList<Product> linkedListOfPersons = productList.stream().collect(toLinkedList);
总结
Stream API 是一套功能强大但易于理解的用于处理元素序列的工具。它允许我们减少大量的代码,创建更多可读的程序,并可以提高应用程序的生产率。