Stream—一个早产的婴儿

栏目: IT技术 · 发布时间: 4年前

内容简介:当你会关注这篇文章时,那么意味着你对在前面的我们可以先看一个

当你会关注这篇文章时,那么意味着你对 Stream 或多或少有些了解,甚至你在许多业务中有所应用。正如你所知,业界对 Streamlambda 褒贬不一,有人认为它是银弹,也有人认为其降低了代码的可读性。事实上,很多东西我们应该辩证的去看待,一方面 Stream 相关的api的确提供了诸多的便利,如果你愿意花时间去理解和使用的话;然而另一方面,它像一个早产的婴儿,当你去阅读它源码时,你会觉得差异,像是一个临时拼凑而成的模块。

在前面的 Java函数式编程的前生今世 篇章中,我们已经了解了 lambda 表达式的原理,以及常见的四大函数式接口。

我们可以先看一个 Stream 的demo:

Stream.of(1, 2, 3)
                .filter(num -> num > 2)
                .forEach(System.out::println);

语义比较清晰,从一个 array 中获取数值大于2的,最后给打印出来。

源头

在调用 StreamAPI 之前,我们都需要先创建一个 Steam 流, Stream 流的创建方式有很多种,比如上述 demo 中的 Stream.of ,其使用的是 StreamSupport 这个类提供的方法;还有在集合类中在 1.8 之后预留了 stream 的获取方法等。

//StreamSupport
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }
//Collection
    default Stream<E> stream() {
        return StreamSupport.stream(spliterator(), false);
    }

这里可以稍微留意一下,有一个 parallel 参数,为我们后文去执行作准备。

不知道看到这里你是否也会有同样的疑惑:为什么 Stream 明明是一个接口,要在里面做 static 的实现?

这与以往的 JDK 代码有较大的出入,一般静态功能都会提供一个 xxxs 来处理,比如 PathPathsFileFiles 等。而且更令人诧异的是,在 1.8 之后,这种静态方法在 ListCollection 中比比皆是。

坦率地讲,这并非一种好的设计,严格来讲,接口只是声明,不应该承载具体实现,虽然从语法而言提供了这种能力。而像也像是为过去设计的妥协。

我们回到 Stream ,前面两种方法都提到了,会返回一个 Stream 流。

default Stream<E> stream() {
   return StreamSupport.stream(spliterator(), false);
}

最开始当我看到 StreamSupport 这个类时,我第一感觉是类似于 LockSupport ,用于「辅助」,而非「创建」。然而事与愿违的是,它更多的做的是「创建」。其实熟悉 JDK 源码的人应该比较清楚,这种「创建」的事情,一般是在 xxs (比如 Paths )这种类中处理。

当然,这个仅是个人主观的臆断,也许他们内部并没有这种「约定俗成」的东西。

public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }

ReferencePipeline.Head 是所有流处理的源头, ReferencePipeline 继承自 AbstractPipelineSpliterator 用于对数据迭代并加工,其中有一个较为关键的方法 forEachRemaining ,我们后面也会提到。

//创建头节点
    AbstractPipeline(Spliterator<?> source,
                     int sourceFlags, boolean parallel) {
        this.previousStage = null;
        this.sourceSpliterator = source;
        this.sourceStage = this;
        this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
        // The following is an optimization of:
        // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
        this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
        this.depth = 0;
        this.parallel = parallel;
    }

头节点,包括后面流水线的节点都继承自这个 AbstractPipeline ,你会发现这里的结构是一个双向链表,通过 previousStagenextStage 来分别用于指向前一个和后一个节点。

Stream—一个早产的婴儿

流水线

Stream 体系中,操作被划分成了两种,一种流操作,他所做的事情是对数据的加工,而在流操作内部,又被划分成了两种,一种是有状态的流( StatefulOp ),一种是无状态的流( StatelessOp ),二者的区别在于, 数据是否会随着操作中的变化而变化 ,举个例子, filter 是无状态的,你要过滤什么就是什么,而 sort 是有状态的,如果你在数据层增加了数据或修改了数据,那么最后的结果有可能不同;另外一种是终止操作( TerminalOp ),他意味着开始对流进行执行操作,如果代码中仅有流操作,那么这个流是不会开始执行的。

A stateful lambda expression is one whose result depends on any state that might change during the execution of a pipeline. On the other hand, a stateless lambda expression is one whose result does not depend on any state that might change during the execution of a pipeline.

Stream 中,流操作有很多种,比如常见的 filtermapmapToInt 等,都会在方法中返回一个新建的流操作对象,而这个对象也继承了 AbstractPipeline

//filter操作
    @Override
    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        //这里的this就是前面提到的流的源头
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }

