微信公众号: 深广大数据Club
关注可了解更多大数据相关的资讯。问题或建议,请公众号留言;
如果你觉得深广大数据Club对你有帮助,欢迎赞赏
本文主要讲述Apache Flink在On Yarn模式下提交任务的执行流程源码分析。
关于本地模式以及集群模式,请阅读以下两篇文章:
Flink源码解析 | 从Example出发:读懂本地任务执行流程
Flink源码解析 | 从Example出发:读懂集群任务执行流程
环境部署脚本入口
在yarn集群上启动一个长时间运行的flink集群,通过脚本yarn-session.sh来启动。
./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m
我们从yarn-session.sh脚本入手,先来看下脚本的内容。
bin=`dirname "$0"` bin=`cd "$bin"; pwd` # get Flink config . "$bin"/config.sh if [ "$FLINK_IDENT_STRING" = "" ]; then FLINK_IDENT_STRING="$USER" fi JVM_ARGS="$JVM_ARGS -Xmx512m" CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS` log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-yarn-session-$HOSTNAME.log log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-yarn-session.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-yarn.xml" export FLINK_CONF_DIR $JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -j "$FLINK_LIB_DIR"/flink-dist*.jar "$@"
脚本流程
-
获取配置
-
设置jvm参数
-
设置log配置
-
调用FlinkYarnSessionCli执行
脚本使用指南
Usage: Required -n,--container <arg> Number of YARN container to allocate (=Number of Task Managers) Optional -D <arg> Dynamic properties -d,--detached Start detached -jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB) -nm,--name Set a custom name for the application on YARN -q,--query Display available YARN resources (memory, cores) -qu,--queue <arg> Specify YARN queue. -s,--slots <arg> Number of slots per TaskManager -tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB) -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for HA mode
FlinkYarnSessionCli.java
入口类:org.apache.flink.yarn.cli.FlinkYarnSessionCli.java
main
方法中调用 run
方法执行on yarn的部署。
# main ... final FlinkYarnSessionCli cli = new FlinkYarnSessionCli( flinkConfiguration, configurationDirectory, "", ""); // no prefix for the YARN session SecurityUtils.install(new SecurityConfiguration(flinkConfiguration)); retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.run(args)); ...
run
方法中,第一步是获取参数进行参数解析
final CommandLine cmd = parseCommandLineOptions(args, true);
命令行解析完毕后有限匹配是否为help模式。如果是直接打印帮助信息。
if (cmd.hasOption(help.getOpt())) { printUsage(); return 0; }
判断是否包含 -q
的操作,则调用yarnClusterDescriptor.getClusterDescription()打印yarn的资源信息后退出
if (cmd.hasOption(query.getOpt())) { final String description = yarnClusterDescriptor.getClusterDescription(); System.out.println(description); return 0; }
如果不是 -q
或者 -h
这类操作,则进入主流程。
主流程开始先判断是否包含applicationId操作。
该操作命令用法如下:
Usage: Required -id,--applicationId <yarnAppId> YARN application Id # 附加到正在运行的Flink YARN会话application_1463870264508_0029 # 示例:./bin/yarn-session.sh -id application_1463870264508_0029
传入applicationId,通过yarnClusterDescriptor进行retrieve(检索)获取clusterClient对象。
if (cmd.hasOption(applicationId.getOpt())) { yarnApplicationId = ConverterUtils.toApplicationId(cmd.getOptionValue(applicationId.getOpt())); clusterClient = yarnClusterDescriptor.retrieve(yarnApplicationId); }
若不包含applicationId,则调用deploySessionCluster进行部署。
final ClusterSpecification clusterSpecification = getClusterSpecification(cmd); clusterClient = yarnClusterDescriptor.deploySessionCluster(clusterSpecification); //------------------ ClusterClient deployed, handle connection details yarnApplicationId = clusterClient.getClusterId(); try { final LeaderConnectionInfo connectionInfo = clusterClient.getClusterConnectionInfo(); System.out.println("Flink JobManager is now running on " + connectionInfo.getHostname() +':' + connectionInfo.getPort() + " with leader id " + connectionInfo.getLeaderSessionID() + '.'); System.out.println("JobManager Web Interface: " + clusterClient.getWebInterfaceURL()); writeYarnPropertiesFile( yarnApplicationId, clusterSpecification.getNumberTaskManagers() * clusterSpecification.getSlotsPerTaskManager(), yarnClusterDescriptor.getDynamicPropertiesEncoded()); } catch (Exception e) { try { clusterClient.shutdown(); } catch (Exception ex) { LOG.info("Could not properly shutdown cluster client.", ex); } try { yarnClusterDescriptor.killCluster(yarnApplicationId); } catch (FlinkException fe) { LOG.info("Could not properly terminate the Flink cluster.", fe); } throw new FlinkException("Could not write the Yarn connection information.", e); }
包含如下步骤
-
deploySessionCluster部署clusterClient
-
获取ApplicationId
-
通过clusterClient 获取LeaderConnectionInfo
-
写入yarn属性信息
on yarn部署还涉及到客户端是否分离的问题,yarn-sesion.sh脚本中指定 -d
或 --detached
,可以启动分离的YarnSession,而不需要客户端一直运行。
if (yarnClusterDescriptor.isDetachedMode()) { LOG.info("The Flink YARN client has been started in detached mode. In order to stop " + "Flink on YARN, use the following command or a YARN web interface to stop it:\n" + "yarn application -kill " + yarnApplicationId); }
注:在这种情况下,Flink YARN客户端将仅向群集提交Flink,然后自行关闭。请注意,在这种情况下,无法使用Flink停止YARN会话。
使用YARN实用程序(yarn application -kill
如不指定分离方式,客户端需要持续运行,可以通过 ctrl+c
或者 stop
来停止。
以下是非分离方式的代码内容:
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); final YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor( yarnClusterDescriptor.getYarnClient(), yarnApplicationId, new ScheduledExecutorServiceAdapter(scheduledExecutorService)); try { runInteractiveCli( clusterClient, yarnApplicationStatusMonitor, acceptInteractiveInput); } finally { try { yarnApplicationStatusMonitor.close(); } catch (Exception e) { LOG.info("Could not properly close the Yarn application status monitor.", e); } clusterClient.shutDownCluster(); try { clusterClient.shutdown(); } catch (Exception e) { LOG.info("Could not properly shutdown cluster client.", e); } // shut down the scheduled executor service ExecutorUtils.gracefulShutdown( 1000L, TimeUnit.MILLISECONDS, scheduledExecutorService); deleteYarnPropertiesFile(); ApplicationReport applicationReport; try { applicationReport = yarnClusterDescriptor .getYarnClient() .getApplicationReport(yarnApplicationId); } catch (YarnException | IOException e) { LOG.info("Could not log the final application report.", e); applicationReport = null; } if (applicationReport != null) { logFinalApplicationReport(applicationReport); } }
-
创建scheduledExecutorService
-
创建状态监听器yarnApplicationStatusMonitor
-
运行交互式客户端runInteractiveCli
-
一系列关闭操作。
-
生成Application报表
runInteractiveCli方法
while (continueRepl) { final ApplicationStatus applicationStatus = yarnApplicationStatusMonitor.getApplicationStatusNow(); switch (applicationStatus) { case FAILED: case CANCELED: System.err.println("The Flink Yarn cluster has failed."); continueRepl = false; break; case UNKNOWN: if (!isLastStatusUnknown) { unknownStatusSince = System.nanoTime(); isLastStatusUnknown = true; } if ((System.nanoTime() - unknownStatusSince) > 5L * CLIENT_POLLING_INTERVAL_MS * 1_000_000L) { System.err.println("The Flink Yarn cluster is in an unknown state. Please check the Yarn cluster."); continueRepl = false; } else { continueRepl = repStep(in, readConsoleInput); } break; case SUCCEEDED: if (isLastStatusUnknown) { isLastStatusUnknown = false; } // ------------------ check if there are updates by the cluster ----------- try { final GetClusterStatusResponse status = clusterClient.getClusterStatus(); if (status != null && numTaskmanagers != status.numRegisteredTaskManagers()) { System.err.println("Number of connected TaskManagers changed to " + status.numRegisteredTaskManagers() + ". " + "Slots available: " + status.totalNumberOfSlots()); numTaskmanagers = status.numRegisteredTaskManagers(); } } catch (Exception e) { LOG.warn("Could not retrieve the current cluster status. Skipping current retrieval attempt ...", e); } printClusterMessages(clusterClient); continueRepl = repStep(in, readConsoleInput); } }
方法内部主要是一个while循环体,判断条件是continueRepl,循环体内部逻辑
-
通过yarnApplicationStatusMonitor.getApplicationStatusNow()获取ApplicationStatus状态信息
-
switch判断ApplicationStatus是FAILED、CANCELED、UNKNOWN还是SUCCEEDED
-
FAILED、CANCELED continueRepl为false,跳出循环
-
UNKNOWN 会对比当前的时间与最后unknown的时间,如果大于5L*CLIENT_POLLING_INTERVAL_MS * 1_000_000L,则continueRepl为false,跳出循环;否则继续循环
-
SUCCEEDED
-
获取集群状态信息
-
验证集群所注册的TaskManager数量与所指定的数量是否相符,不相符则打印err日志
-
打印集群消息
-
调用repStep方法获取continueRepl值
repStep方法主要是用于交互式方式接受用户输入:quit,stop,help。
private static boolean repStep( BufferedReader in, boolean readConsoleInput) throws IOException, InterruptedException { // wait until CLIENT_POLLING_INTERVAL is over or the user entered something. long startTime = System.currentTimeMillis(); while ((System.currentTimeMillis() - startTime) < CLIENT_POLLING_INTERVAL_MS && (!readConsoleInput || !in.ready())) { Thread.sleep(200L); } //------------- handle interactive command by user. ---------------------- if (readConsoleInput && in.ready()) { String command = in.readLine(); switch (command) { case "quit": case "stop": return false; case "help": System.err.println(YARN_SESSION_HELP); break; default: System.err.println("Unknown command '" + command + "'. Showing help:"); System.err.println(YARN_SESSION_HELP); break; } } return true; }
用户通过客户端输入 quit
或 stop
,返回 false
;其他则返回 true
run()方法最后执行yarnClusterDescriptor.close
yarnClusterDescriptor.close();
外层的流程讲完了,我们来看下部署YarnSession集群的过程。
YarnSession部署
部署通过AbstractYarnClusterDescriptor.deploySessionCluster方法,调用deployInternal()执行部署。
@Override public ClusterClient<ApplicationId> deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException { try { return deployInternal( clusterSpecification, "Flink session cluster", getYarnSessionClusterEntrypoint(), null, false); } catch (Exception e) { throw new ClusterDeploymentException("Couldn't deploy Yarn session cluster", e); } }
deployInternal()方法将阻塞,直到将ApplicationMaster/JobManager部署到纱线上为止。
由于代码块较长,这里我们做代码拆分展示分析。
1、配置验证
validateClusterSpecification(clusterSpecification); if (UserGroupInformation.isSecurityEnabled()) { // note: UGI::hasKerberosCredentials inaccurately reports false // for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786), // so we check only in ticket cache scenario. boolean useTicketCache = flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE); UserGroupInformation loginUser = UserGroupInformation.getCurrentUser(); if (loginUser.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS && useTicketCache && !loginUser.hasKerberosCredentials()) { LOG.error("Hadoop security with Kerberos is enabled but the login user does not have Kerberos credentials"); throw new RuntimeException("Hadoop security with Kerberos is enabled but the login user " + "does not have Kerberos credentials"); } } isReadyForDeployment(clusterSpecification);
validateClusterSpecification方法主要是读取taskManagerMemorySize以及计算cutoff
private void validateClusterSpecification(ClusterSpecification clusterSpecification) throws FlinkException { try { final long taskManagerMemorySize = clusterSpecification.getTaskManagerMemoryMB(); // We do the validation by calling the calculation methods here // Internally these methods will check whether the cluster can be started with the provided // ClusterSpecification and the configured memory requirements final long cutoff = ContaineredTaskManagerParameters.calculateCutoffMB(flinkConfiguration, taskManagerMemorySize); TaskManagerServices.calculateHeapSizeMB(taskManagerMemorySize - cutoff, flinkConfiguration); } catch (IllegalArgumentException iae) { throw new FlinkException("Cannot fulfill the minimum memory requirements with the provided " + "cluster specification. Please increase the memory of the cluster.", iae); } }
之后则是验证是否启动Security,进行安全验证,最后判断是否可执行部署。
2、检查指定的yarn queue
// ------------------ Check if the specified queue exists -------------------- checkYarnQueues(yarnClient);
3、读取并设置动态属性
// ------------------ Add dynamic properties to local flinkConfiguraton ------ Map<String, String> dynProperties = getDynamicProperties(dynamicPropertiesEncoded); for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) { flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue()); }
4、检查yarn集群是否能满足资源请求.
-
创建YarnClientApplication 以及 GetNewApplicationResponse
// Create application via yarnClient final YarnClientApplication yarnApplication = yarnClient.createApplication(); final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
-
验证集群资源
Resource maxRes = appResponse.getMaximumResourceCapability(); final ClusterResourceDescription freeClusterMem; try { freeClusterMem = getCurrentFreeClusterResources(yarnClient); } catch (YarnException | IOException e) { failSessionDuringDeployment(yarnClient, yarnApplication); throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e); } final int yarnMinAllocationMB = yarnConfiguration.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0); final ClusterSpecification validClusterSpecification; try { validClusterSpecification = validateClusterResources( clusterSpecification, yarnMinAllocationMB, maxRes, freeClusterMem); } catch (YarnDeploymentException yde) { failSessionDuringDeployment(yarnClient, yarnApplication); throw yde; } LOG.info("Cluster specification: {}", validClusterSpecification);
-
获取ExecutionMode,执行startAppMaster并返回YarnClusterClient对象
final ClusterEntrypoint.ExecutionMode executionMode = detached ? ClusterEntrypoint.ExecutionMode.DETACHED : ClusterEntrypoint.ExecutionMode.NORMAL; flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString()); ApplicationReport report = startAppMaster( flinkConfiguration, applicationName, yarnClusterEntrypoint, jobGraph, yarnClient, yarnApplication, validClusterSpecification); String host = report.getHost(); int port = report.getRpcPort(); // Correctly initialize the Flink config flinkConfiguration.setString(JobManagerOptions.ADDRESS, host); flinkConfiguration.setInteger(JobManagerOptions.PORT, port); flinkConfiguration.setString(RestOptions.ADDRESS, host); flinkConfiguration.setInteger(RestOptions.PORT, port); // the Flink cluster is deployed in YARN. Represent cluster return createYarnClusterClient( this, validClusterSpecification.getNumberTaskManagers(), validClusterSpecification.getSlotsPerTaskManager(), report, flinkConfiguration, true);
由于篇幅问题,至于startAppMaster这里就不再深入分析,有兴趣的朋友可以自行阅读。或者在后续文章中再做详细讲解。
任务执行入口
程序执行入口同样是$FLINK_HOME/bin/flink run,通过CliFrontend.run调用runProgram()来运行项目。
1、实例化ClusterDescriptor
ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine);
此处的customCommandLine对象一个FlinkYarnSessionCli实例。
2、获取clusterId并结合DetachedMode判断处理逻辑
if (clusterId == null && runOptions.getDetachedMode()) { ... final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism); final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine); client = clusterDescriptor.deployJobCluster( clusterSpecification, jobGraph, runOptions.getDetachedMode()); ... }else{ if (clusterId != null) { client = clusterDescriptor.retrieve(clusterId); shutdownHook = null; } else { // also in job mode we have to deploy a session cluster because the job // might consist of multiple parts (e.g. when using collect) final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine); client = clusterDescriptor.deploySessionCluster(clusterSpecification); // if not running in detached mode, add a shutdown hook to shut down cluster if client exits // there's a race-condition here if cli is killed before shutdown hook is installed if (!runOptions.getDetachedMode() && runOptions.isShutdownOnAttachedExit()) { shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG); } else { shutdownHook = null; } } ... }
第一个if中主要是用于DetachedMode(客户端分离模式)
-
创建JobGraph
-
获取ClusterSpecification
-
部署任务集群deployJobCluster
如果条件不成立则走else。
第二个if中如果clusterId不为空,则通过clusterDescriptor.retrieve获取client对象。
否则通过clusterDescriptor.deploySessionCluster部署,获取client对象
3、通过client对象执行任务
executeProgram(program, client, userParallelism);
4、执行获取JobSubmissionResult结果
protected void executeProgram(PackagedProgram program, ClusterClient<?> client, int parallelism) throws ProgramMissingJobException, ProgramInvocationException { logAndSysout("Starting execution of program"); final JobSubmissionResult result = client.run(program, parallelism); .... }
5、client提交任务
经过多层run方法调用最终执行YarnClusterClient.submitJob()方法。
public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { if (isDetached()) { if (newlyCreatedCluster) { stopAfterJob(jobGraph.getJobID()); } return super.runDetached(jobGraph, classLoader); } else { return super.run(jobGraph, classLoader); } }
之后的方法调用与前一篇文章《Flink源码解析 | 从Example出发:读懂集群任务执行流程》一致。需要进一步去阅读了解。
之后的文章准备讲下采用start-scala-shell.sh脚本执行的流程,各类Graph的生成以及actor系统。之后会将以上这些文章整理出一篇完整的Apache Flink的任务执行流程总结文章出来,尽请期待。
关注公众号
以上所述就是小编给大家介绍的《Flink 源码解析之从 Example 出发:读懂 Flink On Yarn 任务执行流程》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- ReactNative源码解析-初识源码
- Spring源码系列:BeanDefinition源码解析
- Spring源码分析:AOP源码解析(下篇)
- Spring源码分析:AOP源码解析(上篇)
- 注册中心 Eureka 源码解析 —— EndPoint 与 解析器
- 新一代Json解析库Moshi源码解析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。