内容简介:本文主要研究一下flink DataStream的split操作flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.javaflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
序
本文主要研究一下flink DataStream的split操作
实例
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() { @Override public Iterable<String> select(Integer value) { List<String> output = new ArrayList<String>(); if (value % 2 == 0) { output.add("even"); } else { output.add("odd"); } return output; } });
- 本实例将dataStream split为两个dataStream,一个outputName为even,另一个outputName为odd
DataStream.split
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java
@Public public class DataStream<T> { //...... public SplitStream<T> split(OutputSelector<T> outputSelector) { return new SplitStream<>(this, clean(outputSelector)); } //...... }
- DataStream的split操作接收OutputSelector参数,然后创建并返回SplitStream
OutputSelector
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
@PublicEvolving public interface OutputSelector<OUT> extends Serializable { Iterable<String> select(OUT value); }
- OutputSelector定义了select方法用于给element打上outputNames
SplitStream
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/SplitStream.java
@PublicEvolving public class SplitStream<OUT> extends DataStream<OUT> { protected SplitStream(DataStream<OUT> dataStream, OutputSelector<OUT> outputSelector) { super(dataStream.getExecutionEnvironment(), new SplitTransformation<OUT>(dataStream.getTransformation(), outputSelector)); } public DataStream<OUT> select(String... outputNames) { return selectOutput(outputNames); } private DataStream<OUT> selectOutput(String[] outputNames) { for (String outName : outputNames) { if (outName == null) { throw new RuntimeException("Selected names must not be null"); } } SelectTransformation<OUT> selectTransform = new SelectTransformation<OUT>(this.getTransformation(), Lists.newArrayList(outputNames)); return new DataStream<OUT>(this.getExecutionEnvironment(), selectTransform); } }
- SplitStream继承了DataStream,它定义了select方法,可以用来根据outputNames选择split出来的dataStream;select方法创建了SelectTransformation
StreamGraphGenerator
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@Internal public class StreamGraphGenerator { //...... private Collection<Integer> transform(StreamTransformation<?> transform) { if (alreadyTransformed.containsKey(transform)) { return alreadyTransformed.get(transform); } LOG.debug("Transforming " + transform); if (transform.getMaxParallelism() <= 0) { // if the max parallelism hasn't been set, then first use the job wide max parallelism // from theExecutionConfig. int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism(); if (globalMaxParallelismFromConfig > 0) { transform.setMaxParallelism(globalMaxParallelismFromConfig); } } // call at least once to trigger exceptions about MissingTypeInfo transform.getOutputType(); Collection<Integer> transformedIds; if (transform instanceof OneInputTransformation<?, ?>) { transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform); } else if (transform instanceof TwoInputTransformation<?, ?, ?>) { transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform); } else if (transform instanceof SourceTransformation<?>) { transformedIds = transformSource((SourceTransformation<?>) transform); } else if (transform instanceof SinkTransformation<?>) { transformedIds = transformSink((SinkTransformation<?>) transform); } else if (transform instanceof UnionTransformation<?>) { transformedIds = transformUnion((UnionTransformation<?>) transform); } else if (transform instanceof SplitTransformation<?>) { transformedIds = transformSplit((SplitTransformation<?>) transform); } else if (transform instanceof SelectTransformation<?>) { transformedIds = transformSelect((SelectTransformation<?>) transform); } else if (transform instanceof FeedbackTransformation<?>) { transformedIds = transformFeedback((FeedbackTransformation<?>) transform); } else if (transform instanceof CoFeedbackTransformation<?>) { transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform); } else if (transform instanceof PartitionTransformation<?>) { transformedIds = transformPartition((PartitionTransformation<?>) transform); } else if (transform instanceof SideOutputTransformation<?>) { transformedIds = transformSideOutput((SideOutputTransformation<?>) transform); } else { throw new IllegalStateException("Unknown transformation: " + transform); } // need this check because the iterate transformation adds itself before // transforming the feedback edges if (!alreadyTransformed.containsKey(transform)) { alreadyTransformed.put(transform, transformedIds); } if (transform.getBufferTimeout() >= 0) { streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout()); } if (transform.getUid() != null) { streamGraph.setTransformationUID(transform.getId(), transform.getUid()); } if (transform.getUserProvidedNodeHash() != null) { streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash()); } if (transform.getMinResources() != null && transform.getPreferredResources() != null) { streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources()); } return transformedIds; } private <T> Collection<Integer> transformSelect(SelectTransformation<T> select) { StreamTransformation<T> input = select.getInput(); Collection<Integer> resultIds = transform(input); // the recursive transform might have already transformed this if (alreadyTransformed.containsKey(select)) { return alreadyTransformed.get(select); } List<Integer> virtualResultIds = new ArrayList<>(); for (int inputId : resultIds) { int virtualId = StreamTransformation.getNewNodeId(); streamGraph.addVirtualSelectNode(inputId, virtualId, select.getSelectedNames()); virtualResultIds.add(virtualId); } return virtualResultIds; } private <T> Collection<Integer> transformSplit(SplitTransformation<T> split) { StreamTransformation<T> input = split.getInput(); Collection<Integer> resultIds = transform(input); // the recursive transform call might have transformed this already if (alreadyTransformed.containsKey(split)) { return alreadyTransformed.get(split); } for (int inputId : resultIds) { streamGraph.addOutputSelector(inputId, split.getOutputSelector()); } return resultIds; } //...... }
- StreamGraphGenerator里头的transform会对SelectTransformation以及SplitTransformation进行相应的处理
- transformSelect方法会根据select.getSelectedNames()来addVirtualSelectNode
- transformSplit方法则根据split.getOutputSelector()来addOutputSelector
小结
- DataStream的split操作接收OutputSelector参数,然后创建并返回SplitStream
- OutputSelector定义了select方法用于给element打上outputNames
- SplitStream继承了DataStream,它定义了select方法,可以用来根据outputNames选择split出来的dataStream
doc
以上所述就是小编给大家介绍的《聊聊flink DataStream的split操作》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 聊聊jdbc的batch操作
- 聊聊flink DataStream的join操作
- 聊聊flink DataStream的iterate操作
- 聊聊flink Table的where及filter操作
- 聊聊从逻辑门到操作系统的计算机
- 聊聊动态规划(2) -- 特征
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
JavaScript设计模式
Ross Harmes、Dustin Diaz / 谢廷晟 / 人民邮电出版社 / 2008 / 45.00元
本书共有两部分。第一部分给出了实现具体设计模式所需要的面向对象特性的基础知识,主要包括接口、封装和信息隐藏、继承、单体模式等内容。第二部分则专注于各种具体的设计模式及其在JavaScript语言中的应用,主要介绍了工厂模式、桥接模式、组合模式、门面模式等几种常见的模式。为了让每一章中的示例都尽可能地贴近实际应用,书中同时列举了一些JavaScript 程序员最常见的任务,然后运用设计模式使其解决方......一起来看看 《JavaScript设计模式》 这本书的介绍吧!