聊聊flink DataStream的iterate操作

栏目: 编程工具 · 发布时间: 5年前

内容简介:本文主要研究一下flink DataStream的iterate操作flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.javaflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/IterativeStream.java

本文主要研究一下flink DataStream的iterate操作

实例

IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value > 0;
    }
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value <= 0;
    }
});
  • 本实例展示了IterativeStream的一些基本用法,使用iterate创建IterativeStream,使用IterativeStream的closeWith方法来关闭feedbackStream

DataStream.iterate

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

@Public
public class DataStream<T> {
    //......

    @PublicEvolving
    public IterativeStream<T> iterate() {
        return new IterativeStream<>(this, 0);
    }

    @PublicEvolving
    public IterativeStream<T> iterate(long maxWaitTimeMillis) {
        return new IterativeStream<>(this, maxWaitTimeMillis);
    }

    //......
}
  • DataStream提供了两个iterate方法,它们创建并返回IterativeStream,无参的iterate方法其maxWaitTimeMillis为0

IterativeStream

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/IterativeStream.java

@PublicEvolving
public class IterativeStream<T> extends SingleOutputStreamOperator<T> {

    // We store these so that we can create a co-iteration if we need to
    private DataStream<T> originalInput;
    private long maxWaitTime;

    protected IterativeStream(DataStream<T> dataStream, long maxWaitTime) {
        super(dataStream.getExecutionEnvironment(),
                new FeedbackTransformation<>(dataStream.getTransformation(), maxWaitTime));
        this.originalInput = dataStream;
        this.maxWaitTime = maxWaitTime;
        setBufferTimeout(dataStream.environment.getBufferTimeout());
    }

