Spark源码拜读(一)

栏目: 编程工具 · 发布时间: 5年前

内容简介:RDD从一个样子转换成另一个状态,代码执行了,啥也没干,到了最后一步一下干了!懒加载是怎么做到的?打开RDD.scala,看最基础的map方法其中,

RDD从一个样子转换成另一个状态,代码执行了,啥也没干,到了最后一步一下干了!懒加载是怎么做到的?

打开RDD.scala,看最基础的map方法

/**
   * Return a new RDD by applying a function to all elements of this RDD.
   */
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

其中, val cleanF = sc.clean(f) 是把函数 f 发送到各个task上。返回的还是 f

map 会创建一个 MapPartitionsRDD ,可以看到 f 最后还是由iter调用它自己的 map 方法来执行的,而这里 (context, pid, iter) => iter.map(cleanF) 整个是一个函数,也就是说,这个RDD的 map 方法是把自己和函数传进 MapPartitionsRDD 了,并没有任何执行。进入MapPartitionsRDD.scala,它是一个RDD的实现类,但里面并没有装数据,只有个函数传进来。

/**
 * An RDD that applies the provided function to every partition of the parent RDD.
 */
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false)
  extends RDD[U](prev) {

  override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None

  override def getPartitions: Array[Partition] = firstParent[T].partitions

  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))

  override def clearDependencies() {
    super.clearDependencies()
    prev = null
  }
}

这就是为什么代码都执行了,rdd该转换了,可是数据并没有动。

这里也可以看到,无论是 map 还是 mapPartition ,都是把一个分区的数据封装成 iterator ,执行 iterator 的同名函数,这个 map 函数是scala的,不是RDD的。

这是transform算子,直到action算子, foreach

/**
   * Applies a function f to all elements of this RDD.
   */
  def foreach(f: T => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
  }

map 不同,有个 sc.runJob ,才开始真正执行。

sc.runJob 进去是对每一个分区执行函数

/**
   * Run a function on a given set of partitions in an RDD and pass the results to the given
   * handler function. This is the main entry point for all actions in Spark.
   *
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   * partitions of the target RDD, e.g. for operations like `first()`
   * @param resultHandler callback to pass each result to
   */
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

注意到执行job的是 dagScheduler ,在 sc 初始化的时候创建了,并且还创建了 taskScheduler

继续看,

eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))

这个job是封装成了 DAGSchedulerEvent 提交给了一个阻塞队列,由一个线程循环地从队列中取事件进行消费。

private[spark] abstract class EventLoop[E](name: String) extends Logging {

  private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()

  private val stopped = new AtomicBoolean(false)

  private val eventThread = new Thread(name) {
    setDaemon(true)

    override def run(): Unit = {
      try {
        while (!stopped.get) {
          val event = eventQueue.take()
          try {
            onReceive(event)
          } catch {
            case NonFatal(e) =>
              try {
                onError(e)
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
              }
          }
        }
      } catch {
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)
      }
    }

  }

最后有一个waiter等待job执行结束返回结果

val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
    ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
    waiter.completionFuture.value.get match {
      case scala.util.Success(_) =>
        logInfo("Job %d finished: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      case scala.util.Failure(exception) =>
        logInfo("Job %d failed: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
        val callerStackTrace = Thread.currentThread().getStackTrace.tail
        exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
        throw exception
    }

以上所述就是小编给大家介绍的《Spark源码拜读(一)》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

爆款文案

爆款文案

关健明 / 北京联合出版公司 / 2017-12 / 42.00元

爆款10W+文案是怎么写出来的?前奥美金牌广告人、知乎25K高赞回答者关健明力作《爆款文案》解构文案打动人的4大黄金法则,公开18种文案写法,75篇实战案例,100多幅释义插图,透露把文案变成“印钞机”的私密武器,手把手教你写出爆款销售力。 市面上有很多大而全的文案书,往往比较宽泛,本书只聚焦一个点:文案如何卖掉产品,赚到钱。 前奥美金牌广告人、知乎25K高赞回答者:关键明,擅长撰写销......一起来看看 《爆款文案》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

MD5 加密
MD5 加密

MD5 加密工具

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器