内容简介:研究 Spark 内部是怎么运行的,怎么将 Spark 的任务从开始运行到结束的,先从 spark-submit 这个 shell 脚本提交用户程序开始。下面的分析都是基于 spark 2.1.1 版本。我们一般提交 Spark 任务时,都会写一个如下的脚本,里面指定 spark-submit 脚本的位置,配置好一些参数,然后运行:上面那个脚本实际上会将参数带到 spark-submit 脚本中去执行,看一下 spark-submit 脚本:
研究 Spark 内部是怎么运行的,怎么将 Spark 的任务从开始运行到结束的,先从 spark-submit 这个 shell 脚本提交用户程序开始。下面的分析都是基于 spark 2.1.1 版本。
我们一般提交 Spark 任务时,都会写一个如下的脚本,里面指定 spark-submit 脚本的位置,配置好一些参数,然后运行:
./bin/spark-submit \ --class <main-class> \ --master <master-url> \ --deploy-mode <deploy-mode> \ --conf <key>=<value> \ ... # other options <application-jar> \ [application-arguments] 复制代码
上面那个脚本实际上会将参数带到 spark-submit 脚本中去执行,看一下 spark-submit 脚本:
if [ -z "${SPARK_HOME}" ]; then source "$(dirname "$0")"/find-spark-home fi # disable randomized hash for string in Python 3.3+ export PYTHONHASHSEED=0 exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@" 复制代码
脚本最后调用 exec 执行 "${SPARK_HOME}"/bin/spark-class,调用的 class 为: org.apache.spark.deploy.SparkSubmit
,后面的 "$@" 是脚本执行的所有参数。
通过 spark-class 脚本,最终执行的命令中,制定了程序的入口为 org.apache.spark.deploy.SparkSubmit
。
一,org.apache.spark.deploy.SparkSubmit
1,main 方法
def main(args: Array[String]): Unit = { val appArgs = new SparkSubmitArguments(args) if (appArgs.verbose) { // scalastyle:off println printStream.println(appArgs) // scalastyle:on println } appArgs.action match { case SparkSubmitAction.SUBMIT => submit(appArgs) case SparkSubmitAction.KILL => kill(appArgs) case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) } } 复制代码
从 main 方法中可以看出,根据解析后的参数中的 action 进行模式匹配,是什么操作就执行什么方法,我们这边是 submit 操作,则调用 submit 方法。
2,submit 方法
submit 方法做两件事情,第一件事为通过 clusterManager 和 dploymode 去决定下一步要执行的类的 main 方法,第二件事是根据反射执行这个 main 方法。
2.1,submit 方法第一步
这部分主要是准备下一步要执行的相关类及参数:
private def submit(args: SparkSubmitArguments): Unit = { val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) 复制代码
2.1.1,prepareSubmitEnvironment 方法
通过调用 prepareSubmitEnvironment
方法来准备下一步要执行的类的 main 方法及相关参数,看一下这个方法,下面这部分是根据参数中的 master 和 deploy-mode 来设置对应的 cluasterManager 和部署模式:
private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments) : (Seq[String], Seq[String], Map[String, String], String) = { // 要返回的四个参数 val childArgs = new ArrayBuffer[String]() val childClasspath = new ArrayBuffer[String]() val sysProps = new HashMap[String, String]() var childMainClass = "" // 根据脚本中配置的 master 参数去模式匹配出 clusterManager val clusterManager: Int = args.master match { case "yarn" => YARN case "yarn-client" | "yarn-cluster" => printWarning(s"Master ${args.master} is deprecated since 2.0." + " Please use master \"yarn\" with specified deploy mode instead.") YARN case m if m.startsWith("spark") => STANDALONE case m if m.startsWith("mesos") => MESOS case m if m.startsWith("local") => LOCAL case _ => printErrorAndExit("Master must either be yarn or start with spark, mesos, local") -1 } // 根据 deployMode 参数去模式匹配出部署模式 var deployMode: Int = args.deployMode match { case "client" | null => CLIENT case "cluster" => CLUSTER case _ => printErrorAndExit("Deploy mode must be either client or cluster"); -1 } 复制代码
然后会根据上面匹配出的集群以及部署模式决定怎么提交 application,我们这边看一下 standalone 集群部署模式,看下面这部分代码:
// standalone cluster 模式下的 childMainClass 以及参数的配置 if (args.isStandaloneCluster) { //如果参数中配置了 useRest 则为 RestSubmissionClient 的方式去提交 application if (args.useRest) { childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient" childArgs += (args.primaryResource, args.mainClass) } else { // 否则使用 Client 放是去提交 application childMainClass = "org.apache.spark.deploy.Client" if (args.supervise) { childArgs += "--supervise" } Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) } Option(args.driverCores).foreach { c => childArgs += ("--cores", c) } childArgs += "launch" childArgs += (args.master, args.primaryResource, args.mainClass) } if (args.childArgs != null) { childArgs ++= args.childArgs } } 复制代码
在 standalone 集群模式下,有两个提交网关:
1,使用 org.apache.spark.deploy.Client
作为包装器来使用传统的 RPC 网关;
2,使用 Spark 1.3 中引入的基于 rest 的网关。
2.2,submit 方法第二步
这里我们的参数已经准备好了,然后根据我们 standalone cluster 部署模式决定下一步怎么执行:
/* 在standalone集群模式下,有两个提交网关: * 1.使用org.apache.spark.deploy.Client作为包装器来使用传统的RPC网关 * 2.Spark 1.3中引入的基于rest的网关 * 第二种方法是Spark 1.3的默认行为,但是Spark submit将会失败 * 如果master不是一个REST服务器,那么它将无法使用REST网关。 */ if (args.isStandaloneCluster && args.useRest) { try { // scalastyle:off println printStream.println("Running Spark using the REST application submission protocol.") // scalastyle:on println doRunMain() } catch { // Fail over to use the legacy submission gateway case e: SubmitRestConnectionException => printWarning(s"Master endpoint ${args.master} was not a REST server. " + "Falling back to legacy submission gateway instead.") args.useRest = false submit(args) } } else { // 其他模式,直接调用doRunMain方法 doRunMain() } 复制代码
接着会调用到 doRunMain 方法,内部其实调用了 runMain 方法,所以我们直接看 runMain 方法。
2.2.1,runMain 方法
//实际上这个方法就是根据我们上面 prepareSubmitEnvironment 方法准备好的参数,通过反射的方法去执行我们 //下一步要执行的类及方法 private def runMain( childArgs: Seq[String], childClasspath: Seq[String], sysProps: Map[String, String], childMainClass: String, verbose: Boolean): Unit = { // scalastyle:off println if (verbose) { printStream.println(s"Main class:\n$childMainClass") printStream.println(s"Arguments:\n${childArgs.mkString("\n")}") printStream.println(s"System properties:\n${sysProps.mkString("\n")}") printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}") printStream.println("\n") } // scalastyle:on println val loader = if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) { new ChildFirstURLClassLoader(new Array[URL](0), Thread.currentThread.getContextClassLoader) } else { new MutableURLClassLoader(new Array[URL](0), Thread.currentThread.getContextClassLoader) } Thread.currentThread.setContextClassLoader(loader) for (jar <- childClasspath) { addJarToClasspath(jar, loader) } for ((key, value) <- sysProps) { System.setProperty(key, value) } var mainClass: Class[_] = null try { mainClass = Utils.classForName(childMainClass) } catch { case e: ClassNotFoundException => e.printStackTrace(printStream) if (childMainClass.contains("thriftserver")) { // scalastyle:off println printStream.println(s"Failed to load main class $childMainClass.") printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.") // scalastyle:on println } System.exit(CLASS_NOT_FOUND_EXIT_STATUS) case e: NoClassDefFoundError => e.printStackTrace(printStream) if (e.getMessage.contains("org/apache/hadoop/hive")) { // scalastyle:off println printStream.println(s"Failed to load hive class.") printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.") // scalastyle:on println } System.exit(CLASS_NOT_FOUND_EXIT_STATUS) } // SPARK-4170 if (classOf[scala.App].isAssignableFrom(mainClass)) { printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.") } val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass) if (!Modifier.isStatic(mainMethod.getModifiers)) { throw new IllegalStateException("The main method in the given main class must be static") } @tailrec def findCause(t: Throwable): Throwable = t match { case e: UndeclaredThrowableException => if (e.getCause() != null) findCause(e.getCause()) else e case e: InvocationTargetException => if (e.getCause() != null) findCause(e.getCause()) else e case e: Throwable => e } try { //通过反射去执行准备好的 mainClass 的 main 方法 mainMethod.invoke(null, childArgs.toArray) } catch { case t: Throwable => findCause(t) match { case SparkUserAppException(exitCode) => System.exit(exitCode) case t: Throwable => throw t } } } 复制代码
我们选取的 standalone cluster 模式去分析的,根据上面的 prepareSubmitEnvironment 方法可以知道我们要使用 org.apache.spark.deploy.Client
这个 childMainClass,然后根据上面的代码知道,我们下一步是将相关参数带进 org.apache.spark.deploy.Client
这个类的 main 方法中去执行。
所以下面开始看 org.apache.spark.deploy.Client
二,org.apache.spark.deploy.Client
Client 用于启动和终止 standalone 集群中的 Driver 程序。
1,main 方法
def main(args: Array[String]) { // scalastyle:off println if (!sys.props.contains("SPARK_SUBMIT")) { println("WARNING: This client is deprecated and will be removed in a future version of Spark") println("Use ./bin/spark-submit with \"--master spark://host:port\"") } // scalastyle:on println val conf = new SparkConf() val driverArgs = new ClientArguments(args) if (!conf.contains("spark.rpc.askTimeout")) { conf.set("spark.rpc.askTimeout", "10s") } Logger.getRootLogger.setLevel(driverArgs.logLevel) //创建 rpcEnv val rpcEnv = RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf)) //获取 master 节点的 RpcEndPoint 的引用,用于和 master 进行 Rpc 通信 val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL). map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME)) //注册 rpcEndpoint,调用 onStart方法 rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf)) // rpcEnv.awaitTermination() } 复制代码
这里开始创建 rpcEnv 了,关于 Rpc 这块的知识点,可以看前面这篇文章了解一下:Spark 中的 RPC,拿到 master 的 rpcEndpoint 的引用去注册 rpcEndpoint,这里会去调用 ClientEndpoint 的 onstart 方法。
三,org.apache.spark.deploy.ClientEndpoint
ClientEndPoint 是一个 ThreadSafeRpcEndpoint,下面看下它的 onStart 方法。
1,onStart 方法
override def onStart(): Unit = { driverArgs.cmd match { case "launch" => // TODO: We could add an env variable here and intercept it in `sc.addJar` that would // truncate filesystem paths similar to what YARN does. For now, we just require // people call `addJar` assuming the jar is in the same directory. val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" val classPathConf = "spark.driver.extraClassPath" val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp => cp.split(java.io.File.pathSeparator) } val libraryPathConf = "spark.driver.extraLibraryPath" val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp => cp.split(java.io.File.pathSeparator) } val extraJavaOptsConf = "spark.driver.extraJavaOptions" val extraJavaOpts = sys.props.get(extraJavaOptsConf) .map(Utils.splitCommandString).getOrElse(Seq.empty) val sparkJavaOpts = Utils.sparkJavaOpts(conf) val javaOpts = sparkJavaOpts ++ extraJavaOpts // 将classPathEntries,libraryPathEntries,javaOpts,drvierArgs信息封装成Command // 这里的mainClass为org.apache.spark.deploy.worker.DriverWrapper val command = new Command(mainClass, Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions, sys.env, classPathEntries, libraryPathEntries, javaOpts) // 将drvierArgs,command信息封装成DriverDescription val driverDescription = new DriverDescription( driverArgs.jarUrl, driverArgs.memory, driverArgs.cores, driverArgs.supervise, command) // 向master发送RequestSubmitDriver,注册Driver ayncSendToMasterAndForwardReply[SubmitDriverResponse]( RequestSubmitDriver(driverDescription)) case "kill" => val driverId = driverArgs.driverId ayncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId)) } } 复制代码
这里也会根据 cmd 进行模式匹配,,如果命令为 launch,就去获取 driver 的额外的 java 依赖,classpath,java 配置。然后将这些信息封装为一个 Command 对象,再降 driver 的参数和 command 信息一起封装成 DriverDescription 对象,调用 ayncSendToMasterAndForwardReply 发送信息。
2,ayncSendToMasterAndForwardReply 方法
private def ayncSendToMasterAndForwardReply[T: ClassTag](message: Any): Unit = { for (masterEndpoint <- masterEndpoints) { masterEndpoint.ask[T](message).onComplete { case Success(v) => self.send(v) case Failure(e) => logWarning(s"Error sending messages to master $masterEndpoint", e) }(forwardMessageExecutionContext) } } 复制代码
这个方法实际上就是将信息发送到 masterEndpoint 上去。
四,总结
至此,我们整个 spark-submit 任务提交就完成了,接下来就是等待 master 返回 driver 的注册结果,启动 driver。
最后可以看一下 spark-submit 过程的流程图:
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- RocketMQ 位移提交源码分析
- JStorm 源码解析:拓扑的构建和提交过程
- Spring 事务提交回滚源码解析 原 荐
- 如何提交takedown,删除泄漏源码的仓库和Fork
- mysql复制那点事(2)-binlog组提交源码分析和实现
- spark源码解析-从提交任务到jar的加载运行(基于2.1.0版本)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。