内容简介:本来不打算写的了,但是真的是闲来无事,整天看美剧也没啥意思。这一章打算讲一下 Spark on yarn 的实现,1.0.0 里面已经是一个 stable 的版本了,可是 1.0.1 也出来了,离 1.0.0 发布才一个月的时间,更新太快了,节奏跟不上啊,这里仍旧是讲 1.0.0 的代码,所以各位朋友也不要再问我讲的是哪个版本,目前为止发布的文章都是基于 1.0.0 的代码。在第一章找到 main 函数,里面调用了 run 方法,我们直接看 run 方法。
本来不打算写的了,但是真的是闲来无事,整天看美剧也没啥意思。这一章打算讲一下 Spark on yarn 的实现,1.0.0 里面已经是一个 stable 的版本了,可是 1.0.1 也出来了,离 1.0.0 发布才一个月的时间,更新太快了,节奏跟不上啊,这里仍旧是讲 1.0.0 的代码,所以各位朋友也不要再问我讲的是哪个版本,目前为止发布的文章都是基于 1.0.0 的代码。
在第一章 《spark-submit 提交作业过程》 的时候,我们讲过 Spark on yarn 的在 cluster 模式下它的 main class 是 org.apache.spark.deploy.yarn.Client。okay,这个就是我们的头号目标。
找到 main 函数,里面调用了 run 方法,我们直接看 run 方法。
val appId = runApp() monitorApplication(appId) System.exit(0) 复制代码
运行 App,跟踪 App,最后退出。我们先看 runApp 吧。
def runApp(): ApplicationId = { // 校验参数,内存不能小于384Mb,Executor的数量不能少于1个。 validateArgs() // 这两个是父类的方法,初始化并且启动Client init(yarnConf) start() // 记录集群的信息(e.g, NodeManagers的数量,队列的信息). logClusterResourceDetails() // 准备提交请求到ResourcManager (specifically its ApplicationsManager (ASM)// Get a new client application. val newApp = super.createApplication() val newAppResponse = newApp.getNewApplicationResponse() val appId = newAppResponse.getApplicationId() // 检查集群的内存是否满足当前的作业需求 verifyClusterResources(newAppResponse) // 准备资源和环境变量. //1.获得工作目录的具体地址: /.sparkStaging/appId/ val appStagingDir = getAppStagingDir(appId) //2.创建工作目录,设置工作目录权限,上传运行时所需要的jar包 val localResources = prepareLocalResources(appStagingDir) //3.设置运行时需要的环境变量 val launchEnv = setupLaunchEnv(localResources, appStagingDir) //4.设置运行时JVM参数,设置SPARK_USE_CONC_INCR_GC为true的话,就使用CMS的垃圾回收机制 val amContainer = createContainerLaunchContext(newAppResponse, localResources, launchEnv) // 设置application submission context. val appContext = newApp.getApplicationSubmissionContext() appContext.setApplicationName(args.appName) appContext.setQueue(args.amQueue) appContext.setAMContainerSpec(amContainer) appContext.setApplicationType("SPARK") // 设置ApplicationMaster的内存,Resource是表示资源的类,目前有CPU和内存两种. val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource] memoryResource.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD) appContext.setResource(memoryResource) // 提交Application. submitApp(appContext) appId } 复制代码
monitorApplication 就不说了,不停的调用 getApplicationReport 方法获得最新的 Report,然后调用 getYarnApplicationState 获取当前状态,如果状态为 FINISHED、FAILED、KILLED 就退出。
说到这里,顺便把跟 yarn 相关的参数也贴出来一下,大家一看就清楚了。
while (!args.isEmpty) { args match { case ("--jar") :: value :: tail => userJar = value args = tail case ("--class") :: value :: tail => userClass = value args = tail case ("--args" | "--arg") :: value :: tail => if (args(0) == "--args") { println("--args is deprecated. Use --arg instead.") } userArgsBuffer += value args = tail case ("--master-class" | "--am-class") :: value :: tail => if (args(0) == "--master-class") { println("--master-class is deprecated. Use --am-class instead.") } amClass = value args = tail case ("--master-memory" | "--driver-memory") :: MemoryParam(value) :: tail => if (args(0) == "--master-memory") { println("--master-memory is deprecated. Use --driver-memory instead.") } amMemory = value args = tail case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail => if (args(0) == "--num-workers") { println("--num-workers is deprecated. Use --num-executors instead.") } numExecutors = value args = tail case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail => if (args(0) == "--worker-memory") { println("--worker-memory is deprecated. Use --executor-memory instead.") } executorMemory = value args = tail case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail => if (args(0) == "--worker-cores") { println("--worker-cores is deprecated. Use --executor-cores instead.") } executorCores = value args = tail case ("--queue") :: value :: tail => amQueue = value args = tail case ("--name") :: value :: tail => appName = value args = tail case ("--addJars") :: value :: tail => addJars = value args = tail case ("--files") :: value :: tail => files = value args = tail case ("--archives") :: value :: tail => archives = value args = tail case Nil => if (userClass == null) { printUsageAndExit(1) } case _ => printUsageAndExit(1, args) } } 复制代码
直接看 run 方法就可以了,main 函数就干了那么一件事...
def run() { // 设置本地目录,默认是先使用yarn的YARN_LOCAL_DIRS目录,再到LOCAL_DIRS System.setProperty("spark.local.dir", getLocalDirs()) // set the web ui port to be ephemeral for yarn so we don't conflict with // other spark processes running on the same box System.setProperty("spark.ui.port", "0") // when running the AM, the Spark master is always "yarn-cluster" System.setProperty("spark.master", "yarn-cluster") // 设置优先级为30,和mapreduce的优先级一样。它比HDFS的优先级高,因为它的操作是清理该作业在hdfs上面的Staging目录 ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30) appAttemptId = getApplicationAttemptId() // 通过yarn.resourcemanager.am.max-attempts来设置,默认是2 // 目前发现它只在清理Staging目录的时候用 isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts amClient = AMRMClient.createAMRMClient() amClient.init(yarnConf) amClient.start() // setup AmIpFilter for the SparkUI - do this before we start the UI // 方法的介绍说是yarn用来保护ui界面的,我感觉是设置ip代理的 addAmIpFilter() // 注册ApplicationMaster到内部的列表里 ApplicationMaster.register(this) // 安全认证相关的东西,默认是不开启的,省得给自己找事 val securityMgr = new SecurityManager(sparkConf) // 启动driver程序 userThread = startUserClass() // 等待SparkContext被实例化,主要是等待spark.driver.port property被使用 // 等待结束之后,实例化一个YarnAllocationHandler waitForSparkContextInitialized() // Do this after Spark master is up and SparkContext is created so that we can register UI Url. // 向yarn注册当前的ApplicationMaster, 这个时候isFinished不能为true,是true就说明程序失败了 synchronized { if (!isFinished) { registerApplicationMaster() registered = true } } // 申请Container来启动Executor allocateExecutors() // 等待程序运行结束 userThread.join() System.exit(0) } 复制代码
run 方法里面主要干了 5 项工作:
2、启动 driver 程序
3、注册 ApplicationMaster
4、分配 Executors
我们重点看分配 Executor 方法。
private def allocateExecutors() { try { logInfo("Allocating " + args.numExecutors + " executors.") // 分host、rack、任意机器三种类型向ResourceManager提交ContainerRequest // 请求的Container数量可能大于需要的数量 yarnAllocator.addResourceRequests(args.numExecutors) // Exits the loop if the user thread exits. while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) { if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) { finishApplicationMaster(FinalApplicationStatus.FAILED, "max number of executor failures reached") } // 把请求回来的资源进行分配,并释放掉多余的资源 yarnAllocator.allocateResources() ApplicationMaster.incrementAllocatorLoop(1) Thread.sleep(100) } } finally { // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT, // so that the loop in ApplicationMaster#sparkContextInitialized() breaks. ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) } logInfo("All executors have launched.") // 启动一个线程来状态报告 if (userThread.isAlive) { // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses. val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) // we want to be reasonably responsive without causing too many requests to RM. val schedulerInterval = sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000) // must be <= timeoutInterval / 2. val interval = math.min(timeoutInterval / 2, schedulerInterval) launchReporterThread(interval) } } 复制代码
这里面我们只需要看 addResourceRequests 和 allocateResources 方法即可。
先说 addResourceRequests 方法,代码就不贴了。
Client 向 ResourceManager 提交 Container 的请求,分三种类型:优先选择机器、同一个 rack 的机器、任意机器。
优先选择机器是在 RDD 里面的 getPreferredLocations 获得的机器位置,如果没有优先选择机器,也就没有同一个 rack 之说了,可以是任意机器。
下面我们接着看 allocateResources 方法。
def allocateResources() { // We have already set the container request. Poll the ResourceManager for a response. // This doubles as a heartbeat if there are no pending container requests. // 之前已经提交过Container请求了,现在只需要获取response即可 val progressIndicator = 0.1f val allocateResponse = amClient.allocate(progressIndicator) val allocatedContainers = allocateResponse.getAllocatedContainers() if (allocatedContainers.size > 0) { var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size) if (numPendingAllocateNow < 0) { numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow) } val hostToContainers = new HashMap[String, ArrayBuffer[Container]]() for (container <- allocatedContainers) { // 内存 > Executor所需内存 + 384 if (isResourceConstraintSatisfied(container)) { // 把container收入名册当中,等待发落 val host = container.getNodeId.getHost val containersForHost = hostToContainers.getOrElseUpdate(host, new ArrayBuffer[Container]()) containersForHost += container } else { // 内存不够,释放掉它 releaseContainer(container) } } // 找到合适的container来使用. val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]() val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]() val offRackContainers = new HashMap[String, ArrayBuffer[Container]]() // 遍历所有的host for (candidateHost <- hostToContainers.keySet) { val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0) val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost) val remainingContainersOpt = hostToContainers.get(candidateHost) var remainingContainers = remainingContainersOpt.get if (requiredHostCount >= remainingContainers.size) { // 需要的比现有的多,把符合数据本地性的添加到dataLocalContainers映射关系里 dataLocalContainers.put(candidateHost, remainingContainers) // 没有containner剩下的. remainingContainers = null } else if (requiredHostCount > 0) { // 获得的container比所需要的多,把多余的释放掉 val (dataLocal, remaining) = remainingContainers.splitAt(remainingContainers.size - requiredHostCount) dataLocalContainers.put(candidateHost, dataLocal) for (container <- remaining) releaseContainer(container) remainingContainers = null } // 数据所在机器已经分配满任务了,只能在同一个rack里面挑选了 if (remainingContainers != null) { val rack = YarnAllocationHandler.lookupRack(conf, candidateHost) if (rack != null) { val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0) val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) - rackLocalContainers.getOrElse(rack, List()).size if (requiredRackCount >= remainingContainers.size) { // Add all remaining containers to to `dataLocalContainers`. dataLocalContainers.put(rack, remainingContainers) remainingContainers = null } else if (requiredRackCount > 0) { // Container list has more containers that we need for data locality. val (rackLocal, remaining) = remainingContainers.splitAt(remainingContainers.size - requiredRackCount) val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack, new ArrayBuffer[Container]()) existingRackLocal ++= rackLocal remainingContainers = remaining } } } if (remainingContainers != null) { // 还是不够,只能放到别的rack的机器上运行了 offRackContainers.put(candidateHost, remainingContainers) } } // 按照数据所在机器、同一个rack、任意机器来排序 val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size) allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers) allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers) allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers) // 遍历选择了的Container,为每个Container启动一个ExecutorRunnable线程专门负责给它发送命令 for (container <- allocatedContainersToProcess) { val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet() val executorHostname = container.getNodeId.getHost val containerId = container.getId // 内存需要大于Executor的内存 + 384 val executorMemoryOverhead = (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD) if (numExecutorsRunningNow > maxExecutors) { // 正在运行的比需要的多了,释放掉多余的Container releaseContainer(container) numExecutorsRunning.decrementAndGet() } else { val executorId = executorIdCounter.incrementAndGet().toString val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( sparkConf.get("spark.driver.host"), sparkConf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) // To be safe, remove the container from `pendingReleaseContainers`. pendingReleaseContainers.remove(containerId) // 把container记录到已分配的rack的映射关系当中 val rack = YarnAllocationHandler.lookupRack(conf, executorHostname) allocatedHostToContainersMap.synchronized { val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname, new HashSet[ContainerId]()) containerSet += containerId allocatedContainerToHostMap.put(containerId, executorHostname) if (rack != null) { allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1) } } // 启动一个线程给它进行跟踪服务,给它发送运行Executor的命令 val executorRunnable = new ExecutorRunnable( container, conf, sparkConf, driverUrl, executorId, executorHostname, executorMemory, executorCores) new Thread(executorRunnable).start() } } } 复制代码
1、把从 ResourceManager 中获得的 Container 进行选择,选择顺序是按照前面的介绍的三种类别依次进行,优先选择机器 > 同一个 rack 的机器 > 任意机器。
2、选择了 Container 之后,给每一个 Container 都启动一个 ExecutorRunner 一对一贴身服务,给它发送运行 CoarseGrainedExecutorBackend 的命令。
3、ExecutorRunner 通过 NMClient 来向 NodeManager 发送请求。
把作业发布到 yarn 上面去执行这块涉及到的类不多,主要是涉及到 Client、ApplicationMaster、YarnAllocationHandler、ExecutorRunner 这四个类。
1、Client 作为 Yarn 的客户端,负责向 Yarn 发送启动 ApplicationMaster 的命令。
2、ApplicationMaster 就像项目经理一样负责整个项目所需要的工作,包括请求资源,分配资源,启动 Driver 和 Executor,Executor 启动失败的错误处理。
3、ApplicationMaster 的请求、分配资源是通过 YarnAllocationHandler 来进行的。
4、Container 选择的顺序是:优先选择机器 > 同一个 rack 的机器 > 任意机器。
5、ExecutorRunner 只负责向 Container 发送启动 CoarseGrainedExecutorBackend 的命令。
6、Executor 的错误处理是在 ApplicationMaster 的 launchReporterThread 方法里面,它启动的线程除了报告运行状态,还会监控 Executor 的运行,一旦发现有丢失的 Executor 就重新请求。
7、在 yarn 目录下看到的名称里面带有 YarnClient 的是属于 yarn-client 模式的类,实现和前面的也差不多。
其它的内容更多是 Yarn 的客户端 api 使用,我也不太会,只是看到了能懂个意思,哈哈。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- libgo 源码剖析(2. libgo调度策略源码实现)
- HashMap源码实现分析
- 浅谈AsyncTask源码实现
- 【React源码解读】- 组件的实现
- HashMap 实现原理与源码分析
- 手写源码(四):自己实现Mybatis
张向南 勾俊伟 / 人民邮电出版社 / 2017-5 / 39.80元
《新媒体运营实战技能》共7章。第1章重点介绍了新媒体图片的创意思路及制作技巧,包括微信公众号封面图、信息长图、icon图标、九宫图、gif图片的具体实战操作;第2章重点介绍了创意云文字、微信排版、滑动看图等新媒体文字的排版方法与处理技巧;第3章是新媒体表单,引导读者对表单结构、设计场景及具体应用全面了解;第4章关于H5的创意思路及制作方法,解析了引发H5传播的心理因素,并重点介绍H5的制作工具与具......一起来看看 《新媒体运营实战技能》 这本书的介绍吧!