内容简介:本文主要研究一下flink的RestClientConfigurationflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.javaflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
序
本文主要研究一下flink的RestClientConfiguration
RestClientConfiguration
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
public final class RestClientConfiguration { @Nullable private final SSLHandlerFactory sslHandlerFactory; private final long connectionTimeout; private final long idlenessTimeout; private final int maxContentLength; private RestClientConfiguration( @Nullable final SSLHandlerFactory sslHandlerFactory, final long connectionTimeout, final long idlenessTimeout, final int maxContentLength) { checkArgument(maxContentLength > 0, "maxContentLength must be positive, was: %d", maxContentLength); this.sslHandlerFactory = sslHandlerFactory; this.connectionTimeout = connectionTimeout; this.idlenessTimeout = idlenessTimeout; this.maxContentLength = maxContentLength; } /** * Returns the {@link SSLEngine} that the REST client endpoint should use. * * @return SSLEngine that the REST client endpoint should use, or null if SSL was disabled */ @Nullable public SSLHandlerFactory getSslHandlerFactory() { return sslHandlerFactory; } /** * {@see RestOptions#CONNECTION_TIMEOUT}. */ public long getConnectionTimeout() { return connectionTimeout; } /** * {@see RestOptions#IDLENESS_TIMEOUT}. */ public long getIdlenessTimeout() { return idlenessTimeout; } /** * Returns the max content length that the REST client endpoint could handle. * * @return max content length that the REST client endpoint could handle */ public int getMaxContentLength() { return maxContentLength; } /** * Creates and returns a new {@link RestClientConfiguration} from the given {@link Configuration}. * * @param config configuration from which the REST client endpoint configuration should be created from * @return REST client endpoint configuration * @throws ConfigurationException if SSL was configured incorrectly */ public static RestClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException { Preconditions.checkNotNull(config); final SSLHandlerFactory sslHandlerFactory; if (SSLUtils.isRestSSLEnabled(config)) { try { sslHandlerFactory = SSLUtils.createRestClientSSLEngineFactory(config); } catch (Exception e) { throw new ConfigurationException("Failed to initialize SSLContext for the REST client", e); } } else { sslHandlerFactory = null; } final long connectionTimeout = config.getLong(RestOptions.CONNECTION_TIMEOUT); final long idlenessTimeout = config.getLong(RestOptions.IDLENESS_TIMEOUT); int maxContentLength = config.getInteger(RestOptions.CLIENT_MAX_CONTENT_LENGTH); return new RestClientConfiguration(sslHandlerFactory, connectionTimeout, idlenessTimeout, maxContentLength); } }
- RestClientConfiguration有四个属性,分别是sslHandlerFactory、connectionTimeout、idlenessTimeout、maxContentLength
- fromConfiguration方法从Configuration中创建SSLHandlerFactory,其读取的是相关配置有security.ssl.rest.enabled,默认为false;security.ssl.protocol,默认为TLSv1.2;security.ssl.algorithms,默认为TLS_RSA_WITH_AES_128_CBC_SHA;security.ssl.rest.authentication-enabled,默认为false
- connectionTimeout读取的是rest.connection-timeout配置,默认是15000毫秒;idlenessTimeout读取的是rest.idleness-timeout配置,默认5分钟;maxContentLength读取的是rest.client.max-content-length配置,默认是104_857_600
RestClient
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
public class RestClient implements AutoCloseableAsync { private static final Logger LOG = LoggerFactory.getLogger(RestClient.class); private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); // used to open connections to a rest server endpoint private final Executor executor; private final Bootstrap bootstrap; private final CompletableFuture<Void> terminationFuture; private final AtomicBoolean isRunning = new AtomicBoolean(true); public RestClient(RestClientConfiguration configuration, Executor executor) { Preconditions.checkNotNull(configuration); this.executor = Preconditions.checkNotNull(executor); this.terminationFuture = new CompletableFuture<>(); final SSLHandlerFactory sslHandlerFactory = configuration.getSslHandlerFactory(); ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) { try { // SSL should be the first handler in the pipeline if (sslHandlerFactory != null) { socketChannel.pipeline().addLast("ssl", sslHandlerFactory.createNettySSLHandler()); } socketChannel.pipeline() .addLast(new HttpClientCodec()) .addLast(new HttpObjectAggregator(configuration.getMaxContentLength())) .addLast(new ChunkedWriteHandler()) // required for multipart-requests .addLast(new IdleStateHandler(configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), TimeUnit.MILLISECONDS)) .addLast(new ClientHandler()); } catch (Throwable t) { t.printStackTrace(); ExceptionUtils.rethrow(t); } } }; NioEventLoopGroup group = new NioEventLoopGroup(1, new ExecutorThreadFactory("flink-rest-client-netty")); bootstrap = new Bootstrap(); bootstrap .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(configuration.getConnectionTimeout())) .group(group) .channel(NioSocketChannel.class) .handler(initializer); LOG.info("Rest client endpoint started."); } @Override public CompletableFuture<Void> closeAsync() { return shutdownInternally(Time.seconds(10L)); } public void shutdown(Time timeout) { final CompletableFuture<Void> shutDownFuture = shutdownInternally(timeout); try { shutDownFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); LOG.info("Rest endpoint shutdown complete."); } catch (Exception e) { LOG.warn("Rest endpoint shutdown failed.", e); } } private CompletableFuture<Void> shutdownInternally(Time timeout) { if (isRunning.compareAndSet(true, false)) { LOG.info("Shutting down rest endpoint."); if (bootstrap != null) { if (bootstrap.group() != null) { bootstrap.group().shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS) .addListener(finished -> { if (finished.isSuccess()) { terminationFuture.complete(null); } else { terminationFuture.completeExceptionally(finished.cause()); } }); } } } return terminationFuture; } //...... }
- RestClient的构造器接收RestClientConfiguration及Executor两个参数,构造器里头创建了netty的Bootstrap,其中ChannelOption.CONNECT_TIMEOUT_MILLIS使用的是configuration.getConnectionTimeout();IdleStateHandler的readerIdleTime、writerIdleTime、allIdleTime使用的是configuration.getIdlenessTimeout();HttpObjectAggregator的maxContentLength使用的是configuration.getMaxContentLength();SSLHandlerFactory使用的是configuration.getSslHandlerFactory()
小结
- RestClientConfiguration有四个属性,分别是sslHandlerFactory、connectionTimeout、idlenessTimeout、maxContentLength;fromConfiguration方法从Configuration中创建SSLHandlerFactory,其读取的是相关配置有security.ssl.rest.enabled,默认为false;security.ssl.protocol,默认为TLSv1.2;security.ssl.algorithms,默认为TLS_RSA_WITH_AES_128_CBC_SHA;security.ssl.rest.authentication-enabled,默认为false
- connectionTimeout读取的是rest.connection-timeout配置,默认是15000毫秒;idlenessTimeout读取的是rest.idleness-timeout配置,默认5分钟;maxContentLength读取的是rest.client.max-content-length配置,默认是104_857_600
- RestClient的构造器接收RestClientConfiguration及Executor两个参数,构造器里头创建了netty的Bootstrap,其中ChannelOption.CONNECT_TIMEOUT_MILLIS使用的是configuration.getConnectionTimeout();IdleStateHandler的readerIdleTime、writerIdleTime、allIdleTime使用的是configuration.getIdlenessTimeout();HttpObjectAggregator的maxContentLength使用的是configuration.getMaxContentLength();SSLHandlerFactory使用的是configuration.getSslHandlerFactory()
doc
以上所述就是小编给大家介绍的《聊聊flink的RestClientConfiguration》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
互联网思维的企业
[美] Dave Gray Thomas Vander Wal / 张 玳 / 人民邮电出版社 / 2014-4-25 / 59.00元
本书指导企业跳出仅更新自家产品和服务的怪圈,在管理方式、组织结构和公司文化方面进行变革,建立具有互联网思维的企业。书中通过大量图示和示例阐述了互联式公司必需的基础元素(透明的互动和交流平台,推崇自治和应变的组织结构,实验和学习的企业文化),以及一套鼓励员工创新的新式管理和奖励体系。最后,讨论板可方便你在工作时间和同事探讨如何增加公司的互联程度。一起来看看 《互联网思维的企业》 这本书的介绍吧!