Spark 源码系列(七)Spark on yarn 具体实现

栏目: 编程工具 · 发布时间: 5年前

内容简介:本来不打算写的了,但是真的是闲来无事,整天看美剧也没啥意思。这一章打算讲一下 Spark on yarn 的实现,1.0.0 里面已经是一个 stable 的版本了,可是 1.0.1 也出来了,离 1.0.0 发布才一个月的时间,更新太快了,节奏跟不上啊,这里仍旧是讲 1.0.0 的代码,所以各位朋友也不要再问我讲的是哪个版本,目前为止发布的文章都是基于 1.0.0 的代码。在第一章找到 main 函数,里面调用了 run 方法,我们直接看 run 方法。

本来不打算写的了,但是真的是闲来无事,整天看美剧也没啥意思。这一章打算讲一下 Spark on yarn 的实现,1.0.0 里面已经是一个 stable 的版本了,可是 1.0.1 也出来了,离 1.0.0 发布才一个月的时间,更新太快了,节奏跟不上啊,这里仍旧是讲 1.0.0 的代码,所以各位朋友也不要再问我讲的是哪个版本,目前为止发布的文章都是基于 1.0.0 的代码。

在第一章 《spark-submit 提交作业过程》 的时候,我们讲过 Spark on yarn 的在 cluster 模式下它的 main class 是 org.apache.spark.deploy.yarn.Client。okay,这个就是我们的头号目标。

提交作业

找到 main 函数,里面调用了 run 方法,我们直接看 run 方法。

val appId = runApp()
    monitorApplication(appId)
    System.exit(0)
复制代码

运行 App,跟踪 App,最后退出。我们先看 runApp 吧。

def runApp(): ApplicationId = {
    // 校验参数,内存不能小于384Mb,Executor的数量不能少于1个。
    validateArgs()
    // 这两个是父类的方法,初始化并且启动Client
    init(yarnConf)
    start()

    // 记录集群的信息(e.g, NodeManagers的数量,队列的信息).
    logClusterResourceDetails()

    // 准备提交请求到ResourcManager (specifically its ApplicationsManager (ASM)// Get a new client application.
    val newApp = super.createApplication()
    val newAppResponse = newApp.getNewApplicationResponse()
    val appId = newAppResponse.getApplicationId()
    // 检查集群的内存是否满足当前的作业需求
    verifyClusterResources(newAppResponse)

    // 准备资源和环境变量.
    //1.获得工作目录的具体地址: /.sparkStaging/appId/
    val appStagingDir = getAppStagingDir(appId)
  //2.创建工作目录,设置工作目录权限,上传运行时所需要的jar包
    val localResources = prepareLocalResources(appStagingDir)
    //3.设置运行时需要的环境变量
    val launchEnv = setupLaunchEnv(localResources, appStagingDir)
  //4.设置运行时JVM参数,设置SPARK_USE_CONC_INCR_GC为true的话,就使用CMS的垃圾回收机制
    val amContainer = createContainerLaunchContext(newAppResponse, localResources, launchEnv)

    // 设置application submission context. 
    val appContext = newApp.getApplicationSubmissionContext()
    appContext.setApplicationName(args.appName)
    appContext.setQueue(args.amQueue)
    appContext.setAMContainerSpec(amContainer)
    appContext.setApplicationType("SPARK")

    // 设置ApplicationMaster的内存,Resource是表示资源的类,目前有CPU和内存两种.
    val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
    memoryResource.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
    appContext.setResource(memoryResource)

    // 提交Application.
    submitApp(appContext)
    appId
  }
复制代码

monitorApplication 就不说了,不停的调用 getApplicationReport 方法获得最新的 Report,然后调用 getYarnApplicationState 获取当前状态,如果状态为 FINISHED、FAILED、KILLED 就退出。

说到这里,顺便把跟 yarn 相关的参数也贴出来一下,大家一看就清楚了。