    @SuppressWarnings({ "unchecked", "rawtypes" })
    public DataStream<T> closeWith(DataStream<T> feedbackStream) {

        Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();

        if (!predecessors.contains(this.transformation)) {
            throw new UnsupportedOperationException(
                    "Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
        }

        ((FeedbackTransformation) getTransformation()).addFeedbackEdge(feedbackStream.getTransformation());

        return feedbackStream;
    }

    public <F> ConnectedIterativeStreams<T, F> withFeedbackType(Class<F> feedbackTypeClass) {
        return withFeedbackType(TypeInformation.of(feedbackTypeClass));
    }

    public <F> ConnectedIterativeStreams<T, F> withFeedbackType(TypeHint<F> feedbackTypeHint) {
        return withFeedbackType(TypeInformation.of(feedbackTypeHint));
    }

    public <F> ConnectedIterativeStreams<T, F> withFeedbackType(TypeInformation<F> feedbackType) {
        return new ConnectedIterativeStreams<>(originalInput, feedbackType, maxWaitTime);
    }

    @Public
    public static class ConnectedIterativeStreams<I, F> extends ConnectedStreams<I, F> {

        private CoFeedbackTransformation<F> coFeedbackTransformation;

        public ConnectedIterativeStreams(DataStream<I> input,
                TypeInformation<F> feedbackType,
                long waitTime) {
            super(input.getExecutionEnvironment(),
                    input,
                    new DataStream<>(input.getExecutionEnvironment(),
                            new CoFeedbackTransformation<>(input.getParallelism(),
                                    feedbackType,
                                    waitTime)));
            this.coFeedbackTransformation = (CoFeedbackTransformation<F>) getSecondInput().getTransformation();
        }

        public DataStream<F> closeWith(DataStream<F> feedbackStream) {

            Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();

            if (!predecessors.contains(this.coFeedbackTransformation)) {
                throw new UnsupportedOperationException(
                        "Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
            }

            coFeedbackTransformation.addFeedbackEdge(feedbackStream.getTransformation());

            return feedbackStream;
        }

        private UnsupportedOperationException groupingException =
                new UnsupportedOperationException("Cannot change the input partitioning of an" +
                        "iteration head directly. Apply the partitioning on the input and" +
                        "feedback streams instead.");

        @Override
        public ConnectedStreams<I, F> keyBy(int[] keyPositions1, int[] keyPositions2) {
            throw groupingException;
        }

        @Override
        public ConnectedStreams<I, F> keyBy(String field1, String field2) {
            throw groupingException;
        }

        @Override
        public ConnectedStreams<I, F> keyBy(String[] fields1, String[] fields2) {
            throw groupingException;
        }

        @Override
        public ConnectedStreams<I, F> keyBy(KeySelector<I, ?> keySelector1, KeySelector<F, ?> keySelector2) {
            throw groupingException;
        }

        @Override
        public <KEY> ConnectedStreams<I, F> keyBy(KeySelector<I, KEY> keySelector1, KeySelector<F, KEY> keySelector2, TypeInformation<KEY> keyType) {
            throw groupingException;
        }
    }
}
可以理解为回流,或者类似递归的操作,filter控制的是递归的条件,通过filter的elements会重新进入IterativeStream的头部继续参与后面的运算操作

FeedbackTransformation

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java

@Internal
public class FeedbackTransformation<T> extends StreamTransformation<T> {

    private final StreamTransformation<T> input;

    private final List<StreamTransformation<T>> feedbackEdges;

    private final Long waitTime;

    public FeedbackTransformation(StreamTransformation<T> input, Long waitTime) {
        super("Feedback", input.getOutputType(), input.getParallelism());
        this.input = input;
        this.waitTime = waitTime;
        this.feedbackEdges = Lists.newArrayList();
    }

    public StreamTransformation<T> getInput() {
        return input;
    }

    public void addFeedbackEdge(StreamTransformation<T> transform) {

        if (transform.getParallelism() != this.getParallelism()) {
            throw new UnsupportedOperationException(
                    "Parallelism of the feedback stream must match the parallelism of the original" +
                            " stream. Parallelism of original stream: " + this.getParallelism() +
                            "; parallelism of feedback stream: " + transform.getParallelism() +
                            ". Parallelism can be modified using DataStream#setParallelism() method");
        }

        feedbackEdges.add(transform);
    }

    public List<StreamTransformation<T>> getFeedbackEdges() {
        return feedbackEdges;
    }

    public Long getWaitTime() {
        return waitTime;
    }

    @Override
    public final void setChainingStrategy(ChainingStrategy strategy) {
        throw new UnsupportedOperationException("Cannot set chaining strategy on Split Transformation.");
    }

    @Override
    public Collection<StreamTransformation<?>> getTransitivePredecessors() {
        List<StreamTransformation<?>> result = Lists.newArrayList();
        result.add(this);
        result.addAll(input.getTransitivePredecessors());
        return result;
    }
}
  • FeedbackTransformation继承了StreamTransformation,它有feedbackEdges、waitTime等属性
  • addFeedbackEdge方法用于添加一个a feedback edge,IterativeStream的closeWith方法会调用addFeedbackEdge来添加一个StreamTransformation
  • waitTime指定的是feedback operator等待feedback elements的时间,一旦过了waitTime则operation会关闭,不再接受新的feedback elements

小结

  • DataStream提供了两个iterate方法,它们创建并返回IterativeStream,无参的iterate方法其maxWaitTimeMillis为0
  • IterativeStream的构造器接收两个参数,一个是originalInput,一个是maxWaitTime;它根据dataStream.getTransformation()及maxWaitTime创建FeedbackTransformation;构造器同时会根据dataStream.environment.getBufferTimeout()参数来设置transformation的bufferTimeout;FeedbackTransformation继承了StreamTransformation,它有feedbackEdges、waitTime等属性,waitTime指定的是feedback operator等待feedback elements的时间,一旦过了waitTime则operation会关闭,不再接受新的feedback elements
  • IterativeStream继承了SingleOutputStreamOperator,它主要提供了两个方法,一个是closeWith方法,用于close iteration,它主要用于定义要被feedback到iteration头部的这部分iteration;withFeedbackType方法创建了ConnectedIterativeStreams,ConnectedIterativeStreams继承了ConnectedStreams,它允许要被feedback的iteration的类型与originalInput的类型不一样,它也定义了closeWith方法,但是它覆盖了ConnectedStreams的keyBy方法,抛出UnsupportedOperationException异常

doc


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Cascading Style Sheets 2.0 Programmer's Reference

Cascading Style Sheets 2.0 Programmer's Reference

Eric A. Meyer / McGraw-Hill Osborne Media / 2001-03-20 / USD 19.99

The most authoritative quick reference available for CSS programmers. This handy resource gives you programming essentials at your fingertips, including all the new tags and features in CSS 2.0. You'l......一起来看看 《Cascading Style Sheets 2.0 Programmer's Reference》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

html转js在线工具
html转js在线工具

html转js在线工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试