聊聊flink的FsCheckpointStorage

栏目: Java · 发布时间: 5年前

内容简介:flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStorage.javaflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.javaflink-runtime_2.11-1.7.0-sources.jar!/org/apache/fli

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/CheckpointStorage.java

/**
 * CheckpointStorage implements the durable storage of checkpoint data and metadata streams.
 * An individual checkpoint or savepoint is stored to a {@link CheckpointStorageLocation},
 * created by this class.
 */
public interface CheckpointStorage {


	boolean supportsHighlyAvailableStorage();

	boolean hasDefaultSavepointLocation();

	CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException;

	CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException;

	CheckpointStorageLocation initializeLocationForSavepoint(
			long checkpointId,
			@Nullable String externalLocationPointer) throws IOException;

	CheckpointStreamFactory resolveCheckpointStorageLocation(
			long checkpointId,
			CheckpointStorageLocationReference reference) throws IOException;

	CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException;
}
复制代码
  • CheckpointStorage接口主要定义了持久化checkpoint data及metadata streams的基本方法;supportsHighlyAvailableStorage方法返回该backend是否支持highly available storage;hasDefaultSavepointLocation方法是否有默认的savepoint location;resolveCheckpoint方法用于解析checkpoint location返回CompletedCheckpointStorageLocation;initializeLocationForCheckpoint方法根据checkpointId来初始化storage location;initializeLocationForSavepoint方法用于根据checkpointId来初始化savepoint的storage location;resolveCheckpointStorageLocation方法解析CheckpointStorageLocationReference返回CheckpointStreamFactory;createTaskOwnedStateStream方法用于打开一个stream来持久化checkpoint state

AbstractFsCheckpointStorage

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java

/**
 * An implementation of durable checkpoint storage to file systems.
 */
public abstract class AbstractFsCheckpointStorage implements CheckpointStorage {

	// ------------------------------------------------------------------------
	//  Constants
	// ------------------------------------------------------------------------

	/** The prefix of the directory containing the data exclusive to a checkpoint. */
	public static final String CHECKPOINT_DIR_PREFIX = "chk-";

	/** The name of the directory for shared checkpoint state. */
	public static final String CHECKPOINT_SHARED_STATE_DIR = "shared";

	/** The name of the directory for state not owned/released by the master, but by the TaskManagers. */
	public static final String CHECKPOINT_TASK_OWNED_STATE_DIR = "taskowned";

	/** The name of the metadata files in checkpoints / savepoints. */
	public static final String METADATA_FILE_NAME = "_metadata";

	/** The magic number that is put in front of any reference. */
	private static final byte[] REFERENCE_MAGIC_NUMBER = new byte[] { 0x05, 0x5F, 0x3F, 0x18 };

	// ------------------------------------------------------------------------
	//  Fields and properties
	// ------------------------------------------------------------------------

	/** The jobId, written into the generated savepoint directories. */
	private final JobID jobId;

	/** The default location for savepoints. Null, if none is configured. */
	@Nullable
	private final Path defaultSavepointDirectory;

	@Override
	public boolean hasDefaultSavepointLocation() {
		return defaultSavepointDirectory != null;
	}

	@Override
	public CompletedCheckpointStorageLocation resolveCheckpoint(String checkpointPointer) throws IOException {
		return resolveCheckpointPointer(checkpointPointer);
	}