while (!args.isEmpty) {
      args match {
        case ("--jar") :: value :: tail =>
          userJar = value
          args = tail

        case ("--class") :: value :: tail =>
          userClass = value
          args = tail

        case ("--args" | "--arg") :: value :: tail =>
          if (args(0) == "--args") {
            println("--args is deprecated. Use --arg instead.")
          }
          userArgsBuffer += value
          args = tail

        case ("--master-class" | "--am-class") :: value :: tail =>
          if (args(0) == "--master-class") {
            println("--master-class is deprecated. Use --am-class instead.")
          }
          amClass = value
          args = tail

        case ("--master-memory" | "--driver-memory") :: MemoryParam(value) :: tail =>
          if (args(0) == "--master-memory") {
            println("--master-memory is deprecated. Use --driver-memory instead.")
          }
          amMemory = value
          args = tail

        case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
          if (args(0) == "--num-workers") {
            println("--num-workers is deprecated. Use --num-executors instead.")
          }
          numExecutors = value
          args = tail

        case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail =>
          if (args(0) == "--worker-memory") {
            println("--worker-memory is deprecated. Use --executor-memory instead.")
          }
          executorMemory = value
          args = tail

        case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail =>
          if (args(0) == "--worker-cores") {
            println("--worker-cores is deprecated. Use --executor-cores instead.")
          }
          executorCores = value
          args = tail

        case ("--queue") :: value :: tail =>
          amQueue = value
          args = tail

        case ("--name") :: value :: tail =>
          appName = value
          args = tail

        case ("--addJars") :: value :: tail =>
          addJars = value
          args = tail

        case ("--files") :: value :: tail =>
          files = value
          args = tail

        case ("--archives") :: value :: tail =>
          archives = value
          args = tail

        case Nil =>
          if (userClass == null) {
            printUsageAndExit(1)
          }

        case _ =>
          printUsageAndExit(1, args)
      }
    }
复制代码

ApplicationMaster

直接看 run 方法就可以了,main 函数就干了那么一件事...

def run() {
    // 设置本地目录,默认是先使用yarn的YARN_LOCAL_DIRS目录,再到LOCAL_DIRS
    System.setProperty("spark.local.dir", getLocalDirs())

    // set the web ui port to be ephemeral for yarn so we don't conflict with
    // other spark processes running on the same box
    System.setProperty("spark.ui.port", "0")

    // when running the AM, the Spark master is always "yarn-cluster"
    System.setProperty("spark.master", "yarn-cluster")

   // 设置优先级为30,和mapreduce的优先级一样。它比HDFS的优先级高,因为它的操作是清理该作业在hdfs上面的Staging目录
    ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)

    appAttemptId = getApplicationAttemptId()
  // 通过yarn.resourcemanager.am.max-attempts来设置,默认是2
  // 目前发现它只在清理Staging目录的时候用
    isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
    amClient = AMRMClient.createAMRMClient()
    amClient.init(yarnConf)
    amClient.start()

    // setup AmIpFilter for the SparkUI - do this before we start the UI
  //  方法的介绍说是yarn用来保护ui界面的,我感觉是设置ip代理的
    addAmIpFilter()
  //  注册ApplicationMaster到内部的列表里
    ApplicationMaster.register(this)

    // 安全认证相关的东西,默认是不开启的,省得给自己找事
    val securityMgr = new SecurityManager(sparkConf)

    // 启动driver程序 
    userThread = startUserClass()

    // 等待SparkContext被实例化,主要是等待spark.driver.port property被使用
  // 等待结束之后,实例化一个YarnAllocationHandler
    waitForSparkContextInitialized()

    // Do this after Spark master is up and SparkContext is created so that we can register UI Url.
  // 向yarn注册当前的ApplicationMaster, 这个时候isFinished不能为true,是true就说明程序失败了
    synchronized {
      if (!isFinished) {
        registerApplicationMaster()
        registered = true
      }
    }

    // 申请Container来启动Executor
    allocateExecutors()

    // 等待程序运行结束
    userThread.join()

    System.exit(0)
  }
复制代码

run 方法里面主要干了 5 项工作:

1、初始化工作

2、启动 driver 程序

3、注册 ApplicationMaster

4、分配 Executors

5、等待程序运行结束

我们重点看分配 Executor 方法。

