【修炼内功】[Java8] Stream是怎么工作的

栏目: Java · 发布时间: 5年前

内容简介:Java8中新增的Stream,相信使用过的同学都已经感受到了它的便利,允许你以声明性的方式处理集合,而不用去做繁琐的for-loop/while-loop,并且可以以极低的成本并行地处理集合数据如果需要从菜单中筛选出卡路里在400以下的菜品,并按卡路里排序后,输出菜品名称

【修炼内功】[Java8] Stream是怎么工作的

Java8中新增的Stream,相信使用过的同学都已经感受到了它的便利,允许你以声明性的方式处理集合,而不用去做繁琐的for-loop/while-loop,并且可以以极低的成本并行地处理集合数据

如果需要从菜单中筛选出卡路里在400以下的菜品,并按卡路里 排序 后,输出菜品名称

java 8之前,需要进行两次显示迭代,并且还需要借助中间结果存储

List<Menu> lowCaloricDishes = new LinkedList<>();

// 按照热量值进行筛选
for(Dish dish : dishes) {
  if (dish.getCalories() < 400) {
    lowCaloricDishes.add(dish);
  }
}

// 按照热量进行排序
lowCaloricDishes.sort(new Comparator<Dish>() {
  @Override
  public int compare(Dish d1, Dish d2) {
    return d1.getCalories().compareTo(d2.getCalories);
  }
})

// 提取名称
List<String> lowCaloricDishesName = new LinkedList<>();
for(Dish dish : lowCaloricDishes) {
  lowCaloricDishesName.add(dish.getName());
}

如果使用Stream API,只需要

List<String> lowCaloricDishesName = 
    dishes.parallelStream() // 开启并行处理
          .filter(d -> d.getCalories() < 400) // 按照热量值进行筛选
          .sorted(Comparator.comparing(Dish::getCalories)) // 按照热量进行排序
          .map(Dish::getName) // 提取名称
          .collect(Collectors.toList()); // 将结果存入List

甚至,可以写出更复杂的功能

Map<Integer, List<String>> lowCaloricDishesNameGroup = 
    dishes.parallelStream() // 开启并行处理
          .filter(d -> d.getCalories() < 400) // 按照热量值进行筛选
          .sorted(comparing(Dish::getCalories)) // 按照热量进行排序
          .collect(Collectors.groupingBy( // 将菜品名按照热量进行分组
              Dish::getCalories, 
              Collectors.mapping(Dish::getName, Collectors.toList())
          ));

是不是非常简洁,并且越发形似SQL

如此简洁的API是如何实现的?中间过程是如何衔接起来的?每一步都会进行一次迭代么,需要中间结果存储么?并行处理是怎么做到的?

什么是Stream?

Stream使用一种类似 SQL 语句的方式,提供对集合运算的高阶抽象,可以将其处理的元素集合看做一种数据流,流在管道中传输,数据在管道节点上进行处理,比如筛选、排序、聚合等

【修炼内功】[Java8] Stream是怎么工作的

数据流在管道中经过中间操作(intermediate operation)处理,由终止操作(terminal operation)得到前面处理的结果

和以往的集合操作不同,Stream操作有两个基础特征:

  • pipelining: 中间操作 会返回流对象,多个操作最终串联成一个管道,管道并不直接操作数据,最终由 终止操作 触发数据在管道中的流动及处理,并收集最终的结果

    Stream的实现使用流水线( pipelining )的方式巧妙的避免了多次迭代,其基本思想是在 一次迭代 中尽可能多的执行用户指定的操作

  • 内部迭代:区别于以往使用iterator或者for-each等显示地在集合外部进行迭代计算的方式,内部迭代隐式的在集合内部进行迭代计算

Stream操作分为两类: 中间操作终止操作

  • 中间操作:将流一层层的进行处理,并向下一层进行传递,如 filter map sorted

    中间操作又分为有状态(stateful)及无状态(stateless)

    • 有状态:必须等上一步操作完拿到全部元素后才可操作,如 sorted
    • 无状态:该操作的数据不收上一步操作的影响,如 filter map
  • 终止操作:触发数据的流动,并收集结果,如 collect findFirst forEach

    终止操作又分为短路操作(short-circuiting)及非短路操作(non-short-circuiting)

    • 短路操作:会在适当的时刻终止遍历,类似于break,如 anyMatch findFirst
    • 非短路操作:会遍历所有元素,如 collect max

