内容简介:本文主要研究一下flink的log.file配置flink-release-1.6.2/flink-dist/src/main/flink-bin/conf/log4j.propertiesflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/minicluster/MiniCluster.java
序
本文主要研究一下flink的log.file配置
log4j.properties
flink-release-1.6.2/flink-dist/src/main/flink-bin/conf/log4j.properties
# This affects logging for both user code and Flink log4j.rootLogger=INFO, file # Uncomment this if you want to _only_ change Flink's logging #log4j.logger.org.apache.flink=INFO # The following lines keep the log level of common libraries/connectors on # log level INFO. The root logger does not override this. You have to manually # change the log levels here. log4j.logger.akka=INFO log4j.logger.org.apache.kafka=INFO log4j.logger.org.apache.hadoop=INFO log4j.logger.org.apache.zookeeper=INFO # Log all infos in the given file log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.file=${log.file} log4j.appender.file.append=false log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # Suppress the irrelevant (wrong) warnings from the Netty channel handler log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
- 这里使用log.file这个系统属性配置log4j.appender.file.file
MiniCluster
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/minicluster/MiniCluster.java
/** * Starts the mini cluster, based on the configured properties. * * @throws Exception This method passes on any exception that occurs during the startup of * the mini cluster. */ public void start() throws Exception { synchronized (lock) { checkState(!running, "FlinkMiniCluster is already running"); LOG.info("Starting Flink Mini Cluster"); LOG.debug("Using configuration {}", miniClusterConfiguration); final Configuration configuration = miniClusterConfiguration.getConfiguration(); final Time rpcTimeout = miniClusterConfiguration.getRpcTimeout(); final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers(); final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED; try { initializeIOFormatClasses(configuration); LOG.info("Starting Metrics Registry"); metricRegistry = createMetricRegistry(configuration); this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup( metricRegistry, "localhost"); final RpcService jobManagerRpcService; final RpcService resourceManagerRpcService; final RpcService[] taskManagerRpcServices = new RpcService[numTaskManagers]; // bring up all the RPC services LOG.info("Starting RPC Service(s)"); // we always need the 'commonRpcService' for auxiliary calls commonRpcService = createRpcService(configuration, rpcTimeout, false, null); // TODO: Temporary hack until the metric query service is ported to the RpcEndpoint final ActorSystem actorSystem = ((AkkaRpcService) commonRpcService).getActorSystem(); metricRegistry.startQueryService(actorSystem, null); if (useSingleRpcService) { for (int i = 0; i < numTaskManagers; i++) { taskManagerRpcServices[i] = commonRpcService; } jobManagerRpcService = commonRpcService; resourceManagerRpcService = commonRpcService; this.resourceManagerRpcService = null; this.jobManagerRpcService = null; this.taskManagerRpcServices = null; } else { // start a new service per component, possibly with custom bind addresses final String jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress(); final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress(); final String resourceManagerBindAddress = miniClusterConfiguration.getResourceManagerBindAddress(); jobManagerRpcService = createRpcService(configuration, rpcTimeout, true, jobManagerBindAddress); resourceManagerRpcService = createRpcService(configuration, rpcTimeout, true, resourceManagerBindAddress); for (int i = 0; i < numTaskManagers; i++) { taskManagerRpcServices[i] = createRpcService( configuration, rpcTimeout, true, taskManagerBindAddress); } this.jobManagerRpcService = jobManagerRpcService; this.taskManagerRpcServices = taskManagerRpcServices; this.resourceManagerRpcService = resourceManagerRpcService; } // create the high-availability services LOG.info("Starting high-availability services"); haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices( configuration, commonRpcService.getExecutor()); blobServer = new BlobServer(configuration, haServices.createBlobStore()); blobServer.start(); heartbeatServices = HeartbeatServices.fromConfiguration(configuration); // bring up the ResourceManager(s) LOG.info("Starting ResourceManger"); resourceManagerRunner = startResourceManager( configuration, haServices, heartbeatServices, metricRegistry, resourceManagerRpcService, new ClusterInformation("localhost", blobServer.getPort()), jobManagerMetricGroup); blobCacheService = new BlobCacheService( configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort()) ); // bring up the TaskManager(s) for the mini cluster LOG.info("Starting {} TaskManger(s)", numTaskManagers); taskManagers = startTaskManagers( configuration, haServices, heartbeatServices, metricRegistry, blobCacheService, numTaskManagers, taskManagerRpcServices); // starting the dispatcher rest endpoint LOG.info("Starting dispatcher rest endpoint."); dispatcherGatewayRetriever = new RpcGatewayRetriever<>( jobManagerRpcService, DispatcherGateway.class, DispatcherId::fromUuid, 20, Time.milliseconds(20L)); final RpcGatewayRetriever<ResourceManagerId, ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>( jobManagerRpcService, ResourceManagerGateway.class, ResourceManagerId::fromUuid, 20, Time.milliseconds(20L)); this.dispatcherRestEndpoint = new DispatcherRestEndpoint( RestServerEndpointConfiguration.fromConfiguration(configuration), dispatcherGatewayRetriever, configuration, RestHandlerConfiguration.fromConfiguration(configuration), resourceManagerGatewayRetriever, blobServer.getTransientBlobService(), WebMonitorEndpoint.createExecutorService( configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1), configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), "DispatcherRestEndpoint"), new AkkaQueryServiceRetriever( actorSystem, Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))), haServices.getWebMonitorLeaderElectionService(), new ShutDownFatalErrorHandler()); dispatcherRestEndpoint.start(); restAddressURI = new URI(dispatcherRestEndpoint.getRestBaseUrl()); // bring up the dispatcher that launches JobManagers when jobs submitted LOG.info("Starting job dispatcher(s) for JobManger"); this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, "localhost"); final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, dispatcherRestEndpoint); dispatcher = new StandaloneDispatcher( jobManagerRpcService, Dispatcher.DISPATCHER_NAME + UUID.randomUUID(), configuration, haServices, resourceManagerRunner.getResourceManageGateway(), blobServer, heartbeatServices, jobManagerMetricGroup, metricRegistry.getMetricQueryServicePath(), new MemoryArchivedExecutionGraphStore(), Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE, new ShutDownFatalErrorHandler(), dispatcherRestEndpoint.getRestBaseUrl(), historyServerArchivist); dispatcher.start(); resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever(); dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever(); resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever); dispatcherLeaderRetriever.start(dispatcherGatewayRetriever); } catch (Exception e) { // cleanup everything try { close(); } catch (Exception ee) { e.addSuppressed(ee); } throw e; } // create a new termination future terminationFuture = new CompletableFuture<>(); // now officially mark this as running running = true; LOG.info("Flink Mini Cluster started successfully"); } }
- 这里先创建了metricRegistry、commonRpcService、jobManagerRpcService、resourceManagerRpcService、haServices、blobServer、heartbeatServices、resourceManagerRunner、blobCacheService、taskManagers、dispatcherGatewayRetriever、dispatcherRestEndpoint、dispatcher、dispatcherLeaderRetriever
RestServerEndpoint
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/rest/RestServerEndpoint.java
/** * Starts this REST server endpoint. * * @throws Exception if we cannot start the RestServerEndpoint */ public final void start() throws Exception { synchronized (lock) { Preconditions.checkState(state == State.CREATED, "The RestServerEndpoint cannot be restarted."); log.info("Starting rest endpoint."); final Router router = new Router(); final CompletableFuture<String> restAddressFuture = new CompletableFuture<>(); List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = initializeHandlers(restAddressFuture); /* sort the handlers such that they are ordered the following: * /jobs * /jobs/overview * /jobs/:jobid * /jobs/:jobid/config * /:* */ Collections.sort( handlers, RestHandlerUrlComparator.INSTANCE); handlers.forEach(handler -> { log.debug("Register handler {} under {}@{}.", handler.f1, handler.f0.getHttpMethod(), handler.f0.getTargetRestEndpointURL()); registerHandler(router, handler); }); ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { RouterHandler handler = new RouterHandler(router, responseHeaders); // SSL should be the first handler in the pipeline if (sslEngineFactory != null) { ch.pipeline().addLast("ssl", new RedirectingSslHandler(restAddress, restAddressFuture, sslEngineFactory)); } ch.pipeline() .addLast(new HttpServerCodec()) .addLast(new FileUploadHandler(uploadDir)) .addLast(new FlinkHttpObjectAggregator(maxContentLength, responseHeaders)) .addLast(new ChunkedWriteHandler()) .addLast(handler.getName(), handler) .addLast(new PipelineErrorHandler(log, responseHeaders)); } }; NioEventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("flink-rest-server-netty-boss")); NioEventLoopGroup workerGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("flink-rest-server-netty-worker")); bootstrap = new ServerBootstrap(); bootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(initializer); log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort); final ChannelFuture channel; if (restBindAddress == null) { channel = bootstrap.bind(restBindPort); } else { channel = bootstrap.bind(restBindAddress, restBindPort); } serverChannel = channel.syncUninterruptibly().channel(); final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress(); final String advertisedAddress; if (bindAddress.getAddress().isAnyLocalAddress()) { advertisedAddress = this.restAddress; } else { advertisedAddress = bindAddress.getAddress().getHostAddress(); } final int port = bindAddress.getPort(); log.info("Rest endpoint listening at {}:{}", advertisedAddress, port); final String protocol; if (sslEngineFactory != null) { protocol = "https://"; } else { protocol = "http://"; } restBaseUrl = protocol + advertisedAddress + ':' + port; restAddressFuture.complete(restBaseUrl); state = State.RUNNING; startInternal(); } }
- 这里调用了initializeHandlers来获取ChannelInboundHandler,initializeHandlers在子类DispatcherRestEndpoint中有实现
DispatcherRestEndpoint
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@Override protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) { List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = super.initializeHandlers(restAddressFuture); // Add the Dispatcher specific handlers final Time timeout = restConfiguration.getTimeout(); JobSubmitHandler jobSubmitHandler = new JobSubmitHandler( restAddressFuture, leaderRetriever, timeout, responseHeaders, executor, clusterConfiguration); if (clusterConfiguration.getBoolean(WebOptions.SUBMIT_ENABLE)) { try { webSubmissionExtension = WebMonitorUtils.loadWebSubmissionExtension( leaderRetriever, restAddressFuture, timeout, responseHeaders, uploadDir, executor, clusterConfiguration); // register extension handlers handlers.addAll(webSubmissionExtension.getHandlers()); } catch (FlinkException e) { if (log.isDebugEnabled()) { log.debug("Failed to load web based job submission extension.", e); } else { log.info("Failed to load web based job submission extension. " + "Probable reason: flink-runtime-web is not in the classpath."); } } } else { log.info("Web-based job submission is not enabled."); } handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler)); return handlers; }
- 这里首先调用了父类的initializeHandlers,这里的父类为WebMonitorEndpoint(
它是RestServerEndpoint的直接子类,而DispatcherRestEndpoint又继承了WebMonitorEndpoint
)
WebMonitorEndpoint
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@Override protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) { ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(30); final Time timeout = restConfiguration.getTimeout(); //...... // TODO: Remove once the Yarn proxy can forward all REST verbs handlers.add(Tuple2.of(YarnCancelJobTerminationHeaders.getInstance(), jobCancelTerminationHandler)); handlers.add(Tuple2.of(YarnStopJobTerminationHeaders.getInstance(), jobStopTerminationHandler)); handlers.add(Tuple2.of(shutdownHandler.getMessageHeaders(), shutdownHandler)); //...... // load the log and stdout file handler for the main cluster component final WebMonitorUtils.LogFileLocation logFileLocation = WebMonitorUtils.LogFileLocation.find(clusterConfiguration); final ChannelInboundHandler logFileHandler = createStaticFileHandler( restAddressFuture, timeout, logFileLocation.logFile); final ChannelInboundHandler stdoutFileHandler = createStaticFileHandler( restAddressFuture, timeout, logFileLocation.stdOutFile); handlers.add(Tuple2.of(LogFileHandlerSpecification.getInstance(), logFileHandler)); handlers.add(Tuple2.of(StdoutFileHandlerSpecification.getInstance(), stdoutFileHandler)); // TaskManager log and stdout file handler final Time cacheEntryDuration = Time.milliseconds(restConfiguration.getRefreshInterval()); final TaskManagerLogFileHandler taskManagerLogFileHandler = new TaskManagerLogFileHandler( restAddressFuture, leaderRetriever, timeout, responseHeaders, TaskManagerLogFileHeaders.getInstance(), resourceManagerRetriever, transientBlobService, cacheEntryDuration); final TaskManagerStdoutFileHandler taskManagerStdoutFileHandler = new TaskManagerStdoutFileHandler( restAddressFuture, leaderRetriever, timeout, responseHeaders, TaskManagerStdoutFileHeaders.getInstance(), resourceManagerRetriever, transientBlobService, cacheEntryDuration); handlers.add(Tuple2.of(TaskManagerLogFileHeaders.getInstance(), taskManagerLogFileHandler)); handlers.add(Tuple2.of(TaskManagerStdoutFileHeaders.getInstance(), taskManagerStdoutFileHandler)); //...... } @Nonnull private ChannelInboundHandler createStaticFileHandler( CompletableFuture<String> restAddressFuture, Time timeout, File fileToServe) { if (fileToServe == null) { return new ConstantTextHandler("(file unavailable)"); } else { try { return new StaticFileServerHandler<>( leaderRetriever, restAddressFuture, timeout, fileToServe); } catch (IOException e) { log.info("Cannot load log file handler.", e); return new ConstantTextHandler("(log file unavailable)"); } } }
- 它初始化了一系列的ChannelInboundHandler,然后注册到handlers中
- 对于JobManager的FileHandler,它先调用了WebMonitorUtils.LogFileLocation.find(clusterConfiguration),构建了logFileLocation,之后使用logFileLocation.logFile及logFileLocation.stdOutFile分别构造了logFileHandler、stdoutFileHandler,分别用于处理log及stdout文件的下载
- 对于TaskManager的FileHandler,分别构造了TaskManagerLogFileHandler以及TaskManagerStdoutFileHandler来处理log及stdout文件的下载
JobManager FileHandler
WebMonitorUtils.LogFileLocation.find
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
/** * Singleton to hold the log and stdout file. */ public static class LogFileLocation { public final File logFile; public final File stdOutFile; private LogFileLocation(File logFile, File stdOutFile) { this.logFile = logFile; this.stdOutFile = stdOutFile; } /** * Finds the Flink log directory using log.file Java property that is set during startup. */ public static LogFileLocation find(Configuration config) { final String logEnv = "log.file"; String logFilePath = System.getProperty(logEnv); if (logFilePath == null) { LOG.warn("Log file environment variable '{}' is not set.", logEnv); logFilePath = config.getString(WebOptions.LOG_PATH); } // not configured, cannot serve log files if (logFilePath == null || logFilePath.length() < 4) { LOG.warn("JobManager log files are unavailable in the web dashboard. " + "Log file location not found in environment variable '{}' or configuration key '{}'.", logEnv, WebOptions.LOG_PATH); return new LogFileLocation(null, null); } String outFilePath = logFilePath.substring(0, logFilePath.length() - 3).concat("out"); LOG.info("Determined location of main cluster component log file: {}", logFilePath); LOG.info("Determined location of main cluster component stdout file: {}", outFilePath); return new LogFileLocation(resolveFileLocation(logFilePath), resolveFileLocation(outFilePath)); } /** * Verify log file location. * * @param logFilePath Path to log file * @return File or null if not a valid log file */ private static File resolveFileLocation(String logFilePath) { File logFile = new File(logFilePath); return (logFile.exists() && logFile.canRead()) ? logFile : null; } }
- 这里先从系统属性读取log.file属性,没有找到,则打印warning(
Log file environment variable 'log.file' is not set.
) - log.file没有配置的话,则从flink的Configuration读取WebOptions.LOG_PATH(
web.log.path
)配置,如果没有或者logFilePath.length()小于4,则打印warning(JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'Key: 'web.log.path' , default: null (deprecated keys: [jobmanager.web.log.path])'.
) - 这里之所以要logFilePath.length()大于等于4,主要是后面要使用logFilePath.substring(0, logFilePath.length() - 3).concat("out")来构建outFilePath;然后通过resolveFileLocation方法校验logFilePath及outFilePath,构建LogFileLocation返回
StaticFileServerHandler
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
/** * Simple file server handler that serves requests to web frontend's static files, such as * HTML, CSS, or JS files. * * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server * example.</p> */ @ChannelHandler.Sharable public class StaticFileServerHandler<T extends RestfulGateway> extends RedirectHandler<T> { /** Timezone in which this server answers its "if-modified" requests. */ private static final TimeZone GMT_TIMEZONE = TimeZone.getTimeZone("GMT"); /** Date format for HTTP. */ public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz"; /** Be default, we allow files to be cached for 5 minutes. */ private static final int HTTP_CACHE_SECONDS = 300; // ------------------------------------------------------------------------ /** The path in which the static documents are. */ private final File rootPath; public StaticFileServerHandler( GatewayRetriever<? extends T> retriever, CompletableFuture<String> localJobManagerAddressFuture, Time timeout, File rootPath) throws IOException { super(localJobManagerAddressFuture, retriever, timeout, Collections.emptyMap()); this.rootPath = checkNotNull(rootPath).getCanonicalFile(); } // ------------------------------------------------------------------------ // Responses to requests // ------------------------------------------------------------------------ @Override protected void respondAsLeader(ChannelHandlerContext channelHandlerContext, RoutedRequest routedRequest, T gateway) throws Exception { final HttpRequest request = routedRequest.getRequest(); final String requestPath; // make sure we request the "index.html" in case there is a directory request if (routedRequest.getPath().endsWith("/")) { requestPath = routedRequest.getPath() + "index.html"; } // in case the files being accessed are logs or stdout files, find appropriate paths. else if (routedRequest.getPath().equals("/jobmanager/log") || routedRequest.getPath().equals("/jobmanager/stdout")) { requestPath = ""; } else { requestPath = routedRequest.getPath(); } respondToRequest(channelHandlerContext, request, requestPath); } //...... @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { if (ctx.channel().isActive()) { logger.error("Caught exception", cause); sendError(ctx, INTERNAL_SERVER_ERROR); } } }
- 对于/jobmanager/log以及/jobmanager/stdout它会重置一下requestPath,之后调用respondToRequest处理,它根据rootPath来传输文件
TaskManager FileHandler
TaskManagerLogFileHandler
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerLogFileHandler.java
/** * Rest handler which serves the log files from {@link TaskExecutor}. */ public class TaskManagerLogFileHandler extends AbstractTaskManagerFileHandler<TaskManagerMessageParameters> { public TaskManagerLogFileHandler( @Nonnull CompletableFuture<String> localAddressFuture, @Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever, @Nonnull Time timeout, @Nonnull Map<String, String> responseHeaders, @Nonnull UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> untypedResponseMessageHeaders, @Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever, @Nonnull TransientBlobService transientBlobService, @Nonnull Time cacheEntryDuration) { super(localAddressFuture, leaderRetriever, timeout, responseHeaders, untypedResponseMessageHeaders, resourceManagerGatewayRetriever, transientBlobService, cacheEntryDuration); } @Override protected CompletableFuture<TransientBlobKey> requestFileUpload(ResourceManagerGateway resourceManagerGateway, ResourceID taskManagerResourceId) { return resourceManagerGateway.requestTaskManagerFileUpload(taskManagerResourceId, FileType.LOG, timeout); } }
- 它的requestFileUpload是调用了ResourceManager.requestTaskManagerFileUpload,传递的FileType是FileType.LOG类型
TaskManagerStdoutFileHandler.requestFileUpload
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerStdoutFileHandler.java
/** * Rest handler which serves the stdout file of the {@link TaskExecutor}. */ public class TaskManagerStdoutFileHandler extends AbstractTaskManagerFileHandler<TaskManagerMessageParameters> { public TaskManagerStdoutFileHandler( @Nonnull CompletableFuture<String> localAddressFuture, @Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever, @Nonnull Time timeout, @Nonnull Map<String, String> responseHeaders, @Nonnull UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> untypedResponseMessageHeaders, @Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever, @Nonnull TransientBlobService transientBlobService, @Nonnull Time cacheEntryDuration) { super(localAddressFuture, leaderRetriever, timeout, responseHeaders, untypedResponseMessageHeaders, resourceManagerGatewayRetriever, transientBlobService, cacheEntryDuration); } @Override protected CompletableFuture<TransientBlobKey> requestFileUpload(ResourceManagerGateway resourceManagerGateway, ResourceID taskManagerResourceId) { return resourceManagerGateway.requestTaskManagerFileUpload(taskManagerResourceId, FileType.STDOUT, timeout); } }
- 它的requestFileUpload是调用了ResourceManager.requestTaskManagerFileUpload,传递的FileType是FileType.STDOUT类型
ResourceManager.requestTaskManagerFileUpload
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@Override public CompletableFuture<TransientBlobKey> requestTaskManagerFileUpload(ResourceID taskManagerId, FileType fileType, Time timeout) { log.debug("Request file {} upload from TaskExecutor {}.", fileType, taskManagerId); final WorkerRegistration<WorkerType> taskExecutor = taskExecutors.get(taskManagerId); if (taskExecutor == null) { log.debug("Requested file {} upload from unregistered TaskExecutor {}.", fileType, taskManagerId); return FutureUtils.completedExceptionally(new UnknownTaskExecutorException(taskManagerId)); } else { return taskExecutor.getTaskExecutorGateway().requestFileUpload(fileType, timeout); } }
- ResourceManager的requestTaskManagerFileUpload是通过TaskExecutor.requestFileUpload来实现的
TaskExecutor.requestFileUpload
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@Override public CompletableFuture<TransientBlobKey> requestFileUpload(FileType fileType, Time timeout) { log.debug("Request file {} upload.", fileType); final String filePath; switch (fileType) { case LOG: filePath = taskManagerConfiguration.getTaskManagerLogPath(); break; case STDOUT: filePath = taskManagerConfiguration.getTaskManagerStdoutPath(); break; default: filePath = null; } if (filePath != null && !filePath.isEmpty()) { final File file = new File(filePath); if (file.exists()) { final TransientBlobCache transientBlobService = blobCacheService.getTransientBlobService(); final TransientBlobKey transientBlobKey; try (FileInputStream fileInputStream = new FileInputStream(file)) { transientBlobKey = transientBlobService.putTransient(fileInputStream); } catch (IOException e) { log.debug("Could not upload file {}.", fileType, e); return FutureUtils.completedExceptionally(new FlinkException("Could not upload file " + fileType + '.', e)); } return CompletableFuture.completedFuture(transientBlobKey); } else { log.debug("The file {} does not exist on the TaskExecutor {}.", fileType, getResourceID()); return FutureUtils.completedExceptionally(new FlinkException("The file " + fileType + " does not exist on the TaskExecutor.")); } } else { log.debug("The file {} is unavailable on the TaskExecutor {}.", fileType, getResourceID()); return FutureUtils.completedExceptionally(new FlinkException("The file " + fileType + " is not available on the TaskExecutor.")); } }
- TaskExecutor的requestFileUpload会根据fileType来获取filePath,如果是LOG类型取的是taskManagerConfiguration.getTaskManagerLogPath();如果是STDOUT类型,取的是taskManagerConfiguration.getTaskManagerStdoutPath(),之后将文件传输过去
TaskManagerRunner.startTaskManager
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
public static TaskExecutor startTaskManager( Configuration configuration, ResourceID resourceID, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, BlobCacheService blobCacheService, boolean localCommunicationOnly, FatalErrorHandler fatalErrorHandler) throws Exception { checkNotNull(configuration); checkNotNull(resourceID); checkNotNull(rpcService); checkNotNull(highAvailabilityServices); LOG.info("Starting TaskManager with ResourceID: {}", resourceID); InetAddress remoteAddress = InetAddress.getByName(rpcService.getAddress()); TaskManagerServicesConfiguration taskManagerServicesConfiguration = TaskManagerServicesConfiguration.fromConfiguration( configuration, remoteAddress, localCommunicationOnly); TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration( taskManagerServicesConfiguration, resourceID, rpcService.getExecutor(), // TODO replace this later with some dedicated executor for io. EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(), EnvironmentInformation.getMaxJvmHeapMemory()); TaskManagerMetricGroup taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup( metricRegistry, taskManagerServices.getTaskManagerLocation(), taskManagerServices.getNetworkEnvironment()); TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); return new TaskExecutor( rpcService, taskManagerConfiguration, highAvailabilityServices, taskManagerServices, heartbeatServices, taskManagerMetricGroup, blobCacheService, fatalErrorHandler); }
- TaskManagerRunner.startTaskManager通过TaskManagerConfiguration.fromConfiguration(configuration)构造了taskManagerConfiguration
TaskManagerConfiguration.fromConfiguration
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
public static TaskManagerConfiguration fromConfiguration(Configuration configuration) { int numberSlots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1); if (numberSlots == -1) { numberSlots = 1; } //...... final String taskManagerLogPath = configuration.getString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, System.getProperty("log.file")); final String taskManagerStdoutPath; if (taskManagerLogPath != null) { final int extension = taskManagerLogPath.lastIndexOf('.'); if (extension > 0) { taskManagerStdoutPath = taskManagerLogPath.substring(0, extension) + ".out"; } else { taskManagerStdoutPath = null; } } else { taskManagerStdoutPath = null; } return new TaskManagerConfiguration( numberSlots, tmpDirPaths, timeout, finiteRegistrationDuration, initialRegistrationPause, maxRegistrationPause, refusedRegistrationPause, configuration, exitOnOom, FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder), alwaysParentFirstLoaderPatterns, taskManagerLogPath, taskManagerStdoutPath); }
- TaskManagerConfiguration.fromConfiguration里头首先根据ConfigConstants.TASK_MANAGER_LOG_PATH_KEY(
taskmanager.log.path
)从flink的Configuration读取taskManagerLogPath,如果读取不到,则取系统属性log.file;如果读取到taskManagerLogPath不为null,则换个后缀构建taskManagerStdoutPath
小结
web.log.path taskmanager.log.path
doc
以上所述就是小编给大家介绍的《聊聊flink的log.file配置 原 荐》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 聊聊flink的logback配置
- 聊聊 Nacos 配置隔离和分类的使用
- 聊聊flink的slot.request.timeout配置
- 聊聊flink的slot.idle.timeout配置
- 聊聊动态规划(2) -- 特征
- 聊聊动态规划(1) -- 概念
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。