	/**
	 * Creates a file system based storage location for a savepoint.
	 *
	 * <p>This methods implements the logic that decides which location to use (given optional
	 * parameters for a configured location and a location passed for this specific savepoint)
	 * and how to name and initialize the savepoint directory.
	 *
	 * @param externalLocationPointer    The target location pointer for the savepoint.
	 *                                   Must be a valid URI. Null, if not supplied.
	 * @param checkpointId               The checkpoint ID of the savepoint.
	 *
	 * @return The checkpoint storage location for the savepoint.
	 *
	 * @throws IOException Thrown if the target directory could not be created.
	 */
	@Override
	public CheckpointStorageLocation initializeLocationForSavepoint(
			@SuppressWarnings("unused") long checkpointId,
			@Nullable String externalLocationPointer) throws IOException {

		// determine where to write the savepoint to

		final Path savepointBasePath;
		if (externalLocationPointer != null) {
			savepointBasePath = new Path(externalLocationPointer);
		}
		else if (defaultSavepointDirectory != null) {
			savepointBasePath = defaultSavepointDirectory;
		}
		else {
			throw new IllegalArgumentException("No savepoint location given and no default location configured.");
		}

		// generate the savepoint directory

		final FileSystem fs = savepointBasePath.getFileSystem();
		final String prefix = "savepoint-" + jobId.toString().substring(0, 6) + '-';

		Exception latestException = null;
		for (int attempt = 0; attempt < 10; attempt++) {
			final Path path = new Path(savepointBasePath, FileUtils.getRandomFilename(prefix));

			try {
				if (fs.mkdirs(path)) {
					// we make the path qualified, to make it independent of default schemes and authorities
					final Path qp = path.makeQualified(fs);

					return createSavepointLocation(fs, qp);
				}
			} catch (Exception e) {
				latestException = e;
			}
		}

		throw new IOException("Failed to create savepoint directory at " + savepointBasePath, latestException);
	}

	protected abstract CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException;

	//......
}
复制代码
  • AbstractFsCheckpointStorage主要是实现了CheckpointStorage接口的hasDefaultSavepointLocation、resolveCheckpoint、initializeLocationForSavepoint方法
  • resolveCheckpoint方法主要做两件事情,一个是解析checkpoint/savepoint path,一个是解析checkpoint/savepoint的metadata path,获取他们的FileStatus,然后创建FsCompletedCheckpointStorageLocation
  • initializeLocationForSavepoint方法主要是给savepoint创建一个CheckpointStorageLocation,它可以根据externalLocationPointer来创建,该值为null的话则使用defaultSavepointDirectory,该方法里头调用了createSavepointLocation抽象方法,由子类去实现

FsCheckpointStorage

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java

/**
 * An implementation of durable checkpoint storage to file systems.
 */
public class FsCheckpointStorage extends AbstractFsCheckpointStorage {

	private final FileSystem fileSystem;

	private final Path checkpointsDirectory;

	private final Path sharedStateDirectory;

	private final Path taskOwnedStateDirectory;

	private final int fileSizeThreshold;

	public FsCheckpointStorage(
			Path checkpointBaseDirectory,
			@Nullable Path defaultSavepointDirectory,
			JobID jobId,
			int fileSizeThreshold) throws IOException {

		this(checkpointBaseDirectory.getFileSystem(),
				checkpointBaseDirectory,
				defaultSavepointDirectory,
				jobId,
				fileSizeThreshold);
	}

	public FsCheckpointStorage(
			FileSystem fs,
			Path checkpointBaseDirectory,
			@Nullable Path defaultSavepointDirectory,
			JobID jobId,
			int fileSizeThreshold) throws IOException {

		super(jobId, defaultSavepointDirectory);

		checkArgument(fileSizeThreshold >= 0);

		this.fileSystem = checkNotNull(fs);
		this.checkpointsDirectory = getCheckpointDirectoryForJob(checkpointBaseDirectory, jobId);
		this.sharedStateDirectory = new Path(checkpointsDirectory, CHECKPOINT_SHARED_STATE_DIR);
		this.taskOwnedStateDirectory = new Path(checkpointsDirectory, CHECKPOINT_TASK_OWNED_STATE_DIR);
		this.fileSizeThreshold = fileSizeThreshold;

		// initialize the dedicated directories
		fileSystem.mkdirs(checkpointsDirectory);
		fileSystem.mkdirs(sharedStateDirectory);
		fileSystem.mkdirs(taskOwnedStateDirectory);
	}

	// ------------------------------------------------------------------------

	public Path getCheckpointsDirectory() {
		return checkpointsDirectory;
	}

	// ------------------------------------------------------------------------
	//  CheckpointStorage implementation
	// ------------------------------------------------------------------------

	@Override
	public boolean supportsHighlyAvailableStorage() {
		return true;
	}

	@Override
	public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
		checkArgument(checkpointId >= 0);

		// prepare all the paths needed for the checkpoints
		final Path checkpointDir = createCheckpointDirectory(checkpointsDirectory, checkpointId);

		// create the checkpoint exclusive directory
		fileSystem.mkdirs(checkpointDir);

