Flink 源码解析之从 Example 出发:读懂 Flink On Yarn 任务执行流程

栏目: 编程工具 · 发布时间: 5年前

Flink 源码解析之从 Example 出发:读懂 Flink On Yarn 任务执行流程

微信公众号: 深广大数据Club

关注可了解更多大数据相关的资讯。问题或建议,请公众号留言;

如果你觉得深广大数据Club对你有帮助,欢迎赞赏

本文主要讲述Apache Flink在On Yarn模式下提交任务的执行流程源码分析。

关于本地模式以及集群模式,请阅读以下两篇文章:

Flink源码解析 | 从Example出发:读懂本地任务执行流程

Flink源码解析 | 从Example出发:读懂集群任务执行流程

环境部署脚本入口

Flink 源码解析之从 Example 出发:读懂 Flink On Yarn 任务执行流程

在yarn集群上启动一个长时间运行的flink集群,通过脚本yarn-session.sh来启动。

./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m

我们从yarn-session.sh脚本入手,先来看下脚本的内容。

bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

# get Flink config
. "$bin"/config.sh

if [ "$FLINK_IDENT_STRING" = "" ]; then
        FLINK_IDENT_STRING="$USER"
fi

JVM_ARGS="$JVM_ARGS -Xmx512m"

CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`

log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-yarn-session-$HOSTNAME.log
log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-yarn-session.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-yarn.xml"

export FLINK_CONF_DIR

$JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -j "$FLINK_LIB_DIR"/flink-dist*.jar "$@"

脚本流程

  • 获取配置

  • 设置jvm参数

  • 设置log配置

  • 调用FlinkYarnSessionCli执行

脚本使用指南

Usage:
   Required
     -n,--container <arg>   Number of YARN container to allocate (=Number of Task Managers)
   Optional
     -D <arg>                        Dynamic properties
     -d,--detached                   Start detached
     -jm,--jobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)
     -nm,--name                      Set a custom name for the application on YARN
     -q,--query                      Display available YARN resources (memory, cores)
     -qu,--queue <arg>               Specify YARN queue.
     -s,--slots <arg>                Number of slots per TaskManager
     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for HA mode

FlinkYarnSessionCli.java

入口类:org.apache.flink.yarn.cli.FlinkYarnSessionCli.java

main 方法中调用 run 方法执行on yarn的部署。

# main
...
final FlinkYarnSessionCli cli = new FlinkYarnSessionCli(
    flinkConfiguration,
    configurationDirectory,
    "",
    ""); // no prefix for the YARN session

    SecurityUtils.install(new SecurityConfiguration(flinkConfiguration));

    retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.run(args));
...

run 方法中,第一步是获取参数进行参数解析

final CommandLine cmd = parseCommandLineOptions(args, true);

命令行解析完毕后有限匹配是否为help模式。如果是直接打印帮助信息。

if (cmd.hasOption(help.getOpt())) {
    printUsage();
    return 0;
}

判断是否包含 -q 的操作,则调用yarnClusterDescriptor.getClusterDescription()打印yarn的资源信息后退出

if (cmd.hasOption(query.getOpt())) {
    final String description = yarnClusterDescriptor.getClusterDescription();
    System.out.println(description);
    return 0;
}

如果不是 -q 或者 -h 这类操作,则进入主流程。

主流程开始先判断是否包含applicationId操作。

该操作命令用法如下:

Usage:
   Required
     -id,--applicationId <yarnAppId> YARN application Id

   # 附加到正在运行的Flink YARN会话application_1463870264508_0029
   # 示例:./bin/yarn-session.sh -id application_1463870264508_0029

传入applicationId,通过yarnClusterDescriptor进行retrieve(检索)获取clusterClient对象。

if (cmd.hasOption(applicationId.getOpt())) {
    yarnApplicationId = ConverterUtils.toApplicationId(cmd.getOptionValue(applicationId.getOpt()));

    clusterClient = yarnClusterDescriptor.retrieve(yarnApplicationId);
}

若不包含applicationId,则调用deploySessionCluster进行部署。

final ClusterSpecification clusterSpecification = getClusterSpecification(cmd);
clusterClient = yarnClusterDescriptor.deploySessionCluster(clusterSpecification);

//------------------ ClusterClient deployed, handle connection details
yarnApplicationId = clusterClient.getClusterId();

try {
    final LeaderConnectionInfo connectionInfo = clusterClient.getClusterConnectionInfo();

    System.out.println("Flink JobManager is now running on " + connectionInfo.getHostname() +':' + connectionInfo.getPort() + " with leader id " + connectionInfo.getLeaderSessionID() + '.');
    System.out.println("JobManager Web Interface: " + clusterClient.getWebInterfaceURL());

    writeYarnPropertiesFile(
        yarnApplicationId,
        clusterSpecification.getNumberTaskManagers() * clusterSpecification.getSlotsPerTaskManager(),
        yarnClusterDescriptor.getDynamicPropertiesEncoded());
} catch (Exception e) {
    try {
        clusterClient.shutdown();
    } catch (Exception ex) {
        LOG.info("Could not properly shutdown cluster client.", ex);
    }

    try {
        yarnClusterDescriptor.killCluster(yarnApplicationId);
    } catch (FlinkException fe) {
        LOG.info("Could not properly terminate the Flink cluster.", fe);
    }
    throw new FlinkException("Could not write the Yarn connection information.", e);
}

包含如下步骤

  • deploySessionCluster部署clusterClient

  • 获取ApplicationId

  • 通过clusterClient 获取LeaderConnectionInfo

  • 写入yarn属性信息

on yarn部署还涉及到客户端是否分离的问题,yarn-sesion.sh脚本中指定 -d--detached ,可以启动分离的YarnSession,而不需要客户端一直运行。

if (yarnClusterDescriptor.isDetachedMode()) {
    LOG.info("The Flink YARN client has been started in detached mode. In order to stop " + "Flink on YARN, use the following command or a YARN web interface to stop it:\n" + "yarn application -kill " + yarnApplicationId);
}

注:在这种情况下,Flink YARN客户端将仅向群集提交Flink,然后自行关闭。请注意,在这种情况下,无法使用Flink停止YARN会话。

使用YARN实用程序(yarn application -kill )来停止YARN会话。

如不指定分离方式,客户端需要持续运行,可以通过 ctrl+c 或者 stop 来停止。

以下是非分离方式的代码内容:

ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

final YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
    yarnClusterDescriptor.getYarnClient(),
    yarnApplicationId,
    new ScheduledExecutorServiceAdapter(scheduledExecutorService));

try {
    runInteractiveCli(
        clusterClient,
        yarnApplicationStatusMonitor,
        acceptInteractiveInput);
 } finally {
    try {
        yarnApplicationStatusMonitor.close();
    } catch (Exception e) {
        LOG.info("Could not properly close the Yarn application status monitor.", e);
    }

    clusterClient.shutDownCluster();

    try {
        clusterClient.shutdown();
    } catch (Exception e) {
        LOG.info("Could not properly shutdown cluster client.", e);
    }

    // shut down the scheduled executor service
    ExecutorUtils.gracefulShutdown(
        1000L,
        TimeUnit.MILLISECONDS,
        scheduledExecutorService);

    deleteYarnPropertiesFile();

    ApplicationReport applicationReport;

    try {
        applicationReport = yarnClusterDescriptor
            .getYarnClient()
            .getApplicationReport(yarnApplicationId);
    } catch (YarnException | IOException e) {
        LOG.info("Could not log the final application report.", e);
        applicationReport = null;
    }

    if (applicationReport != null) {
        logFinalApplicationReport(applicationReport);
    }
}
  • 创建scheduledExecutorService

  • 创建状态监听器yarnApplicationStatusMonitor

  • 运行交互式客户端runInteractiveCli

  • 一系列关闭操作。

  • 生成Application报表

runInteractiveCli方法

while (continueRepl) {
    final ApplicationStatus applicationStatus = yarnApplicationStatusMonitor.getApplicationStatusNow();

    switch (applicationStatus) {
        case FAILED:
        case CANCELED:
            System.err.println("The Flink Yarn cluster has failed.");
            continueRepl = false;
            break;
        case UNKNOWN:
            if (!isLastStatusUnknown) {
                unknownStatusSince = System.nanoTime();
                isLastStatusUnknown = true;
            }

            if ((System.nanoTime() - unknownStatusSince) > 5L * CLIENT_POLLING_INTERVAL_MS * 1_000_000L) {
                System.err.println("The Flink Yarn cluster is in an unknown state. Please check the Yarn cluster.");
                continueRepl = false;
            } else {
                continueRepl = repStep(in, readConsoleInput);
            }
            break;
        case SUCCEEDED:
            if (isLastStatusUnknown) {
                isLastStatusUnknown = false;
            }

             // ------------------ check if there are updates by the cluster -----------
            try {
                final GetClusterStatusResponse status = clusterClient.getClusterStatus();

                if (status != null && numTaskmanagers != status.numRegisteredTaskManagers()) {
                    System.err.println("Number of connected TaskManagers changed to " +
                    status.numRegisteredTaskManagers() + ". " +
                    "Slots available: " + status.totalNumberOfSlots());
                    numTaskmanagers = status.numRegisteredTaskManagers();
                 }
            } catch (Exception e) {
                LOG.warn("Could not retrieve the current cluster status. Skipping current retrieval attempt ...", e);
            }

            printClusterMessages(clusterClient);

            continueRepl = repStep(in, readConsoleInput);
    }
}

方法内部主要是一个while循环体,判断条件是continueRepl,循环体内部逻辑

  • 通过yarnApplicationStatusMonitor.getApplicationStatusNow()获取ApplicationStatus状态信息

  • switch判断ApplicationStatus是FAILED、CANCELED、UNKNOWN还是SUCCEEDED

  • FAILED、CANCELED continueRepl为false,跳出循环

  • UNKNOWN 会对比当前的时间与最后unknown的时间,如果大于5L*CLIENT_POLLING_INTERVAL_MS * 1_000_000L,则continueRepl为false,跳出循环;否则继续循环

  • SUCCEEDED

    • 获取集群状态信息

    • 验证集群所注册的TaskManager数量与所指定的数量是否相符,不相符则打印err日志

    • 打印集群消息

    • 调用repStep方法获取continueRepl值

repStep方法主要是用于交互式方式接受用户输入:quit,stop,help。

private static boolean repStep(
    BufferedReader in,
    boolean readConsoleInput) throws IOException, InterruptedException {

    // wait until CLIENT_POLLING_INTERVAL is over or the user entered something.
    long startTime = System.currentTimeMillis();
    while ((System.currentTimeMillis() - startTime) < CLIENT_POLLING_INTERVAL_MS
        && (!readConsoleInput || !in.ready())) {
        Thread.sleep(200L);
    }
    //------------- handle interactive command by user. ----------------------

    if (readConsoleInput && in.ready()) {
        String command = in.readLine();
        switch (command) {
            case "quit":
            case "stop":
                return false;

            case "help":
                System.err.println(YARN_SESSION_HELP);
                break;
            default:
                System.err.println("Unknown command '" + command + "'. Showing help:");
                System.err.println(YARN_SESSION_HELP);
                break;
        }
    }

    return true;
}

用户通过客户端输入 quitstop ,返回 false ;其他则返回 true

run()方法最后执行yarnClusterDescriptor.close

yarnClusterDescriptor.close();

外层的流程讲完了,我们来看下部署YarnSession集群的过程。

YarnSession部署

Flink 源码解析之从 Example 出发:读懂 Flink On Yarn 任务执行流程

部署通过AbstractYarnClusterDescriptor.deploySessionCluster方法,调用deployInternal()执行部署。

@Override
public ClusterClient<ApplicationId> deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException {
    try {
        return deployInternal(
            clusterSpecification,
            "Flink session cluster",
            getYarnSessionClusterEntrypoint(),
            null,
            false);
    } catch (Exception e) {
        throw new ClusterDeploymentException("Couldn't deploy Yarn session cluster", e);
    }
}

deployInternal()方法将阻塞,直到将ApplicationMaster/JobManager部署到纱线上为止。

由于代码块较长,这里我们做代码拆分展示分析。

1、配置验证

validateClusterSpecification(clusterSpecification);

if (UserGroupInformation.isSecurityEnabled()) {
    // note: UGI::hasKerberosCredentials inaccurately reports false
    // for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
    // so we check only in ticket cache scenario.
    boolean useTicketCache = flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);

    UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
    if (loginUser.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS
    && useTicketCache && !loginUser.hasKerberosCredentials()) {
        LOG.error("Hadoop security with Kerberos is enabled but the login user does not have Kerberos credentials");
        throw new RuntimeException("Hadoop security with Kerberos is enabled but the login user " +
        "does not have Kerberos credentials");
    }
}

isReadyForDeployment(clusterSpecification);

validateClusterSpecification方法主要是读取taskManagerMemorySize以及计算cutoff

private void validateClusterSpecification(ClusterSpecification clusterSpecification) throws FlinkException {
        try {
            final long taskManagerMemorySize = clusterSpecification.getTaskManagerMemoryMB();
            // We do the validation by calling the calculation methods here
            // Internally these methods will check whether the cluster can be started with the provided
            // ClusterSpecification and the configured memory requirements
            final long cutoff = ContaineredTaskManagerParameters.calculateCutoffMB(flinkConfiguration, taskManagerMemorySize);
            TaskManagerServices.calculateHeapSizeMB(taskManagerMemorySize - cutoff, flinkConfiguration);
        } catch (IllegalArgumentException iae) {
            throw new FlinkException("Cannot fulfill the minimum memory requirements with the provided " +
                "cluster specification. Please increase the memory of the cluster.", iae);
        }
    }

之后则是验证是否启动Security,进行安全验证,最后判断是否可执行部署。

2、检查指定的yarn queue

// ------------------ Check if the specified queue exists --------------------
checkYarnQueues(yarnClient);

3、读取并设置动态属性

// ------------------ Add dynamic properties to local flinkConfiguraton ------
Map<String, String> dynProperties = getDynamicProperties(dynamicPropertiesEncoded);
for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) {
    flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue());
}

4、检查yarn集群是否能满足资源请求.

  • 创建YarnClientApplication 以及 GetNewApplicationResponse

// Create application via yarnClient
final YarnClientApplication yarnApplication = yarnClient.createApplication();
final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
  • 验证集群资源

Resource maxRes = appResponse.getMaximumResourceCapability();

final ClusterResourceDescription freeClusterMem;
try {
    freeClusterMem = getCurrentFreeClusterResources(yarnClient);
} catch (YarnException | IOException e) {
    failSessionDuringDeployment(yarnClient, yarnApplication);
    throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
}

final int yarnMinAllocationMB = yarnConfiguration.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);

final ClusterSpecification validClusterSpecification;
try {
    validClusterSpecification = validateClusterResources(
        clusterSpecification,
        yarnMinAllocationMB,
        maxRes,
        freeClusterMem);
} catch (YarnDeploymentException yde) {
    failSessionDuringDeployment(yarnClient, yarnApplication);
    throw yde;
}

LOG.info("Cluster specification: {}", validClusterSpecification);
  • 获取ExecutionMode,执行startAppMaster并返回YarnClusterClient对象

final ClusterEntrypoint.ExecutionMode executionMode = detached ?
    ClusterEntrypoint.ExecutionMode.DETACHED
    : ClusterEntrypoint.ExecutionMode.NORMAL;

flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString());

ApplicationReport report = startAppMaster(
    flinkConfiguration,
    applicationName,
    yarnClusterEntrypoint,
    jobGraph,
    yarnClient,
    yarnApplication,
    validClusterSpecification);

String host = report.getHost();
int port = report.getRpcPort();

// Correctly initialize the Flink config
flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
flinkConfiguration.setInteger(JobManagerOptions.PORT, port);

flinkConfiguration.setString(RestOptions.ADDRESS, host);
flinkConfiguration.setInteger(RestOptions.PORT, port);

// the Flink cluster is deployed in YARN. Represent cluster
return createYarnClusterClient(
    this,
    validClusterSpecification.getNumberTaskManagers(),
    validClusterSpecification.getSlotsPerTaskManager(),
    report,
    flinkConfiguration,
    true);

由于篇幅问题,至于startAppMaster这里就不再深入分析,有兴趣的朋友可以自行阅读。或者在后续文章中再做详细讲解。

任务执行入口

Flink 源码解析之从 Example 出发:读懂 Flink On Yarn 任务执行流程

程序执行入口同样是$FLINK_HOME/bin/flink run,通过CliFrontend.run调用runProgram()来运行项目。

1、实例化ClusterDescriptor

ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine);

此处的customCommandLine对象一个FlinkYarnSessionCli实例。

2、获取clusterId并结合DetachedMode判断处理逻辑

if (clusterId == null && runOptions.getDetachedMode()) {
    ...
    final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);

    final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
    client = clusterDescriptor.deployJobCluster(
        clusterSpecification,
        jobGraph,
        runOptions.getDetachedMode());
    ...
}else{
    if (clusterId != null) {
        client = clusterDescriptor.retrieve(clusterId);
        shutdownHook = null;
    } else {
        // also in job mode we have to deploy a session cluster because the job
        // might consist of multiple parts (e.g. when using collect)
        final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
        client = clusterDescriptor.deploySessionCluster(clusterSpecification);
        // if not running in detached mode, add a shutdown hook to shut down cluster if client exits
        // there's a race-condition here if cli is killed before shutdown hook is installed
        if (!runOptions.getDetachedMode() && runOptions.isShutdownOnAttachedExit()) {
            shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG);
        } else {
            shutdownHook = null;
        }
    }
    ...
}

第一个if中主要是用于DetachedMode(客户端分离模式)

  • 创建JobGraph

  • 获取ClusterSpecification

  • 部署任务集群deployJobCluster

    如果条件不成立则走else。

    第二个if中如果clusterId不为空,则通过clusterDescriptor.retrieve获取client对象。

    否则通过clusterDescriptor.deploySessionCluster部署,获取client对象

3、通过client对象执行任务

executeProgram(program, client, userParallelism);

4、执行获取JobSubmissionResult结果

protected void executeProgram(PackagedProgram program, ClusterClient<?> client, int parallelism) throws ProgramMissingJobException, ProgramInvocationException {
    logAndSysout("Starting execution of program");

    final JobSubmissionResult result = client.run(program, parallelism);
    ....
}

5、client提交任务

经过多层run方法调用最终执行YarnClusterClient.submitJob()方法。

public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
        if (isDetached()) {
            if (newlyCreatedCluster) {
                stopAfterJob(jobGraph.getJobID());
            }
            return super.runDetached(jobGraph, classLoader);
        } else {
            return super.run(jobGraph, classLoader);
        }
    }

之后的方法调用与前一篇文章《Flink源码解析 | 从Example出发:读懂集群任务执行流程》一致。需要进一步去阅读了解。

之后的文章准备讲下采用start-scala-shell.sh脚本执行的流程,各类Graph的生成以及actor系统。之后会将以上这些文章整理出一篇完整的Apache Flink的任务执行流程总结文章出来,尽请期待。

关注公众号

Flink 源码解析之从 Example 出发:读懂 Flink On Yarn 任务执行流程


以上所述就是小编给大家介绍的《Flink 源码解析之从 Example 出发:读懂 Flink On Yarn 任务执行流程》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

黑客

黑客

Steven Levy / 赵俐、刁海鹏、田俊静 / 机械工业出版社华章公司 / 2011-10-31 / 69.00元

黑客文化和伦理的奠基之作,计算机专业人士必读。 二十五周年新版,涵盖比尔·盖茨、马克·扎克伯格、理查德·斯托曼、史蒂夫·沃兹尼克等著名黑客的最新资料。 多年前,射击游戏之父、Doom游戏的作者约翰·卡马克由于读到本书,坚定了游戏开发的决心。 谷歌首席信息官本·弗里德也是本书的忠实读者。 探寻黑客文化的本质,体会黑客精神的精髓。一起来看看 《黑客》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

URL 编码/解码
URL 编码/解码

URL 编码/解码

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具