内容简介:接上篇文章:1.spark-submit提交任务,调用的脚本spark-submit.sh,脚本基本没啥操作,直接调用org.apache.spark.deploy.SparkSubmit类,比较简单;2.直接找到SparkSubmit的main()函数,代码如下:
接上篇文章: Spark2.2.0精通:详解Master端任务调度schedule()函数 ,这里从client出发,详细用户通过spark-submit提交任务后,Spark集群如何进行提交命令处理;由于上篇文章主要只提到了Master端如何进行Executor的启动,没有讲解Driver的启动,这里结合spark-submit任务提交,把Driver的提交和启动一块讲了。
1.spark-submit提交任务,调用的脚本spark-submit.sh,脚本基本没啥操作,直接调用org.apache.spark.deploy.SparkSubmit类,比较简单;
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
2.直接找到SparkSubmit的main()函数,代码如下:
override def main(args: Array[String]): Unit = {
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
appArgs.action match {
//调用submit()函数,参数是用户提交指定参数
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
3.submit()函数,主要是进行了一些初始化,然后提交了Driver
@tailrec
private def submit(args: SparkSubmitArguments): Unit = {
//prepareSubmitEnvironment()函数代码比较多,主要干了以下几件事:
// 1.根据用户参数,主要是master和deploye-mode,设置任务的提交模式
// 2.根据提交模式实例化主类,
//childArgs: 主要就是一些参数的
//childClasspath:这个就是classPath,jvm运行的class路径
//sysProps:一些系统参数
// childMainClass:接下来将要运行的主类
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
def doRunMain(): Unit = {
// proxyUser 一般为null 直接看下面
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
})
} catch {
case e: Exception =>
// Hadoop's AuthorizationException suppresses the exception's stack trace, which
// makes the message printed to the output by the JVM not very helpful. Instead,
// detect exceptions with empty stack traces here, and treat them differently.
if (e.getStackTrace().length == 0) {
// scalastyle:off println
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
// scalastyle:on println
exitFn(1)
} else {
throw e
}
}
} else {
// 根据返回的数据,调用runMain,主要是打印了一些日志信息
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
}
// In standalone cluster mode, there are two submission gateways:
// (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
// (2) The new REST-based gateway introduced in Spark 1.3
// The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
// to use the legacy gateway if the master endpoint turns out to be not a REST server.
if (args.isStandaloneCluster && args.useRest) {
try {
// scalastyle:off println
printStream.println("Running Spark using the REST application submission protocol.")
// scalastyle:on println
doRunMain()
} catch {
// Fail over to use the legacy submission gateway
case e: SubmitRestConnectionException =>
printWarning(s"Master endpoint ${args.master} was not a REST server. " +
"Falling back to legacy submission gateway instead.")
args.useRest = false
submit(args)
}
// In all other modes, just run the main class as prepared
} else {
// 转了一大圈,最后调用和doRunMain()去执行
doRunMain()
}
}
4.这里才是spark-submit的核心,调用doRunMain(),他里面没啥东西,一般我们不设置代理,直接执行else:
/* 在standalone集群模式下,有两个提交网关:
* 1.使用org.apache.spark.deploy.Client作为包装器来使用传统的RPC网关
* 2.Spark 1.3中引入的基于rest的网关
* 第二种方法是Spark 1.3的默认行为,但是Spark submit将会失败
* 如果master不是一个REST服务器,那么它将无法使用REST网关。
*/
def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
})
} catch {
case e: Exception =>
// Hadoop's AuthorizationException suppresses the exception's stack trace, which
// makes the message printed to the output by the JVM not very helpful. Instead,
// detect exceptions with empty stack traces here, and treat them differently.
if (e.getStackTrace().length == 0) {
// scalastyle:off println
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
// scalastyle:on println
exitFn(1)
} else {
throw e
}
}
} else {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
}
5.最后调用runMain()函数,代码如下:
//当deploy mode为client时,执行用户自己编写的主方法
// 当deploy mode为cluster时,需要判断是否为REST提交,如果是则执行
// org.apache.spark.rest.RestSubmissionClient的主方法,如果不是则执行
// org.apache.spark.deploy.Client的主方法
private def runMain(
childArgs: Seq[String],
childClasspath: Seq[String],
sysProps: Map[String, String],
childMainClass: String,
verbose: Boolean): Unit = {
// scalastyle:off println
if (verbose) {
printStream.println(s"Main class:\n$childMainClass")
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
// sysProps may contain sensitive information, so redact before printing
printStream.println(s"System properties:\n${Utils.redact(sysProps).mkString("\n")}")
printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
printStream.println("\n")
}
// scalastyle:on println
//由于默认情况下,优先级SPARK_HOME/lib/jar包 > 用户程序中的jar包,
// 如果想让用户程序jar优先执行,那么要使用 spark.yarn.user.classpath.first (spark1.3以前)或者
// spark.executor.userClassPathFirst 和spark.driver.userClassPathFirst 参数。
val loader =
if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
//使用URLClassLoader加载jar包
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
for ((key, value) <- sysProps) {
System.setProperty(key, value)
}
var mainClass: Class[_] = null
try {
//获取用户指定的Main函数
mainClass = Utils.classForName(childMainClass)
} catch {
case e: ClassNotFoundException =>
e.printStackTrace(printStream)
if (childMainClass.contains("thriftserver")) {
// scalastyle:off println
printStream.println(s"Failed to load main class $childMainClass.")
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
// scalastyle:on println
}
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
case e: NoClassDefFoundError =>
e.printStackTrace(printStream)
if (e.getMessage.contains("org/apache/hadoop/hive")) {
// scalastyle:off println
printStream.println(s"Failed to load hive class.")
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
// scalastyle:on println
}
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
}
// SPARK-4170
if (classOf[scala.App].isAssignableFrom(mainClass)) {
printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
}
@tailrec
def findCause(t: Throwable): Throwable = t match {
case e: UndeclaredThrowableException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: InvocationTargetException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: Throwable =>
e
}
try {
//mainMethod.invoke 是通过反射来调用的 对应的主函数
mainMethod.invoke(null, childArgs.toArray)
} catch {
case t: Throwable =>
findCause(t) match {
case SparkUserAppException(exitCode) =>
System.exit(exitCode)
case t: Throwable =>
throw t
}
}
}
6.由于我们这里是Standalone模式,会映射到 org.apache.spark.deploy.Client 直接调用main()函数,代码如下:
def main(args: Array[String]) {
// scalastyle:off println
if (!sys.props.contains("SPARK_SUBMIT")) {
println("WARNING: This client is deprecated and will be removed in a future version of Spark")
println("Use ./bin/spark-submit with \"--master spark://host:port\"")
}
// scalastyle:on println
val conf = new SparkConf()
val driverArgs = new ClientArguments(args)
if (!conf.contains("spark.rpc.askTimeout")) {
conf.set("spark.rpc.askTimeout", "10s")
}
Logger.getRootLogger.setLevel(driverArgs.logLevel)
//准备rpcEnv环境,之后通过master的地址获取masterEndpoints端点相关信息,
// 通过rpcEnv注册相关clientEndPoint端点信息,同时需要注意,这里会把masterEndpoints上
//端点信息也作为构造ClientEndpoint端点的参数,也就是说这个ClientEndpoint会和masterEndpoints通信
val rpcEnv =
RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
//之前说过只要调用setupEndpoint就会调用Onstart()函数,由于我们是提交任务
// 这里会直接调用“launch”事件处理函数
rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
rpcEnv.awaitTermination()
}
7.我们直接看onStart()函数,代码如下:
case "launch" =>
//这里会启动DriverWrapper类
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
//下面都是指定了一些classPath libpath 不再讲解
val classPathConf = "spark.driver.extraClassPath"
val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val libraryPathConf = "spark.driver.extraLibraryPath"
val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val extraJavaOptsConf = "spark.driver.extraJavaOptions"
val extraJavaOpts = sys.props.get(extraJavaOptsConf)
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = new Command(mainClass,
Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
sys.env, classPathEntries, libraryPathEntries, javaOpts)
//DriverWrapper封装到command中,然后封装到driverDescription中
val driverDescription = new DriverDescription(
driverArgs.jarUrl,
driverArgs.memory,
driverArgs.cores,
driverArgs.supervise,
command)
//这里会调用RequestSubmitDriver,向Master发送RequestSubmitDriver消息
//注意这里Master接收消息,处理完成后会进行回复的,回复SubmitDriverResponse消息
//会在receive()函数进行处理
ayncSendToMasterAndForwardReply[SubmitDriverResponse](
RequestSubmitDriver(driverDescription))
8.Master端接收到 RequestSubmitDriver 消息,在receiveAndReply()函数进行对应事件的处理,代码如下:
case RequestSubmitDriver(description) =>
//如果数据恢复状态,直接跳过
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only accept driver submissions in ALIVE state."
context.reply(SubmitDriverResponse(self, false, None, msg))
} else {
//创建driver,然后加入持久化引擎中,将driver添加到等待队列
logInfo("Driver submitted " + description.command.mainClass)
val driver = createDriver(description)
persistenceEngine.addDriver(driver)
waitingDrivers += driver
drivers.add(driver)
//资源调度函数已经在上一篇文章讲解:Spark2.2.0精通:详解Master端任务调度schedule()函数
schedule()
// TODO: It might be good to instead have the submission client poll the master to determine
// the current status of the driver. For now it's simply "fire and forget".
context.reply(SubmitDriverResponse(self, true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}"))
}
至此,用户通过spark-submit提交任务到Driver启动的整个流程基本都梳理清楚了。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- Git提交错误时如何删除Git提交记录
- 圣思园《精通Spring Boot/Cloud》与《精通Java并发》课程现已宣布
- 分布式系统 - 两段式提交(2PC)和三段式提交(3PC)
- 减半前,比特币开发者代码提交数创历史新高:4月累计提交510次
- Charles 从入门到精通
- MAT 入门到精通(一)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。