Flink: 两个递归彻底搞懂operator chain

栏目: IT技术 · 发布时间: 4年前

内容简介:operator chain是指将满足一定条件的operator 链在一起,放在同一个task里面执行,是Flink任务优化的一种方式,在同一个task里面的operator的数据传输变成函数调用关系,这种方式减少数据传输过程。常见的chain例如:source->map->filter,这样的任务链可以chain在一起,那么其内部是如何决定是否能够chain在一起与chain一起之后如何执行就是本篇文章将要剖析的重点。

operator chain是指将满足一定条件的operator 链在一起,放在同一个task里面执行,是Flink任务优化的一种方式,在同一个task里面的operator的数据传输变成函数调用关系,这种方式减少数据传输过程。常见的chain例如:source->map->filter,这样的任务链可以chain在一起,那么其内部是如何决定是否能够chain在一起与chain一起之后如何执行就是本篇文章将要剖析的重点。

第一个递归:JobGraph生成

Flink中划分了四种图: StreamGraph、JobGraph、ExecutionGraph、物理执行图,前两种StreamGraph、JobGraph是在客户端生成,ExecutionGraph在jobMaster中生成,最后一种物理执行图是一种虚拟的图,不存在的数据结构,运行在每一个TaskExecutor中。 我们在Flink Web UI中看到的就是JobGraph,如下:

Flink: 两个递归彻底搞懂operator chain

JobGraph相对于StreamGraph,可以理解为优化过后的StreamGraph,将能够chain一起的operator chain在一起,上图将source与filter两个operator chain在一起了,这个步骤在生成JobGraph过程中完成。 其具体实现在StreamingJobGraphGenerator中:

private JobGraph createJobGraph() {

.....

Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();

setChaining(hashes, legacyHashes, chainedOperatorHashes);

setPhysicalEdges();

setSlotSharingAndCoLocation();

configureCheckpointing();

....

}

重点就在setChaining方法中,在里面调用createChain方法,构造JobVertix的同时完成operator chain的操作,createChain方法:

    

private List<StreamEdge> createChain(

Integer startNodeId,

Integer currentNodeId,

Map<Integer, byte[]> hashes,

List<Map<Integer, byte[]>> legacyHashes,

int chainIndex,

Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {


if (!builtVertices.contains(startNodeId)) {

//chain 的出边

List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();

//能够chain在一起的边

List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();

//不能够chain一起的边

List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();


for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {

//isChainable判断是否能够chain在一起

if (isChainable(outEdge, streamGraph)) {

chainableOutputs.add(outEdge);

} else {

nonChainableOutputs.add(outEdge);

}

}


for (StreamEdge chainable : chainableOutputs) {

//能够chain在一起那么遍历下一个节点

transitiveOutEdges.addAll(

createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));

}


for (StreamEdge nonChainable : nonChainableOutputs) {

transitiveOutEdges.add(nonChainable);

//以不能chain在一起的节点为起始点重新开始往下遍历

createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);

}


List<Tuple2<byte[], byte[]>> operatorHashes =

chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());


byte[] primaryHashBytes = hashes.get(currentNodeId);


for (Map<Integer, byte[]> legacyHash : legacyHashes) {

operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));

}


chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));

chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));

chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));


//如果currentNodeId=startNodeId 那么就说明是一个chain的起点,则需要创建jobVertix

//不是则表示是chain的一部分,只需要创建StreamConfig即可

StreamConfig config = currentNodeId.equals(startNodeId)

? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)

: new StreamConfig(new Configuration());


setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);


if (currentNodeId.equals(startNodeId)) {

config.setChainStart(); //起始chain

config.setChainIndex(0);

config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());

config.setOutEdgesInOrder(transitiveOutEdges);

config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());

//连接边

for (StreamEdge edge : transitiveOutEdges) {

connect(startNodeId, edge);

}


config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));


} else {

//currentNodeId属于chain的一部分

Map<Integer, StreamConfig> chainedConfs = chainedConfigs.get(startNodeId);


if (chainedConfs == null) {

chainedConfigs.put(startNodeId, new HashMap<Integer, StreamConfig>());

}

config.setChainIndex(chainIndex);

StreamNode node = streamGraph.getStreamNode(currentNodeId);

config.setOperatorName(node.getOperatorName());

chainedConfigs.get(startNodeId).put(currentNodeId, config);

}


config.setOperatorID(new OperatorID(primaryHashBytes));


if (chainableOutputs.isEmpty()) {

config.setChainEnd();

}

//返回chain的出边

return transitiveOutEdges;


} else {

return new ArrayList<>();

}

}

整个过程就是一个递归的过程,createChain 过程就是不断寻找一个chain的出边,如果邻接的两个operator(source与filter)能够chain在一起,那么就以下一个能够chain一起的operator(filter)为起点,继续寻找,直到找到不能够chain一起的operator(process1),但是此时并没有立刻返回,而是以当前不能chain再一起的operator(process1)为起点继续往下寻找,直到终点(sink)开始一层一层返回,实际上其构造过程是一个反向过程: sink->process2->process1->(source&filter) 这样的一个过程完成operator chain并且构造JobVertix (可通过debug方式查看其详细过程)。

如何判断两个相邻的operator(StreamNode)能够chain在一起? 通过isChainable方法判断:

public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {

StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);

StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);


StreamOperator<?> headOperator = upStreamVertex.getOperator();

StreamOperator<?> outOperator = downStreamVertex.getOperator();


return downStreamVertex.getInEdges().size() == 1 //下游的数据流入只有一个节点