		return new FsCheckpointStorageLocation(
				fileSystem,
				checkpointDir,
				sharedStateDirectory,
				taskOwnedStateDirectory,
				CheckpointStorageLocationReference.getDefault(),
				fileSizeThreshold);
	}

	@Override
	public CheckpointStreamFactory resolveCheckpointStorageLocation(
			long checkpointId,
			CheckpointStorageLocationReference reference) throws IOException {

		if (reference.isDefaultReference()) {
			// default reference, construct the default location for that particular checkpoint
			final Path checkpointDir = createCheckpointDirectory(checkpointsDirectory, checkpointId);

			return new FsCheckpointStorageLocation(
					fileSystem,
					checkpointDir,
					sharedStateDirectory,
					taskOwnedStateDirectory,
					reference,
					fileSizeThreshold);
		}
		else {
			// location encoded in the reference
			final Path path = decodePathFromReference(reference);

			return new FsCheckpointStorageLocation(
					path.getFileSystem(),
					path,
					path,
					path,
					reference,
					fileSizeThreshold);
		}
	}

	@Override
	public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException {
		return new FsCheckpointStateOutputStream(
				taskOwnedStateDirectory,
				fileSystem,
				FsCheckpointStreamFactory.DEFAULT_WRITE_BUFFER_SIZE,
				fileSizeThreshold);
	}

	@Override
	protected CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException {
		final CheckpointStorageLocationReference reference = encodePathAsReference(location);
		return new FsCheckpointStorageLocation(fs, location, location, location, reference, fileSizeThreshold);
	}
}
复制代码
  • FsCheckpointStorage继承了AbstractFsCheckpointStorage,实现了它定义的createSavepointLocation方法,这里返回的是FsCheckpointStorageLocation
  • FsCheckpointStorage还实现了CheckpointStorage接口定义的AbstractFsCheckpointStorage未实现的几个方法:supportsHighlyAvailableStorage、initializeLocationForCheckpoint、resolveCheckpointStorageLocation、createTaskOwnedStateStream
  • supportsHighlyAvailableStorage这里直接返回true;initializeLocationForCheckpoint这里创建的是FsCheckpointStorageLocation;resolveCheckpointStorageLocation这里创建的是FsCheckpointStorageLocation;而createTaskOwnedStateStream创建的是FsCheckpointStateOutputStream

小结

  • CheckpointStorage接口主要定义了持久化checkpoint data及metadata streams的基本方法;AbstractFsCheckpointStorage主要是实现了CheckpointStorage接口的hasDefaultSavepointLocation、resolveCheckpoint、initializeLocationForSavepoint方法,同时定义了一个抽象方法createSavepointLocation
  • FsCheckpointStorage继承了AbstractFsCheckpointStorage,实现了它定义的createSavepointLocation方法,同时还实现了CheckpointStorage接口定义的AbstractFsCheckpointStorage未实现的几个方法:supportsHighlyAvailableStorage、initializeLocationForCheckpoint、resolveCheckpointStorageLocation、createTaskOwnedStateStream
  • FsCheckpointStorage的supportsHighlyAvailableStorage方法直接返回true;initializeLocationForCheckpoint方法创建的是FsCheckpointStorageLocation;resolveCheckpointStorageLocation方法创建的是FsCheckpointStorageLocation;而createTaskOwnedStateStream方法创建的是FsCheckpointStateOutputStream

doc


以上所述就是小编给大家介绍的《聊聊flink的FsCheckpointStorage》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

链接

链接

[美]艾伯特-拉斯洛•巴拉巴西 (Albert-László Barabási) / 沈华伟 / 浙江人民出版社 / 2013-8-1 / 59.90元

[内容简介] ★《链接》是《爆发》的作者,艾伯特-拉斯洛•巴拉巴西的成名之作,同时也是复杂网络的奠基之作,社交网络的入门之作。巴拉巴西之前,随机网络理论一直主导者我们的网络思维,是巴拉巴西第一个证明了,我们不是生活在随机世界里,真实网络是无尺度的。 ★巴拉巴西在书中追溯了网络的数学起源,分析了社会学家在此基础上得出的研究成果,最后提出自己的观点:我们周围的复杂网络,从鸡尾酒会、恐怖组织......一起来看看 《链接》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

在线进制转换器
在线进制转换器

各进制数互转换器

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器