Spark持久化以及checkpoint原理分析

栏目: 服务器 · 发布时间: 6年前

内容简介:在Spark 的持久化使用中,我们会将一些经常使用到的数据进行持久化,比如使用cache()或者persist()方法进行持久化操作,但是当某个节点或者executor挂掉之后,持久化的数据会丢失,因为我们的数据是保存在内存当中的,这时就会重新计算RDD,如果某个之前的RDD需要大量的计算时间,这时将会浪费很多时间,因此,我们有时候需要使用checkpoint操作来将一些数据持久化可容错文件系统中,比如HDFS文件系统中,虽然这种方式可能对性能带来了一定的影响(磁盘IO),但是为了避免大量的重复计算操作,

在Spark 的持久化使用中,我们会将一些经常使用到的数据进行持久化,比如使用cache()或者persist()方法进行持久化操作,但是当某个节点或者executor挂掉之后,持久化的数据会丢失,因为我们的数据是保存在内存当中的,这时就会重新计算RDD,如果某个之前的RDD需要大量的计算时间,这时将会浪费很多时间,因此,我们有时候需要使用checkpoint操作来将一些数据持久化可容错文件系统中,比如HDFS文件系统中,虽然这种方式可能对性能带来了一定的影响(磁盘IO),但是为了避免大量的重复计算操作,有时也可以使用性能代价来换取时间效率上的提升。

当我们对某个RDD进行了缓存操作之后,首先会去CaacheManager中去找,然后紧接着去BlockManager中去获取内存或者磁盘中缓存的数据,如果没有进行缓存或者缓存丢失,那么就会去checkpoint的容错文件系统中查找数据,如果最终没有找到,那就会按照RDD lineage重新计算。

checkpoint原理

1.在代码中,当使用SparkContext可以设置一个checkpointFile文件目录,比如HDFS文件目录。

2.在代码中对需要checkpoint的RDD调用checkpoint方法。

3.RDDCheckpointData(spark内部的API),接管你的RDD,会标记为marked for checkpointing,准备进行checkpoint。

4.你的job运行完之后,会调用一个finalRDD.doCheckpoint()方法,会顺着rdd lineage,回溯扫描,发现有标记为待checkpoint的rdd,就会进行二次标记,标记为checkpointing in progress,正在接受checkpoint操作。

5.job执行完之后,就会启动一个内部的新job,去将标记为checkpointing in progress的rdd的数据,都写入hdfs文件中。(如果rdd之前cache过,会直接从缓存中获取数据,写入hdfs中;如果没有cache过,那么就会重新计算一遍这个rdd,再checkpoint)

6.将checkpoint过的rdd之前的依赖rdd,改成一个CheckpointRDD*,强制改变你的rdd的lineage。后面如果rdd的cache数据获取失败,直接会通过它的上游CheckpointRDD,去容错的文件系统,比如hdfs,中,获取checkpoint的数据。

RDDCheckpointData源码如下:

/**

* Enumeration to manage state transitions of an RDD through checkpointing

* [ Initialized --> marked for checkpointing --> checkpointing in progress --> checkpointed ]

*/

private[spark] object CheckpointState extends Enumeration {

type CheckpointState = Value

val Initialized, MarkedForCheckpoint, CheckpointingInProgress, Checkpointed = Value

}

/**

* This class contains all the information related to RDD checkpointing. Each instance of this

* class is associated with a RDD. It manages process of checkpointing of the associated RDD,

* as well as, manages the post-checkpoint state by providing the updated partitions,

* iterator and preferred locations of the checkpointed RDD.

*/

private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])

extends Logging with Serializable {

import CheckpointState._

// The checkpoint state of the associated RDD.

var cpState = Initialized

// The file to which the associated RDD has been checkpointed to

@transient var cpFile: Option[String] = None

// The CheckpointRDD created from the checkpoint file, that is, the new parent the associated RDD.

var cpRDD: Option[RDD[T]] = None

// Mark the RDD for checkpointing

def markForCheckpoint() {

RDDCheckpointData.synchronized {

if (cpState == Initialized) cpState = MarkedForCheckpoint

}

}

// Is the RDD already checkpointed

def isCheckpointed: Boolean = {

RDDCheckpointData.synchronized { cpState == Checkpointed }

}

// Get the file to which this RDD was checkpointed to as an Option

def getCheckpointFile: Option[String] = {

RDDCheckpointData.synchronized { cpFile }

}

// Do the checkpointing of the RDD. Called after the first job using that RDD is over.

def doCheckpoint() {

// If it is marked for checkpointing AND checkpointing is not already in progress,

// then set it to be in progress, else return

RDDCheckpointData.synchronized {

if (cpState == MarkedForCheckpoint) {

cpState = CheckpointingInProgress

} else {

return

}

}

// Create the output path for the checkpoint

val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)

//获取checkpoint文件路径

val fs = path.getFileSystem(rdd.context.HadoopConfiguration)

if (!fs.mkdirs(path)) {

throw new SparkException("Failed to create checkpoint path " + path)

}

// Save to file, and reload it as an RDD

val broadcastedConf = rdd.context.broadcast(

new SerializableWritable(rdd.context.hadoopConfiguration))

//checkpoint数据到文件系统中

rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, broadcastedConf) _)