private def allocateExecutors() {
    try {
      logInfo("Allocating " + args.numExecutors + " executors.")
      // 分host、rack、任意机器三种类型向ResourceManager提交ContainerRequest
    // 请求的Container数量可能大于需要的数量
      yarnAllocator.addResourceRequests(args.numExecutors)
      // Exits the loop if the user thread exits.
      while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
        if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
          finishApplicationMaster(FinalApplicationStatus.FAILED, "max number of executor failures reached")
        }
     // 把请求回来的资源进行分配,并释放掉多余的资源
        yarnAllocator.allocateResources()
        ApplicationMaster.incrementAllocatorLoop(1)
        Thread.sleep(100)
      }
    } finally {
      // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
      // so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
      ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
    }
    logInfo("All executors have launched.")

    // 启动一个线程来状态报告
    if (userThread.isAlive) {
      // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
      val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)

      // we want to be reasonably responsive without causing too many requests to RM.
      val schedulerInterval = sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)

      // must be <= timeoutInterval / 2.
      val interval = math.min(timeoutInterval / 2, schedulerInterval)

      launchReporterThread(interval)
    }
  }
复制代码

这里面我们只需要看 addResourceRequests 和 allocateResources 方法即可。

先说 addResourceRequests 方法,代码就不贴了。

Client 向 ResourceManager 提交 Container 的请求,分三种类型:优先选择机器、同一个 rack 的机器、任意机器。

优先选择机器是在 RDD 里面的 getPreferredLocations 获得的机器位置,如果没有优先选择机器,也就没有同一个 rack 之说了,可以是任意机器。

下面我们接着看 allocateResources 方法。

def allocateResources() {
    // We have already set the container request. Poll the ResourceManager for a response.
    // This doubles as a heartbeat if there are no pending container requests.
  // 之前已经提交过Container请求了,现在只需要获取response即可 
    val progressIndicator = 0.1f
    val allocateResponse = amClient.allocate(progressIndicator)

    val allocatedContainers = allocateResponse.getAllocatedContainers()
    if (allocatedContainers.size > 0) {
      var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size)

      if (numPendingAllocateNow < 0) {
        numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow)
      }

      val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()

      for (container <- allocatedContainers) {
     // 内存 > Executor所需内存 + 384
        if (isResourceConstraintSatisfied(container)) {
          // 把container收入名册当中,等待发落
          val host = container.getNodeId.getHost
          val containersForHost = hostToContainers.getOrElseUpdate(host, new ArrayBuffer[Container]())
          containersForHost += container
        } else {
          // 内存不够,释放掉它
          releaseContainer(container)
        }
      }

      // 找到合适的container来使用.
      val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
      val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
      val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
    // 遍历所有的host
      for (candidateHost <- hostToContainers.keySet) {
        val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
        val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)

        val remainingContainersOpt = hostToContainers.get(candidateHost)
        var remainingContainers = remainingContainersOpt.get
      
        if (requiredHostCount >= remainingContainers.size) {
          // 需要的比现有的多,把符合数据本地性的添加到dataLocalContainers映射关系里
          dataLocalContainers.put(candidateHost, remainingContainers)
          // 没有containner剩下的.
          remainingContainers = null
        } else if (requiredHostCount > 0) {
          // 获得的container比所需要的多,把多余的释放掉
          val (dataLocal, remaining) = remainingContainers.splitAt(remainingContainers.size - requiredHostCount)
          dataLocalContainers.put(candidateHost, dataLocal)

          for (container <- remaining) releaseContainer(container)
          remainingContainers = null
        }

        // 数据所在机器已经分配满任务了,只能在同一个rack里面挑选了
        if (remainingContainers != null) {
          val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
          if (rack != null) {
            val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
            val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
              rackLocalContainers.getOrElse(rack, List()).size

            if (requiredRackCount >= remainingContainers.size) {
              // Add all remaining containers to to `dataLocalContainers`.
              dataLocalContainers.put(rack, remainingContainers)
              remainingContainers = null
            } else if (requiredRackCount > 0) {
              // Container list has more containers that we need for data locality.
              val (rackLocal, remaining) = remainingContainers.splitAt(remainingContainers.size - requiredRackCount)
              val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack, new ArrayBuffer[Container]())

              existingRackLocal ++= rackLocal
              remainingContainers = remaining
            }
          }
        }

        if (remainingContainers != null) {
          // 还是不够,只能放到别的rack的机器上运行了
          offRackContainers.put(candidateHost, remainingContainers)
        }
      }

      // 按照数据所在机器、同一个rack、任意机器来排序
      val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size)
      allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
      allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
      allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers)

      // 遍历选择了的Container,为每个Container启动一个ExecutorRunnable线程专门负责给它发送命令
      for (container <- allocatedContainersToProcess) {
        val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
        val executorHostname = container.getNodeId.getHost
        val containerId = container.getId
     // 内存需要大于Executor的内存 + 384
        val executorMemoryOverhead = (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)

        if (numExecutorsRunningNow > maxExecutors) {
          // 正在运行的比需要的多了,释放掉多余的Container
          releaseContainer(container)
          numExecutorsRunning.decrementAndGet()
        } else {
          val executorId = executorIdCounter.incrementAndGet().toString
          val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
            sparkConf.get("spark.driver.host"),
            sparkConf.get("spark.driver.port"),
            CoarseGrainedSchedulerBackend.ACTOR_NAME)


          // To be safe, remove the container from `pendingReleaseContainers`.
          pendingReleaseContainers.remove(containerId)
         // 把container记录到已分配的rack的映射关系当中
          val rack = YarnAllocationHandler.lookupRack(conf, executorHostname)
          allocatedHostToContainersMap.synchronized {
            val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
              new HashSet[ContainerId]())

            containerSet += containerId
            allocatedContainerToHostMap.put(containerId, executorHostname)

            if (rack != null) {
              allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
            }
          }
      // 启动一个线程给它进行跟踪服务,给它发送运行Executor的命令
          val executorRunnable = new ExecutorRunnable(
            container,
            conf,
            sparkConf,
            driverUrl,
            executorId,
            executorHostname,
            executorMemory,
            executorCores)
          new Thread(executorRunnable).start()
        }
      }
      
  }
