聊聊flink DataStream的split操作

栏目: 编程工具 · 发布时间: 7年前

内容简介:本文主要研究一下flink DataStream的split操作flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.javaflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/collector/selector/OutputSelector.java

本文主要研究一下flink DataStream的split操作

实例

SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>();
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
});
  • 本实例将dataStream split为两个dataStream,一个outputName为even,另一个outputName为odd

DataStream.split

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

@Public
public class DataStream<T> {

    //......

    public SplitStream<T> split(OutputSelector<T> outputSelector) {
        return new SplitStream<>(this, clean(outputSelector));
    }

    //......
}
  • DataStream的split操作接收OutputSelector参数,然后创建并返回SplitStream

OutputSelector

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/collector/selector/OutputSelector.java

@PublicEvolving
public interface OutputSelector<OUT> extends Serializable {

    Iterable<String> select(OUT value);

}
  • OutputSelector定义了select方法用于给element打上outputNames

SplitStream

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/SplitStream.java

@PublicEvolving
public class SplitStream<OUT> extends DataStream<OUT> {

    protected SplitStream(DataStream<OUT> dataStream, OutputSelector<OUT> outputSelector) {
        super(dataStream.getExecutionEnvironment(), new SplitTransformation<OUT>(dataStream.getTransformation(), outputSelector));
    }

    public DataStream<OUT> select(String... outputNames) {
        return selectOutput(outputNames);
    }

    private DataStream<OUT> selectOutput(String[] outputNames) {
        for (String outName : outputNames) {
            if (outName == null) {
                throw new RuntimeException("Selected names must not be null");
            }
        }

        SelectTransformation<OUT> selectTransform = new SelectTransformation<OUT>(this.getTransformation(), Lists.newArrayList(outputNames));
        return new DataStream<OUT>(this.getExecutionEnvironment(), selectTransform);
    }

}
  • SplitStream继承了DataStream,它定义了select方法,可以用来根据outputNames选择split出来的dataStream;select方法创建了SelectTransformation

StreamGraphGenerator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java

@Internal
public class StreamGraphGenerator {

    //......

    private Collection<Integer> transform(StreamTransformation<?> transform) {

        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }

        LOG.debug("Transforming " + transform);

        if (transform.getMaxParallelism() <= 0) {

            // if the max parallelism hasn't been set, then first use the job wide max parallelism
            // from theExecutionConfig.
            int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();
            if (globalMaxParallelismFromConfig > 0) {
                transform.setMaxParallelism(globalMaxParallelismFromConfig);
            }
        }

        // call at least once to trigger exceptions about MissingTypeInfo
        transform.getOutputType();

        Collection<Integer> transformedIds;
        if (transform instanceof OneInputTransformation<?, ?>) {
            transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
        } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
            transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
        } else if (transform instanceof SourceTransformation<?>) {
            transformedIds = transformSource((SourceTransformation<?>) transform);
        } else if (transform instanceof SinkTransformation<?>) {
            transformedIds = transformSink((SinkTransformation<?>) transform);
        } else if (transform instanceof UnionTransformation<?>) {
            transformedIds = transformUnion((UnionTransformation<?>) transform);
        } else if (transform instanceof SplitTransformation<?>) {
            transformedIds = transformSplit((SplitTransformation<?>) transform);
        } else if (transform instanceof SelectTransformation<?>) {
            transformedIds = transformSelect((SelectTransformation<?>) transform);
        } else if (transform instanceof FeedbackTransformation<?>) {
            transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
        } else if (transform instanceof CoFeedbackTransformation<?>) {
            transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
        } else if (transform instanceof PartitionTransformation<?>) {
            transformedIds = transformPartition((PartitionTransformation<?>) transform);
        } else if (transform instanceof SideOutputTransformation<?>) {
            transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
        } else {
            throw new IllegalStateException("Unknown transformation: " + transform);
        }

        // need this check because the iterate transformation adds itself before
        // transforming the feedback edges
        if (!alreadyTransformed.containsKey(transform)) {
            alreadyTransformed.put(transform, transformedIds);
        }

        if (transform.getBufferTimeout() >= 0) {
            streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
        }
        if (transform.getUid() != null) {
            streamGraph.setTransformationUID(transform.getId(), transform.getUid());
        }
        if (transform.getUserProvidedNodeHash() != null) {
            streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
        }

        if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
            streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
        }

        return transformedIds;
    }

    private <T> Collection<Integer> transformSelect(SelectTransformation<T> select) {
        StreamTransformation<T> input = select.getInput();
        Collection<Integer> resultIds = transform(input);

        // the recursive transform might have already transformed this
        if (alreadyTransformed.containsKey(select)) {
            return alreadyTransformed.get(select);
        }

        List<Integer> virtualResultIds = new ArrayList<>();

        for (int inputId : resultIds) {
            int virtualId = StreamTransformation.getNewNodeId();
            streamGraph.addVirtualSelectNode(inputId, virtualId, select.getSelectedNames());
            virtualResultIds.add(virtualId);
        }
        return virtualResultIds;
    }

    private <T> Collection<Integer> transformSplit(SplitTransformation<T> split) {

        StreamTransformation<T> input = split.getInput();
        Collection<Integer> resultIds = transform(input);

        // the recursive transform call might have transformed this already
        if (alreadyTransformed.containsKey(split)) {
            return alreadyTransformed.get(split);
        }

        for (int inputId : resultIds) {
            streamGraph.addOutputSelector(inputId, split.getOutputSelector());
        }

        return resultIds;
    }

    //......
}
  • StreamGraphGenerator里头的transform会对SelectTransformation以及SplitTransformation进行相应的处理
  • transformSelect方法会根据select.getSelectedNames()来addVirtualSelectNode
  • transformSplit方法则根据split.getOutputSelector()来addOutputSelector

小结

  • DataStream的split操作接收OutputSelector参数,然后创建并返回SplitStream
  • OutputSelector定义了select方法用于给element打上outputNames
  • SplitStream继承了DataStream,它定义了select方法,可以用来根据outputNames选择split出来的dataStream

doc


以上所述就是小编给大家介绍的《聊聊flink DataStream的split操作》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Creative Curve

The Creative Curve

Allen Gannett / Knopf Doubleday Publishing Group / 2018-6-12

Big data entrepreneur Allen Gannett overturns the mythology around creative genius, and reveals the science and secrets behind achieving breakout commercial success in any field. We have been s......一起来看看 《The Creative Curve》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

在线进制转换器
在线进制转换器

各进制数互转换器