【修炼内功】[Java8] Stream是怎么工作的

Stream采用某种方式记录用户每一步的操作,当用户调用终止操作时将之前记录的操作叠加到一起,尽可能地在一次迭代中全部执行掉,那么

  1. 用户的操作如何记录?
  2. 操作如何叠加?
  3. 叠加后的操作如何执行?
  4. 执行后的结果(如果有)在哪里?

Stream如何实现

操作如何记录

Stream中使用Stage的概念来描述一个完整的操作,并用某种实例化后的PipelineHelper来代表Stage,将各Pipeline按照先后顺序连接到一起,就构成了整个流水线

与Stream相关类和接口的继承关系如下图

【修炼内功】[Java8] Stream是怎么工作的

Head用于表示第一个Stage,该Stage不包含任何操作

StatelessOp和StatefulOp分别表示无状态和有状态的Stage

【修炼内功】[Java8] Stream是怎么工作的

使用 Collection.stream Arrays.streamStream.of 等接口会生成 Head ,其内部均采用 StreamSupport.stream 方法,将原始数据包装为 Spliterator 存放在Stage中

  • Head记录Stream起始操作,将包装为Spliterator的原始数据存放在Stage中
  • StatelessOp记录无状态的中间操作
  • StatefulOp记录有状态的中间操作
  • TerminalOp用于触发数据数据在各Stage间的流动及处理,并收集最终数据(如果有)

Head StatelessOp StatefulOp三个操作实例化会指向其父类AbstractPipeline

对于Head

/**
 * Constructor for the head of a stream pipeline.
 *
 * @param source {@code Spliterator} describing the stream source
 * @param sourceFlags the source flags for the stream source, described in
 * {@link StreamOpFlag}
 * @param parallel {@code true} if the pipeline is parallel
 */
AbstractPipeline(Spliterator<?> source, int sourceFlags, boolean parallel) {
    this.previousStage = null;
    this.sourceSpliterator = source;
    this.sourceStage = this;
    this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
    // The following is an optimization of:
    // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
    this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
    this.depth = 0;
    this.parallel = parallel;
}

其会将包装为Spliterator的原始数据存放在Stage中,并将自身存放在sourceStage中

对于StatelessOp及StatefulOp

/**
 * Constructor for appending an intermediate operation stage onto an
 * existing pipeline.
 *
 * @param previousStage the upstream pipeline stage
 * @param opFlags the operation flags for the new stage, described in
 * {@link StreamOpFlag}
 */
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
    if (previousStage.linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    previousStage.linkedOrConsumed = true;
    previousStage.nextStage = this;

    this.previousStage = previousStage;
    this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
    this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
    this.sourceStage = previousStage.sourceStage;
    if (opIsStateful())
        sourceStage.sourceAnyStateful = true;
    this.depth = previousStage.depth + 1;
}

每一个Stage都会存放原始的sourceStage,即Head

通过previousStage及nextStage,将各Stage串联为一个双向链表,使得每一步都知道上一步与下一步的操作

操作如何叠加

以上已经解决了如何记录操作的问题,想要让pipeline运行起来,需要一种将所有操作叠加到一起的方案

由于前面的Stage并不知道后面的Stage导致需要执行何种操作,只有当前Stage本身知道该如何执行自己包含的动作,这就需要某种协议来协调相邻Stage之间的调用关系

Stream类库采用了Sink接口来协调各Stage之间的关系

interface Sink<T> extends Consumer<T> {
    /**
     * Resets the sink state to receive a fresh data set.  This must be called
     * before sending any data to the sink.  After calling {@link #end()},
     * you may call this method to reset the sink for another calculation.
     * @param size The exact size of the data to be pushed downstream, if
     * known or {@code -1} if unknown or infinite.
     *
     * <p>Prior to this call, the sink must be in the initial state, and after
     * this call it is in the active state.
     *
     * 开始遍历前调用,通知Sink做好准备
     */
    default void begin(long size) {}

