Spark统一内存管理

栏目: 编程工具 · 发布时间: 5年前

内容简介:Spark1.6 以后,增加统一内存管理机制内存管理模块包括堆内内存(On-heap Memory),堆外内存(Off-heap Memory)两大区域。Executor Memory:主要用于存放 Shuffle、Join、Sort、Aggregation 等计算过程中的临时数据

Spark1.6 以后,增加统一内存管理机制内存管理模块包括堆内内存(On-heap Memory),堆外内存(Off-heap Memory)两大区域。

1.堆内内存

Spark统一内存管理

Executor Memory:主要用于存放 Shuffle、Join、Sort、Aggregation 等计算过程中的临时数据

Storage Memory:主要用于存储 spark 的 cache 数据,例如RDD的缓存、unroll数据

User Memory:主要用于存储 RDD 转换操作所需要的数据,例如 RDD 依赖等信息

Reserved Memory:系统预留内存,会用来存储Spark内部对象

private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024
val reservedMemory = conf.getLong("spark.testing.reservedMemory",
      if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)

systemMemory:

val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)

Runtime.getRuntime.maxMemory 就是JVM运行时的堆内存,在 Java 程序中通过 -Xmx -Xms 配置,spark中通过 spark.executor.memory--executor-memory 配置的。

useableMemory:spark可用内存

val usableMemory = systemMemory - reservedMemory

补充:

val minSystemMemory = (reservedMemory * 1.5).ceil.toLong

execution Memory不得小于reservedMemory 的1.5倍。

2.堆外内存

Spark 1.6 开始引入了Off-heap memory,调用Java的Unsafe类API申请堆外的内存资源,这种方式不进行Java内存管理,可避免频繁GC,但需要自己实现内存申请和释放的逻辑。

Spark统一内存管理

Spark统一内存管理

3.堆内内存动态调整

初始化:程序提交时,execution和storage各占0.5(通过 spark.memory.storageFraction 配置)

onHeapStorageRegionSize =
    (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong

这意味着

  • 在程序运行时,如果双方的空间都不足时,则存储到硬盘;将内存中的块存储到磁盘的策略是按照 LRU 规则进行的。若己方空间不足而对方空余时,可借用对方的空间;(存储空间不足是指不足以放下一个完整的 Block)
  • Execution 内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后"归还"借用的空间
  • Storage 内存的空间被对方占用后,目前的实现是无法让对方"归还",因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂;而且 Shuffle 过程产生的文件在后面一定会被使用到,而 Cache 在内存的数据不一定在后面使用。

注意,上面说的借用对方的内存需要借用方和被借用方的内存类型都一样,都是堆内内存或者都是堆外内存,不存在堆内内存不够去借用堆外内存的空间。

4.Task内存分配

/**
 * Try to acquire up to `numBytes` of memory for the given task and return the number of bytes
 * obtained, or 0 if none can be allocated.
 *  * This call may block until there is enough free memory in some situations, to make sure each
 * task has a chance to ramp up to at least 1 / 2N of the total memory pool (where N is the # of
 * active tasks) before it is forced to spill. This can happen if the number of tasks increase
 * but an older task had a lot of memory already.
 *  * @param numBytes number of bytes to acquire
 * @param taskAttemptId the task attempt acquiring memory
 * @param maybeGrowPool a callback that potentially grows the size of this pool. It takes in
 *                      one parameter (Long) that represents the desired amount of memory by
 *                      which this pool should be expanded.
 * @param computeMaxPoolSize a callback that returns the maximum allowable size of this pool
 *                           at this given moment. This is not a field because the max pool
 *                           size is variable in certain cases. For instance, in unified
 *                           memory management, the execution pool can be expanded by evicting
 *                           cached blocks, thereby shrinking the storage pool.
 *  * @return the number of bytes granted to the task.
   */
  private[memory] def acquireMemory(
      numBytes: Long,
      taskAttemptId: Long,
      maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
      computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
    assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")

    // TODO: clean up this clunky method signature

    // Add this task to the taskMemory map just so we can keep an accurate count of the number
    // of active tasks, to let other tasks ramp down their memory in calls to `acquireMemory`
    if (!memoryForTask.contains(taskAttemptId)) {
      memoryForTask(taskAttemptId) = 0L
      // This will later cause waiting tasks to wake up and check numTasks again
      lock.notifyAll()
    }

    // Keep looping until we're either sure that we don't want to grant this request (because this
    // task would have more than 1 / numActiveTasks of the memory) or we have enough free
    // memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)).
    // TODO: simplify this to limit each task to its own slot
    while (true) {
      val numActiveTasks = memoryForTask.keys.size
      val curMem = memoryForTask(taskAttemptId)

      // In every iteration of this loop, we should first try to reclaim any borrowed execution
      // space from storage. This is necessary because of the potential race condition where new
      // storage blocks may steal the free execution memory that this task was waiting for.
      maybeGrowPool(numBytes - memoryFree)

      // Maximum size the pool would have after potentially growing the pool.
      // This is used to compute the upper bound of how much memory each task can occupy. This
      // must take into account potential free memory as well as the amount this pool currently
      // occupies. Otherwise, we may run into SPARK-12155 where, in unified memory management,
      // we did not take into account space that could have been freed by evicting cached blocks.
      val maxPoolSize = computeMaxPoolSize()
      val maxMemoryPerTask = maxPoolSize / numActiveTasks
      val minMemoryPerTask = poolSize / (2 * numActiveTasks)

      // How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks
      val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
      // Only give it as much memory as is free, which might be none if it reached 1 / numTasks
      val toGrant = math.min(maxToGrant, memoryFree)

      // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
      // if we can't give it this much now, wait for other tasks to free up memory
      // (this happens if older tasks allocated lots of memory before N grew)
      if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
        logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
        lock.wait()
      } else {
        memoryForTask(taskAttemptId) += toGrant
        return toGrant
      }
    }
    0L  // Never reached
  }