//StatelessOp类
    abstract static class StatelessOp<E_IN, E_OUT>
            extends ReferencePipeline<E_IN, E_OUT> {
        /**
         * Construct a new Stream by appending a stateless intermediate
         * operation to an existing stream.
         *
         * @param upstream The upstream pipeline stage
         * @param inputShape The stream shape for the upstream pipeline stage
         * @param opFlags Operation flags for the new stage
         */
        StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
                    StreamShape inputShape,
                    int opFlags) {
            super(upstream, opFlags);
            assert upstream.getOutputShape() == inputShape;
        }

        @Override
        final boolean opIsStateful() {
            return false;
        }
    }

//StatelessOp最终也继承自AbstractPipeline
    AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
        if (previousStage.linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        previousStage.linkedOrConsumed = true;
        previousStage.nextStage = this;

        this.previousStage = previousStage;
        this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
        this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
        this.sourceStage = previousStage.sourceStage;
        if (opIsStateful())
            sourceStage.sourceAnyStateful = true;
        this.depth = previousStage.depth + 1;
    }

StatelessOp 对象在创建时,会注入一个参数 this ,而这个 this 即我们前面提到的流的源头。在 AbstractPipeline 的另外一个构造方法中,完成了双向链表的指定以及深度的自增。

这里有一个方法 opIsStateful ,用于判定前面提到的是否是有状态的。

终止符

所有的流操作的执行,都取决于最终的终止操作( TerminalOp ),如果流中没有这个操作,那么前面提到的操作流都无法执行。

而所有的终止操作都实现了 TerminalOp 这个接口,包括向我们常见的 foreachreducefind 等。我们还是以前面例子中提到的 foreach 来演示我们的原理。

//Stream
void forEach(Consumer<? super T> action);

//ReferencePipeline中的forEach实现
    @Override
public void forEach(Consumer<? super P_OUT> action) {
   evaluate(ForEachOps.makeRef(action, false));
}

StreamforEach 方法中,有一个参数 Consumer ,是一个函数式接口,我们在前面的文章中有所涉及,有兴趣的可以自行查阅其原理。

//ForEachOps
static final class OfRef<T> extends ForEachOp<T> {
            final Consumer<? super T> consumer;

            OfRef(Consumer<? super T> consumer, boolean ordered) {
                super(ordered);
                this.consumer = consumer;
            }

            @Override
            public void accept(T t) {
                consumer.accept(t);
            }
        }

ForEachOps 有一个 ForEachOp 类用于生成操作类,同时, ForEachOp 还实现了 TerminalSink ,后面会提到。不过,还有另外一个 OfRef 来继承自 ForEachOp 作为调用入口去使用,不过至今我还没明白这里为何单独需要在 ForEachOp 下面再嵌套一层,有了解的可以告知我一下。

//AbstractPipeline
    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

//用于判定是并行还是串行
        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

    @Override
    public final boolean isParallel() {
        return sourceStage.parallel;
    }

这里会根据最开始的源头注入的 parallel 来判定,在前面也有所提及。这里有一个方法 sourceSpliterator 用于协助我们去获取数据源分割器,其实在前面有所提及,在创建流的时候,就已经有自动创建一个 spliterator ,如果是串行流,那么会直接使用源头流的分割器,如果是并行流,而且其中有有状态的操作,那么会使用这个状态流实现的方法去返回。

//AbstractPipeline
    @SuppressWarnings("unchecked")
    private Spliterator<?> sourceSpliterator(int terminalFlags) {
        // Get the source spliterator of the pipeline
        Spliterator<?> spliterator = null;
       //最开始的源头流的分割器
        if (sourceStage.sourceSpliterator != null) {
            spliterator = sourceStage.sourceSpliterator;
            sourceStage.sourceSpliterator = null;
        }
        else if (sourceStage.sourceSupplier != null) {
            spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
            sourceStage.sourceSupplier = null;
        }
        else {
            throw new IllegalStateException(MSG_CONSUMED);
        }

     //如果是并行流并且有有状态的操作流
        if (isParallel() && sourceStage.sourceAnyStateful) {
            // Adapt the source spliterator, evaluating each stateful op
            // in the pipeline up to and including this pipeline stage.
            // The depth and flags of each pipeline stage are adjusted accordingly.
            int depth = 1;
            for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
                 u != e;
                 u = p, p = p.nextStage) {

                int thisOpFlags = p.sourceOrOpFlags;
                if (p.opIsStateful()) {
                    depth = 0;

                    if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
                        // Clear the short circuit flag for next pipeline stage
                        // This stage encapsulates short-circuiting, the next
                        // stage may not have any short-circuit operations, and
                        // if so spliterator.forEachRemaining should be used
                        // for traversal
                        thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
                    }

                    spliterator = p.opEvaluateParallelLazy(u, spliterator);

                    // Inject or clear SIZED on the source pipeline stage
                    // based on the stage's spliterator
                    thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)
                            ? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED
                            : (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED;
                }
                p.depth = depth++;
                p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
            }
        }

        if (terminalFlags != 0)  {
            // Apply flags from the terminal operation to last pipeline stage
            combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
        }

        return spliterator;
    }

