内容简介:本文主要研究一下flink的BlobWriterflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.javaflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
序
本文主要研究一下flink的BlobWriter
BlobWriter
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java
/**
* BlobWriter is used to upload data to the BLOB store.
*/
public interface BlobWriter {
Logger LOG = LoggerFactory.getLogger(BlobWriter.class);
/**
* Uploads the data of the given byte array for the given job to the BLOB server and makes it
* a permanent BLOB.
*
* @param jobId
* the ID of the job the BLOB belongs to
* @param value
* the buffer to upload
*
* @return the computed BLOB key identifying the BLOB on the server
*
* @throws IOException
* thrown if an I/O error occurs while writing it to a local file, or uploading it to the HA
* store
*/
PermanentBlobKey putPermanent(JobID jobId, byte[] value) throws IOException;
/**
* Uploads the data from the given input stream for the given job to the BLOB server and makes it
* a permanent BLOB.
*
* @param jobId
* ID of the job this blob belongs to
* @param inputStream
* the input stream to read the data from
*
* @return the computed BLOB key identifying the BLOB on the server
*
* @throws IOException
* thrown if an I/O error occurs while reading the data from the input stream, writing it to a
* local file, or uploading it to the HA store
*/
PermanentBlobKey putPermanent(JobID jobId, InputStream inputStream) throws IOException;
/**
* Returns the min size before data will be offloaded to the BLOB store.
*
* @return minimum offloading size
*/
int getMinOffloadingSize();
/**
* Serializes the given value and offloads it to the BlobServer if its size exceeds the minimum
* offloading size of the BlobServer.
*
* @param value to serialize
* @param jobId to which the value belongs.
* @param blobWriter to use to offload the serialized value
* @param <T> type of the value to serialize
* @return Either the serialized value or the stored blob key
* @throws IOException if the data cannot be serialized
*/
static <T> Either<SerializedValue<T>, PermanentBlobKey> serializeAndTryOffload(
T value,
JobID jobId,
BlobWriter blobWriter) throws IOException {
Preconditions.checkNotNull(value);
Preconditions.checkNotNull(jobId);
Preconditions.checkNotNull(blobWriter);
final SerializedValue<T> serializedValue = new SerializedValue<>(value);
if (serializedValue.getByteArray().length < blobWriter.getMinOffloadingSize()) {
return Either.Left(new SerializedValue<>(value));
} else {
try {
final PermanentBlobKey permanentBlobKey = blobWriter.putPermanent(jobId, serializedValue.getByteArray());
return Either.Right(permanentBlobKey);
} catch (IOException e) {
LOG.warn("Failed to offload value {} for job {} to BLOB store.", value, jobId, e);
return Either.Left(serializedValue);
}
}
}
}
- BlobWriter定义了putPermanent、getMinOffloadingSize方法,同时还提供了serializeAndTryOffload静态方法用于序列化指定value并在其大小超过minimum offloading size时调用blobWriter.putPermanent存放到BlobServer
BlobServer
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
/**
* This class implements the BLOB server. The BLOB server is responsible for listening for incoming requests and
* spawning threads to handle these requests. Furthermore, it takes care of creating the directory structure to store
* the BLOBs or temporarily cache them.
*/
public class BlobServer extends Thread implements BlobService, BlobWriter, PermanentBlobService, TransientBlobService {
//......
@Override
public PermanentBlobKey putPermanent(JobID jobId, byte[] value) throws IOException {
checkNotNull(jobId);
return (PermanentBlobKey) putBuffer(jobId, value, PERMANENT_BLOB);
}
@Override
public PermanentBlobKey putPermanent(JobID jobId, InputStream inputStream) throws IOException {
checkNotNull(jobId);
return (PermanentBlobKey) putInputStream(jobId, inputStream, PERMANENT_BLOB);
}
/**
* Returns the configuration used by the BLOB server.
*
* @return configuration
*/
@Override
public final int getMinOffloadingSize() {
return blobServiceConfiguration.getInteger(BlobServerOptions.OFFLOAD_MINSIZE);
}
/**
* Uploads the data of the given byte array for the given job to the BLOB server.
*
* @param jobId
* the ID of the job the BLOB belongs to
* @param value
* the buffer to upload
* @param blobType
* whether to make the data permanent or transient
*
* @return the computed BLOB key identifying the BLOB on the server
*
* @throws IOException
* thrown if an I/O error occurs while writing it to a local file, or uploading it to the HA
* store
*/
private BlobKey putBuffer(@Nullable JobID jobId, byte[] value, BlobKey.BlobType blobType)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Received PUT call for BLOB of job {}.", jobId);
}
File incomingFile = createTemporaryFilename();
MessageDigest md = BlobUtils.createMessageDigest();
BlobKey blobKey = null;
try (FileOutputStream fos = new FileOutputStream(incomingFile)) {
md.update(value);
fos.write(value);
} catch (IOException ioe) {
// delete incomingFile from a failed download
if (!incomingFile.delete() && incomingFile.exists()) {
LOG.warn("Could not delete the staging file {} for job {}.",
incomingFile, jobId);
}
throw ioe;
}
try {
// persist file
blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), blobType);
return blobKey;
} finally {
// delete incomingFile from a failed download
if (!incomingFile.delete() && incomingFile.exists()) {
LOG.warn("Could not delete the staging file {} for blob key {} and job {}.",
incomingFile, blobKey, jobId);
}
}
}
/**
* Uploads the data from the given input stream for the given job to the BLOB server.
*
* @param jobId
* the ID of the job the BLOB belongs to
* @param inputStream
* the input stream to read the data from
* @param blobType
* whether to make the data permanent or transient
*
* @return the computed BLOB key identifying the BLOB on the server
*
* @throws IOException
* thrown if an I/O error occurs while reading the data from the input stream, writing it to a
* local file, or uploading it to the HA store
*/
private BlobKey putInputStream(
@Nullable JobID jobId, InputStream inputStream, BlobKey.BlobType blobType)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Received PUT call for BLOB of job {}.", jobId);
}
File incomingFile = createTemporaryFilename();
MessageDigest md = BlobUtils.createMessageDigest();
BlobKey blobKey = null;
try (FileOutputStream fos = new FileOutputStream(incomingFile)) {
// read stream
byte[] buf = new byte[BUFFER_SIZE];
while (true) {
final int bytesRead = inputStream.read(buf);
if (bytesRead == -1) {
// done
break;
}
fos.write(buf, 0, bytesRead);
md.update(buf, 0, bytesRead);
}
// persist file
blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), blobType);
return blobKey;
} finally {
// delete incomingFile from a failed download
if (!incomingFile.delete() && incomingFile.exists()) {
LOG.warn("Could not delete the staging file {} for blob key {} and job {}.",
incomingFile, blobKey, jobId);
}
}
}
/**
* Moves the temporary <tt>incomingFile</tt> to its permanent location where it is available for
* use.
*
* @param incomingFile
* temporary file created during transfer
* @param jobId
* ID of the job this blob belongs to or <tt>null</tt> if job-unrelated
* @param digest
* BLOB content digest, i.e. hash
* @param blobType
* whether this file is a permanent or transient BLOB
*
* @return unique BLOB key that identifies the BLOB on the server
*
* @throws IOException
* thrown if an I/O error occurs while moving the file or uploading it to the HA store
*/
BlobKey moveTempFileToStore(
File incomingFile, @Nullable JobID jobId, byte[] digest, BlobKey.BlobType blobType)
throws IOException {
int retries = 10;
int attempt = 0;
while (true) {
// add unique component independent of the BLOB content
BlobKey blobKey = BlobKey.createKey(blobType, digest);
File storageFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey);
// try again until the key is unique (put the existence check into the lock!)
readWriteLock.writeLock().lock();
try {
if (!storageFile.exists()) {
BlobUtils.moveTempFileToStore(
incomingFile, jobId, blobKey, storageFile, LOG,
blobKey instanceof PermanentBlobKey ? blobStore : null);
// add TTL for transient BLOBs:
if (blobKey instanceof TransientBlobKey) {
// must be inside read or write lock to add a TTL
blobExpiryTimes
.put(Tuple2.of(jobId, (TransientBlobKey) blobKey),
System.currentTimeMillis() + cleanupInterval);
}
return blobKey;
}
} finally {
readWriteLock.writeLock().unlock();
}
++attempt;
if (attempt >= retries) {
String message = "Failed to find a unique key for BLOB of job " + jobId + " (last tried " + storageFile.getAbsolutePath() + ".";
LOG.error(message + " No retries left.");
throw new IOException(message);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to find a unique key for BLOB of job {} (retry {}, last tried {})",
jobId, attempt, storageFile.getAbsolutePath());
}
}
}
}
/**
* Returns a temporary file inside the BLOB server's incoming directory.
*
* @return a temporary file inside the BLOB server's incoming directory
*
* @throws IOException
* if creating the directory fails
*/
File createTemporaryFilename() throws IOException {
return new File(BlobUtils.getIncomingDirectory(storageDir),
String.format("temp-%08d", tempFileCounter.getAndIncrement()));
}
//......
}
- BlobServer实现了BlobWriter接口,putPermanent方法分别用到了putBuffer及putInputStream方法,而getMinOffloadingSize方法则从blobServiceConfiguration获取BlobServerOptions.OFFLOAD_MINSIZE配置,默认是1M
- putBuffer方法接收byte[]参数,它先把byte[]写入到临时文件,之后调用moveTempFileToStore方法进行持久化;putInputStream方法接收InputStream参数,它也是先把InputStream写入到临时文件,然后调用moveTempFileToStore方法进行持久化
- moveTempFileToStore方法调用了BlobUtils.moveTempFileToStore将本地临时文件转移到permanent location;其中storageDir由BlobUtils.initLocalStorageDirectory(config)来初始化,而storageFile通过BlobUtils.getStorageLocation(storageDir, jobId, blobKey)来获取
BlobUtils
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
/**
* Utility class to work with blob data.
*/
public class BlobUtils {
//......
/**
* Creates a local storage directory for a blob service under the configuration parameter given
* by {@link BlobServerOptions#STORAGE_DIRECTORY}. If this is <tt>null</tt> or empty, we will
* fall back to Flink's temp directories (given by
* {@link org.apache.flink.configuration.CoreOptions#TMP_DIRS}) and choose one among them at
* random.
*
* @param config
* Flink configuration
*
* @return a new local storage directory
*
* @throws IOException
* thrown if the local file storage cannot be created or is not usable
*/
static File initLocalStorageDirectory(Configuration config) throws IOException {
String basePath = config.getString(BlobServerOptions.STORAGE_DIRECTORY);
File baseDir;
if (StringUtils.isNullOrWhitespaceOnly(basePath)) {
final String[] tmpDirPaths = ConfigurationUtils.parseTempDirectories(config);
baseDir = new File(tmpDirPaths[RANDOM.nextInt(tmpDirPaths.length)]);
}
else {
baseDir = new File(basePath);
}
File storageDir;
// NOTE: although we will be using UUIDs, there may be collisions
int maxAttempts = 10;
for (int attempt = 0; attempt < maxAttempts; attempt++) {
storageDir = new File(baseDir, String.format(
"blobStore-%s", UUID.randomUUID().toString()));
// Create the storage dir if it doesn't exist. Only return it when the operation was
// successful.
if (storageDir.mkdirs()) {
return storageDir;
}
}
// max attempts exceeded to find a storage directory
throw new IOException("Could not create storage directory for BLOB store in '" + baseDir + "'.");
}
/**
* Returns the (designated) physical storage location of the BLOB with the given key.
*
* @param storageDir
* storage directory used be the BLOB service
* @param key
* the key identifying the BLOB
* @param jobId
* ID of the job for the incoming files (or <tt>null</tt> if job-unrelated)
*
* @return the (designated) physical storage location of the BLOB
*
* @throws IOException
* if creating the directory fails
*/
static File getStorageLocation(
File storageDir, @Nullable JobID jobId, BlobKey key) throws IOException {
File file = new File(getStorageLocationPath(storageDir.getAbsolutePath(), jobId, key));
Files.createDirectories(file.getParentFile().toPath());
return file;
}
/**
* Returns the path for the given blob key.
*
* <p>The returned path can be used with the (local or HA) BLOB store file system back-end for
* recovery purposes and follows the same scheme as {@link #getStorageLocation(File, JobID,
* BlobKey)}.
*
* @param storageDir
* storage directory used be the BLOB service
* @param key
* the key identifying the BLOB
* @param jobId
* ID of the job for the incoming files
*
* @return the path to the given BLOB
*/
static String getStorageLocationPath(
String storageDir, @Nullable JobID jobId, BlobKey key) {
if (jobId == null) {
// format: $base/no_job/blob_$key
return String.format("%s/%s/%s%s",
storageDir, NO_JOB_DIR_PREFIX, BLOB_FILE_PREFIX, key.toString());
} else {
// format: $base/job_$jobId/blob_$key
return String.format("%s/%s%s/%s%s",
storageDir, JOB_DIR_PREFIX, jobId.toString(), BLOB_FILE_PREFIX, key.toString());
}
}
/**
* Moves the temporary <tt>incomingFile</tt> to its permanent location where it is available for
* use (not thread-safe!).
*
* @param incomingFile
* temporary file created during transfer
* @param jobId
* ID of the job this blob belongs to or <tt>null</tt> if job-unrelated
* @param blobKey
* BLOB key identifying the file
* @param storageFile
* (local) file where the blob is/should be stored
* @param log
* logger for debug information
* @param blobStore
* HA store (or <tt>null</tt> if unavailable)
*
* @throws IOException
* thrown if an I/O error occurs while moving the file or uploading it to the HA store
*/
static void moveTempFileToStore(
File incomingFile, @Nullable JobID jobId, BlobKey blobKey, File storageFile,
Logger log, @Nullable BlobStore blobStore) throws IOException {
try {
// first check whether the file already exists
if (!storageFile.exists()) {
try {
// only move the file if it does not yet exist
Files.move(incomingFile.toPath(), storageFile.toPath());
incomingFile = null;
} catch (FileAlreadyExistsException ignored) {
log.warn("Detected concurrent file modifications. This should only happen if multiple" +
"BlobServer use the same storage directory.");
// we cannot be sure at this point whether the file has already been uploaded to the blob
// store or not. Even if the blobStore might shortly be in an inconsistent state, we have
// to persist the blob. Otherwise we might not be able to recover the job.
}
if (blobStore != null) {
// only the one moving the incoming file to its final destination is allowed to upload the
// file to the blob store
blobStore.put(storageFile, jobId, blobKey);
}
} else {
log.warn("File upload for an existing file with key {} for job {}. This may indicate a duplicate upload or a hash collision. Ignoring newest upload.", blobKey, jobId);
}
storageFile = null;
} finally {
// we failed to either create the local storage file or to upload it --> try to delete the local file
// while still having the write lock
if (storageFile != null && !storageFile.delete() && storageFile.exists()) {
log.warn("Could not delete the storage file {}.", storageFile);
}
if (incomingFile != null && !incomingFile.delete() && incomingFile.exists()) {
log.warn("Could not delete the staging file {} for blob key {} and job {}.", incomingFile, blobKey, jobId);
}
}
}
//......
}
-
initLocalStorageDirectory方法从配置文件读取BlobServerOptions.STORAGE_DIRECTORY配置(
blob.storage.directory),如果没有配置,则通过ConfigurationUtils.parseTempDirectories来获取tmpDirPaths,然后随机选一个作为baseDir,而storageDir目录则是baseDir的子目录,其目录名前缀为blobStore -
getStorageLocation方法则在storageDir的基础上根据JobID及BlobKey构造具体的存储路径,其格式为
$base/no_job/blob_$key或者$base/job_$jobId/blob_$key - moveTempFileToStore方法则在目标文件不存在的场景下使用Files.move将incomingFile转移到storageFile,如果blobStore不为null,还会将storageFile放入到BlobStore
小结
- BlobWriter定义了putPermanent、getMinOffloadingSize方法,同时还提供了serializeAndTryOffload静态方法用于序列化指定value并在其大小超过minimum offloading size时调用blobWriter.putPermanent存放到BlobServer
- BlobServer实现了BlobWriter接口,putPermanent方法分别用到了putBuffer及putInputStream方法,而getMinOffloadingSize方法则从blobServiceConfiguration获取BlobServerOptions.OFFLOAD_MINSIZE配置,默认是1M;putBuffer方法接收byte[]参数,它先把byte[]写入到临时文件,之后调用moveTempFileToStore方法进行持久化;putInputStream方法接收InputStream参数,它也是先把InputStream写入到临时文件,然后调用moveTempFileToStore方法进行持久化;moveTempFileToStore方法调用了BlobUtils.moveTempFileToStore将本地临时文件转移到permanent location;其中storageDir由BlobUtils.initLocalStorageDirectory(config)来初始化,而storageFile通过BlobUtils.getStorageLocation(storageDir, jobId, blobKey)来获取
-
BlobUtils的initLocalStorageDirectory方法从配置文件读取BlobServerOptions.STORAGE_DIRECTORY配置(
blob.storage.directory),如果没有配置,则通过ConfigurationUtils.parseTempDirectories来获取tmpDirPaths,然后随机选一个作为baseDir,而storageDir目录则是baseDir的子目录,其目录名前缀为blobStore;getStorageLocation方法则在storageDir的基础上根据JobID及BlobKey构造具体的存储路径,其格式为$base/no_job/blob_$key或者$base/job_$jobId/blob_$key;moveTempFileToStore方法则在目标文件不存在的场景下使用Files.move将incomingFile转移到storageFile,如果blobStore不为null,还会将storageFile放入到BlobStore
doc
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Numerical Recipes 3rd Edition
William H. Press、Saul A. Teukolsky、William T. Vetterling、Brian P. Flannery / Cambridge University Press / 2007-9-6 / GBP 64.99
Do you want easy access to the latest methods in scientific computing? This greatly expanded third edition of Numerical Recipes has it, with wider coverage than ever before, many new, expanded and upd......一起来看看 《Numerical Recipes 3rd Edition》 这本书的介绍吧!