    /**
     * Indicates that all elements have been pushed.  If the {@code Sink} is
     * stateful, it should send any stored state downstream at this time, and
     * should clear any accumulated state (and associated resources).
     *
     * <p>Prior to this call, the sink must be in the active state, and after
     * this call it is returned to the initial state.
     *
     * 所有元素遍历完成后调用,通知Sink没有更多元素了
     */
    default void end() {}
    
    /**
     * Indicates that this {@code Sink} does not wish to receive any more data.
     *
     * @implSpec The default implementation always returns false.
     *
     * @return true if cancellation is requested
     *
     * 是否可以结束操作,可以让短路操作尽早结束
     */
    default boolean cancellationRequested() {}

    /**
     * Accepts a value.
     *
     * @implSpec The default implementation throws IllegalStateException.
     *
     * @throws IllegalStateException if this sink does not accept values
     *
     * 遍历时调用,接收的一个待处理元素,并对元素进行处理
     * Stage把自己包含的操作和回调方法封装到该方法里,前一个Stage只需要调用当前Stage.accept方法即可
     */
    default void accept(T value) {}
}

Sink的四个接口方法常常相互协作,共同完成计算任务

实际上Stream API内部实现的的本质,就是如何重载Sink的这四个接口方法,下面结合具体源码来理解Stage是如何将自身的操作包装秤Sink,以及Sink是如何将处理结果转发给下一个Sink的

无状态Stage,Stream.map

// Stream.map 将生成一个新Stream
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
    Objects.requireNonNull(mapper);
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        // 该方法将回调函数(处理逻辑)包装成Sink
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void accept(P_OUT u) {
                    // 接收数据,使用当前包装的回调函数处理数据,并传递给下游Sink
                    downstream.accept(mapper.apply(u));
                }
            };
        }
    };
}

上述代码逻辑非常简单,接下来可以看一下有状态Stage,Stream.sorted

private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {
    // 存放用于排序的元素
    private ArrayList<T> list;

    RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
        super(sink, comparator);
    }

    @Override
    public void begin(long size) {
        if (size >= Nodes.MAX_ARRAY_SIZE)
            throw new IllegalArgumentException(Nodes.BAD_SIZE);
        // 创建用于存放排序元素的列表
        list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
    }

    @Override
    public void end() {
        // 只有在接收到所有元素后才开始排序
        list.sort(comparator);
        downstream.begin(list.size());
        // 排序完成后,将数据传递给下游Sink
        if (!cancellationWasRequested) {
            // 下游Sink不包含短路操作,将数据依次传递给下游Sink
            list.forEach(downstream::accept);
        }
        else {
            // 下游Sink包含短路操作
            for (T t : list) {
                // 对于每一个元素,都要询问是否可以结束处理
                if (downstream.cancellationRequested()) break;
                // 将元素传递给下游Sink
                downstream.accept(t);
            }
        }
        // 告知下游Sink数据传递完毕
        downstream.end();
        list = null;
    }

    @Override
    public void accept(T t) {
        // 依次将需要排序的元素加入到临时列表中
        list.add(t);
    }
}

Stream.sorted会在接收到所有元素之后再进行排序,在此之后才开始将数据依次传递给下游Sink

【修炼内功】[Java8] Stream是怎么工作的

叠加后的操作如何执行

Sink就如齿轮,每一步的操作逻辑是封装在Sink中的,那各Sink是如何串联咬合在一起的,首个Sink又是如何启动来触发整个pipeline执行的?

结束操作(TerminalOp)之后不能再有别的操作,结束操作会创建一个包装了自己操作的Sink,这个Sink只处理数据而不会将数据传递到下游Sink

TerminalOp的类图非常简单

【修炼内功】[Java8] Stream是怎么工作的

FindOp: 用于查找,如 findFirstfindAny ,生成 FindSink
ReduceOp: 用于规约,如 reduce collect ,生成 ReduceSink
MatchOp: 用于匹配,如 allMatch anyMatch ,生成 MatchSink
ForEachOp: 用于遍历,如 forEach ,生成 ForEachSink

