spark源码解析-从提交任务到jar的加载运行(基于2.1.0版本)

栏目: 服务器 · 发布时间: 6年前

内容简介:spark源码解析-从提交任务到jar的加载运行(基于2.1.0版本)

本文分析的源码基于Spark2.1.0版本,如果有理解不当的地方欢迎批评指正。

在之前的一篇文章中我们分析了Spark-submit脚本,发现该脚本会调用spark-class脚本检查参数设置,以及提交任务。最后发现,提交任务的入口类是org.apache.spark.deploy.SparkSubmit 我们接下来深入这个类,看看从提交任务到执行用户jar包之间都发生了什么;

首先找到org.apache.spark.deploy.SparkSubmit类的main方法:

def main (args: Array[ String ]): Unit

= {

val appArgs = new

SparkSubmitArguments(args)

if (appArgs. verbose

) {

// scalastyle:off println

printStream

.println(appArgs)

// scalastyle:on println

}

appArgs.

action match

{

case SparkSubmitAction. SUBMIT

=> submit (appArgs)

case SparkSubmitAction. KILL

=> kill (appArgs)

case SparkSubmitAction. REQUEST_STATUS

=> requestStatus (appArgs)

}

}

main方法很简单,首先利用参数创建了一个SparkSubmitArguments 这个类是SparkSubmitArgumentsParser的子类,主要工作就是对Spark应用的参数进行解析,以及加载当前和Spark相关的环境变量。

在SparkSubmitArguments中有一个action成员,用于表示spark-submit的动作,一般来说使用spark-submit有三个目的,第一是提交应用(这也是最常用的),第二是可以通过spark-submit杀死某个任务,第三是获取某个正在执行的任务的状态。这个action是通过参数指定的,默认值为submit即提交一个任务。

我们可以跳到submit方法看看,该方法定义如下:

private def submit(args: SparkSubmitArguments): Unit

仅接受一个SparkSubmitArguments实例作为参数,这个方法执行两个步骤,首先是基于集群管理器和部署模式设置合适的classpath、系统属性和应用程序参数,以此为运行用户的main方法做环境准备。

然后,使用第一步准备好的环境来启动main方法,这是通过反射完成的,我们下面再看。

Submit方法最终执行了其内部定义的doRunMain,而doRunMain方法会调用runMain(line 169)

runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)

在runMain方法中可以看到这三行代码:分别是line 695:

mainClass = Utils.classForName(childMainClass)

这行代码利用反射加载main方法所在类,Utils.classForName方法最终还是调用的Class.forName方法;

line 722:

valmainMethod = mainClass.getMethod("main", new Array[String](0).getClass)

以及line 738:

mainMethod.invoke(null, childArgs.toArray)

上述两行分别获取main方法,然后执行main方法。这些动作都是在driver端完成的。

再理一下思路,spark提交jar的过程如下:

Spark-submit -> Spark-class -> org.apache.spark.deploy.SparkSubmit-> { main -> submit -> doRunMan -> runMain}

org.apache.spark.deploy.SparkSubmit主要负责准备运行环境以及通过反射获取app的main方法并执行。

为了方便理解,接下来我们对Spark利用反射加载运行用户应用程序的main方法做一个简易实现:

新建一个scala工程,在com.load包下新建一个Main类:

package com.load
/**
  * Created by hunan on 2017/5/19.
  */
object Main {
  def main(args: Array[String]): Unit = {
    println("Run main method success!")
  }
}

这个用来模拟我们自己开发的Spark应用,然后打jar包,生成ScalaTest.jar

再创建另一个工程,将刚才的jar作为依赖添加进来,并写如下类:

package com.example
/**
  * Created by hunan on 2017/5/19.
  */
object Test {
  def main(args: Array[String]): Unit = {
    var mainClass:Class[_]=null
    try {
      mainClass = Class.forName("com.load.Main", true, Thread.currentThread().getContextClassLoader)
    }catch{
      case e:Exception=>
        e.printStackTrace()
    }

    if(mainClass!=null){
      val mainMethod = mainClass.getMethod("main",new Array[String](0).getClass)
      mainMethod.invoke(null,Array[String]())
    }
  }
}

这个用来模拟Spark本身,运行即可发现输出如下:

spark源码解析-从提交任务到jar的加载运行(基于2.1.0版本)

这就是Spark提交jar的机制,我们也可以发现这里的main方法仅作为普通方法执行,只不过Spark会检查该mian方法是不是静态的,如果不是静态就抛出异常拒绝执行,如果修改722行的“main”字串也可以实现以任意方法名为Spark app执行入口。


以上所述就是小编给大家介绍的《spark源码解析-从提交任务到jar的加载运行(基于2.1.0版本)》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

How to Design Programs, 2nd Edition

How to Design Programs, 2nd Edition

Matthias Felleisen、Robert Bruce Findler、Matthew Flatt、Shriram Krishnamurthi / MIT Press / 2018-5-4 / USD 57.00

A completely revised edition, offering new design recipes for interactive programs and support for images as plain values, testing, event-driven programming, and even distributed programming. This ......一起来看看 《How to Design Programs, 2nd Edition》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具