内容简介:本文主要研究一下flink的FileSystemflink-1.7.2/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.javaflink-1.7.2/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
序
本文主要研究一下flink的FileSystem
FileSystem
flink-1.7.2/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@Public public abstract class FileSystem { /** * The possible write modes. The write mode decides what happens if a file should be created, * but already exists. */ public enum WriteMode { /** Creates the target file only if no file exists at that path already. * Does not overwrite existing files and directories. */ NO_OVERWRITE, /** Creates a new target file regardless of any existing files or directories. * Existing files and directories will be deleted (recursively) automatically before * creating the new file. */ OVERWRITE } // ------------------------------------------------------------------------ /** Logger for all FileSystem work. */ private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class); /** This lock guards the methods {@link #initOutPathLocalFS(Path, WriteMode, boolean)} and * {@link #initOutPathDistFS(Path, WriteMode, boolean)} which are otherwise susceptible to races. */ private static final ReentrantLock OUTPUT_DIRECTORY_INIT_LOCK = new ReentrantLock(true); /** Object used to protect calls to specific methods.*/ private static final ReentrantLock LOCK = new ReentrantLock(true); /** Cache for file systems, by scheme + authority. */ private static final HashMap<FSKey, FileSystem> CACHE = new HashMap<>(); /** All available file system factories. */ private static final List<FileSystemFactory> RAW_FACTORIES = loadFileSystems(); /** Mapping of file system schemes to the corresponding factories, * populated in {@link FileSystem#initialize(Configuration)}. */ private static final HashMap<String, FileSystemFactory> FS_FACTORIES = new HashMap<>(); /** The default factory that is used when no scheme matches. */ private static final FileSystemFactory FALLBACK_FACTORY = loadHadoopFsFactory(); /** The default filesystem scheme to be used, configured during process-wide initialization. * This value defaults to the local file systems scheme {@code 'file:///'} or {@code 'file:/'}. */ private static URI defaultScheme; //...... // ------------------------------------------------------------------------ // Initialization // ------------------------------------------------------------------------ /** * Initializes the shared file system settings. * * <p>The given configuration is passed to each file system factory to initialize the respective * file systems. Because the configuration of file systems may be different subsequent to the call * of this method, this method clears the file system instance cache. * * <p>This method also reads the default file system URI from the configuration key * {@link CoreOptions#DEFAULT_FILESYSTEM_SCHEME}. All calls to {@link FileSystem#get(URI)} where * the URI has no scheme will be interpreted as relative to that URI. * As an example, assume the default file system URI is set to {@code 'hdfs://localhost:9000/'}. * A file path of {@code '/user/USERNAME/in.txt'} is interpreted as * {@code 'hdfs://localhost:9000/user/USERNAME/in.txt'}. * * @param config the configuration from where to fetch the parameter. */ public static void initialize(Configuration config) throws IOException, IllegalConfigurationException { LOCK.lock(); try { // make sure file systems are re-instantiated after re-configuration CACHE.clear(); FS_FACTORIES.clear(); // configure all file system factories for (FileSystemFactory factory : RAW_FACTORIES) { factory.configure(config); String scheme = factory.getScheme(); FileSystemFactory fsf = ConnectionLimitingFactory.decorateIfLimited(factory, scheme, config); FS_FACTORIES.put(scheme, fsf); } // configure the default (fallback) factory FALLBACK_FACTORY.configure(config); // also read the default file system scheme final String stringifiedUri = config.getString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, null); if (stringifiedUri == null) { defaultScheme = null; } else { try { defaultScheme = new URI(stringifiedUri); } catch (URISyntaxException e) { throw new IllegalConfigurationException("The default file system scheme ('" + CoreOptions.DEFAULT_FILESYSTEM_SCHEME + "') is invalid: " + stringifiedUri, e); } } } finally { LOCK.unlock(); } } // ------------------------------------------------------------------------ // Obtaining File System Instances // ------------------------------------------------------------------------ public static FileSystem getLocalFileSystem() { return FileSystemSafetyNet.wrapWithSafetyNetWhenActivated(LocalFileSystem.getSharedInstance()); } public static FileSystem get(URI uri) throws IOException { return FileSystemSafetyNet.wrapWithSafetyNetWhenActivated(getUnguardedFileSystem(uri)); } @Internal public static FileSystem getUnguardedFileSystem(final URI fsUri) throws IOException { checkNotNull(fsUri, "file system URI"); LOCK.lock(); try { final URI uri; if (fsUri.getScheme() != null) { uri = fsUri; } else { // Apply the default fs scheme final URI defaultUri = getDefaultFsUri(); URI rewrittenUri = null; try { rewrittenUri = new URI(defaultUri.getScheme(), null, defaultUri.getHost(), defaultUri.getPort(), fsUri.getPath(), null, null); } catch (URISyntaxException e) { // for local URIs, we make one more try to repair the path by making it absolute if (defaultUri.getScheme().equals("file")) { try { rewrittenUri = new URI( "file", null, new Path(new File(fsUri.getPath()).getAbsolutePath()).toUri().getPath(), null); } catch (URISyntaxException ignored) { // could not help it... } } } if (rewrittenUri != null) { uri = rewrittenUri; } else { throw new IOException("The file system URI '" + fsUri + "' declares no scheme and cannot be interpreted relative to the default file system URI (" + defaultUri + ")."); } } // print a helpful pointer for malformed local URIs (happens a lot to new users) if (uri.getScheme().equals("file") && uri.getAuthority() != null && !uri.getAuthority().isEmpty()) { String supposedUri = "file:///" + uri.getAuthority() + uri.getPath(); throw new IOException("Found local file path with authority '" + uri.getAuthority() + "' in path '" + uri.toString() + "'. Hint: Did you forget a slash? (correct path would be '" + supposedUri + "')"); } final FSKey key = new FSKey(uri.getScheme(), uri.getAuthority()); // See if there is a file system object in the cache { FileSystem cached = CACHE.get(key); if (cached != null) { return cached; } } // this "default" initialization makes sure that the FileSystem class works // even when not configured with an explicit Flink configuration, like on // JobManager or TaskManager setup if (FS_FACTORIES.isEmpty()) { initialize(new Configuration()); } // Try to create a new file system final FileSystem fs; final FileSystemFactory factory = FS_FACTORIES.get(uri.getScheme()); if (factory != null) { fs = factory.create(uri); } else { try { fs = FALLBACK_FACTORY.create(uri); } catch (UnsupportedFileSystemSchemeException e) { throw new UnsupportedFileSystemSchemeException( "Could not find a file system implementation for scheme '" + uri.getScheme() + "'. The scheme is not directly supported by Flink and no Hadoop file " + "system to support this scheme could be loaded.", e); } } CACHE.put(key, fs); return fs; } finally { LOCK.unlock(); } } public static URI getDefaultFsUri() { return defaultScheme != null ? defaultScheme : LocalFileSystem.getLocalFsURI(); } // ------------------------------------------------------------------------ // File System Methods // ------------------------------------------------------------------------ public abstract Path getWorkingDirectory(); public abstract Path getHomeDirectory(); public abstract URI getUri(); public abstract FileStatus getFileStatus(Path f) throws IOException; public abstract BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException; public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException; public abstract FSDataInputStream open(Path f) throws IOException; public RecoverableWriter createRecoverableWriter() throws IOException { throw new UnsupportedOperationException("This file system does not support recoverable writers."); } public abstract FileStatus[] listStatus(Path f) throws IOException; public boolean exists(final Path f) throws IOException { try { return (getFileStatus(f) != null); } catch (FileNotFoundException e) { return false; } } public abstract boolean delete(Path f, boolean recursive) throws IOException; public abstract boolean mkdirs(Path f) throws IOException; public abstract FSDataOutputStream create(Path f, WriteMode overwriteMode) throws IOException; public abstract boolean rename(Path src, Path dst) throws IOException; public abstract boolean isDistributedFS(); public abstract FileSystemKind getKind(); // ------------------------------------------------------------------------ // output directory initialization // ------------------------------------------------------------------------ public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException { if (isDistributedFS()) { return false; } // NOTE: We actually need to lock here (process wide). Otherwise, multiple threads that // concurrently work in this method (multiple output formats writing locally) might end // up deleting each other's directories and leave non-retrievable files, without necessarily // causing an exception. That results in very subtle issues, like output files looking as if // they are not getting created. // we acquire the lock interruptibly here, to make sure that concurrent threads waiting // here can cancel faster try { OUTPUT_DIRECTORY_INIT_LOCK.lockInterruptibly(); } catch (InterruptedException e) { // restore the interruption state Thread.currentThread().interrupt(); // leave the method - we don't have the lock anyways throw new IOException("The thread was interrupted while trying to initialize the output directory"); } try { FileStatus status; try { status = getFileStatus(outPath); } catch (FileNotFoundException e) { // okay, the file is not there status = null; } // check if path exists if (status != null) { // path exists, check write mode switch (writeMode) { case NO_OVERWRITE: if (status.isDir() && createDirectory) { return true; } else { // file may not be overwritten throw new IOException("File or directory " + outPath + " already exists. Existing files and directories " + "are not overwritten in " + WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() + " mode to overwrite existing files and directories."); } case OVERWRITE: if (status.isDir()) { if (createDirectory) { // directory exists and does not need to be created return true; } else { // we will write in a single file, delete directory try { delete(outPath, true); } catch (IOException e) { throw new IOException("Could not remove existing directory '" + outPath + "' to allow overwrite by result file", e); } } } else { // delete file try { delete(outPath, false); } catch (IOException e) { throw new IOException("Could not remove existing file '" + outPath + "' to allow overwrite by result file/directory", e); } } break; default: throw new IllegalArgumentException("Invalid write mode: " + writeMode); } } if (createDirectory) { // Output directory needs to be created if (!exists(outPath)) { mkdirs(outPath); } // double check that the output directory exists try { return getFileStatus(outPath).isDir(); } catch (FileNotFoundException e) { return false; } } else { // check that the output path does not exist and an output file // can be created by the output format. return !exists(outPath); } } finally { OUTPUT_DIRECTORY_INIT_LOCK.unlock(); } } public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, boolean createDirectory) throws IOException { if (!isDistributedFS()) { return false; } // NOTE: We actually need to lock here (process wide). Otherwise, multiple threads that // concurrently work in this method (multiple output formats writing locally) might end // up deleting each other's directories and leave non-retrievable files, without necessarily // causing an exception. That results in very subtle issues, like output files looking as if // they are not getting created. // we acquire the lock interruptibly here, to make sure that concurrent threads waiting // here can cancel faster try { OUTPUT_DIRECTORY_INIT_LOCK.lockInterruptibly(); } catch (InterruptedException e) { // restore the interruption state Thread.currentThread().interrupt(); // leave the method - we don't have the lock anyways throw new IOException("The thread was interrupted while trying to initialize the output directory"); } try { // check if path exists if (exists(outPath)) { // path exists, check write mode switch(writeMode) { case NO_OVERWRITE: // file or directory may not be overwritten throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " + WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() + " mode to overwrite existing files and directories."); case OVERWRITE: // output path exists. We delete it and all contained files in case of a directory. try { delete(outPath, true); } catch (IOException e) { // Some other thread might already have deleted the path. // If - for some other reason - the path could not be deleted, // this will be handled later. } break; default: throw new IllegalArgumentException("Invalid write mode: " + writeMode); } } if (createDirectory) { // Output directory needs to be created try { if (!exists(outPath)) { mkdirs(outPath); } } catch (IOException ioe) { // Some other thread might already have created the directory. // If - for some other reason - the directory could not be created // and the path does not exist, this will be handled later. } // double check that the output directory exists return exists(outPath) && getFileStatus(outPath).isDir(); } else { // single file case: check that the output path does not exist and // an output file can be created by the output format. return !exists(outPath); } } finally { OUTPUT_DIRECTORY_INIT_LOCK.unlock(); } } //...... }
- FileSystem是flink使用的文件系统的抽象基类,子类实现的可以是本地文件系统或者分布式文件系统
- FileSystem定义了getWorkingDirectory、getHomeDirectory、getUri、getFileStatus、getFileBlockLocations、open、listStatus、delete、mkdirs、create、rename、isDistributedFS、getKind这几个抽象方法要求子类实现
- FileSystem提供了initOutPathLocalFS、initOutPathDistFS这几个已经实现的实例方法以及initialize、getLocalFileSystem、get、getUnguardedFileSystem、getDefaultFsUri这几个静态方法
LocalFileSystem
flink-1.7.2/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@Internal public class LocalFileSystem extends FileSystem { private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystem.class); /** The URI representing the local file system. */ private static final URI LOCAL_URI = OperatingSystem.isWindows() ? URI.create("file:/") : URI.create("file:///"); /** The shared instance of the local file system. */ private static final LocalFileSystem INSTANCE = new LocalFileSystem(); /** Path pointing to the current working directory. * Because Paths are not immutable, we cannot cache the proper path here */ private final URI workingDir; /** Path pointing to the current working directory. * Because Paths are not immutable, we cannot cache the proper path here. */ private final URI homeDir; /** The host name of this machine. */ private final String hostName; /** * Constructs a new <code>LocalFileSystem</code> object. */ public LocalFileSystem() { this.workingDir = new File(System.getProperty("user.dir")).toURI(); this.homeDir = new File(System.getProperty("user.home")).toURI(); String tmp = "unknownHost"; try { tmp = InetAddress.getLocalHost().getHostName(); } catch (UnknownHostException e) { LOG.error("Could not resolve local host", e); } this.hostName = tmp; } // ------------------------------------------------------------------------ @Override public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException { return new BlockLocation[] { new LocalBlockLocation(hostName, file.getLen()) }; } @Override public FileStatus getFileStatus(Path f) throws IOException { final File path = pathToFile(f); if (path.exists()) { return new LocalFileStatus(path, this); } else { throw new FileNotFoundException("File " + f + " does not exist or the user running " + "Flink ('" + System.getProperty("user.name") + "') has insufficient permissions to access it."); } } @Override public URI getUri() { return LOCAL_URI; } @Override public Path getWorkingDirectory() { return new Path(workingDir); } @Override public Path getHomeDirectory() { return new Path(homeDir); } @Override public FSDataInputStream open(final Path f, final int bufferSize) throws IOException { return open(f); } @Override public FSDataInputStream open(final Path f) throws IOException { final File file = pathToFile(f); return new LocalDataInputStream(file); } @Override public LocalRecoverableWriter createRecoverableWriter() throws IOException { return new LocalRecoverableWriter(this); } @Override public boolean exists(Path f) throws IOException { final File path = pathToFile(f); return path.exists(); } @Override public FileStatus[] listStatus(final Path f) throws IOException { final File localf = pathToFile(f); FileStatus[] results; if (!localf.exists()) { return null; } if (localf.isFile()) { return new FileStatus[] { new LocalFileStatus(localf, this) }; } final String[] names = localf.list(); if (names == null) { return null; } results = new FileStatus[names.length]; for (int i = 0; i < names.length; i++) { results[i] = getFileStatus(new Path(f, names[i])); } return results; } @Override public boolean delete(final Path f, final boolean recursive) throws IOException { final File file = pathToFile(f); if (file.isFile()) { return file.delete(); } else if ((!recursive) && file.isDirectory()) { File[] containedFiles = file.listFiles(); if (containedFiles == null) { throw new IOException("Directory " + file.toString() + " does not exist or an I/O error occurred"); } else if (containedFiles.length != 0) { throw new IOException("Directory " + file.toString() + " is not empty"); } } return delete(file); } /** * Deletes the given file or directory. * * @param f * the file to be deleted * @return <code>true</code> if all files were deleted successfully, <code>false</code> otherwise * @throws IOException * thrown if an error occurred while deleting the files/directories */ private boolean delete(final File f) throws IOException { if (f.isDirectory()) { final File[] files = f.listFiles(); if (files != null) { for (File file : files) { final boolean del = delete(file); if (!del) { return false; } } } } else { return f.delete(); } // Now directory is empty return f.delete(); } /** * Recursively creates the directory specified by the provided path. * * @return <code>true</code>if the directories either already existed or have been created successfully, * <code>false</code> otherwise * @throws IOException * thrown if an error occurred while creating the directory/directories */ @Override public boolean mkdirs(final Path f) throws IOException { checkNotNull(f, "path is null"); return mkdirsInternal(pathToFile(f)); } private boolean mkdirsInternal(File file) throws IOException { if (file.isDirectory()) { return true; } else if (file.exists() && !file.isDirectory()) { // Important: The 'exists()' check above must come before the 'isDirectory()' check to // be safe when multiple parallel instances try to create the directory // exists and is not a directory -> is a regular file throw new FileAlreadyExistsException(file.getAbsolutePath()); } else { File parent = file.getParentFile(); return (parent == null || mkdirsInternal(parent)) && (file.mkdir() || file.isDirectory()); } } @Override public FSDataOutputStream create(final Path filePath, final WriteMode overwrite) throws IOException { checkNotNull(filePath, "filePath"); if (exists(filePath) && overwrite == WriteMode.NO_OVERWRITE) { throw new FileAlreadyExistsException("File already exists: " + filePath); } final Path parent = filePath.getParent(); if (parent != null && !mkdirs(parent)) { throw new IOException("Mkdirs failed to create " + parent); } final File file = pathToFile(filePath); return new LocalDataOutputStream(file); } @Override public boolean rename(final Path src, final Path dst) throws IOException { final File srcFile = pathToFile(src); final File dstFile = pathToFile(dst); final File dstParent = dstFile.getParentFile(); // Files.move fails if the destination directory doesn't exist //noinspection ResultOfMethodCallIgnored -- we don't care if the directory existed or was created dstParent.mkdirs(); try { Files.move(srcFile.toPath(), dstFile.toPath(), StandardCopyOption.REPLACE_EXISTING); return true; } catch (NoSuchFileException | AccessDeniedException | DirectoryNotEmptyException | SecurityException ex) { // catch the errors that are regular "move failed" exceptions and return false return false; } } @Override public boolean isDistributedFS() { return false; } @Override public FileSystemKind getKind() { return FileSystemKind.FILE_SYSTEM; } // ------------------------------------------------------------------------ /** * Converts the given Path to a File for this file system. * * <p>If the path is not absolute, it is interpreted relative to this FileSystem's working directory. */ public File pathToFile(Path path) { if (!path.isAbsolute()) { path = new Path(getWorkingDirectory(), path); } return new File(path.toUri().getPath()); } // ------------------------------------------------------------------------ /** * Gets the URI that represents the local file system. * That URI is {@code "file:/"} on Windows platforms and {@code "file:///"} on other * UNIX family platforms. * * @return The URI that represents the local file system. */ public static URI getLocalFsURI() { return LOCAL_URI; } /** * Gets the shared instance of this file system. * * @return The shared instance of this file system. */ public static LocalFileSystem getSharedInstance() { return INSTANCE; } }
- LocalFileSystem继承了FileSystem,它使用的是本地文件系统来实现,其isDistributedFS方法返回的false;getKind方法返回的是FileSystemKind.FILE_SYSTEM
HadoopFileSystem
flink-1.7.2/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
public class HadoopFileSystem extends FileSystem { /** The wrapped Hadoop File System. */ private final org.apache.hadoop.fs.FileSystem fs; /* This field caches the file system kind. It is lazily set because the file system * URL is lazily initialized. */ private FileSystemKind fsKind; /** * Wraps the given Hadoop File System object as a Flink File System object. * The given Hadoop file system object is expected to be initialized already. * * @param hadoopFileSystem The Hadoop FileSystem that will be used under the hood. */ public HadoopFileSystem(org.apache.hadoop.fs.FileSystem hadoopFileSystem) { this.fs = checkNotNull(hadoopFileSystem, "hadoopFileSystem"); } /** * Gets the underlying Hadoop FileSystem. * @return The underlying Hadoop FileSystem. */ public org.apache.hadoop.fs.FileSystem getHadoopFileSystem() { return this.fs; } // ------------------------------------------------------------------------ // file system methods // ------------------------------------------------------------------------ @Override public Path getWorkingDirectory() { return new Path(this.fs.getWorkingDirectory().toUri()); } public Path getHomeDirectory() { return new Path(this.fs.getHomeDirectory().toUri()); } @Override public URI getUri() { return fs.getUri(); } @Override public FileStatus getFileStatus(final Path f) throws IOException { org.apache.hadoop.fs.FileStatus status = this.fs.getFileStatus(toHadoopPath(f)); return new HadoopFileStatus(status); } @Override public BlockLocation[] getFileBlockLocations(final FileStatus file, final long start, final long len) throws IOException { if (!(file instanceof HadoopFileStatus)) { throw new IOException("file is not an instance of DistributedFileStatus"); } final HadoopFileStatus f = (HadoopFileStatus) file; final org.apache.hadoop.fs.BlockLocation[] blkLocations = fs.getFileBlockLocations(f.getInternalFileStatus(), start, len); // Wrap up HDFS specific block location objects final HadoopBlockLocation[] distBlkLocations = new HadoopBlockLocation[blkLocations.length]; for (int i = 0; i < distBlkLocations.length; i++) { distBlkLocations[i] = new HadoopBlockLocation(blkLocations[i]); } return distBlkLocations; } @Override public HadoopDataInputStream open(final Path f, final int bufferSize) throws IOException { final org.apache.hadoop.fs.Path path = toHadoopPath(f); final org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open(path, bufferSize); return new HadoopDataInputStream(fdis); } @Override public HadoopDataInputStream open(final Path f) throws IOException { final org.apache.hadoop.fs.Path path = toHadoopPath(f); final org.apache.hadoop.fs.FSDataInputStream fdis = fs.open(path); return new HadoopDataInputStream(fdis); } @Override @SuppressWarnings("deprecation") public HadoopDataOutputStream create( final Path f, final boolean overwrite, final int bufferSize, final short replication, final long blockSize) throws IOException { final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create( toHadoopPath(f), overwrite, bufferSize, replication, blockSize); return new HadoopDataOutputStream(fdos); } @Override public HadoopDataOutputStream create(final Path f, final WriteMode overwrite) throws IOException { final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream = this.fs.create(toHadoopPath(f), overwrite == WriteMode.OVERWRITE); return new HadoopDataOutputStream(fsDataOutputStream); } @Override public boolean delete(final Path f, final boolean recursive) throws IOException { return this.fs.delete(toHadoopPath(f), recursive); } @Override public boolean exists(Path f) throws IOException { return this.fs.exists(toHadoopPath(f)); } @Override public FileStatus[] listStatus(final Path f) throws IOException { final org.apache.hadoop.fs.FileStatus[] hadoopFiles = this.fs.listStatus(toHadoopPath(f)); final FileStatus[] files = new FileStatus[hadoopFiles.length]; // Convert types for (int i = 0; i < files.length; i++) { files[i] = new HadoopFileStatus(hadoopFiles[i]); } return files; } @Override public boolean mkdirs(final Path f) throws IOException { return this.fs.mkdirs(toHadoopPath(f)); } @Override public boolean rename(final Path src, final Path dst) throws IOException { return this.fs.rename(toHadoopPath(src), toHadoopPath(dst)); } @SuppressWarnings("deprecation") @Override public long getDefaultBlockSize() { return this.fs.getDefaultBlockSize(); } @Override public boolean isDistributedFS() { return true; } @Override public FileSystemKind getKind() { if (fsKind == null) { fsKind = getKindForScheme(this.fs.getUri().getScheme()); } return fsKind; } @Override public RecoverableWriter createRecoverableWriter() throws IOException { // This writer is only supported on a subset of file systems, and on // specific versions. We check these schemes and versions eagerly for better error // messages in the constructor of the writer. return new HadoopRecoverableWriter(fs); } // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ public static org.apache.hadoop.fs.Path toHadoopPath(Path path) { return new org.apache.hadoop.fs.Path(path.toUri()); } /** * Gets the kind of the file system from its scheme. * * <p>Implementation note: Initially, especially within the Flink 1.3.x line * (in order to not break backwards compatibility), we must only label file systems * as 'inconsistent' or as 'not proper filesystems' if we are sure about it. * Otherwise, we cause regression for example in the performance and cleanup handling * of checkpoints. * For that reason, we initially mark some filesystems as 'eventually consistent' or * as 'object stores', and leave the others as 'consistent file systems'. */ static FileSystemKind getKindForScheme(String scheme) { scheme = scheme.toLowerCase(Locale.US); if (scheme.startsWith("s3") || scheme.startsWith("emr")) { // the Amazon S3 storage return FileSystemKind.OBJECT_STORE; } else if (scheme.startsWith("http") || scheme.startsWith("ftp")) { // file servers instead of file systems // they might actually be consistent, but we have no hard guarantees // currently to rely on that return FileSystemKind.OBJECT_STORE; } else { // the remainder should include hdfs, kosmos, ceph, ... // this also includes federated HDFS (viewfs). return FileSystemKind.FILE_SYSTEM; } } }
- HadoopFileSystem继承了FileSystem,它使用的是HDFS文件系统来实现,其isDistributedFS方法返回的true;getKind方法返回的是FileSystemKind.FILE_SYSTEM或者FileSystemKind.OBJECT_STORE;FlinkS3FileSystem及MapRFileSystem都继承至HadoopFileSystem
小结
- FileSystem是flink使用的文件系统的抽象基类,子类实现的可以是本地文件系统或者分布式文件系统;它定义了getWorkingDirectory、getHomeDirectory、getUri、getFileStatus、getFileBlockLocations、open、listStatus、delete、mkdirs、create、rename、isDistributedFS、getKind这几个抽象方法要求子类实现;提供了initOutPathLocalFS、initOutPathDistFS这几个已经实现的实例方法以及initialize、getLocalFileSystem、get、getUnguardedFileSystem、getDefaultFsUri这几个静态方法
- LocalFileSystem继承了FileSystem,它使用的是本地文件系统来实现,其isDistributedFS方法返回的false;getKind方法返回的是FileSystemKind.FILE_SYSTEM
- HadoopFileSystem继承了FileSystem,它使用的是HDFS文件系统来实现,其isDistributedFS方法返回的true;getKind方法返回的是FileSystemKind.FILE_SYSTEM或者FileSystemKind.OBJECT_STORE;FlinkS3FileSystem及MapRFileSystem都继承至HadoopFileSystem
doc
以上所述就是小编给大家介绍的《聊聊flink的FileSystem》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Introduction to the Design and Analysis of Algorithms
Anany Levitin / Addison Wesley / 2011-10-10 / USD 117.00
Based on a new classification of algorithm design techniques and a clear delineation of analysis methods, Introduction to the Design and Analysis of Algorithms presents the subject in a coherent a......一起来看看 《Introduction to the Design and Analysis of Algorithms》 这本书的介绍吧!