内容简介:接上篇文章:1.spark-submit提交任务,调用的脚本spark-submit.sh,脚本基本没啥操作,直接调用org.apache.spark.deploy.SparkSubmit类,比较简单;2.直接找到SparkSubmit的main()函数,代码如下:
接上篇文章: Spark2.2.0精通:详解Master端任务调度schedule()函数 ,这里从client出发,详细用户通过spark-submit提交任务后,Spark集群如何进行提交命令处理;由于上篇文章主要只提到了Master端如何进行Executor的启动,没有讲解Driver的启动,这里结合spark-submit任务提交,把Driver的提交和启动一块讲了。
1.spark-submit提交任务,调用的脚本spark-submit.sh,脚本基本没啥操作,直接调用org.apache.spark.deploy.SparkSubmit类,比较简单;
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
2.直接找到SparkSubmit的main()函数,代码如下:
override def main(args: Array[String]): Unit = {
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
appArgs.action match {
//调用submit()函数,参数是用户提交指定参数
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
3.submit()函数,主要是进行了一些初始化,然后提交了Driver
@tailrec
private def submit(args: SparkSubmitArguments): Unit = {
//prepareSubmitEnvironment()函数代码比较多,主要干了以下几件事:
// 1.根据用户参数,主要是master和deploye-mode,设置任务的提交模式
// 2.根据提交模式实例化主类,
//childArgs: 主要就是一些参数的
//childClasspath:这个就是classPath,jvm运行的class路径
//sysProps:一些系统参数
// childMainClass:接下来将要运行的主类
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
def doRunMain(): Unit = {
// proxyUser 一般为null 直接看下面
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
})
} catch {
case e: Exception =>
// Hadoop's AuthorizationException suppresses the exception's stack trace, which
// makes the message printed to the output by the JVM not very helpful. Instead,
// detect exceptions with empty stack traces here, and treat them differently.
if (e.getStackTrace().length == 0) {
// scalastyle:off println
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
// scalastyle:on println
exitFn(1)
} else {
throw e
}
}
} else {
// 根据返回的数据,调用runMain,主要是打印了一些日志信息
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
}
// In standalone cluster mode, there are two submission gateways:
// (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
// (2) The new REST-based gateway introduced in Spark 1.3
// The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
// to use the legacy gateway if the master endpoint turns out to be not a REST server.
if (args.isStandaloneCluster && args.useRest) {
try {
// scalastyle:off println
printStream.println("Running Spark using the REST application submission protocol.")
// scalastyle:on println
doRunMain()
} catch {
// Fail over to use the legacy submission gateway
case e: SubmitRestConnectionException =>
printWarning(s"Master endpoint ${args.master} was not a REST server. " +
"Falling back to legacy submission gateway instead.")
args.useRest = false
submit(args)
}
// In all other modes, just run the main class as prepared
} else {
// 转了一大圈,最后调用和doRunMain()去执行
doRunMain()
}
}
4.这里才是spark-submit的核心,调用doRunMain(),他里面没啥东西,一般我们不设置代理,直接执行else:
/* 在standalone集群模式下,有两个提交网关:
* 1.使用org.apache.spark.deploy.Client作为包装器来使用传统的RPC网关
* 2.Spark 1.3中引入的基于rest的网关
* 第二种方法是Spark 1.3的默认行为,但是Spark submit将会失败
* 如果master不是一个REST服务器,那么它将无法使用REST网关。
*/
def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
})
} catch {
case e: Exception =>
// Hadoop's AuthorizationException suppresses the exception's stack trace, which
// makes the message printed to the output by the JVM not very helpful. Instead,
// detect exceptions with empty stack traces here, and treat them differently.
if (e.getStackTrace().length == 0) {
// scalastyle:off println
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
// scalastyle:on println
exitFn(1)
} else {
throw e
}
}
} else {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
}
5.最后调用runMain()函数,代码如下:
//当deploy mode为client时,执行用户自己编写的主方法
// 当deploy mode为cluster时,需要判断是否为REST提交,如果是则执行
// org.apache.spark.rest.RestSubmissionClient的主方法,如果不是则执行
// org.apache.spark.deploy.Client的主方法
private def runMain(
childArgs: Seq[String],
childClasspath: Seq[String],
sysProps: Map[String, String],
childMainClass: String,
verbose: Boolean): Unit = {
// scalastyle:off println
if (verbose) {
printStream.println(s"Main class:\n$childMainClass")
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
// sysProps may contain sensitive information, so redact before printing
printStream.println(s"System properties:\n${Utils.redact(sysProps).mkString("\n")}")
printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
printStream.println("\n")
}
// scalastyle:on println
//由于默认情况下,优先级SPARK_HOME/lib/jar包 > 用户程序中的jar包,
// 如果想让用户程序jar优先执行,那么要使用 spark.yarn.user.classpath.first (spark1.3以前)或者
// spark.executor.userClassPathFirst 和spark.driver.userClassPathFirst 参数。
val loader =
if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
//使用URLClassLoader加载jar包
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
for ((key, value) <- sysProps) {
System.setProperty(key, value)
}
var mainClass: Class[_] = null
try {
//获取用户指定的Main函数
mainClass = Utils.classForName(childMainClass)
} catch {
case e: ClassNotFoundException =>
e.printStackTrace(printStream)
if (childMainClass.contains("thriftserver")) {
// scalastyle:off println
printStream.println(s"Failed to load main class $childMainClass.")
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
// scalastyle:on println
}
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
case e: NoClassDefFoundError =>
e.printStackTrace(printStream)
if (e.getMessage.contains("org/apache/hadoop/hive")) {
// scalastyle:off println
printStream.println(s"Failed to load hive class.")
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
// scalastyle:on println
}
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
}
// SPARK-4170
if (classOf[scala.App].isAssignableFrom(mainClass)) {
printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
}
@tailrec
def findCause(t: Throwable): Throwable = t match {
case e: UndeclaredThrowableException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: InvocationTargetException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: Throwable =>
e
}
try {
//mainMethod.invoke 是通过反射来调用的 对应的主函数
mainMethod.invoke(null, childArgs.toArray)
} catch {
case t: Throwable =>
findCause(t) match {
case SparkUserAppException(exitCode) =>
System.exit(exitCode)
case t: Throwable =>
throw t
}
}
}
6.由于我们这里是Standalone模式,会映射到 org.apache.spark.deploy.Client 直接调用main()函数,代码如下:
def main(args: Array[String]) {
// scalastyle:off println
if (!sys.props.contains("SPARK_SUBMIT")) {
println("WARNING: This client is deprecated and will be removed in a future version of Spark")
println("Use ./bin/spark-submit with \"--master spark://host:port\"")
}
// scalastyle:on println
val conf = new SparkConf()
val driverArgs = new ClientArguments(args)
if (!conf.contains("spark.rpc.askTimeout")) {
conf.set("spark.rpc.askTimeout", "10s")
}
Logger.getRootLogger.setLevel(driverArgs.logLevel)
//准备rpcEnv环境,之后通过master的地址获取masterEndpoints端点相关信息,
// 通过rpcEnv注册相关clientEndPoint端点信息,同时需要注意,这里会把masterEndpoints上
//端点信息也作为构造ClientEndpoint端点的参数,也就是说这个ClientEndpoint会和masterEndpoints通信
val rpcEnv =
RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
//之前说过只要调用setupEndpoint就会调用Onstart()函数,由于我们是提交任务
// 这里会直接调用“launch”事件处理函数
rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
rpcEnv.awaitTermination()
}
7.我们直接看onStart()函数,代码如下:
case "launch" =>
//这里会启动DriverWrapper类
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
//下面都是指定了一些classPath libpath 不再讲解
val classPathConf = "spark.driver.extraClassPath"
val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val libraryPathConf = "spark.driver.extraLibraryPath"
val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val extraJavaOptsConf = "spark.driver.extraJavaOptions"
val extraJavaOpts = sys.props.get(extraJavaOptsConf)
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = new Command(mainClass,
Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
sys.env, classPathEntries, libraryPathEntries, javaOpts)
//DriverWrapper封装到command中,然后封装到driverDescription中
val driverDescription = new DriverDescription(
driverArgs.jarUrl,
driverArgs.memory,
driverArgs.cores,
driverArgs.supervise,
command)
//这里会调用RequestSubmitDriver,向Master发送RequestSubmitDriver消息
//注意这里Master接收消息,处理完成后会进行回复的,回复SubmitDriverResponse消息
//会在receive()函数进行处理
ayncSendToMasterAndForwardReply[SubmitDriverResponse](
RequestSubmitDriver(driverDescription))
8.Master端接收到 RequestSubmitDriver 消息,在receiveAndReply()函数进行对应事件的处理,代码如下:
case RequestSubmitDriver(description) =>
//如果数据恢复状态,直接跳过
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only accept driver submissions in ALIVE state."
context.reply(SubmitDriverResponse(self, false, None, msg))
} else {
//创建driver,然后加入持久化引擎中,将driver添加到等待队列
logInfo("Driver submitted " + description.command.mainClass)
val driver = createDriver(description)
persistenceEngine.addDriver(driver)
waitingDrivers += driver
drivers.add(driver)
//资源调度函数已经在上一篇文章讲解:Spark2.2.0精通:详解Master端任务调度schedule()函数
schedule()
// TODO: It might be good to instead have the submission client poll the master to determine
// the current status of the driver. For now it's simply "fire and forget".
context.reply(SubmitDriverResponse(self, true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}"))
}
至此,用户通过spark-submit提交任务到Driver启动的整个流程基本都梳理清楚了。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- Git提交错误时如何删除Git提交记录
- 圣思园《精通Spring Boot/Cloud》与《精通Java并发》课程现已宣布
- 分布式系统 - 两段式提交(2PC)和三段式提交(3PC)
- 减半前,比特币开发者代码提交数创历史新高:4月累计提交510次
- Charles 从入门到精通
- MAT 入门到精通(一)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Game Programming Patterns
Robert Nystrom / Genever Benning / 2014-11-2 / USD 39.95
The biggest challenge facing many game programmers is completing their game. Most game projects fizzle out, overwhelmed by the complexity of their own code. Game Programming Patterns tackles that exac......一起来看看 《Game Programming Patterns》 这本书的介绍吧!