【修炼内功】[Java8] Stream是怎么工作的

在调用Stream的终止操作时,会执行 AbstractPipeline.evaluate

/**
 * Evaluate the pipeline with a terminal operation to produce a result.
 *
 * @param <R> the type of result
 * @param terminalOp the terminal operation to be applied to the pipeline.
 * @return the result
 */
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp /* 各种终止操作 */) {
    assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
           ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) /* 并发执行 */
           : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); /* 串行执行 */
}

最终会根据是否并行执行TerminalOp中不同的的evaluate方法,在TerminalOp的evaluate方法中会调用 helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).get() 来串联各层Sink,触发pipeline,并获取最终结果,那TerminalOp到底是如何串联各层Sink的?

final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink /* TerminalSink */, Spliterator<P_IN> spliterator) {
    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
    return sink;
}

其中玄机尽在 warpSink

final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
    Objects.requireNonNull(sink);

    // AbstractPipeline.this,最后一层Stage
    for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        // 从下游向上游遍历,不断包装Sink
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink /* 下一层Stage的Sink */);
    }
    return (Sink<P_IN>) sink;
}

还记得 opWrapSink 么?它会返回一个新的Sink,实现 begin end accept 等方法,当前Stage的处理逻辑封装在其中,并将处理后的结果传递给下游的Sink

这样,便将从开始到结束的所有操作都包装到了一个Sink里,执行这个Sink就相当于执行首个Sink,并带动所有下游的Sink,使整个pipeline运行起来

有了包含所有操作的Sink,如何执行Sink呢? wrapAndCopyInto 中还有一个 copyInto 方法

final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    Objects.requireNonNull(wrappedSink);

    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        // 不包含短路操作
        
        // 1. begin
        wrappedSink.begin(spliterator.getExactSizeIfKnown());
        // 2. 遍历调用 sink.accept
        spliterator.forEachRemaining(wrappedSink);
        // 3. end
        wrappedSink.end();
    }
    else {
        // 包含短路操作
        copyIntoWithCancel(wrappedSink, spliterator);
    }
}

final <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    @SuppressWarnings({"rawtypes","unchecked"})
    AbstractPipeline p = AbstractPipeline.this;
    while (p.depth > 0) {
        p = p.previousStage;
    }
    // 1. begin
    wrappedSink.begin(spliterator.getExactSizeIfKnown());
    // 2. 遍历调用 sink.accept
    //    每一次遍历都询问cancellationRequested结果
    //    如果cancellationRequested为true,则中断遍历
    p.forEachWithCancel(spliterator, wrappedSink);
    // 3. end
    wrappedSink.end();
}

copyInto 会根据不同的情况依次

  1. 调用 sink.bigin
  2. 遍历调用 sink.accept

    如果包含短路操作,则每次遍历都需要询问cancellationRequested,适时中断遍历

  3. 调用 sink.end

执行后的结果在哪里

各层Stage通过Sink协议将所有的操作串联到一起,遍历原始数据并执行,终止操作会创建一个包装了自己操作的TerminalSink,该Sink中处理最终的数据并做数据收集(如果需要),每一种TerminalSink中均会提供一个获取最终数据的方法

【修炼内功】[Java8] Stream是怎么工作的

TerminalOp通过调用TerminalSink中的对应方法,获取最终的数据并返回,如ReduceOp中

@Override
public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                   Spliterator<P_IN> spliterator) {
    return helper.wrapAndCopyInto(makeSink(), spliterator)/* 执行各Sink */.get()/* 获取最终数据 */;
}

并发是如何做到的

使用 Collection.parallelStreamStream.parallel 等方法可以将当前的流 标记 为并发,重新来看 AbstractPipeline.evaluate ,该方法会在终止操作时被执行

/**
 * Evaluate the pipeline with a terminal operation to produce a result.
 *
 * @param <R> the type of result
 * @param terminalOp the terminal operation to be applied to the pipeline.
 * @return the result
 */
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp /* 各种终止操作 */) {
    assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
           ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) /* 并发执行 */
           : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); /* 串行执行 */
}