&& outOperator != null

&& headOperator != null

&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex) //在同一个slotGroup中

&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS //开启operator chain策略

&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||

headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS) //head 表示的是一个起点

&& (edge.getPartitioner() instanceof ForwardPartitioner) //直接转发方式

&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism() //并行度相同

&& streamGraph.isChainingEnabled(); //默认允许chain

}

第二个递归:OperatorChain生成

当Execution在deploy的过程中,也就是Task在TaskExecutor启动过程中, 会生成一个OperatorChain对象,在该OperatorChain对象中包含了所有的能够chain在一起的operator(source&filter),其内部会生成一个名为chainEntryPoint的WatermarkGaugeExposingOutput对象,一个将数据输出的对象,其输出有两种形式:

1.函数调用,将数据推送给chain在一起的下一个operator节点(filter中)                2.输出到下一个没有被chain的operator(process1)

那么chainEntryPoint是如何生成的?

OperatorChain的初始化是在StreamTask中被调用的:

operatorChain = new OperatorChain<>(this, recordWriters);

headOperator = operatorChain.getHeadOperator();

在其构造函数中调用:

this.chainEntryPoint = createOutputCollector(

containingTask, //当前的streamTask

configuration, //chain的第一个节点的StreamConfig

chainedConfigs, //该chain的所有StreamConfig

userCodeClassloader,

streamOutputMap,

allOps);

createOutputCollector获取当前节点的out,

private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(

StreamTask<?, ?> containingTask,

StreamConfig operatorConfig,

Map<Integer, StreamConfig> chainedConfigs,

ClassLoader userCodeClassloader,

Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,

List<StreamOperator<?>> allOperators) {

List<Tuple2<WatermarkGaugeExposingOutput<StreamRecord<T>>, StreamEdge>> allOutputs = new ArrayList<>(4);


// 当前operator的网络方式输出(filter->process1)

for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {

@SuppressWarnings("unchecked")

RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);


allOutputs.add(new Tuple2<>(output, outputEdge));

}


// chain out (source->filter)

for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {

int outputId = outputEdge.getTargetId();

StreamConfig chainedOpConfig = chainedConfigs.get(outputId);

//寻找被chain在一起的下一个operator(filter)的out

WatermarkGaugeExposingOutput<StreamRecord<T>> output = createChainedOperator(

containingTask,

chainedOpConfig,

chainedConfigs,

userCodeClassloader,

streamOutputs,

allOperators,

outputEdge.getOutputTag());

allOutputs.add(new Tuple2<>(output, outputEdge));

}


//最后将当前节点的out返回

.....


}

}

createChainedOperator方法:

private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createChainedOperator(

StreamTask<?, ?> containingTask,

StreamConfig operatorConfig,

Map<Integer, StreamConfig> chainedConfigs,

ClassLoader userCodeClassloader,

Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,

List<StreamOperator<?>> allOperators,

OutputTag<IN> outputTag) {

// 调用createOutputCollector 获取当前operator的out

WatermarkGaugeExposingOutput<StreamRecord<OUT>> chainedOperatorOutput = createOutputCollector(

containingTask,

operatorConfig,

chainedConfigs,

userCodeClassloader,

streamOutputs,

allOperators);


// 获取当前的StreamOperator

OneInputStreamOperator<IN, OUT> chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader);

//作为当前的out 传入setup方法中

chainedOperator.setup(containingTask, operatorConfig, chainedOperatorOutput);


allOperators.add(chainedOperator);


//将被chain的operator(filter)传给上一个operator(source)的out

//那么在out中就可以直接调用filter的处理source的输出数据了

WatermarkGaugeExposingOutput<StreamRecord<IN>> currentOperatorOutput;

if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {

currentOperatorOutput = new ChainingOutput<>(chainedOperator, this, outputTag);

}

else {

TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);

currentOperatorOutput = new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this);

}

....

return currentOperatorOutput;

}

可以看出整个构造chainEntryPoint的过程也是一个递归的过程,会不断寻找被chain在一起的下一个operator的out,直到下游没有可chain的位置,返回网络out作为最后一个operator的out,然后通过使用当前operator构造前一个operator的out,同样是一个反向构造out的过程(filterOut->sourceOut)。 简化一下逻辑代码:

createOut(currentOperator){


if(currentOperator.isNetworkOut){

currOut=networkOut();

}else{

currOut=creatOut(nextOperstor);

}

currentOperator.setOut(currOut);

preOut=CopyingOut(currentOperator);

return preOut;

}

最终得到的chainEntryPoint就是headOperator的out,这样在其内部不断的通过out调用operator的方式实现了chain的函数调用链关系。

总结

透过以上分析,operator chain就是将满足一定条件的的operator通过函数调用方式传递数据,避免了数据传输的中间过程。

Flink: 两个递归彻底搞懂operator chain


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

剑指Offer

剑指Offer

何海涛 / 电子工业出版社 / 2012-1 / 45.00元

《剑指Offer:名企面试官精讲典型编程题》剖析了50个典型的程序员面试题,从基础知识、代码质量、解题思路、优化效率和综合能力五个方面系统整理了影响面试的5个要点。全书分为7章,主要包括面试的流程,讨论面试流程中每一环节需要注意的问题;面试需要的基础知识,从编程语言、数据结构及算法三方面总结了程序员面试的知识点;高质量的代码,讨论影响代码质量的3个要素(规范性、完整性和鲁棒性),强调高质量的代码除......一起来看看 《剑指Offer》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

在线进制转换器
在线进制转换器

各进制数互转换器