memoryForTask
val minMemoryPerTask = poolSize / (2 * numActiveTasks)

5.Spark UI 数据解释

内存分配池的堆部分分为 Eden,Survivor 和 Tenured 三部分空间,而这里面一共包含了两个 Survivor 区域,而这两个 Survivor 区域在任何时候我们只能用到其中一个,所以我们可以使用下面的公式进行描述:

ExecutorMemory = Eden + 2 * Survivor + Tenured

Runtime.getRuntime.maxMemory的差异取决于GC配置

spark.executor.memory 设为1g,如图

Spark统一内存管理

384.1MB = (Runtime.getRuntime.maxMemory (910.5MB) - ReservedMemory (300MB)) × spark.memory.fraction (0.6) × 页面以1000为换算单位(1000/1024 × 1000/1024)

Spark统一内存管理

366.3MB = (Runtime.getRuntime.maxMemory (910.5MB) - ReservedMemory (300MB)) × spark.memory.fraction (0.6)

加上1g堆外内存:

spark.memory.offHeap.enabled    true
spark.memory.offHeap.size       1G

Spark统一内存管理

1390.3MB = (Runtime.getRuntime.maxMemory (910.5MB) - ReservedMemory (300MB)) × spark.memory.fraction (0.6) + 1 × 1024MB

Spark统一内存管理

1.5G ≈ 1457.8MB = ((Runtime.getRuntime.maxMemory (910.5MB) - ReservedMemory (300MB)) × spark.memory.fraction (0.6) + 1 × 1024MB) × 页面以1000为换算单位(1000/1024 × 1000/1024)


以上所述就是小编给大家介绍的《Spark统一内存管理》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Writing Apache Modules with Perl and C

Writing Apache Modules with Perl and C

Lincoln Stein、Doug MacEachern / O'Reilly Media, Inc. / 1999-03 / USD 39.95

Apache is the most popular Web server on the Internet because it is free, reliable, and extensible. The availability of the source code and the modular design of Apache makes it possible to extend Web......一起来看看 《Writing Apache Modules with Perl and C》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具