内容简介:本文主要研究一下flink的BlobServerflink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.javaflink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
序
本文主要研究一下flink的BlobServer
BlobServer
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
public class BlobServer extends Thread implements BlobService, BlobWriter, PermanentBlobService, TransientBlobService { /** The log object used for debugging. */ private static final Logger LOG = LoggerFactory.getLogger(BlobServer.class); /** Counter to generate unique names for temporary files. */ private final AtomicLong tempFileCounter = new AtomicLong(0); /** The server socket listening for incoming connections. */ private final ServerSocket serverSocket; /** Blob Server configuration. */ private final Configuration blobServiceConfiguration; /** Indicates whether a shutdown of server component has been requested. */ private final AtomicBoolean shutdownRequested = new AtomicBoolean(); /** Root directory for local file storage. */ private final File storageDir; /** Blob store for distributed file storage, e.g. in HA. */ private final BlobStore blobStore; /** Set of currently running threads. */ private final Set<BlobServerConnection> activeConnections = new HashSet<>(); /** The maximum number of concurrent connections. */ private final int maxConnections; /** Lock guarding concurrent file accesses. */ private final ReadWriteLock readWriteLock; /** * Shutdown hook thread to ensure deletion of the local storage directory. */ private final Thread shutdownHook; // -------------------------------------------------------------------------------------------- /** * Map to store the TTL of each element stored in the local storage, i.e. via one of the {@link * #getFile} methods. **/ private final ConcurrentHashMap<Tuple2<JobID, TransientBlobKey>, Long> blobExpiryTimes = new ConcurrentHashMap<>(); /** Time interval (ms) to run the cleanup task; also used as the default TTL. */ private final long cleanupInterval; /** * Timer task to execute the cleanup at regular intervals. */ private final Timer cleanupTimer; /** * Instantiates a new BLOB server and binds it to a free network port. * * @param config Configuration to be used to instantiate the BlobServer * @param blobStore BlobStore to store blobs persistently * * @throws IOException * thrown if the BLOB server cannot bind to a free network port or if the * (local or distributed) file storage cannot be created or is not usable */ public BlobServer(Configuration config, BlobStore blobStore) throws IOException { this.blobServiceConfiguration = checkNotNull(config); this.blobStore = checkNotNull(blobStore); this.readWriteLock = new ReentrantReadWriteLock(); // configure and create the storage directory this.storageDir = BlobUtils.initLocalStorageDirectory(config); LOG.info("Created BLOB server storage directory {}", storageDir); // configure the maximum number of concurrent connections final int maxConnections = config.getInteger(BlobServerOptions.FETCH_CONCURRENT); if (maxConnections >= 1) { this.maxConnections = maxConnections; } else { LOG.warn("Invalid value for maximum connections in BLOB server: {}. Using default value of {}", maxConnections, BlobServerOptions.FETCH_CONCURRENT.defaultValue()); this.maxConnections = BlobServerOptions.FETCH_CONCURRENT.defaultValue(); } // configure the backlog of connections int backlog = config.getInteger(BlobServerOptions.FETCH_BACKLOG); if (backlog < 1) { LOG.warn("Invalid value for BLOB connection backlog: {}. Using default value of {}", backlog, BlobServerOptions.FETCH_BACKLOG.defaultValue()); backlog = BlobServerOptions.FETCH_BACKLOG.defaultValue(); } // Initializing the clean up task this.cleanupTimer = new Timer(true); this.cleanupInterval = config.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000; this.cleanupTimer .schedule(new TransientBlobCleanupTask(blobExpiryTimes, readWriteLock.writeLock(), storageDir, LOG), cleanupInterval, cleanupInterval); this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(), LOG); // ----------------------- start the server ------------------- final String serverPortRange = config.getString(BlobServerOptions.PORT); final Iterator<Integer> ports = NetUtils.getPortRangeFromString(serverPortRange); final ServerSocketFactory socketFactory; if (SSLUtils.isInternalSSLEnabled(config) && config.getBoolean(BlobServerOptions.SSL_ENABLED)) { try { socketFactory = SSLUtils.createSSLServerSocketFactory(config); } catch (Exception e) { throw new IOException("Failed to initialize SSL for the blob server", e); } } else { socketFactory = ServerSocketFactory.getDefault(); } final int finalBacklog = backlog; this.serverSocket = NetUtils.createSocketFromPorts(ports, (port) -> socketFactory.createServerSocket(port, finalBacklog)); if (serverSocket == null) { throw new IOException("Unable to open BLOB Server in specified port range: " + serverPortRange); } // start the server thread setName("BLOB Server listener at " + getPort()); setDaemon(true); if (LOG.isInfoEnabled()) { LOG.info("Started BLOB server at {}:{} - max concurrent requests: {} - max backlog: {}", serverSocket.getInetAddress().getHostAddress(), getPort(), maxConnections, backlog); } } //...... @Override public void run() { try { while (!this.shutdownRequested.get()) { BlobServerConnection conn = new BlobServerConnection(serverSocket.accept(), this); try { synchronized (activeConnections) { while (activeConnections.size() >= maxConnections) { activeConnections.wait(2000); } activeConnections.add(conn); } conn.start(); conn = null; } finally { if (conn != null) { conn.close(); synchronized (activeConnections) { activeConnections.remove(conn); } } } } } catch (Throwable t) { if (!this.shutdownRequested.get()) { LOG.error("BLOB server stopped working. Shutting down", t); try { close(); } catch (Throwable closeThrowable) { LOG.error("Could not properly close the BlobServer.", closeThrowable); } } } } /** * Shuts down the BLOB server. */ @Override public void close() throws IOException { cleanupTimer.cancel(); if (shutdownRequested.compareAndSet(false, true)) { Exception exception = null; try { this.serverSocket.close(); } catch (IOException ioe) { exception = ioe; } // wake the thread up, in case it is waiting on some operation interrupt(); try { join(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); LOG.debug("Error while waiting for this thread to die.", ie); } synchronized (activeConnections) { if (!activeConnections.isEmpty()) { for (BlobServerConnection conn : activeConnections) { LOG.debug("Shutting down connection {}.", conn.getName()); conn.close(); } activeConnections.clear(); } } // Clean up the storage directory try { FileUtils.deleteDirectory(storageDir); } catch (IOException e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } // Remove shutdown hook to prevent resource leaks ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG); if (LOG.isInfoEnabled()) { LOG.info("Stopped BLOB server at {}:{}", serverSocket.getInetAddress().getHostAddress(), getPort()); } ExceptionUtils.tryRethrowIOException(exception); } } //...... }
- BlobServer继承了Thread,同时实现了BlobService、BlobWriter、PermanentBlobService、TransientBlobService接口
- 其构造器使用DefaultServerSocketFactory创建了ServerSocket,同时使用ShutdownHookUtil.addShutdownHook注册了shutdownHook,在shutdown的时候会调用close方法
- 重写了Thread的run方法,该方法在没有接收到shutdown请求的时候,会不断循环等待serverSocket.accept(),然后创建BlobServerConnection,如果当前activeConnections超过了maxConnections则会不断循环等待2000毫秒,之后将连接维护到activeConnections,然后调用conn.start()
BlobServerConnection
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
class BlobServerConnection extends Thread { /** The log object used for debugging. */ private static final Logger LOG = LoggerFactory.getLogger(BlobServerConnection.class); /** The socket to communicate with the client. */ private final Socket clientSocket; /** The BLOB server. */ private final BlobServer blobServer; /** Read lock to synchronize file accesses. */ private final Lock readLock; BlobServerConnection(Socket clientSocket, BlobServer blobServer) { super("BLOB connection for " + clientSocket.getRemoteSocketAddress()); setDaemon(true); this.clientSocket = clientSocket; this.blobServer = checkNotNull(blobServer); ReadWriteLock readWriteLock = blobServer.getReadWriteLock(); this.readLock = readWriteLock.readLock(); } // -------------------------------------------------------------------------------------------- // Connection / Thread methods // -------------------------------------------------------------------------------------------- /** * Main connection work method. Accepts requests until the other side closes the connection. */ @Override public void run() { try { final InputStream inputStream = this.clientSocket.getInputStream(); final OutputStream outputStream = this.clientSocket.getOutputStream(); while (true) { // Read the requested operation final int operation = inputStream.read(); if (operation < 0) { // done, no one is asking anything from us return; } switch (operation) { case PUT_OPERATION: put(inputStream, outputStream, new byte[BUFFER_SIZE]); break; case GET_OPERATION: get(inputStream, outputStream, new byte[BUFFER_SIZE]); break; default: throw new IOException("Unknown operation " + operation); } } } catch (SocketException e) { // this happens when the remote site closes the connection LOG.debug("Socket connection closed", e); } catch (Throwable t) { LOG.error("Error while executing BLOB connection.", t); } finally { closeSilently(clientSocket, LOG); blobServer.unregisterConnection(this); } } /** * Closes the connection socket and lets the thread exit. */ public void close() { closeSilently(clientSocket, LOG); interrupt(); } // -------------------------------------------------------------------------------------------- // Actions // -------------------------------------------------------------------------------------------- private void get(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException { /* * Retrieve the file from the (distributed?) BLOB store and store it * locally, then send it to the service which requested it. * * Instead, we could send it from the distributed store directly but * chances are high that if there is one request, there will be more * so a local cache makes more sense. */ final File blobFile; final JobID jobId; final BlobKey blobKey; try { // read HEADER contents: job ID, key, HA mode/permanent or transient BLOB final int mode = inputStream.read(); if (mode < 0) { throw new EOFException("Premature end of GET request"); } // Receive the jobId and key if (mode == JOB_UNRELATED_CONTENT) { jobId = null; } else if (mode == JOB_RELATED_CONTENT) { byte[] jidBytes = new byte[JobID.SIZE]; readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID"); jobId = JobID.fromByteArray(jidBytes); } else { throw new IOException("Unknown type of BLOB addressing: " + mode + '.'); } blobKey = BlobKey.readFromInputStream(inputStream); checkArgument(blobKey instanceof TransientBlobKey || jobId != null, "Invalid BLOB addressing for permanent BLOBs"); if (LOG.isDebugEnabled()) { LOG.debug("Received GET request for BLOB {}/{} from {}.", jobId, blobKey, clientSocket.getInetAddress()); } // the file's (destined) location at the BlobServer blobFile = blobServer.getStorageLocation(jobId, blobKey); // up to here, an error can give a good message } catch (Throwable t) { LOG.error("GET operation from {} failed.", clientSocket.getInetAddress(), t); try { writeErrorToStream(outputStream, t); } catch (IOException e) { // since we are in an exception case, it means that we could not send the error // ignore this } clientSocket.close(); return; } try { readLock.lock(); try { // copy the file to local store if it does not exist yet try { blobServer.getFileInternal(jobId, blobKey, blobFile); // enforce a 2GB max for now (otherwise the protocol's length field needs to be increased) if (blobFile.length() > Integer.MAX_VALUE) { throw new IOException("BLOB size exceeds the maximum size (2 GB)."); } outputStream.write(RETURN_OKAY); } catch (Throwable t) { LOG.error("GET operation failed for BLOB {}/{} from {}.", jobId, blobKey, clientSocket.getInetAddress(), t); try { writeErrorToStream(outputStream, t); } catch (IOException e) { // since we are in an exception case, it means that we could not send the error // ignore this } clientSocket.close(); return; } // from here on, we started sending data, so all we can do is close the connection when something happens int blobLen = (int) blobFile.length(); writeLength(blobLen, outputStream); try (FileInputStream fis = new FileInputStream(blobFile)) { int bytesRemaining = blobLen; while (bytesRemaining > 0) { int read = fis.read(buf); if (read < 0) { throw new IOException("Premature end of BLOB file stream for " + blobFile.getAbsolutePath()); } outputStream.write(buf, 0, read); bytesRemaining -= read; } } } finally { readLock.unlock(); } // on successful transfer, delete transient files int result = inputStream.read(); if (result < 0) { throw new EOFException("Premature end of GET request"); } else if (blobKey instanceof TransientBlobKey && result == RETURN_OKAY) { // ignore the result from the operation if (!blobServer.deleteInternal(jobId, (TransientBlobKey) blobKey)) { LOG.warn("DELETE operation failed for BLOB {}/{} from {}.", jobId, blobKey, clientSocket.getInetAddress()); } } } catch (SocketException e) { // happens when the other side disconnects LOG.debug("Socket connection closed", e); } catch (Throwable t) { LOG.error("GET operation failed", t); clientSocket.close(); } } private void put(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException { File incomingFile = null; try { // read HEADER contents: job ID, HA mode/permanent or transient BLOB final int mode = inputStream.read(); if (mode < 0) { throw new EOFException("Premature end of PUT request"); } final JobID jobId; if (mode == JOB_UNRELATED_CONTENT) { jobId = null; } else if (mode == JOB_RELATED_CONTENT) { byte[] jidBytes = new byte[JobID.SIZE]; readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID"); jobId = JobID.fromByteArray(jidBytes); } else { throw new IOException("Unknown type of BLOB addressing."); } final BlobKey.BlobType blobType; { final int read = inputStream.read(); if (read < 0) { throw new EOFException("Read an incomplete BLOB type"); } else if (read == TRANSIENT_BLOB.ordinal()) { blobType = TRANSIENT_BLOB; } else if (read == PERMANENT_BLOB.ordinal()) { blobType = PERMANENT_BLOB; checkArgument(jobId != null, "Invalid BLOB addressing for permanent BLOBs"); } else { throw new IOException("Invalid data received for the BLOB type: " + read); } } if (LOG.isDebugEnabled()) { LOG.debug("Received PUT request for BLOB of job {} with from {}.", jobId, clientSocket.getInetAddress()); } incomingFile = blobServer.createTemporaryFilename(); byte[] digest = readFileFully(inputStream, incomingFile, buf); BlobKey blobKey = blobServer.moveTempFileToStore(incomingFile, jobId, digest, blobType); // Return computed key to client for validation outputStream.write(RETURN_OKAY); blobKey.writeToOutputStream(outputStream); } catch (SocketException e) { // happens when the other side disconnects LOG.debug("Socket connection closed", e); } catch (Throwable t) { LOG.error("PUT operation failed", t); try { writeErrorToStream(outputStream, t); } catch (IOException e) { // since we are in an exception case, it means not much that we could not send the error // ignore this } clientSocket.close(); } finally { if (incomingFile != null) { if (!incomingFile.delete() && incomingFile.exists()) { LOG.warn("Cannot delete BLOB server staging file " + incomingFile.getAbsolutePath()); } } } } private static byte[] readFileFully( final InputStream inputStream, final File incomingFile, final byte[] buf) throws IOException { MessageDigest md = BlobUtils.createMessageDigest(); try (FileOutputStream fos = new FileOutputStream(incomingFile)) { while (true) { final int bytesExpected = readLength(inputStream); if (bytesExpected == -1) { // done break; } if (bytesExpected > BUFFER_SIZE) { throw new IOException( "Unexpected number of incoming bytes: " + bytesExpected); } readFully(inputStream, buf, 0, bytesExpected, "buffer"); fos.write(buf, 0, bytesExpected); md.update(buf, 0, bytesExpected); } return md.digest(); } } // -------------------------------------------------------------------------------------------- // Utilities // -------------------------------------------------------------------------------------------- private static void writeErrorToStream(OutputStream out, Throwable t) throws IOException { byte[] bytes = InstantiationUtil.serializeObject(t); out.write(RETURN_ERROR); writeLength(bytes.length, out); out.write(bytes); } }
- BlobServerConnection继承了Thread,它的构造器接收clientSocket及blobServer;它覆盖了Thread的run方法,该方法首先从clientSocket读取请求的operation,如果是PUT_OPERATION则调用put方法,如果是GET_OPERATION则调用get方法
- put方法从inputStream读取jobId及blobType,之后创建incomingFile,将输入的文件先存储到临时文件,然后调用blobServer.moveTempFileToStore方法存储到blob server
- get方法从inputStream读取jobId及blobType,之后调用blobServer.getStorageLocation获取blobFile,之后将其拷贝到local store,然后写入到outputStream
小结
- BlobServer继承了Thread,同时实现了BlobService、BlobWriter、PermanentBlobService、TransientBlobService接口;其构造器使用DefaultServerSocketFactory创建了ServerSocket,同时使用ShutdownHookUtil.addShutdownHook注册了shutdownHook,在shutdown的时候会调用close方法
- BlobServer重写了Thread的run方法,该方法在没有接收到shutdown请求的时候,会不断循环等待serverSocket.accept(),然后创建BlobServerConnection,如果当前activeConnections超过了maxConnections则会不断循环等待2000毫秒,之后将连接维护到activeConnections,然后调用conn.start()
- BlobServerConnection继承了Thread,它的构造器接收clientSocket及blobServer;它覆盖了Thread的run方法,该方法首先从clientSocket读取请求的operation,如果是PUT_OPERATION则调用put方法,如果是GET_OPERATION则调用get方法;put方法从inputStream读取jobId及blobType,之后创建incomingFile,将输入的文件先存储到临时文件,然后调用blobServer.moveTempFileToStore方法存储到blob server;get方法从inputStream读取jobId及blobType,之后调用blobServer.getStorageLocation获取blobFile,之后将其拷贝到local store,然后写入到outputStream
doc
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Practical Algorithms for Programmers
Andrew Binstock、John Rex / Addison-Wesley Professional / 1995-06-29 / USD 39.99
Most algorithm books today are either academic textbooks or rehashes of the same tired set of algorithms. Practical Algorithms for Programmers is the first book to give complete code implementations o......一起来看看 《Practical Algorithms for Programmers》 这本书的介绍吧!