java8 Stream之原理

  Stream
  
  java8的Stream很重要,spring-reactor里面用到reactor-core,而java8的stream与之很相似,搞懂了再看reactor-core必定事半功倍。
  
  先看一下它的强大,这里只是冰山一角:
  
  从List<Student> 列表中取出name,将name组成一个List。
  
  老代码
  
  List<String> nameList = new ArrayList();
  
  if(null != list){
  
  for(Student stu : list){
  
  nameList.add(stu.getName());
  
  }
  
  }
  
  JAVA8
  
  List<String> nameList = Optional.ofNullable(list).orElse(Collections.emptyList()).stream()
  
  .map(Stu::getName).collect(Collectors.toList());
  
  Stream.of 创建Stream
  
  这里给大家演示一下通过Stream.of创建Stream。
  
  常见的集合通过stream()方法都可以创建Stream。 其实他们最终都是调用以下方法创建的。
  
  public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
  
  Objects.requireNonNull(spliterator);
  
  return new ReferencePipeline.Head<>(spliterator,
  
  StreamOpFlag.fromCharacteristics(spliterator),
  
  parallel);
  
  }
  
  Stream.of有两种创建Stream的方法。
  
  第一种
  
  Stream.of("a1")
  
  第二种
  
  Stream.of("a1","a2"); //这种通过Arrays.stream 构建
  
  这里介绍两个相关的类:
  
  如果是单个元素,直接使用Spliterator进行构建。 如果是多个元素,会有一个优化,使用SpineBuffer构建。
  
  如果是大数组,使用SpineBuffer,小数组是使用ArrayList。 如何使用SpineBuffer构建?
  
  Stream.builder().add("a1").add("a2").build();
  
  Stream 相关概念
  
  stream的操作分为两种:
  
  一种是中间操作,就是不需要结果,只需要记录这个过程,一般返回Stream对象都是属于这种
  
  一种是终极操作,就是立即需要返回结果,一般返回非Stream对象,都是属于这种。
  
  stream的状态分为三种:
  
  第一种:Head,第一次创建的时候就是这种
  
  第二种:Stateless,无状态,每个对象的操作是独立的。
  
  第三种:Stateful,有状态,需要联合多个象才能得出结果。
  
  stream操作特性:
  
  操作特性是指:该stream有固定大小,大小不固定,操作有序,数据有序等。
  
  Stream.filter
  
  顾名思义:对 Stream进行filter,然后返回新的Stream。 由前一节我们知道,stream的具体数据存储在Spliterator中。而它本身可以理解为只是一个算法。
  
  filter只是一个中间操作,我们只需要记录这一个过程就OK了。然后返回新的Stream。如果再次调用fileter,会再次返回一个新Stream。
  
  上面是一个流程图,Sink是包装算子的一个类,比如调用filter,从Head里面拿到对象,经过第一个Sink,再经过第二个Sink的运算,最终得到结果。
  
  下面是Strea.filter的源码实现:
  
  public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
  
  Objects.requireNonNull(predicate);
  
  return new StatelessOp<P_OUT, P_OUT>www.frgjyL.cn(this, StreamShape.REFERENCE,
  
  StreamOpFlag.NOT_SIZED) {
  
  @Override
  
  Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
  
  return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
  
  @Override
  
  public void begin(long size) {
  
  downstream.begin(-1);
  
  }
  
  @Override
  
  public void accept(P_OUT u) {
  
  //如果通过当前filter,就进入下一个算子
  
  if (predicate.test(u))
  
  downstream.accept(u);
  
  }
  
  };
  
  }
  
  };
  
  }
  
  Stream.peek
  
  这个方法可以理解为调试方法,它不对结果产生任何影响,将数据原封不动的传给下一个算子
  
  public final Stream<P_OUT> peek(Consumer<? super P_OUT> action) {
  
  Objects.requireNonNull(action);
  
  return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
  
  0) {
  
  @Override
  
  Sink<P_OUT> opWrapSink(int flags,www.guochengzy.com Sink<P_OUT> sink) {
  
  return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
  
  @Override
  
  public void accept(P_OUT u) {
  
  action.accept(u);
  
  downstream.accept(u);
  
  }
  
  };
  
  }
  
  };
  
  }
  
  Stream.flatMap
  
  算子应该是通过一个对象映身成一个Stream,然后调用foreach,将每个元素传递到下一个算子。
  
  public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
  
  Objects.requireNonNull(mapper);
  
  // We can do better than this, by polling cancellationRequested when stream is infinite
  
  return new StatelessOp<P_OUT, R>www.yongshiyule178.com(this, StreamShape.REFERENCE,
  
  StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
  
  @Override
  
  Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
  
  return new Sink.ChainedReference<P_OUT, R>(sink) {
  
  @Override
  
  public void begin(long size) {
  
  downstream.begin(-1);
  
  }
  
  @Override
  
  public void accept(P_OUT u) {
  
  try (Stream<? extends R> result = mapper.apply(u)) {
  
  // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
  
  if (result !www.honglpt.cn= null)
  
  result.sequential().forEach(downstream);
  
  Stream.map
  
  与上面的类似,只是映射成另一个对象
  
  public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
  
  Objects.requireNonNull(mapper);
  
  return new StatelessOp<P_OUT, R>www.cmyLgw.cn (this, StreamShape.REFERENCE,
  
  StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
  
  @Override
  
  Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
  
  return new Sink.ChainedReference<P_OUT, R>(sink) {
  
  @Override
  
  public void accept(P_OUT u) {
  
  downstream.accept(mapper.apply(u));
  
  }
  
  };
  
  }
  
  };
  
  }
  
  Stream.limit
  
  这是一个有状态的操作,因为它返回一定数据的数据组成的Stream。 这里只贴一段核心算法:
  
  Sink<T> opWrapSink(int flags, Sink<T> sink) {
  
  return new Sink.ChainedReference<T, T>(sink) {
  
  long n = skip;
  
  long m = limit >= 0 ? limit : Long.MAX_VALUE;
  
  @Override
  
  public void begin(long size) {
  
  downstream.begin(calcSize(size, skip, m));
  
  }
  
  @Override
  
  public void accept(T www.chengmyuLegw.cn) {
  
  if (n == 0) {
  
  if (m > 0) {
  
  m--;
  
  downstream.accept(t);
  
  }
  
  }
  
  else {
  
  n--;
  
  }
  
  }
  
  @Override
  
  public boolean cancellationRequested() {
  
  return m == 0 |www.huishenggw.cn| downstream.cancellationRequested();
  
  }
  
  };
  
  }
  
  Stream.skip
  
  这个与Stram.limit类似,两个联合起来就可以分面查询了。
  
  Stream.sorted
  
  排序,如果没传比较器就用默认的。
  
  如果有顺序,就不用排序了,如果给定大小了就用一个固定大小的数组来排序,否则用一个列来来排序。
  
  public Sink<T> opWrapSink(int flags, Sink<T> sink) {
  
  Objects.requireNonNull(sink);
  
  // If the input is already naturally sorted and this operation
  
  // also naturally sorted then this is a no-op
  
  if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
  
  return sink;
  
  else if (StreamOpFlag.SIZED.isKnown(flags))
  
  return new SizedRefSortingSink<>(sink, comparator);
  
  else
  
  return new RefSortingSink<>(sink, comparator);
  
  }
  
  通过排序,分页,说明这个算子需要支持开始,结束方法。还需要一个取消方法,为什么了,比如第一个Stream有20个对象,但是后面只需要第一个,所以我第一个算子给到你一个数据时,第一个算子就需要终止了。
  
  Stream.anyMatch
  
  下面看一个anyMatch是怎么实现的。
  
  @Override
  
  public final boolean anyMatch(Predicate<? super P_OUT> predicate) {
  
  return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY));
  
  }
  
  第二步,主要是用当前stream,和原始的数据容器spliterator
  
  final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
  
  assert getOutputShape() == terminalOp.inputShape();
  
  if (linkedOrConsumed)
  
  throw new IllegalStateException(MSG_STREAM_LINKED);
  
  linkedOrConsumed = true;
  
  return isParallel()
  
  ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
  
  : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
  
  }
  
  第三步,最后一个算子和原始容器
  
  @Override
  
  public <S> Boolean evaluateSequential(PipelineHelper<T> helper,
  
  Spliterator<S> spliterator) {
  
  return helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).getAndClearState();
  
  }
  
  第四步 包装算子
  
  final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
  
  Objects.requireNonNull(sink);
  
  for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
  
  sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
  
  }
  
  return (Sink<P_IN>) sink;
  
  }
  
  第五步 数据传递
  
  @Override
  
  final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
  
  copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
  
  return sink;
  
  }
  
  final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
  
  Objects.requireNonNull(wrappedSink);
  
  //满足要求后,是否需要停止计算
  
  if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
  
  wrappedSink.begin(spliterator.getExactSizeIfKnown());
  
  spliterator.forEachRemaining(wrappedSink);
  
  wrappedSink.end();
  
  }
  
  else {
  
  //需要停止计算
  
  copyIntoWithCancel(wrappedSink, spliterator);
  
  }
  
  }
  
  Stream.spliterator
  
  只需要一个Sink,然后调用wrapSink,再copyInto就可以实现了
  
  final <P_IN> Spliterator<P_OUT> wrap(PipelineHelper<P_OUT> ph,
  
  Supplier<Spliterator<P_IN>> supplier,
  
  boolean isParallel) {
  
  return new StreamSpliterators.WrappingSpliterator<>(ph, supplier, isParallel);

上一篇:排序之堆排序


下一篇:flume