在我们拿到分割器之后,我们会调用 terminalOp.evaluateSequential 方法去处理。需要说明的是,并行流我暂时没有深入研究,所以暂时不在此章的讨论范畴,后续有机会我会补充上去。

//ForEachOps
        @Override
        public <S> Void evaluateSequential(PipelineHelper<T> helper,
                                           Spliterator<S> spliterator) {
//这里的helper也就是前面在AbstractPipeline中注入的this
            return helper.wrapAndCopyInto(this, spliterator).get();
        }

//AbstractPipeline 
    @Override
    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
    }

    @Override
    @SuppressWarnings("unchecked")
    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        Objects.requireNonNull(sink);
//遍历流链表,逐一执行前面的opWrapSink方法
        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        return (Sink<P_IN>) sink;
    }

在操作流中,一般会返回一个 StatelessOp 类,这里前面有所提及,中间有一个 opWrapSink 就是现在我们在调用的方法,而在这个方法中,又会继续返回一个类 Sink.ChainedReference ,这个类会在 downstream 记录我们传入的 sink ,也就是我们目前正在操作的 ForEachOp

//前面的filter
    @Override
    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                //继续返回一个类,记录terminalOp
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }

sink 也是一个简单的单项链表,他的顺序与 Stream 相反,通过 downStream 一层层向前指定。在获取到最前面一层包装好的 sink 之后,我们继续看 copyInto 方法。

//AbstractPipeline
    @Override
    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        //这里的wrappedSink是最前面的流操作,也就是我们生成流之后的第一个操作,在此案例中也就是filter
        Objects.requireNonNull(wrappedSink);

        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            //调用分割器的遍历方法
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();
        }
        else {
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }

//Spliterators
        public void forEachRemaining(Consumer<? super T> action) {
            Object[] a; int i, hi; // hoist accesses and checks from loop
            if (action == null)
                throw new NullPointerException();
            if ((a = array).length >= (hi = fence) &&
                (i = index) >= 0 && i < (index = hi)) {
               //将数据源遍历,执行sink中的accept方法
                do { action.accept((T)a[i]); } while (++i < hi);
            }
        }

//filter accept方法被遍历执行
                    @Override
                    public void accept(P_OUT u) {
//这里的predicate也就是我们最开始通过lambda表达式创建的action
                        if (predicate.test(u))
//如果检测通过,那么执行downstream也就是ForEach.OfRef类的accept方法
                            downstream.accept(u);
                    }

//OfRef accept被调用
            @Override
            public void accept(T t) {
//这里的consumer也就是我们stream.foreach调用时注入的System.out::println
                consumer.accept(t);
            }

Spliterators 通过遍历所有数据源,执行 filteraccept 方法,如果校验通过,那么会执行 downstreamaccept 方法,而这个 downstream 我们已经提及很多次,也就是我们这个例子中的 foreachforeachaccept 被调用时,此时又有一个 consumer ,这里的 consumer 也就是我们最开始例子中的 System.out::println 。至此,整体流程就执行完毕了。

回到我们的标题,为什么说 stream 是一个“早产的婴儿”呢?在对 stream 整体源码有所大体阅读之后,你会发现很多类的命名、类的设计风格、以及结构的整理设计能力与之前的模块有较大的差异,有些命名明明可以更为规范,有些设计明明可以设计的更为优雅,甚至于,许多地方的设计还不够简练,这里就不一一举例了。当然,这一切都只是我个人的想法,也有可能是我的水平还没到达另外一个层次吧,或许几年之后再拜读时又会有不一样的感悟。

本文由nine 创作,采用 知识共享署名4.0 国际许可协议进行许可

本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名

最后编辑时间为: Apr 23, 2020 at 11:00 pm


以上所述就是小编给大家介绍的《Stream—一个早产的婴儿》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Pro HTML5 and CSS3 Design Patterns

Pro HTML5 and CSS3 Design Patterns

Michael Bowers / Apress / 2011-11-15 / GBP 35.50

Pro HTML5 and CSS3 Design Patterns is a reference book and a cookbook on how to style web pages using CSS3 and HTML5. It contains 350 ready--to--use patterns (CSS3 and HTML5 code snippets) that you ca......一起来看看 《Pro HTML5 and CSS3 Design Patterns》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具