val newRDD = new CheckpointRDD[T](rdd.context, path.toString)

if (newRDD.partitions.size != rdd.partitions.size) {

throw new SparkException(

"Checkpoint RDD " + newRDD + "(" + newRDD.partitions.size + ") has different " +

"number of partitions than original RDD " + rdd + "(" + rdd.partitions.size + ")")

}

// Change the dependencies and partitions of the RDD

RDDCheckpointData.synchronized {

cpFile = Some(path.toString)

cpRDD = Some(newRDD)

rdd.markCheckpointed(newRDD)  // Update the RDD's dependencies and partitions

cpState = Checkpointed

}

logInfo("Done checkpointing RDD " + rdd.id + " to " + path + ", new parent is RDD " + newRDD.id)

}

// Get preferred location of a split after checkpointing

def getPreferredLocations(split: Partition): Seq[String] = {

RDDCheckpointData.synchronized {

cpRDD.get.preferredLocations(split)

}

}

def getPartitions: Array[Partition] = {

RDDCheckpointData.synchronized {

cpRDD.get.partitions

}

}

def checkpointRDD: Option[RDD[T]] = {

RDDCheckpointData.synchronized {

cpRDD

}

}

}

在CheckPointRDD中写文件的操作如下:

def writeToFile[T: ClassTag](

path: String,

broadcastedConf: Broadcast[SerializableWritable[Configuration]],

blockSize: Int = -1

)(ctx: TaskContext, iterator: Iterator[T]) {

val env = SparkEnv.get

val outputDir = new Path(path)

val fs = outputDir.getFileSystem(broadcastedConf.value.value)

val finalOutputName = splitIdToFile(ctx.partitionId)

val finalOutputPath = new Path(outputDir, finalOutputName)

val tempOutputPath =

new Path(outputDir, "." + finalOutputName + "-attempt-" + ctx.attemptNumber)

if (fs.exists(tempOutputPath)) {

throw new IOException("Checkpoint failed: temporary path " +

tempOutputPath + " already exists")

}

val bufferSize = env.conf.getInt("spark.buffer.size", 65536)

val fileOutputStream = if (blockSize < 0) {

fs.create(tempOutputPath, false, bufferSize)

} else {

// This is mainly for testing purpose

fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication, blockSize)

}

val serializer = env.serializer.newInstance()

val serializeStream = serializer.serializeStream(fileOutputStream)

serializeStream.writeAll(iterator)

serializeStream.close()

if (!fs.rename(tempOutputPath, finalOutputPath)) {

if (!fs.exists(finalOutputPath)) {

logInfo("Deleting tempOutputPath " + tempOutputPath)

fs.delete(tempOutputPath, false)

throw new IOException("Checkpoint failed: failed to save output of task: "

+ ctx.attemptNumber + " and final output path does not exist")

} else {

// Some other copy of this task must've finished before us and renamed it

logInfo("Final output path " + finalOutputPath + " already exists; not overwriting it")

fs.delete(tempOutputPath, false)

}

}

}

在RDD类的源码中,两个方法如下所示:

/**

* Performs the checkpointing of this RDD by saving this. It is called after a job using this RDD

* has completed (therefore the RDD has been materialized and potentially stored in memory).

* doCheckpoint() is called recursively on the parent RDDs.

*/

private[spark] def doCheckpoint() {

if (!doCheckpointCalled) {

doCheckpointCalled = true

if (checkpointData.isDefined) {

checkpointData.get.doCheckpoint()

} else {

dependencies.foreach(_.rdd.doCheckpoint())

}

}

}

/**

* Changes the dependencies of this RDD from its original parents to a new RDD (`newRDD`)

* created from the checkpoint file, and forget its old dependencies and partitions.

*/

private[spark] def markCheckpointed(checkpointRDD: RDD[_]) {

clearDependencies()

partitions_ = null

deps = null    // Forget the constructor argument for dependencies too

}

在我们的应用程序中,在使用checkpoint的时候只需要进行两步简单的操作即可,使用SparkContext设置一个checkPoint文件目录,在需要checkpoint的RDD中调用doCheckpoint方法即可。

Linux公社的RSS地址: https://www.linuxidc.com/rssFeed.aspx

本文永久更新链接地址: https://www.linuxidc.com/Linux/2018-09/154132.htm


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

数据结构算法与应用

数据结构算法与应用

塞尼 / 机械工业出版社 / 1999-3 / 49.00元

数据结构、算法与应用—C++语言描述(英文版),ISBN:9787111070177,作者:(美)塞尼 著一起来看看 《数据结构算法与应用》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具