如果被标记为 sequential ,则会调用 TerminalOp.evaluateSequential ,evaluateSequential的调用过程上文已经讲述的很清楚

如果被标记为 parallel ,则会调用 TerminalOp.evaluateParallel ,对于该方法不同的TerminalOp会有不同的实现,但都使用了ForkJoin框架,将原始数据不断拆分为更小的单元,对每一个单元做上述 evaluateSequential 类似的动作,最后将每一个单元计算的结果依次整合,得到最终结果

【修炼内功】[Java8] Stream是怎么工作的

默认情况下,ForkJoin的线程数即为机器的CPU核数,如果想自定义Stream并行执行的线程数,可以参考 Custom Thread Pools In Java 8 Parallel Streams

在将原始数据进行拆分的时候,拆分的策略是什么?拆分的粒度又是什么(拆分到什么程度)?

还记得上文所说,原始数据是如何存放的么? Spliterator (可分迭代器 splitable iterator),无论使用何种API,均会将原始数据封装为 Spliterator 后存放在Stage中,在进行parallel计算时,对原始数据的拆分以及拆分粒度都是基于 Spliterator 的,和Iterator一样,Spliterator也用于遍历数据源中的数据,但它是专门为并行执行而设计的

public interface Spliterator<T> {
    /**
     * 如果还有元素需要遍历,则遍历该元素并执行action,返回true,否则返回false
     */
    boolean tryAdvance(Consumer<? super T> action);
    
    /**
     * 如果可以,则将一部分元素划分出去,构造另一个Spliterator,使得两个Spliterator可以并行处理
     */
    Spliterator<T> trySplit();
    
    /**
     * 估算还有多少元素需要遍历
     */
    long estimateSize();
    
    /**
     * 遍历所有未遍历的元素
     */
    default void forEachRemaining(Consumer<? super T> action) {
        do { } while (tryAdvance(action));
    }
}

【修炼内功】[Java8] Stream是怎么工作的

动图如下

【修炼内功】[Java8] Stream是怎么工作的

在使用Stream parallel时,如果默认Spliterator的拆分逻辑不能满足你的需求,便可以自定义Spliterator,具体示例可以参考《Java 8 in Action》中『7.3.2 实现你自己的Spliterator』

结语

  1. Head 会生成一个不包含任何操作的Stage,并将原始数据 Spliterator 存放在 sourceStage
  2. 中间操作 StagelessOp StagefulOp 将当前操作封装在Sink中,生成一个新的Stage,并使用双链表结构将前后的Stage链接在一起,Sink用于调用当前指定的操作处理数据,并将处理后的结果传递给下游Sink
  3. 终止操作 TerminalOp 生成一个 TerminalSink ,从下游向上游遍历Stage,不断包装各Stage中的Sink,最终生成一个串联了所有操作的TerminalSink,适时调用该Sink的 begin accept end 等方法,触发整个pipeline的数据流动及处理,最终调用TerminalSink的 get 方法,获取最终结果(如果有)
  4. 被标记为parallel的流,会使用ForkJoin框架,将原始流拆分为更小的单元,对每一个单元分别作计算,并将各单元的计算结果进行整合,得到最终结果

JavaLambdaInternals - 6-Stream Pipelines)

java8实战:Stream执行原理

以上所述就是小编给大家介绍的《【修炼内功】[Java8] Stream是怎么工作的》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

暗趋势

暗趋势

王煜全 / 中信出版集团 / 2019-1 / 59元

《暗趋势》由得到“全球创新260讲”专栏主讲人王煜全,为你揭示藏在科技浪潮中的商业机会,教你获得把握趋势的能力,发现小趋势,抓住大机遇。 《暗趋势》聚焦于改变你生活和未来的产业,深度解读人工智能、混合现实、区块链、生物医疗等你必须关注的科技行业,并分析新科技给企业和个人带来的发展机遇,前瞻性提出企业和个人的思维与行动应对策略。 王煜全作为全球科技前哨侦察兵,以其每年5亿元的科技投资及2......一起来看看 《暗趋势》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具