收集器用法详解与多级分组和分区
为什么在collectors类中定义一个静态内部类?
static class CollectorImpl<T, A, R> implements Collector<T, A, R>
设计上,本身就是一个辅助类,是一个工厂。作用是给开发者提供常见的收集器实现。提供的方法都是静态方法,可以直接调用。
函数式编程最大的特点:表示做什么,而不是如何做。开发者更注重如做什么,底层实现如何做。
/**
* Implementations of {@link Collector} that implement various useful reduction
* operations, such as accumulating elements into collections, summarizing
* elements according to various criteria, etc.
没有实现的方法,可以自己去编写收集器。
* <p>The following are examples of using the predefined collectors to perform
* common mutable reduction tasks:
* 举例:
* <pre>{@code
* // Accumulate names into a List 名字加入到一个集合。
* List<String> list = people.stream().map(Person::getName).collect(Collectors.toList());
*
* // Accumulate names into a TreeSet 名字加入到一个Set。 待排序的集合。
* Set<String> set = people.stream().map(Person::getName).collect(Collectors.toCollection(TreeSet::new));
*
* // Convert elements to strings and concatenate them, separated by commas
* String joined = things.stream()
* .map(Object::toString)
* .collect(Collectors.joining(", "));
*
* // Compute sum of salaries of employee 计算员工工资的总数。
* int total = employees.stream()
* .collect(Collectors.summingInt(Employee::getSalary)));
*
* // Group employees by department 对员工进行分组。
* Map<Department, List<Employee>> byDept
* = employees.stream()
* .collect(Collectors.groupingBy(Employee::getDepartment));
*
* // Compute sum of salaries by department 根据部门计算工资的总数。
* Map<Department, Integer> totalByDept
* = employees.stream()
* .collect(Collectors.groupingBy(Employee::getDepartment,
* Collectors.summingInt(Employee::getSalary)));
*
* // Partition students into passing and failing 将学生进行分区。
* Map<Boolean, List<Student>> passingFailing =
* students.stream()
* .collect(Collectors.partitioningBy(s -> s.getGrade() >= PASS_THRESHOLD));
*
* }</pre>
*
* @since 1.8 提供了常见的方法。没有的话可以去自定义。
*/
public final class Collectors {
举例。collector中的方法应用:
public static void main(String[] args) {
Student student1 = new Student("zhangsan", 80);
Student student2 = new Student("lisi", 90);
Student student3 = new Student("wangwu", 100);
Student student4 = new Student("zhaoliu", 90);
Student student5 = new Student("zhaoliu", 90);
List<Student> students = Arrays.asList(student1, student2, student3, student4, student5);
//list 转换成一个流,再转换成一个集合.
List<Student> students1 = students.stream().collect(Collectors.toList());
students1.forEach(System.out::println);
System.out.println("- - - - - - -");
// collect 方法底层原理介绍.
//有多种方法可以实现同一个功能.什么方式更好呢? 越具体的方法越好. 减少自动装箱拆箱操作.
System.out.println("count:" + students.stream().collect(Collectors.counting()));
System.out.println("count:" + (Long) students.stream().count());
System.out.println("- - - - - - - -");
//举例练习
// 找出集合中分数最低的学生,打印出来.
students.stream().collect(minBy(Comparator.comparingInt(Student::getScore))).ifPresent(System.out::println);
// 找出集合中分数最大成绩
students.stream().collect(maxBy(Comparator.comparingInt(Student::getScore))).ifPresent(System.out::println);
// 求平均值
System.out.println(students.stream().collect(averagingInt(Student::getScore)));
// 求分数的综合
System.out.println(students.stream().collect(summingInt(Student::getScore)));
// 求各种汇总信息 结果为IntSummaryStatistics{count=5, sum=450, min=80, average=90.000000, max=100}
System.out.println(students.stream().collect(summarizingInt(Student::getScore)));
System.out.println(" - - - - - ");
// 字符串的拼接 结果为:zhangsanlisiwangwuzhaoliuzhaoliu
System.out.println(students.stream().map(Student::getName).collect(joining()));
//拼接加分隔符 结果为:zhangsan,lisi,wangwu,zhaoliu,zhaoliu
System.out.println(students.stream().map(Student::getName).collect(joining(",")));
// 拼接加前后缀 结果为:hello zhangsan,lisi,wangwu,zhaoliu,zhaoliu world
System.out.println(students.stream().map(Student::getName).collect(joining(",", "hello ", " world")));
System.out.println("- - - - - - ");
// group by 多层分组
// 根据分数和名字进行分组 输出结果为:
// {80={zhangsan=[Student{name='zhangsan', score=80}]},
// 100={wangwu=[Student{name='wangwu', score=100}]},
// 90={lisi=[Student{name='lisi', score=90}], zhaoliu=[Student{name='zhaoliu', score=90}, Student{name='zhaoliu', score=90}]}}
Map<Integer, Map<String, List<Student>>> collect = students.stream().collect(groupingBy(Student::getScore, groupingBy(Student::getName)));
System.out.println(collect);
System.out.println("- - - - - - - ");
// partitioningBy 多级分区 输出结果为:{false=[Student{name='zhangsan', score=80}], true=[Student{name='lisi', score=90}, Student{name='wangwu', score=100}, Student{name='zhaoliu', score=90}, Student{name='zhaoliu', score=90}]}
Map<Boolean, List<Student>> collect1 = students.stream().collect(partitioningBy(student -> student.getScore() > 80));
System.out.println(collect1);
// 按照大于80分区,再按照90分区
//输出结果为:{false={false=[Student{name='zhangsan', score=80}], true=[]}, true={false=[Student{name='lisi', score=90}, Student{name='zhaoliu', score=90}, Student{name='zhaoliu', score=90}], true=[Student{name='wangwu', score=100}]}}
Map<Boolean, Map<Boolean, List<Student>>> collect2 = students.stream().collect(partitioningBy(student -> student.getScore() > 80, partitioningBy(student -> student.getScore() > 90)));
System.out.println(collect2);
//分区, 然后求出每个分组中的个数. 结果为:{false=1, true=4}
Map<Boolean, Long> collect3 = students.stream().collect(partitioningBy(student -> student.getScore() > 80, counting()));
System.out.println(collect3);
System.out.println("- - - - - - - ");
//根据名字分组,得到学生的分数 --, 使用collectingAndThen 求最小值,然后整合起来. 最后Optional.get()一定有值.
students.stream().collect(groupingBy(Student::getName,collectingAndThen(minBy(Comparator.comparingInt(Student::getScore)), Optional::get)));
}
Comparator比较器详解与类型推断特例
Comparator 比较器。引用了多个default方法。
完成一个功能时有多个方法,使用特化的方法。因为效率会更高。减少了装箱拆箱的操作。减少性能损耗。
举例: 简单功能实现
public static void main(String[] args) {
List<String> list = Arrays.asList("nihao", "hello", "world", "welcome");
//对list按照字母的升序排序
// list.stream().sorted().forEach(System.out::println);
//按照字符串的长度排序
// Collections.sort(list, (item1, item2) -> item1.length() - item2.length());
// Collections.sort(list, Comparator.comparingInt(String::length));
//字符串的降序排序
// list.sort(Comparator.comparingInt(String::length).reversed());
// 下边的形式会报错 item识别成了(Obejct).
//lambda表达式的类型推断. 如果无法推断类型,需要自己制定类型
// list.sort(Comparator.comparingInt(item-> item.length()).reversed());
//这样写就成功了.
list.sort(Comparator.comparingInt((String item )-> item.length()).reversed());
//为什么这个地方无法推断类型?
// 能推断出的 : list.stream().... Strean<T> 传递的有参数. 精确的类型可以进行类型推断.
//这个地方没有明确具体是什么类型.ToIntFunction<? super T> .可以是String 或者在往上的父类 这个地方看成了Object类了.
// list.sort(Comparator.comparingInt((Boolean item)-> 1).reversed());
//这种Boolean 就会报错.编译不通过.
System.out.println(list);
}
比较器深入举例练习
举例:两层的比较.先按照字符串的长度升序排序. 长度相同,根据每一个ASCII码的顺序排序、
thenComparing()多级排序的练习。;
List<String> list = Arrays.asList("nihao", "hello", "world", "welcome");
//两层的比较.先按照字符串的长度升序排序. 长度相同,根据每一个ASCII码的升序排序. (不区分大小写的 ,按照字母排序的规则) 几种实现的方法。
list.sort(Comparator.comparingInt(String::length).thenComparing(String.CASE_INSENSITIVE_ORDER));
list.sort(Comparator.comparingInt(String::length).thenComparing((item1,item2) -> item1.toUpperCase().compareTo(item2.toUpperCase())));
list.sort(Comparator.comparingInt(String::length).thenComparing(Comparator.comparing(String::toUpperCase)));
//排序后将顺序翻转过来. reverseOrder();
list.sort(Comparator.comparingInt(String::length).thenComparing(String::toLowerCase,Comparator.reverseOrder()));
// 按照字符串的长度降序排序, 再根据ASCII的降序排序
list.sort(Comparator.comparingInt(String::length).reversed()
.thenComparing(String::toLowerCase,Comparator.reverseOrder()));
//多级排序
list.sort(Comparator.comparingInt(String::length).reversed()
.thenComparing(String::toLowerCase, Comparator.reverseOrder())
.thenComparing(Comparator.reverseOrder()));
// 最后一个thenComparing()没有发生作用。
自定义一个简单的收集器
jdk提供了Collector接口。
public class MySetCollector<T> implements Collector<T,Set<T>,Set<T>> {
@Override
public Supplier<Set<T>> supplier() {
//用于提供一个空的容器
System.out.println("supplier invoked! ");
return HashSet::new; // 不接受对象,返回一个Set对象
}
@Override
public BiConsumer<Set<T>, T> accumulator() {
// 累加器类型. 接收两个参数不返回值.
//完成的功能: 不断的往set中添加元素
System.out.println("accumulator invoked! ");
return Set<T>::add ;
// return HashSet<T>::add ; 返回HashSet报错. 原因: 返回的是中间类型的返回类型. 不论返回什么类型的Set ,Set都符合要求.
}
@Override
public BinaryOperator<Set<T>> combiner() {
//将并行流的多个结果给合并起来.
System.out.println("combiner invoked! ");
return (set1,set2)->{
set1.addAll(set2);
return set1;
};
}
@Override
public Function<Set<T>, Set<T>> finisher() {
//完成器,把所有的结果都合并在一起. 返回一个最终的结果类型
//如果中间类型 和最终结果类型一致, 不执行此方法;
System.out.println("finisher invoked! ");
// return t -> t ;
return Function.identity(); // 总是返回参数.
}
@Override
public Set<Characteristics> characteristics() {
System.out.println("characterstics invoked! ");
return Collections.unmodifiableSet(EnumSet.of(Characteristics.IDENTITY_FINISH,Characteristics.UNORDERED)); // 这个地方 不给参数,IDENTITY_FINISH . 则会调用finisher()
}
public static void main(String[] args) {
List<String> list = Arrays.asList("hello", "world");
Set<String> collect = list.stream().collect(new MySetCollector<>());
System.out.println(collect);
}
输出结果为:
supplier invoked!
accumulator invoked!
combiner invoked!
characterstics invoked!
characterstics invoked!
[world, hello]
}
接下来跟源码,看一下程序的调用过程。
@Override
@SuppressWarnings("unchecked")
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
A container;
if (isParallel()
&& (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
&& (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
container = collector.supplier().get();
BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
forEach(u -> accumulator.accept(container, u));
}
else {
container = evaluate(ReduceOps.makeRef(collector));
}
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
? (R) container
: collector.finisher().apply(container);
}
自定义收集器的深度剖析与并行缺陷
// 举例: 需求:将一个Set,进行一个收集.对结果进行增强,封装在一个map当中. // 输入:Set<String> // 输出:Map<String,String> // 示例输入: [hello,world,hello world] // 示例输出: {[hello,hello],[world,world],[hello world,hello world]}
public class MySetCollector2<T> implements Collector<T, Set<T>, Map<T, T>> {
@Override
public Supplier<Set<T>> supplier() {
System.out.println("supplier invoked!");
return HashSet::new;
}
@Override
public BiConsumer<Set<T>, T> accumulator() {
System.out.println("accumlator invoked!");
return (set, item) -> {
set.add(item);
//每次调用 打印出线程 这里会打印6次,
System.out.println("accunlator : " +set+ ", "+ Thread.currentThread().getName());
//出现异常的原因在这里:
// 一个线程去修改一个集合,同时另外一个线程去迭代它(遍历它)。程序就会抛出并发修改异常。如果是并行操作的话,就不要在操作中额外的添加操作。添加就添加,别再去打印他。
};
}
@Override
public BinaryOperator<Set<T>> combiner() {
System.out.println("combiner invoked!");
//并行流的时候才会被调用. 将并行流的多个结果给合并起来
return (set1, set2) -> {
set1.addAll(set2);
return set2;
};
}
@Override
public Function<Set<T>, Map<T, T>> finisher() {
System.out.println("finisher invoked!");
// 中间类型和最终类型 一样,这个是不会被调用的.
//这里不一样 . 会进行调用
return set -> {
Map<T, T> map = new HashMap<>();
// Map<T, T> map = new TreeMap<>(); 直接返回一个排序的Map
set.forEach(item -> map.put(item,item));
return map;
};
}
@Override
public Set<Characteristics> characteristics() {
System.out.println(" characteristics invoked");
return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED));// 这个参数不能乱写. 要理解每个枚举的具体意思.
// return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED,Characteristics.CONCURRENT));// 这个参数不能乱写. 要理解每个枚举的具体意思.
//加了这个参数 Characteristics.CONCURRENT
// 会出异常, 会正常运行. Caused by: java.util.ConcurrentModificationException
// return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED,Characteristics.IDENTITY_FINISH));
// 加了参数Characteristics.IDENTITY_FINISH . 会报错
// Process 'command '/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home/bin/java'' finished with non-zero exit value 1
// IDENTITY_FINISH 实际的含义: 如果用和这个参数,表示 Finish函数就是 identity函数。 并且转换一定要是成功的。失败的话会抛异常.
// 这个收集器具有什么特性 ,由Characteristics 来定义. 就算你赋值的不实际,他也照样执行.
}
public static void main(String[] args) {
List<String> list = Arrays.asList("hello","hello", "world", "helloworld","1","4","j");
Set<String> set = new HashSet<>(list);
System.out.println("set"+set);
// Map<String, String> collect = set.stream().collect(new MySetCollector2<>());
Map<String, String> collect = set.parallelStream().collect(new MySetCollector2<>()); //并行流
System.out.println(collect);
}
}
并行流缺陷详解
并行:
accumlator invoked!
accunlator : [j], main
accunlator : [j, hello], main
accunlator : [helloworld, 4, j, hello], ForkJoinPool.commonPool-worker-2
accunlator : [helloworld, 1, 4, j, hello], ForkJoinPool.commonPool-worker-2
accunlator : [helloworld, 1, world, 4, j, hello], ForkJoinPool.commonPool-worker-2
串行。
accunlator : [j], main
accunlator : [helloworld], ForkJoinPool.commonPool-worker-11
accunlator : [helloworld, 1], ForkJoinPool.commonPool-worker-11
accunlator : [helloworld, 1, world], ForkJoinPool.commonPool-worker-11
accunlator : [4], ForkJoinPool.commonPool-worker-9
accunlator : [j, hello], main
/**
* Characteristics indicating properties of a {@code Collector}, which can
* be used to optimize reduction implementations.
*/
enum Characteristics { // 特征
/**
* Indicates that this collector is <em>concurrent</em>, meaning that
* the result container can support the accumulator function being
* called concurrently with the same result container from multiple
* threads.
* 并发的,同一个结果容器可以由多个线程同时调用。
* <p>If a {@code CONCURRENT} collector is not also {@code UNORDERED},
* then it should only be evaluated concurrently if applied to an
* unordered data source.
如果不是UNORDERED。只能用于无序的数据源。
如果不加CONCURRENT,还是可以操作并行流。但是操作的不是一个结果容器,而是多个结果容器。则需要调用finisher.
如果加了CONCURRENT,则是多个线程操作同一结果容器。 则无需调用finisher.
*/
CONCURRENT,
/**
* Indicates that the collection operation does not commit to preserving
* the encounter order of input elements. (This might be true if the
* result container has no intrinsic order, such as a {@link Set}.)
收集操作并不保留顺序。无序的。
*/
UNORDERED,
/**
* Indicates that the finisher function is the identity function and
* can be elided. If set, it must be the case that an unchecked cast
* from A to R will succeed.
如果用和这个参数,表示 Finish函数就是 identity函数。 并且转换一定要是成功的。不会调用Finish方法
*/
IDENTITY_FINISH
}
出异常的根本原因:
一个线程去修改一个集合,同时另外一个线程去迭代它(遍历它)。程序就会抛出并发修改异常。
如果是并行操作的话,就不要在操作中额外的添加操作。添加就添加,别再去打印他。
如果不加CONCURRENT,还是可以操作并行流。但是操作的不是一个结果容器,而是多个结果容器。则需要调用finisher.
如果加了CONCURRENT,则是多个线程操作同一结果容器。 则无需调用finisher.
超线程介绍:
超线程(HT, Hyper-Threading)是英特尔研发的一种技术,于2002年发布。超线程技术原先只应用于Xeon 处理器中,当时称为“Super-Threading”。之后陆续应用在Pentium 4 HT中。早期代号为Jackson。 [1]
通过此技术,英特尔实现在一个实体CPU中,提供两个逻辑线程。之后的Pentium D纵使不支持超线程技术,但就集成了两个实体核心,所以仍会见到两个线程。超线程的未来发展,是提升处理器的逻辑线程。英特尔于2016年发布的Core i7-6950X便是将10核心的处理器,加上超线程技术,使之成为20个逻辑线程的产品
收集器总结:
Collectors类中方法的实现练习。收集器总是有中间的容器。有必要的总结一下收集器中的方法。
当你具备一些前提的东西之后,你再去看难的东西就会觉得理所当然的。
对于Collectors静态工厂类来说,实现一共分为两种情况:
通过CollectorImpl来实现。
-
通过reducing方法来实现;reducing方法本身又是通过CollectorImpl实现的。
总的来说,都是通过CollectorImpl来实现的。
1. toCollection(collectionFactory) 。 将集合转成指定的集合。
public static <T, C extends Collection<T>>
Collector<T, ?, C> toCollection(Supplier<C> collectionFactory) {
return new CollectorImpl<>(collectionFactory, Collection<T>::add,
(r1, r2) -> { r1.addAll(r2); return r1; },
CH_ID);
}
2. toList()是 toCollection()方法的一种具体实现。
public static <T>
Collector<T, ?, List<T>> toList() {
return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
(left, right) -> { left.addAll(right); return left; },
CH_ID);
}
3. toSet() 是toCollection()方法的一种具体实现。
public static <T>
Collector<T, ?, Set<T>> toSet() {
return new CollectorImpl<>((Supplier<Set<T>>) HashSet::new, Set::add,
(left, right) -> { left.addAll(right); return left; },
CH_UNORDERED_ID);
}
4. joining(); 融合成一个字符串。还有两个重载的,单参数的和多参数的
public static Collector<CharSequence, ?, String> joining() {
return new CollectorImpl<CharSequence, StringBuilder, String>(
StringBuilder::new, StringBuilder::append,
(r1, r2) -> { r1.append(r2); return r1; },
StringBuilder::toString, CH_NOID);
}
public static Collector<CharSequence, ?, String> joining(CharSequence delimiter) {
return joining(delimiter, "", "");
}
public static Collector<CharSequence, ?, String> joining(CharSequence delimiter,
CharSequence prefix,
CharSequence suffix) {
return new CollectorImpl<>(
() -> new StringJoiner(delimiter, prefix, suffix),
StringJoiner::add, StringJoiner::merge,
StringJoiner::toString, CH_NOID);
}
5.mapping(); 将收集器的A 映射成B
public static <T, U, A, R>
Collector<T, ?, R> mapping(Function<? super T, ? extends U> mapper,
Collector<? super U, A, R> downstream) {
BiConsumer<A, ? super U> downstreamAccumulator = downstream.accumulator();
return new CollectorImpl<>(downstream.supplier(),
(r, t) -> downstreamAccumulator.accept(r, mapper.apply(t)),
downstream.combiner(), downstream.finisher(),
downstream.characteristics());
}
such as :
Map<City, Set<String>> lastNamesByCity
= people.stream().collect(groupingBy(Person::getCity, mapping(Person::getLastName, toSet())));
6.collectingAndThen(); 收集处理转换完后, 再去进行一个转换。
public static<T,A,R,RR> Collector<T,A,RR> collectingAndThen(Collector<T,A,R> downstream,
Function<R,RR> finisher) {
Set<Collector.Characteristics> characteristics = downstream.characteristics();
if (characteristics.contains(Collector.Characteristics.IDENTITY_FINISH)) {
if (characteristics.size() == 1)
characteristics = Collectors.CH_NOID;
else {
characteristics = EnumSet.copyOf(characteristics);
characteristics.remove(Collector.Characteristics.IDENTITY_FINISH);
// 这个地方为什么要把IDENTITY_FINISH 去掉。
// 如果不去掉的话, 最终结果直接返回中间结果的类型
characteristics = Collections.unmodifiableSet(characteristics);
}
}
return new CollectorImpl<>(downstream.supplier(),
downstream.accumulator(),
downstream.combiner(),
downstream.finisher().andThen(finisher),
characteristics);
}
such as :
List<String> people
= people.stream().collect(collectingAndThen(toList(),Collections::unmodifiableList));
7. counting(); 计数。
public static <T> Collector<T, ?, Long>
counting() {
return reducing(0L, e -> 1L, Long::sum);
}
8. 最大值最小值
public static <T> Collector<T, ?, Optional<T>>
minBy(Comparator<? super T> comparator) {
return reducing(BinaryOperator.minBy(comparator));
}
public static <T> Collector<T, ?, Optional<T>>
maxBy(Comparator<? super T> comparator) {
return reducing(BinaryOperator.maxBy(comparator));
}
9. summingInt();求和。
public static <T> Collector<T, ?, Integer>
summingInt(ToIntFunction<? super T> mapper) {
return new CollectorImpl<>(
() -> new int[1], // 这个地方为什么不可以用一个0,来当做中间类型呢?数字本身是一个值类型的,不可变的,没法引用。数组本身是一个引用类型,可以进行传递。数组本身是一个容器。
(a, t) -> { a[0] += mapper.applyAsInt(t); },
(a, b) -> { a[0] += b[0]; return a; },
a -> a[0], CH_NOID);
}
public static <T> Collector<T, ?, Long>
summingLong(ToLongFunction<? super T> mapper) {
return new CollectorImpl<>(
() -> new long[1],
(a, t) -> { a[0] += mapper.applyAsLong(t); },
(a, b) -> { a[0] += b[0]; return a; },
a -> a[0], CH_NOID);
}
public static <T> Collector<T, ?, Double>
summingDouble(ToDoubleFunction<? super T> mapper) {
/*
* In the arrays allocated for the collect operation, index 0
* holds the high-order bits of the running sum, index 1 holds
* the low-order bits of the sum computed via compensated
* summation, and index 2 holds the simple sum used to compute
* the proper result if the stream contains infinite values of
* the same sign.
*/
return new CollectorImpl<>(
() -> new double[3],
(a, t) -> { sumWithCompensation(a, mapper.applyAsDouble(t));
a[2] += mapper.applyAsDouble(t);},
(a, b) -> { sumWithCompensation(a, b[0]);
a[2] += b[2];
return sumWithCompensation(a, b[1]); },
a -> computeFinalSum(a),
CH_NOID);
}
10. averagingInt(); 求平均值。
public static <T> Collector<T, ?, Double>
averagingInt(ToIntFunction<? super T> mapper) {
return new CollectorImpl<>(
() -> new long[2],
(a, t) -> { a[0] += mapper.applyAsInt(t); a[1]++; },
(a, b) -> { a[0] += b[0]; a[1] += b[1]; return a; },
a -> (a[1] == 0) ? 0.0d : (double) a[0] / a[1], CH_NOID);
}
public static <T> Collector<T, ?, Double>
averagingLong(ToLongFunction<? super T> mapper) {
return new CollectorImpl<>(
() -> new long[2],
(a, t) -> { a[0] += mapper.applyAsLong(t); a[1]++; },
(a, b) -> { a[0] += b[0]; a[1] += b[1]; return a; },
a -> (a[1] == 0) ? 0.0d : (double) a[0] / a[1], CH_NOID);
}
public static <T> Collector<T, ?, Double>
averagingDouble(ToDoubleFunction<? super T> mapper) {
/*
* In the arrays allocated for the collect operation, index 0
* holds the high-order bits of the running sum, index 1 holds
* the low-order bits of the sum computed via compensated
* summation, and index 2 holds the number of values seen.
*/
return new CollectorImpl<>(
() -> new double[4],
(a, t) -> { sumWithCompensation(a, mapper.applyAsDouble(t)); a[2]++; a[3]+= mapper.applyAsDouble(t);},
(a, b) -> { sumWithCompensation(a, b[0]); sumWithCompensation(a, b[1]); a[2] += b[2]; a[3] += b[3]; return a; },
a -> (a[2] == 0) ? 0.0d : (computeFinalSum(a) / a[2]),
CH_NOID);
}
11. reducing() ; 详解。
public static <T> Collector<T, ?, T>
reducing(T identity, BinaryOperator<T> op) {
return new CollectorImpl<>(
boxSupplier(identity),
(a, t) -> { a[0] = op.apply(a[0], t); },
(a, b) -> { a[0] = op.apply(a[0], b[0]); return a; },
a -> a[0],
CH_NOID);
}
12. groupingBy(); 分组方法详解。
public static <T, K> Collector<T, ?, Map<K, List<T>>> //使用者本身不注重中间类型怎么操作。
groupingBy(Function<? super T, ? extends K> classifier) {
return groupingBy(classifier, toList()); //调用两个参数的 groupingBy();
}
* @param <T> the type of the input elements //T; 接收的类型。
* @param <K> the type of the keys // K,分类器函数中间返回结果的类型。
* @param <A> the intermediate accumulation type of the downstream collector
* @param <D> the result type of the downstream reduction
*
public static <T, K, A, D>
Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier,
Collector<? super T, A, D> downstream) {
return groupingBy(classifier, HashMap::new, downstream); // 调用三参数的 groupingBy()
}
//功能最完全的groupingBy();
/**
* Returns a {@code Collector} implementing a cascaded "group by" operation
* on input elements of type {@code T}, grouping elements according to a
* classification function, and then performing a reduction operation on
* the values associated with a given key using the specified downstream
* {@code Collector}. The {@code Map} produced by the Collector is created
* with the supplied factory function.
*
* <p>The classification function maps elements to some key type {@code K}.
* The downstream collector operates on elements of type {@code T} and
* produces a result of type {@code D}. The resulting collector produces a
* {@code Map<K, D>}.
*
* <p>For example, to compute the set of last names of people in each city,
* where the city names are sorted:
* <pre>{@code
* Map<City, Set<String>> namesByCity
* = people.stream().collect(groupingBy(Person::getCity, TreeMap::new,
* mapping(Person::getLastName, toSet())));
* }</pre>
*
* @implNote
* The returned {@code Collector} is not concurrent. For parallel stream
* pipelines, the {@code combiner} function operates by merging the keys
* from one map into another, which can be an expensive operation. If
* preservation of the order in which elements are presented to the downstream
* collector is not required, using {@link #groupingByConcurrent(Function, Supplier, Collector)}
* may offer better parallel performance.
* 返回的 并不是并发的。如果顺序并不是很重要的话, 推荐使用groupingByConcurrent(); 并发的分组函数。
* @param <T> the type of the input elements
* @param <K> the type of the keys
* @param <A> the intermediate accumulation type of the downstream collector
* @param <D> the result type of the downstream reduction
* @param <M> the type of the resulting {@code Map}
* @param classifier a classifier function mapping input elements to keys
* @param downstream a {@code Collector} implementing the downstream reduction
* @param mapFactory a function which, when called, produces a new empty
* {@code Map} of the desired type
* @return a {@code Collector} implementing the cascaded group-by operation
*
* @see #groupingBy(Function, Collector)
* @see #groupingBy(Function)
* @see #groupingByConcurrent(Function, Supplier, Collector)
*/
public static <T, K, D, A, M extends Map<K, D>>
Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier,
Supplier<M> mapFactory,
Collector<? super T, A, D> downstream) {
Supplier<A> downstreamSupplier = downstream.supplier();
BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
BiConsumer<Map<K, A>, T> accumulator = (m, t) -> {
K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
A container = m.computeIfAbsent(key, k -> downstreamSupplier.get());
downstreamAccumulator.accept(container, t);
};
BinaryOperator<Map<K, A>> merger = Collectors.<K, A, Map<K, A>>mapMerger(downstream.combiner()); //接收两个参数,参会一个结果。
@SuppressWarnings("unchecked")
Supplier<Map<K, A>> mangledFactory = (Supplier<Map<K, A>>) mapFactory; // 进行一个强制的类型转换。
if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
//如果 IDENTITY_FINISH , 则不用调用finisher方法。
return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_ID);
}
else {
@SuppressWarnings("unchecked")
Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher();
Function<Map<K, A>, M> finisher = intermediate -> {
intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v));
@SuppressWarnings("unchecked")
M castResult = (M) intermediate;
return castResult;
};
return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_NOID);
}
}
13. groupingByConcurrent(); 并发的分组方法。 使用前提是对数据里边的顺序没有要求。
/**
* Returns a concurrent {@code Collector} implementing a cascaded "group by"
* operation on input elements of type {@code T}, grouping elements
* according to a classification function, and then performing a reduction
* operation on the values associated with a given key using the specified
* downstream {@code Collector}.
*/ // ConcurrentHashMap 是一个支持并发的Map
public static <T, K>
Collector<T, ?, ConcurrentMap<K, List<T>>>
groupingByConcurrent(Function<? super T, ? extends K> classifier) {
return groupingByConcurrent(classifier, ConcurrentHashMap::new, toList());
}
public static <T, K, A, D>
Collector<T, ?, ConcurrentMap<K, D>> groupingByConcurrent(Function<? super T, ? extends K> classifier,
Collector<? super T, A, D> downstream) {
return groupingByConcurrent(classifier, ConcurrentHashMap::new, downstream);
}
public static <T, K, A, D, M extends ConcurrentMap<K, D>>
Collector<T, ?, M> groupingByConcurrent(Function<? super T, ? extends K> classifier,
Supplier<M> mapFactory,
Collector<? super T, A, D> downstream) {
Supplier<A> downstreamSupplier = downstream.supplier();
BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
BinaryOperator<ConcurrentMap<K, A>> merger = Collectors.<K, A, ConcurrentMap<K, A>>mapMerger(downstream.combiner());
@SuppressWarnings("unchecked")
Supplier<ConcurrentMap<K, A>> mangledFactory = (Supplier<ConcurrentMap<K, A>>) mapFactory;
BiConsumer<ConcurrentMap<K, A>, T> accumulator;
if (downstream.characteristics().contains(Collector.Characteristics.CONCURRENT)) {
accumulator = (m, t) -> {
K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
A resultContainer = m.computeIfAbsent(key, k -> downstreamSupplier.get());
downstreamAccumulator.accept(resultContainer, t);
};
}
else {
accumulator = (m, t) -> {
K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
A resultContainer = m.computeIfAbsent(key, k -> downstreamSupplier.get());
synchronized (resultContainer) { // 这里有一个同步的操作。虽然是多线程操作同一容器,但是同时还是只有一个线程操作,进行了同步。
downstreamAccumulator.accept(resultContainer, t);
}
};
}
if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_CONCURRENT_ID);
}
else {
@SuppressWarnings("unchecked")
Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher();
Function<ConcurrentMap<K, A>, M> finisher = intermediate -> {
intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v));
@SuppressWarnings("unchecked")
M castResult = (M) intermediate;
return castResult;
};
return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_CONCURRENT_NOID);
}
}
14. partitioningBy(); 分区详解。
public static <T>
Collector<T, ?, Map<Boolean, List<T>>> partitioningBy(Predicate<? super T> predicate) {
return partitioningBy(predicate, toList());
}
public static <T, D, A>
Collector<T, ?, Map<Boolean, D>> partitioningBy(Predicate<? super T> predicate,
Collector<? super T, A, D> downstream) {
BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
BiConsumer<Partition<A>, T> accumulator = (result, t) ->
downstreamAccumulator.accept(predicate.test(t) ? result.forTrue : result.forFalse, t);
BinaryOperator<A> op = downstream.combiner();
BinaryOperator<Partition<A>> merger = (left, right) ->
new Partition<>(op.apply(left.forTrue, right.forTrue),
op.apply(left.forFalse, right.forFalse));
Supplier<Partition<A>> supplier = () ->
new Partition<>(downstream.supplier().get(),
downstream.supplier().get());
if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
return new CollectorImpl<>(supplier, accumulator, merger, CH_ID);
}
else {
Function<Partition<A>, Map<Boolean, D>> finisher = par ->
new Partition<>(downstream.finisher().apply(par.forTrue),
downstream.finisher().apply(par.forFalse));
return new CollectorImpl<>(supplier, accumulator, merger, finisher, CH_NOID);
}
}
jdk的代码,就是我们学习的范本。
讲这么细的原因并不是因为要自己去写,是为了了解内部是具体怎么实现的。调用的时候就信心非常的足。