复制代码

1、把从 ResourceManager 中获得的 Container 进行选择,选择顺序是按照前面的介绍的三种类别依次进行,优先选择机器 > 同一个 rack 的机器 > 任意机器。

2、选择了 Container 之后,给每一个 Container 都启动一个 ExecutorRunner 一对一贴身服务,给它发送运行 CoarseGrainedExecutorBackend 的命令。

3、ExecutorRunner 通过 NMClient 来向 NodeManager 发送请求。

总结:

把作业发布到 yarn 上面去执行这块涉及到的类不多,主要是涉及到 Client、ApplicationMaster、YarnAllocationHandler、ExecutorRunner 这四个类。

1、Client 作为 Yarn 的客户端,负责向 Yarn 发送启动 ApplicationMaster 的命令。

2、ApplicationMaster 就像项目经理一样负责整个项目所需要的工作,包括请求资源,分配资源,启动 Driver 和 Executor,Executor 启动失败的错误处理。

3、ApplicationMaster 的请求、分配资源是通过 YarnAllocationHandler 来进行的。

4、Container 选择的顺序是:优先选择机器 > 同一个 rack 的机器 > 任意机器。

5、ExecutorRunner 只负责向 Container 发送启动 CoarseGrainedExecutorBackend 的命令。

6、Executor 的错误处理是在 ApplicationMaster 的 launchReporterThread 方法里面,它启动的线程除了报告运行状态,还会监控 Executor 的运行,一旦发现有丢失的 Executor 就重新请求。

7、在 yarn 目录下看到的名称里面带有 YarnClient 的是属于 yarn-client 模式的类,实现和前面的也差不多。

其它的内容更多是 Yarn 的客户端 api 使用,我也不太会,只是看到了能懂个意思,哈哈。

参考文献

提交作业 ApplicationMaster


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

CASIO fx-5800P编程计算器公路与铁路施工测量程序

CASIO fx-5800P编程计算器公路与铁路施工测量程序

2011-8 / 40.00元

《CASIO fx-5800P 编程计算器公路与铁路施工测量程序(第2版)》内容简介:第2版是一本全新的图书。书中的QH2-7T与QH2-8T程序都具有三维中边桩坐标正、反算,路基超高及边桩设计高程计算,边坡坡口与坡脚计算,桥墩桩基坐标计算,隧道超欠挖计算等功能。QH2-7T为交点法程序,QH2-8T为线元法程序,两个程序均使用数据库子程序输入平竖曲线的全部设计数据。测试程序各项功能所用的案例均取......一起来看看 《CASIO fx-5800P编程计算器公路与铁路施工测量程序》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具