内容简介:本文主要研究一下flink DataStream的iterate操作flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.javaflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/IterativeStream.java
序
本文主要研究一下flink DataStream的iterate操作
实例
IterativeStream<Long> iteration = initialStream.iterate(); DataStream<Long> iterationBody = iteration.map (/*do something*/); DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Long value) throws Exception { return value > 0; } }); iteration.closeWith(feedback); DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Long value) throws Exception { return value <= 0; } });
- 本实例展示了IterativeStream的一些基本用法,使用iterate创建IterativeStream,使用IterativeStream的closeWith方法来关闭feedbackStream
DataStream.iterate
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java
@Public public class DataStream<T> { //...... @PublicEvolving public IterativeStream<T> iterate() { return new IterativeStream<>(this, 0); } @PublicEvolving public IterativeStream<T> iterate(long maxWaitTimeMillis) { return new IterativeStream<>(this, maxWaitTimeMillis); } //...... }
- DataStream提供了两个iterate方法,它们创建并返回IterativeStream,无参的iterate方法其maxWaitTimeMillis为0
IterativeStream
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/IterativeStream.java
@PublicEvolving public class IterativeStream<T> extends SingleOutputStreamOperator<T> { // We store these so that we can create a co-iteration if we need to private DataStream<T> originalInput; private long maxWaitTime; protected IterativeStream(DataStream<T> dataStream, long maxWaitTime) { super(dataStream.getExecutionEnvironment(), new FeedbackTransformation<>(dataStream.getTransformation(), maxWaitTime)); this.originalInput = dataStream; this.maxWaitTime = maxWaitTime; setBufferTimeout(dataStream.environment.getBufferTimeout()); } @SuppressWarnings({ "unchecked", "rawtypes" }) public DataStream<T> closeWith(DataStream<T> feedbackStream) { Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors(); if (!predecessors.contains(this.transformation)) { throw new UnsupportedOperationException( "Cannot close an iteration with a feedback DataStream that does not originate from said iteration."); } ((FeedbackTransformation) getTransformation()).addFeedbackEdge(feedbackStream.getTransformation()); return feedbackStream; } public <F> ConnectedIterativeStreams<T, F> withFeedbackType(Class<F> feedbackTypeClass) { return withFeedbackType(TypeInformation.of(feedbackTypeClass)); } public <F> ConnectedIterativeStreams<T, F> withFeedbackType(TypeHint<F> feedbackTypeHint) { return withFeedbackType(TypeInformation.of(feedbackTypeHint)); } public <F> ConnectedIterativeStreams<T, F> withFeedbackType(TypeInformation<F> feedbackType) { return new ConnectedIterativeStreams<>(originalInput, feedbackType, maxWaitTime); } @Public public static class ConnectedIterativeStreams<I, F> extends ConnectedStreams<I, F> { private CoFeedbackTransformation<F> coFeedbackTransformation; public ConnectedIterativeStreams(DataStream<I> input, TypeInformation<F> feedbackType, long waitTime) { super(input.getExecutionEnvironment(), input, new DataStream<>(input.getExecutionEnvironment(), new CoFeedbackTransformation<>(input.getParallelism(), feedbackType, waitTime))); this.coFeedbackTransformation = (CoFeedbackTransformation<F>) getSecondInput().getTransformation(); } public DataStream<F> closeWith(DataStream<F> feedbackStream) { Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors(); if (!predecessors.contains(this.coFeedbackTransformation)) { throw new UnsupportedOperationException( "Cannot close an iteration with a feedback DataStream that does not originate from said iteration."); } coFeedbackTransformation.addFeedbackEdge(feedbackStream.getTransformation()); return feedbackStream; } private UnsupportedOperationException groupingException = new UnsupportedOperationException("Cannot change the input partitioning of an" + "iteration head directly. Apply the partitioning on the input and" + "feedback streams instead."); @Override public ConnectedStreams<I, F> keyBy(int[] keyPositions1, int[] keyPositions2) { throw groupingException; } @Override public ConnectedStreams<I, F> keyBy(String field1, String field2) { throw groupingException; } @Override public ConnectedStreams<I, F> keyBy(String[] fields1, String[] fields2) { throw groupingException; } @Override public ConnectedStreams<I, F> keyBy(KeySelector<I, ?> keySelector1, KeySelector<F, ?> keySelector2) { throw groupingException; } @Override public <KEY> ConnectedStreams<I, F> keyBy(KeySelector<I, KEY> keySelector1, KeySelector<F, KEY> keySelector2, TypeInformation<KEY> keyType) { throw groupingException; } } }
可以理解为回流,或者类似递归的操作,filter控制的是递归的条件,通过filter的elements会重新进入IterativeStream的头部继续参与后面的运算操作
FeedbackTransformation
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java
@Internal public class FeedbackTransformation<T> extends StreamTransformation<T> { private final StreamTransformation<T> input; private final List<StreamTransformation<T>> feedbackEdges; private final Long waitTime; public FeedbackTransformation(StreamTransformation<T> input, Long waitTime) { super("Feedback", input.getOutputType(), input.getParallelism()); this.input = input; this.waitTime = waitTime; this.feedbackEdges = Lists.newArrayList(); } public StreamTransformation<T> getInput() { return input; } public void addFeedbackEdge(StreamTransformation<T> transform) { if (transform.getParallelism() != this.getParallelism()) { throw new UnsupportedOperationException( "Parallelism of the feedback stream must match the parallelism of the original" + " stream. Parallelism of original stream: " + this.getParallelism() + "; parallelism of feedback stream: " + transform.getParallelism() + ". Parallelism can be modified using DataStream#setParallelism() method"); } feedbackEdges.add(transform); } public List<StreamTransformation<T>> getFeedbackEdges() { return feedbackEdges; } public Long getWaitTime() { return waitTime; } @Override public final void setChainingStrategy(ChainingStrategy strategy) { throw new UnsupportedOperationException("Cannot set chaining strategy on Split Transformation."); } @Override public Collection<StreamTransformation<?>> getTransitivePredecessors() { List<StreamTransformation<?>> result = Lists.newArrayList(); result.add(this); result.addAll(input.getTransitivePredecessors()); return result; } }
- FeedbackTransformation继承了StreamTransformation,它有feedbackEdges、waitTime等属性
- addFeedbackEdge方法用于添加一个a feedback edge,IterativeStream的closeWith方法会调用addFeedbackEdge来添加一个StreamTransformation
- waitTime指定的是feedback operator等待feedback elements的时间,一旦过了waitTime则operation会关闭,不再接受新的feedback elements
小结
- DataStream提供了两个iterate方法,它们创建并返回IterativeStream,无参的iterate方法其maxWaitTimeMillis为0
- IterativeStream的构造器接收两个参数,一个是originalInput,一个是maxWaitTime;它根据dataStream.getTransformation()及maxWaitTime创建FeedbackTransformation;构造器同时会根据dataStream.environment.getBufferTimeout()参数来设置transformation的bufferTimeout;FeedbackTransformation继承了StreamTransformation,它有feedbackEdges、waitTime等属性,waitTime指定的是feedback operator等待feedback elements的时间,一旦过了waitTime则operation会关闭,不再接受新的feedback elements
- IterativeStream继承了SingleOutputStreamOperator,它主要提供了两个方法,一个是closeWith方法,用于close iteration,它主要用于定义要被feedback到iteration头部的这部分iteration;withFeedbackType方法创建了ConnectedIterativeStreams,ConnectedIterativeStreams继承了ConnectedStreams,它允许要被feedback的iteration的类型与originalInput的类型不一样,它也定义了closeWith方法,但是它覆盖了ConnectedStreams的keyBy方法,抛出UnsupportedOperationException异常
doc
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 聊聊jdbc的batch操作
- 聊聊flink DataStream的join操作
- 聊聊flink DataStream的split操作
- 聊聊flink Table的where及filter操作
- 聊聊从逻辑门到操作系统的计算机
- 聊聊动态规划(2) -- 特征
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Cascading Style Sheets 2.0 Programmer's Reference
Eric A. Meyer / McGraw-Hill Osborne Media / 2001-03-20 / USD 19.99
The most authoritative quick reference available for CSS programmers. This handy resource gives you programming essentials at your fingertips, including all the new tags and features in CSS 2.0. You'l......一起来看看 《Cascading Style Sheets 2.0 Programmer's Reference》 这本书的介绍吧!