内容简介:本文主要研究一下flink DataStream的iterate操作flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.javaflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/IterativeStream.java
序
本文主要研究一下flink DataStream的iterate操作
实例
IterativeStream<Long> iteration = initialStream.iterate(); DataStream<Long> iterationBody = iteration.map (/*do something*/); DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Long value) throws Exception { return value > 0; } }); iteration.closeWith(feedback); DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Long value) throws Exception { return value <= 0; } });
- 本实例展示了IterativeStream的一些基本用法,使用iterate创建IterativeStream,使用IterativeStream的closeWith方法来关闭feedbackStream
DataStream.iterate
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java
@Public public class DataStream<T> { //...... @PublicEvolving public IterativeStream<T> iterate() { return new IterativeStream<>(this, 0); } @PublicEvolving public IterativeStream<T> iterate(long maxWaitTimeMillis) { return new IterativeStream<>(this, maxWaitTimeMillis); } //...... }
- DataStream提供了两个iterate方法,它们创建并返回IterativeStream,无参的iterate方法其maxWaitTimeMillis为0
IterativeStream
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/IterativeStream.java
@PublicEvolving public class IterativeStream<T> extends SingleOutputStreamOperator<T> { // We store these so that we can create a co-iteration if we need to private DataStream<T> originalInput; private long maxWaitTime; protected IterativeStream(DataStream<T> dataStream, long maxWaitTime) { super(dataStream.getExecutionEnvironment(), new FeedbackTransformation<>(dataStream.getTransformation(), maxWaitTime)); this.originalInput = dataStream; this.maxWaitTime = maxWaitTime; setBufferTimeout(dataStream.environment.getBufferTimeout()); } @SuppressWarnings({ "unchecked", "rawtypes" }) public DataStream<T> closeWith(DataStream<T> feedbackStream) { Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors(); if (!predecessors.contains(this.transformation)) { throw new UnsupportedOperationException( "Cannot close an iteration with a feedback DataStream that does not originate from said iteration."); } ((FeedbackTransformation) getTransformation()).addFeedbackEdge(feedbackStream.getTransformation()); return feedbackStream; } public <F> ConnectedIterativeStreams<T, F> withFeedbackType(Class<F> feedbackTypeClass) { return withFeedbackType(TypeInformation.of(feedbackTypeClass)); } public <F> ConnectedIterativeStreams<T, F> withFeedbackType(TypeHint<F> feedbackTypeHint) { return withFeedbackType(TypeInformation.of(feedbackTypeHint)); } public <F> ConnectedIterativeStreams<T, F> withFeedbackType(TypeInformation<F> feedbackType) { return new ConnectedIterativeStreams<>(originalInput, feedbackType, maxWaitTime); } @Public public static class ConnectedIterativeStreams<I, F> extends ConnectedStreams<I, F> { private CoFeedbackTransformation<F> coFeedbackTransformation; public ConnectedIterativeStreams(DataStream<I> input, TypeInformation<F> feedbackType, long waitTime) { super(input.getExecutionEnvironment(), input, new DataStream<>(input.getExecutionEnvironment(), new CoFeedbackTransformation<>(input.getParallelism(), feedbackType, waitTime))); this.coFeedbackTransformation = (CoFeedbackTransformation<F>) getSecondInput().getTransformation(); } public DataStream<F> closeWith(DataStream<F> feedbackStream) { Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors(); if (!predecessors.contains(this.coFeedbackTransformation)) { throw new UnsupportedOperationException( "Cannot close an iteration with a feedback DataStream that does not originate from said iteration."); } coFeedbackTransformation.addFeedbackEdge(feedbackStream.getTransformation()); return feedbackStream; } private UnsupportedOperationException groupingException = new UnsupportedOperationException("Cannot change the input partitioning of an" + "iteration head directly. Apply the partitioning on the input and" + "feedback streams instead."); @Override public ConnectedStreams<I, F> keyBy(int[] keyPositions1, int[] keyPositions2) { throw groupingException; } @Override public ConnectedStreams<I, F> keyBy(String field1, String field2) { throw groupingException; } @Override public ConnectedStreams<I, F> keyBy(String[] fields1, String[] fields2) { throw groupingException; } @Override public ConnectedStreams<I, F> keyBy(KeySelector<I, ?> keySelector1, KeySelector<F, ?> keySelector2) { throw groupingException; } @Override public <KEY> ConnectedStreams<I, F> keyBy(KeySelector<I, KEY> keySelector1, KeySelector<F, KEY> keySelector2, TypeInformation<KEY> keyType) { throw groupingException; } } }
可以理解为回流,或者类似递归的操作,filter控制的是递归的条件,通过filter的elements会重新进入IterativeStream的头部继续参与后面的运算操作
FeedbackTransformation
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java
@Internal public class FeedbackTransformation<T> extends StreamTransformation<T> { private final StreamTransformation<T> input; private final List<StreamTransformation<T>> feedbackEdges; private final Long waitTime; public FeedbackTransformation(StreamTransformation<T> input, Long waitTime) { super("Feedback", input.getOutputType(), input.getParallelism()); this.input = input; this.waitTime = waitTime; this.feedbackEdges = Lists.newArrayList(); } public StreamTransformation<T> getInput() { return input; } public void addFeedbackEdge(StreamTransformation<T> transform) { if (transform.getParallelism() != this.getParallelism()) { throw new UnsupportedOperationException( "Parallelism of the feedback stream must match the parallelism of the original" + " stream. Parallelism of original stream: " + this.getParallelism() + "; parallelism of feedback stream: " + transform.getParallelism() + ". Parallelism can be modified using DataStream#setParallelism() method"); } feedbackEdges.add(transform); } public List<StreamTransformation<T>> getFeedbackEdges() { return feedbackEdges; } public Long getWaitTime() { return waitTime; } @Override public final void setChainingStrategy(ChainingStrategy strategy) { throw new UnsupportedOperationException("Cannot set chaining strategy on Split Transformation."); } @Override public Collection<StreamTransformation<?>> getTransitivePredecessors() { List<StreamTransformation<?>> result = Lists.newArrayList(); result.add(this); result.addAll(input.getTransitivePredecessors()); return result; } }
- FeedbackTransformation继承了StreamTransformation,它有feedbackEdges、waitTime等属性
- addFeedbackEdge方法用于添加一个a feedback edge,IterativeStream的closeWith方法会调用addFeedbackEdge来添加一个StreamTransformation
- waitTime指定的是feedback operator等待feedback elements的时间,一旦过了waitTime则operation会关闭,不再接受新的feedback elements
小结
- DataStream提供了两个iterate方法,它们创建并返回IterativeStream,无参的iterate方法其maxWaitTimeMillis为0
- IterativeStream的构造器接收两个参数,一个是originalInput,一个是maxWaitTime;它根据dataStream.getTransformation()及maxWaitTime创建FeedbackTransformation;构造器同时会根据dataStream.environment.getBufferTimeout()参数来设置transformation的bufferTimeout;FeedbackTransformation继承了StreamTransformation,它有feedbackEdges、waitTime等属性,waitTime指定的是feedback operator等待feedback elements的时间,一旦过了waitTime则operation会关闭,不再接受新的feedback elements
- IterativeStream继承了SingleOutputStreamOperator,它主要提供了两个方法,一个是closeWith方法,用于close iteration,它主要用于定义要被feedback到iteration头部的这部分iteration;withFeedbackType方法创建了ConnectedIterativeStreams,ConnectedIterativeStreams继承了ConnectedStreams,它允许要被feedback的iteration的类型与originalInput的类型不一样,它也定义了closeWith方法,但是它覆盖了ConnectedStreams的keyBy方法,抛出UnsupportedOperationException异常
doc
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 聊聊jdbc的batch操作
- 聊聊flink DataStream的join操作
- 聊聊flink DataStream的split操作
- 聊聊flink Table的where及filter操作
- 聊聊从逻辑门到操作系统的计算机
- 聊聊动态规划(2) -- 特征
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
区块链革命
[加]唐塔普斯科特(Don Tapscott)、[加]亚力克斯·塔普斯科特(Alex Tapscott) / 中信出版集团股份有限公司 / 2016-9 / 69
(1)国际大腕“数字经济之父”继畅销书《维基经济学》之后再出力作! (2)一本真正全景式描述区块链理论及应用的巨著! (3)苹果共同创始人史蒂夫·沃兹尼亚克、世界经济论坛创始人和论坛主席克劳斯·施瓦布、网景及硅谷安德森·霍洛维茨风险投资公司创始人马克·安德森、麦肯锡董事长兼全球总裁鲍达民、 百事公司首席执行官卢英德、丹·舒尔曼 Paypal公司首席执行官等全球政治界、学术界和商界精英联......一起来看看 《区块链革命》 这本书的介绍吧!