内容简介:本文主要研究一下flink的RestClusterClientConfigurationflink-release-1.7.2/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.javaflink-release-1.7.2/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClu
序
本文主要研究一下flink的RestClusterClientConfiguration
RestClusterClientConfiguration
flink-release-1.7.2/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java
public final class RestClusterClientConfiguration { private final RestClientConfiguration restClientConfiguration; private final long awaitLeaderTimeout; private final int retryMaxAttempts; private final long retryDelay; private RestClusterClientConfiguration( final RestClientConfiguration endpointConfiguration, final long awaitLeaderTimeout, final int retryMaxAttempts, final long retryDelay) { checkArgument(awaitLeaderTimeout >= 0, "awaitLeaderTimeout must be equal to or greater than 0"); checkArgument(retryMaxAttempts >= 0, "retryMaxAttempts must be equal to or greater than 0"); checkArgument(retryDelay >= 0, "retryDelay must be equal to or greater than 0"); this.restClientConfiguration = Preconditions.checkNotNull(endpointConfiguration); this.awaitLeaderTimeout = awaitLeaderTimeout; this.retryMaxAttempts = retryMaxAttempts; this.retryDelay = retryDelay; } public RestClientConfiguration getRestClientConfiguration() { return restClientConfiguration; } /** * @see RestOptions#AWAIT_LEADER_TIMEOUT */ public long getAwaitLeaderTimeout() { return awaitLeaderTimeout; } /** * @see RestOptions#RETRY_MAX_ATTEMPTS */ public int getRetryMaxAttempts() { return retryMaxAttempts; } /** * @see RestOptions#RETRY_DELAY */ public long getRetryDelay() { return retryDelay; } public static RestClusterClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException { RestClientConfiguration restClientConfiguration = RestClientConfiguration.fromConfiguration(config); final long awaitLeaderTimeout = config.getLong(RestOptions.AWAIT_LEADER_TIMEOUT); final int retryMaxAttempts = config.getInteger(RestOptions.RETRY_MAX_ATTEMPTS); final long retryDelay = config.getLong(RestOptions.RETRY_DELAY); return new RestClusterClientConfiguration(restClientConfiguration, awaitLeaderTimeout, retryMaxAttempts, retryDelay); } }
- RestClusterClientConfiguration除了RestClientConfiguration外,还有3个属性,分别是awaitLeaderTimeout、retryMaxAttempts、retryDelay;awaitLeaderTimeout读取的是rest.await-leader-timeout配置,默认是30秒;retryMaxAttempts读取的是rest.retry.max-attempts配置,默认是20;retryDelay读取的是rest.retry.delay配置,默认是3秒
RestClusterClient
flink-release-1.7.2/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
public class RestClusterClient<T> extends ClusterClient<T> implements NewClusterClient { private final RestClusterClientConfiguration restClusterClientConfiguration; private final RestClient restClient; private final ExecutorService executorService = Executors.newFixedThreadPool(4, new ExecutorThreadFactory("Flink-RestClusterClient-IO")); private final WaitStrategy waitStrategy; private final T clusterId; private final LeaderRetrievalService webMonitorRetrievalService; private final LeaderRetrievalService dispatcherRetrievalService; private final LeaderRetriever webMonitorLeaderRetriever = new LeaderRetriever(); private final LeaderRetriever dispatcherLeaderRetriever = new LeaderRetriever(); /** ExecutorService to run operations that can be retried on exceptions. */ private ScheduledExecutorService retryExecutorService; //...... private <C> CompletableFuture<C> retry( CheckedSupplier<CompletableFuture<C>> operation, Predicate<Throwable> retryPredicate) { return FutureUtils.retryWithDelay( CheckedSupplier.unchecked(operation), restClusterClientConfiguration.getRetryMaxAttempts(), Time.milliseconds(restClusterClientConfiguration.getRetryDelay()), retryPredicate, new ScheduledExecutorServiceAdapter(retryExecutorService)); } @VisibleForTesting CompletableFuture<URL> getWebMonitorBaseUrl() { return FutureUtils.orTimeout( webMonitorLeaderRetriever.getLeaderFuture(), restClusterClientConfiguration.getAwaitLeaderTimeout(), TimeUnit.MILLISECONDS) .thenApplyAsync(leaderAddressSessionId -> { final String url = leaderAddressSessionId.f0; try { return new URL(url); } catch (MalformedURLException e) { throw new IllegalArgumentException("Could not parse URL from " + url, e); } }, executorService); } //...... }
- RestClusterClient的构造器会从使用RestClusterClientConfiguration.fromConfiguration(configuration)方法从Configuration构建RestClusterClientConfiguration
- retry方法内部使用的是FutureUtils.retryWithDelay方法,其retries参数使用的是restClusterClientConfiguration.getRetryMaxAttempts(),retryDelay参数使用的是Time.milliseconds(restClusterClientConfiguration.getRetryDelay())
- getWebMonitorBaseUrl方法内部使用的是FutureUtils.orTimeout方法,其timeout参数使用的是restClusterClientConfiguration.getAwaitLeaderTimeout()
小结
- RestClusterClientConfiguration除了RestClientConfiguration外,还有3个属性,分别是awaitLeaderTimeout、retryMaxAttempts、retryDelay
- awaitLeaderTimeout读取的是rest.await-leader-timeout配置,默认是30秒;retryMaxAttempts读取的是rest.retry.max-attempts配置,默认是20;retryDelay读取的是rest.retry.delay配置,默认是3秒
- RestClusterClient的etry方法内部使用的是FutureUtils.retryWithDelay方法,其retries参数使用的是restClusterClientConfiguration.getRetryMaxAttempts(),retryDelay参数使用的是Time.milliseconds(restClusterClientConfiguration.getRetryDelay());getWebMonitorBaseUrl方法内部使用的是FutureUtils.orTimeout方法,其timeout参数使用的是restClusterClientConfiguration.getAwaitLeaderTimeout()
doc
以上所述就是小编给大家介绍的《聊聊flink的RestClusterClientConfiguration》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Linux程序设计
Neil Matthew、Richard Stones / 陈健、宋健建 / 人民邮电出版社 / 201005 / 99.00元
时至今日,Linux系统已经从一个个人作品发展为可以用于各种关键任务的成熟、高效和稳定的操作系统,因为具备跨平台、开源、支持众多应用软件和网络协议等优点,它得到了各大主流软硬件厂商的支持,也成为广大程序设计人员理想的开发平台。 本书是Linux程序设计领域的经典名著,以简单易懂、内容全面和示例丰富而受到广泛好评。中文版前两版出版后,在国内的Linux爱好者和程序员中也引起了强烈反响,这一热潮......一起来看看 《Linux程序设计》 这本书的介绍吧!