内容简介:本文主要研究一下flink的RestClientConfigurationflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.javaflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
序
本文主要研究一下flink的RestClientConfiguration
RestClientConfiguration
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
public final class RestClientConfiguration {
@Nullable
private final SSLHandlerFactory sslHandlerFactory;
private final long connectionTimeout;
private final long idlenessTimeout;
private final int maxContentLength;
private RestClientConfiguration(
@Nullable final SSLHandlerFactory sslHandlerFactory,
final long connectionTimeout,
final long idlenessTimeout,
final int maxContentLength) {
checkArgument(maxContentLength > 0, "maxContentLength must be positive, was: %d", maxContentLength);
this.sslHandlerFactory = sslHandlerFactory;
this.connectionTimeout = connectionTimeout;
this.idlenessTimeout = idlenessTimeout;
this.maxContentLength = maxContentLength;
}
/**
* Returns the {@link SSLEngine} that the REST client endpoint should use.
*
* @return SSLEngine that the REST client endpoint should use, or null if SSL was disabled
*/
@Nullable
public SSLHandlerFactory getSslHandlerFactory() {
return sslHandlerFactory;
}
/**
* {@see RestOptions#CONNECTION_TIMEOUT}.
*/
public long getConnectionTimeout() {
return connectionTimeout;
}
/**
* {@see RestOptions#IDLENESS_TIMEOUT}.
*/
public long getIdlenessTimeout() {
return idlenessTimeout;
}
/**
* Returns the max content length that the REST client endpoint could handle.
*
* @return max content length that the REST client endpoint could handle
*/
public int getMaxContentLength() {
return maxContentLength;
}
/**
* Creates and returns a new {@link RestClientConfiguration} from the given {@link Configuration}.
*
* @param config configuration from which the REST client endpoint configuration should be created from
* @return REST client endpoint configuration
* @throws ConfigurationException if SSL was configured incorrectly
*/
public static RestClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
Preconditions.checkNotNull(config);
final SSLHandlerFactory sslHandlerFactory;
if (SSLUtils.isRestSSLEnabled(config)) {
try {
sslHandlerFactory = SSLUtils.createRestClientSSLEngineFactory(config);
} catch (Exception e) {
throw new ConfigurationException("Failed to initialize SSLContext for the REST client", e);
}
} else {
sslHandlerFactory = null;
}
final long connectionTimeout = config.getLong(RestOptions.CONNECTION_TIMEOUT);
final long idlenessTimeout = config.getLong(RestOptions.IDLENESS_TIMEOUT);
int maxContentLength = config.getInteger(RestOptions.CLIENT_MAX_CONTENT_LENGTH);
return new RestClientConfiguration(sslHandlerFactory, connectionTimeout, idlenessTimeout, maxContentLength);
}
}
- RestClientConfiguration有四个属性,分别是sslHandlerFactory、connectionTimeout、idlenessTimeout、maxContentLength
- fromConfiguration方法从Configuration中创建SSLHandlerFactory,其读取的是相关配置有security.ssl.rest.enabled,默认为false;security.ssl.protocol,默认为TLSv1.2;security.ssl.algorithms,默认为TLS_RSA_WITH_AES_128_CBC_SHA;security.ssl.rest.authentication-enabled,默认为false
- connectionTimeout读取的是rest.connection-timeout配置,默认是15000毫秒;idlenessTimeout读取的是rest.idleness-timeout配置,默认5分钟;maxContentLength读取的是rest.client.max-content-length配置,默认是104_857_600
RestClient
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
public class RestClient implements AutoCloseableAsync {
private static final Logger LOG = LoggerFactory.getLogger(RestClient.class);
private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
// used to open connections to a rest server endpoint
private final Executor executor;
private final Bootstrap bootstrap;
private final CompletableFuture<Void> terminationFuture;
private final AtomicBoolean isRunning = new AtomicBoolean(true);
public RestClient(RestClientConfiguration configuration, Executor executor) {
Preconditions.checkNotNull(configuration);
this.executor = Preconditions.checkNotNull(executor);
this.terminationFuture = new CompletableFuture<>();
final SSLHandlerFactory sslHandlerFactory = configuration.getSslHandlerFactory();
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
try {
// SSL should be the first handler in the pipeline
if (sslHandlerFactory != null) {
socketChannel.pipeline().addLast("ssl", sslHandlerFactory.createNettySSLHandler());
}
socketChannel.pipeline()
.addLast(new HttpClientCodec())
.addLast(new HttpObjectAggregator(configuration.getMaxContentLength()))
.addLast(new ChunkedWriteHandler()) // required for multipart-requests
.addLast(new IdleStateHandler(configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), TimeUnit.MILLISECONDS))
.addLast(new ClientHandler());
} catch (Throwable t) {
t.printStackTrace();
ExceptionUtils.rethrow(t);
}
}
};
NioEventLoopGroup group = new NioEventLoopGroup(1, new ExecutorThreadFactory("flink-rest-client-netty"));
bootstrap = new Bootstrap();
bootstrap
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(configuration.getConnectionTimeout()))
.group(group)
.channel(NioSocketChannel.class)
.handler(initializer);
LOG.info("Rest client endpoint started.");
}
@Override
public CompletableFuture<Void> closeAsync() {
return shutdownInternally(Time.seconds(10L));
}
public void shutdown(Time timeout) {
final CompletableFuture<Void> shutDownFuture = shutdownInternally(timeout);
try {
shutDownFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
LOG.info("Rest endpoint shutdown complete.");
} catch (Exception e) {
LOG.warn("Rest endpoint shutdown failed.", e);
}
}
private CompletableFuture<Void> shutdownInternally(Time timeout) {
if (isRunning.compareAndSet(true, false)) {
LOG.info("Shutting down rest endpoint.");
if (bootstrap != null) {
if (bootstrap.group() != null) {
bootstrap.group().shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
.addListener(finished -> {
if (finished.isSuccess()) {
terminationFuture.complete(null);
} else {
terminationFuture.completeExceptionally(finished.cause());
}
});
}
}
}
return terminationFuture;
}
//......
}
- RestClient的构造器接收RestClientConfiguration及Executor两个参数,构造器里头创建了netty的Bootstrap,其中ChannelOption.CONNECT_TIMEOUT_MILLIS使用的是configuration.getConnectionTimeout();IdleStateHandler的readerIdleTime、writerIdleTime、allIdleTime使用的是configuration.getIdlenessTimeout();HttpObjectAggregator的maxContentLength使用的是configuration.getMaxContentLength();SSLHandlerFactory使用的是configuration.getSslHandlerFactory()
小结
- RestClientConfiguration有四个属性,分别是sslHandlerFactory、connectionTimeout、idlenessTimeout、maxContentLength;fromConfiguration方法从Configuration中创建SSLHandlerFactory,其读取的是相关配置有security.ssl.rest.enabled,默认为false;security.ssl.protocol,默认为TLSv1.2;security.ssl.algorithms,默认为TLS_RSA_WITH_AES_128_CBC_SHA;security.ssl.rest.authentication-enabled,默认为false
- connectionTimeout读取的是rest.connection-timeout配置,默认是15000毫秒;idlenessTimeout读取的是rest.idleness-timeout配置,默认5分钟;maxContentLength读取的是rest.client.max-content-length配置,默认是104_857_600
- RestClient的构造器接收RestClientConfiguration及Executor两个参数,构造器里头创建了netty的Bootstrap,其中ChannelOption.CONNECT_TIMEOUT_MILLIS使用的是configuration.getConnectionTimeout();IdleStateHandler的readerIdleTime、writerIdleTime、allIdleTime使用的是configuration.getIdlenessTimeout();HttpObjectAggregator的maxContentLength使用的是configuration.getMaxContentLength();SSLHandlerFactory使用的是configuration.getSslHandlerFactory()
doc
以上所述就是小编给大家介绍的《聊聊flink的RestClientConfiguration》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
渐进增强的Web设计
[美] Todd Parker、[英] Patty Toland、[英] Scott Jehl、[法] Maggie Costello Wachs / 牛化成 / 人民邮电出版社 / 2014-1 / 69.00
本书由全球著名Web设计公司Filament集团两位创始人和两位开发主力联手打造,其中Scott Jehl还是jQuery团队成员。四位作者具有多年的网站设计和开发经验,曾为网站、无线设备、Web应用设计过众多高度实用的用户界面,受到了高度赞扬。本书展示了如何利用渐进增强方法开发网站,从而获得最佳用户体验。本书既是理解渐进增强原则和益处的实用指南,也用详细的案例分析,目的是向设计师以及开发人员传授......一起来看看 《渐进增强的Web设计》 这本书的介绍吧!