内容简介:本文主要研究一下flink的BlobServiceflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.javaflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobService.java
序
本文主要研究一下flink的BlobService
BlobService
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
/** * A simple store and retrieve binary large objects (BLOBs). */ public interface BlobService extends Closeable { /** * Returns a BLOB service for accessing permanent BLOBs. * * @return BLOB service */ PermanentBlobService getPermanentBlobService(); /** * Returns a BLOB service for accessing transient BLOBs. * * @return BLOB service */ TransientBlobService getTransientBlobService(); /** * Returns the port of the BLOB server that this BLOB service is working with. * * @return the port the blob server. */ int getPort(); }
- BlobService定义了getPermanentBlobService方法用于获取PermanentBlobService;getTransientBlobService方法用于获取TransientBlobService
PermanentBlobService
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobService.java
/** * A service to retrieve permanent binary large objects (BLOBs). * * <p>These may include per-job BLOBs that are covered by high-availability (HA) mode, e.g. a job's * JAR files or (parts of) an off-loaded {@link org.apache.flink.runtime.deployment.TaskDeploymentDescriptor} * or files in the {@link org.apache.flink.api.common.cache.DistributedCache}. */ public interface PermanentBlobService extends Closeable { /** * Returns the path to a local copy of the file associated with the provided job ID and blob * key. * * @param jobId * ID of the job this blob belongs to * @param key * BLOB key associated with the requested file * * @return The path to the file. * * @throws java.io.FileNotFoundException * if the BLOB does not exist; * @throws IOException * if any other error occurs when retrieving the file */ File getFile(JobID jobId, PermanentBlobKey key) throws IOException; }
- PermanentBlobService提供了getFile方法,它根据JobID及PermanentBlobKey来获取File
TransientBlobService
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobService.java
/** * A service to retrieve transient binary large objects (BLOBs) which are deleted on the * {@link BlobServer} when they are retrieved. * * <p>These may include per-job BLOBs like files in the {@link * org.apache.flink.api.common.cache.DistributedCache}, for example. * * <p>Note: None of these BLOBs is highly available (HA). This case is covered by BLOBs in the * {@link PermanentBlobService}. * * <p>TODO: change API to not rely on local files but return {@link InputStream} objects */ public interface TransientBlobService extends Closeable { // -------------------------------------------------------------------------------------------- // GET // -------------------------------------------------------------------------------------------- /** * Returns the path to a local copy of the (job-unrelated) file associated with the provided * blob key. * * @param key * blob key associated with the requested file * * @return The path to the file. * * @throws java.io.FileNotFoundException * when the path does not exist; * @throws IOException * if any other error occurs when retrieving the file */ File getFile(TransientBlobKey key) throws IOException; /** * Returns the path to a local copy of the file associated with the provided job ID and blob * key. * * @param jobId * ID of the job this blob belongs to * @param key * blob key associated with the requested file * * @return The path to the file. * * @throws java.io.FileNotFoundException * when the path does not exist; * @throws IOException * if any other error occurs when retrieving the file */ File getFile(JobID jobId, TransientBlobKey key) throws IOException; // -------------------------------------------------------------------------------------------- // PUT // -------------------------------------------------------------------------------------------- /** * Uploads the (job-unrelated) data of the given byte array to the BLOB server. * * @param value * the buffer to upload * * @return the computed BLOB key identifying the BLOB on the server * * @throws IOException * thrown if an I/O error occurs while uploading the data to the BLOB server */ TransientBlobKey putTransient(byte[] value) throws IOException; /** * Uploads the data of the given byte array for the given job to the BLOB server. * * @param jobId * the ID of the job the BLOB belongs to * @param value * the buffer to upload * * @return the computed BLOB key identifying the BLOB on the server * * @throws IOException * thrown if an I/O error occurs while uploading the data to the BLOB server */ TransientBlobKey putTransient(JobID jobId, byte[] value) throws IOException; /** * Uploads the (job-unrelated) data from the given input stream to the BLOB server. * * @param inputStream * the input stream to read the data from * * @return the computed BLOB key identifying the BLOB on the server * * @throws IOException * thrown if an I/O error occurs while reading the data from the input stream or uploading the * data to the BLOB server */ TransientBlobKey putTransient(InputStream inputStream) throws IOException; /** * Uploads the data from the given input stream for the given job to the BLOB server. * * @param jobId * ID of the job this blob belongs to * @param inputStream * the input stream to read the data from * * @return the computed BLOB key identifying the BLOB on the server * * @throws IOException * thrown if an I/O error occurs while reading the data from the input stream or uploading the * data to the BLOB server */ TransientBlobKey putTransient(JobID jobId, InputStream inputStream) throws IOException; // -------------------------------------------------------------------------------------------- // DELETE // -------------------------------------------------------------------------------------------- /** * Deletes the (job-unrelated) file associated with the provided blob key from the local cache. * * @param key * associated with the file to be deleted * * @return <tt>true</tt> if the given blob is successfully deleted or non-existing; * <tt>false</tt> otherwise */ boolean deleteFromCache(TransientBlobKey key); /** * Deletes the file associated with the provided job ID and blob key from the local cache. * * @param jobId * ID of the job this blob belongs to * @param key * associated with the file to be deleted * * @return <tt>true</tt> if the given blob is successfully deleted or non-existing; * <tt>false</tt> otherwise */ boolean deleteFromCache(JobID jobId, TransientBlobKey key); }
- TransientBlobService用于获取transient binary large objects (BLOBs),这些blobs在获取时就会在BlobServer上删掉;它提供了getFile、putTransient、deleteFromCache方法
BlobKey
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
/** * A BLOB key uniquely identifies a BLOB. */ public abstract class BlobKey implements Serializable, Comparable<BlobKey> { private static final long serialVersionUID = 3847117712521785209L; /** Size of the internal BLOB key in bytes. */ public static final int SIZE = 20; /** The byte buffer storing the actual key data. */ private final byte[] key; /** * (Internal) BLOB type - to be reflected by the inheriting sub-class. */ private final BlobType type; /** * BLOB type, i.e. permanent or transient. */ enum BlobType { /** * Indicates a permanent BLOB whose lifecycle is that of a job and which is made highly * available. */ PERMANENT_BLOB, /** * Indicates a transient BLOB whose lifecycle is managed by the user and which is not made * highly available. */ TRANSIENT_BLOB } /** * Random component of the key. */ private final AbstractID random; /** * Constructs a new BLOB key. * * @param type * whether the referenced BLOB is permanent or transient */ protected BlobKey(BlobType type) { this.type = checkNotNull(type); this.key = new byte[SIZE]; this.random = new AbstractID(); } /** * Constructs a new BLOB key from the given byte array. * * @param type * whether the referenced BLOB is permanent or transient * @param key * the actual key data */ protected BlobKey(BlobType type, byte[] key) { if (key == null || key.length != SIZE) { throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes"); } this.type = checkNotNull(type); this.key = key; this.random = new AbstractID(); } /** * Constructs a new BLOB key from the given byte array. * * @param type * whether the referenced BLOB is permanent or transient * @param key * the actual key data * @param random * the random component of the key */ protected BlobKey(BlobType type, byte[] key, byte[] random) { if (key == null || key.length != SIZE) { throw new IllegalArgumentException("BLOB key must have a size of " + SIZE + " bytes"); } this.type = checkNotNull(type); this.key = key; this.random = new AbstractID(random); } /** * Returns the right {@link BlobKey} subclass for the given parameters. * * @param type * whether the referenced BLOB is permanent or transient * * @return BlobKey subclass */ @VisibleForTesting static BlobKey createKey(BlobType type) { if (type == PERMANENT_BLOB) { return new PermanentBlobKey(); } else { return new TransientBlobKey(); } } /** * Returns the right {@link BlobKey} subclass for the given parameters. * * @param type * whether the referenced BLOB is permanent or transient * @param key * the actual key data * * @return BlobKey subclass */ static BlobKey createKey(BlobType type, byte[] key) { if (type == PERMANENT_BLOB) { return new PermanentBlobKey(key); } else { return new TransientBlobKey(key); } } /** * Returns the right {@link BlobKey} subclass for the given parameters. * * @param type * whether the referenced BLOB is permanent or transient * @param key * the actual key data * @param random * the random component of the key * * @return BlobKey subclass */ static BlobKey createKey(BlobType type, byte[] key, byte[] random) { if (type == PERMANENT_BLOB) { return new PermanentBlobKey(key, random); } else { return new TransientBlobKey(key, random); } } /** * Returns the hash component of this key. * * @return a 20 bit hash of the contents the key refers to */ @VisibleForTesting public byte[] getHash() { return key; } /** * Returns the (internal) BLOB type which is reflected by the inheriting sub-class. * * @return BLOB type, i.e. permanent or transient */ BlobType getType() { return type; } /** * Adds the BLOB key to the given {@link MessageDigest}. * * @param md * the message digest to add the BLOB key to */ public void addToMessageDigest(MessageDigest md) { md.update(this.key); } @Override public boolean equals(final Object obj) { if (!(obj instanceof BlobKey)) { return false; } final BlobKey bk = (BlobKey) obj; return Arrays.equals(this.key, bk.key) && this.type == bk.type && this.random.equals(bk.random); } @Override public int hashCode() { int result = Arrays.hashCode(this.key); result = 37 * result + this.type.hashCode(); result = 37 * result + this.random.hashCode(); return result; } @Override public String toString() { final String typeString; switch (this.type) { case TRANSIENT_BLOB: typeString = "t-"; break; case PERMANENT_BLOB: typeString = "p-"; break; default: // this actually never happens! throw new IllegalStateException("Invalid BLOB type"); } return typeString + StringUtils.byteToHexString(this.key) + "-" + random.toString(); } @Override public int compareTo(BlobKey o) { // compare the hashes first final byte[] aarr = this.key; final byte[] barr = o.key; final int len = Math.min(aarr.length, barr.length); for (int i = 0; i < len; ++i) { final int a = (aarr[i] & 0xff); final int b = (barr[i] & 0xff); if (a != b) { return a - b; } } if (aarr.length == barr.length) { // same hash contents - compare the BLOB types int typeCompare = this.type.compareTo(o.type); if (typeCompare == 0) { // same type - compare random components return this.random.compareTo(o.random); } else { return typeCompare; } } else { return aarr.length - barr.length; } } // -------------------------------------------------------------------------------------------- /** * Auxiliary method to read a BLOB key from an input stream. * * @param inputStream * the input stream to read the BLOB key from * @return the read BLOB key * @throws IOException * throw if an I/O error occurs while reading from the input stream */ static BlobKey readFromInputStream(InputStream inputStream) throws IOException { final byte[] key = new byte[BlobKey.SIZE]; final byte[] random = new byte[AbstractID.SIZE]; int bytesRead = 0; // read key while (bytesRead < key.length) { final int read = inputStream.read(key, bytesRead, key.length - bytesRead); if (read < 0) { throw new EOFException("Read an incomplete BLOB key"); } bytesRead += read; } // read BLOB type final BlobType blobType; { final int read = inputStream.read(); if (read < 0) { throw new EOFException("Read an incomplete BLOB type"); } else if (read == TRANSIENT_BLOB.ordinal()) { blobType = TRANSIENT_BLOB; } else if (read == PERMANENT_BLOB.ordinal()) { blobType = PERMANENT_BLOB; } else { throw new IOException("Invalid data received for the BLOB type: " + read); } } // read random component bytesRead = 0; while (bytesRead < AbstractID.SIZE) { final int read = inputStream.read(random, bytesRead, AbstractID.SIZE - bytesRead); if (read < 0) { throw new EOFException("Read an incomplete BLOB key"); } bytesRead += read; } return createKey(blobType, key, random); } /** * Auxiliary method to write this BLOB key to an output stream. * * @param outputStream * the output stream to write the BLOB key to * @throws IOException * thrown if an I/O error occurs while writing the BLOB key */ void writeToOutputStream(final OutputStream outputStream) throws IOException { outputStream.write(this.key); outputStream.write(this.type.ordinal()); outputStream.write(this.random.getBytes()); } }
- BlobKey是个抽象类,它有key、BlobType、AbstractID三个属性,其中BlobType分为PERMANENT_BLOB及TRANSIENT_BLOB;它定义了createKey静态方法,用于根据BlobType创建BlobKey;readFromInputStream方法用于从InputStream反序列化为BlobKey;writeToOutputStream方法用于将BlobKey序列化到OutputStream;它有两个子类,分别为PermanentBlobKey及TransientBlobKey
PermanentBlobKey
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobKey.java
/** * BLOB key referencing permanent BLOB files. */ public final class PermanentBlobKey extends BlobKey { /** * Constructs a new BLOB key. */ @VisibleForTesting public PermanentBlobKey() { super(BlobType.PERMANENT_BLOB); } /** * Constructs a new BLOB key from the given byte array. * * @param key * the actual key data */ PermanentBlobKey(byte[] key) { super(BlobType.PERMANENT_BLOB, key); } /** * Constructs a new BLOB key from the given byte array. * * @param key * the actual key data * @param random * the random component of the key */ PermanentBlobKey(byte[] key, byte[] random) { super(BlobType.PERMANENT_BLOB, key, random); } }
- PermanentBlobKey继承了BlobKey,它的BlobType为BlobType.PERMANENT_BLOB
TransientBlobKey
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobKey.java
/** * BLOB key referencing transient BLOB files. */ public final class TransientBlobKey extends BlobKey { /** * Constructs a new BLOB key. */ @VisibleForTesting public TransientBlobKey() { super(BlobType.TRANSIENT_BLOB); } /** * Constructs a new BLOB key from the given byte array. * * @param key * the actual key data */ TransientBlobKey(byte[] key) { super(BlobType.TRANSIENT_BLOB, key); } /** * Constructs a new BLOB key from the given byte array. * * @param key * the actual key data * @param random * the random component of the key */ TransientBlobKey(byte[] key, byte[] random) { super(BlobType.TRANSIENT_BLOB, key, random); } }
- TransientBlobKey继承了BlobKey,它的BlobType为BlobType.TRANSIENT_BLOB
AbstractID
flink-release-1.7.2/flink-core/src/main/java/org/apache/flink/util/AbstractID.java
/** * A statistically unique identification number. */ @PublicEvolving public class AbstractID implements Comparable<AbstractID>, java.io.Serializable { private static final long serialVersionUID = 1L; private static final Random RND = new Random(); /** The size of a long in bytes. */ private static final int SIZE_OF_LONG = 8; /** The size of the ID in byte. */ public static final int SIZE = 2 * SIZE_OF_LONG; // ------------------------------------------------------------------------ /** The upper part of the actual ID. */ protected final long upperPart; /** The lower part of the actual ID. */ protected final long lowerPart; /** The memoized value returned by toString(). */ private transient String toString; // -------------------------------------------------------------------------------------------- /** * Constructs a new ID with a specific bytes value. */ public AbstractID(byte[] bytes) { if (bytes == null || bytes.length != SIZE) { throw new IllegalArgumentException("Argument bytes must by an array of " + SIZE + " bytes"); } this.lowerPart = byteArrayToLong(bytes, 0); this.upperPart = byteArrayToLong(bytes, SIZE_OF_LONG); } /** * Constructs a new abstract ID. * * @param lowerPart the lower bytes of the ID * @param upperPart the higher bytes of the ID */ public AbstractID(long lowerPart, long upperPart) { this.lowerPart = lowerPart; this.upperPart = upperPart; } /** * Copy constructor: Creates a new abstract ID from the given one. * * @param id the abstract ID to copy */ public AbstractID(AbstractID id) { if (id == null) { throw new IllegalArgumentException("Id must not be null."); } this.lowerPart = id.lowerPart; this.upperPart = id.upperPart; } /** * Constructs a new random ID from a uniform distribution. */ public AbstractID() { this.lowerPart = RND.nextLong(); this.upperPart = RND.nextLong(); } // -------------------------------------------------------------------------------------------- /** * Gets the lower 64 bits of the ID. * * @return The lower 64 bits of the ID. */ public long getLowerPart() { return lowerPart; } /** * Gets the upper 64 bits of the ID. * * @return The upper 64 bits of the ID. */ public long getUpperPart() { return upperPart; } /** * Gets the bytes underlying this ID. * * @return The bytes underlying this ID. */ public byte[] getBytes() { byte[] bytes = new byte[SIZE]; longToByteArray(lowerPart, bytes, 0); longToByteArray(upperPart, bytes, SIZE_OF_LONG); return bytes; } // -------------------------------------------------------------------------------------------- // Standard Utilities // -------------------------------------------------------------------------------------------- @Override public boolean equals(Object obj) { if (obj == this) { return true; } else if (obj != null && obj.getClass() == getClass()) { AbstractID that = (AbstractID) obj; return that.lowerPart == this.lowerPart && that.upperPart == this.upperPart; } else { return false; } } @Override public int hashCode() { return ((int) this.lowerPart) ^ ((int) (this.lowerPart >>> 32)) ^ ((int) this.upperPart) ^ ((int) (this.upperPart >>> 32)); } @Override public String toString() { if (this.toString == null) { final byte[] ba = new byte[SIZE]; longToByteArray(this.lowerPart, ba, 0); longToByteArray(this.upperPart, ba, SIZE_OF_LONG); this.toString = StringUtils.byteToHexString(ba); } return this.toString; } @Override public int compareTo(AbstractID o) { int diff1 = Long.compare(this.upperPart, o.upperPart); int diff2 = Long.compare(this.lowerPart, o.lowerPart); return diff1 == 0 ? diff2 : diff1; } // -------------------------------------------------------------------------------------------- // Conversion Utilities // -------------------------------------------------------------------------------------------- /** * Converts the given byte array to a long. * * @param ba the byte array to be converted * @param offset the offset indicating at which byte inside the array the conversion shall begin * @return the long variable */ private static long byteArrayToLong(byte[] ba, int offset) { long l = 0; for (int i = 0; i < SIZE_OF_LONG; ++i) { l |= (ba[offset + SIZE_OF_LONG - 1 - i] & 0xffL) << (i << 3); } return l; } /** * Converts a long to a byte array. * * @param l the long variable to be converted * @param ba the byte array to store the result the of the conversion * @param offset offset indicating at what position inside the byte array the result of the conversion shall be stored */ private static void longToByteArray(long l, byte[] ba, int offset) { for (int i = 0; i < SIZE_OF_LONG; ++i) { final int shift = i << 3; // i * 8 ba[offset + SIZE_OF_LONG - 1 - i] = (byte) ((l & (0xffL << shift)) >>> shift); } } }
- AbstractID由upperPart及lowerPart两个long类型的属性组成;无参构造器会使用Random.nextLong来生成upperPart及lowerPart;bytes参数的构造器则会从bytes中解析出lowerPart及upperPart;也可以直接使用lowerPart及upperPart参数的构造器直接指定
小结
- BlobService定义了getPermanentBlobService方法用于获取PermanentBlobService;getTransientBlobService方法用于获取TransientBlobService;PermanentBlobService提供了getFile方法,它根据JobID及PermanentBlobKey来获取File;TransientBlobService用于获取transient binary large objects (BLOBs),这些blobs在获取时就会在BlobServer上删掉;它提供了getFile、putTransient、deleteFromCache方法
- BlobKey是个抽象类,它有key、BlobType、AbstractID三个属性,其中BlobType分为PERMANENT_BLOB及TRANSIENT_BLOB;它定义了createKey静态方法,用于根据BlobType创建BlobKey;readFromInputStream方法用于从InputStream反序列化为BlobKey;writeToOutputStream方法用于将BlobKey序列化到OutputStream;它有两个子类,分别为PermanentBlobKey及TransientBlobKey;PermanentBlobKey继承了BlobKey,它的BlobType为BlobType.PERMANENT_BLOB;TransientBlobKey继承了BlobKey,它的BlobType为BlobType.TRANSIENT_BLOB
- AbstractID由upperPart及lowerPart两个long类型的属性组成;无参构造器会使用Random.nextLong来生成upperPart及lowerPart;bytes参数的构造器则会从bytes中解析出lowerPart及upperPart;也可以直接使用lowerPart及upperPart参数的构造器直接指定
doc
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Data Structures and Algorithm Analysis in Java
Mark A. Weiss / Pearson / 2006-3-3 / USD 143.00
As the speed and power of computers increases, so does the need for effective programming and algorithm analysis. By approaching these skills in tandem, Mark Allen Weiss teaches readers to develop wel......一起来看看 《Data Structures and Algorithm Analysis in Java》 这本书的介绍吧!