内容简介:本文主要研究一下flink的RestClientConfigurationflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.javaflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
序
本文主要研究一下flink的RestClientConfiguration
RestClientConfiguration
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
public final class RestClientConfiguration {
@Nullable
private final SSLHandlerFactory sslHandlerFactory;
private final long connectionTimeout;
private final long idlenessTimeout;
private final int maxContentLength;
private RestClientConfiguration(
@Nullable final SSLHandlerFactory sslHandlerFactory,
final long connectionTimeout,
final long idlenessTimeout,
final int maxContentLength) {
checkArgument(maxContentLength > 0, "maxContentLength must be positive, was: %d", maxContentLength);
this.sslHandlerFactory = sslHandlerFactory;
this.connectionTimeout = connectionTimeout;
this.idlenessTimeout = idlenessTimeout;
this.maxContentLength = maxContentLength;
}
/**
* Returns the {@link SSLEngine} that the REST client endpoint should use.
*
* @return SSLEngine that the REST client endpoint should use, or null if SSL was disabled
*/
@Nullable
public SSLHandlerFactory getSslHandlerFactory() {
return sslHandlerFactory;
}
/**
* {@see RestOptions#CONNECTION_TIMEOUT}.
*/
public long getConnectionTimeout() {
return connectionTimeout;
}
/**
* {@see RestOptions#IDLENESS_TIMEOUT}.
*/
public long getIdlenessTimeout() {
return idlenessTimeout;
}
/**
* Returns the max content length that the REST client endpoint could handle.
*
* @return max content length that the REST client endpoint could handle
*/
public int getMaxContentLength() {
return maxContentLength;
}
/**
* Creates and returns a new {@link RestClientConfiguration} from the given {@link Configuration}.
*
* @param config configuration from which the REST client endpoint configuration should be created from
* @return REST client endpoint configuration
* @throws ConfigurationException if SSL was configured incorrectly
*/
public static RestClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
Preconditions.checkNotNull(config);
final SSLHandlerFactory sslHandlerFactory;
if (SSLUtils.isRestSSLEnabled(config)) {
try {
sslHandlerFactory = SSLUtils.createRestClientSSLEngineFactory(config);
} catch (Exception e) {
throw new ConfigurationException("Failed to initialize SSLContext for the REST client", e);
}
} else {
sslHandlerFactory = null;
}
final long connectionTimeout = config.getLong(RestOptions.CONNECTION_TIMEOUT);
final long idlenessTimeout = config.getLong(RestOptions.IDLENESS_TIMEOUT);
int maxContentLength = config.getInteger(RestOptions.CLIENT_MAX_CONTENT_LENGTH);
return new RestClientConfiguration(sslHandlerFactory, connectionTimeout, idlenessTimeout, maxContentLength);
}
}
- RestClientConfiguration有四个属性,分别是sslHandlerFactory、connectionTimeout、idlenessTimeout、maxContentLength
- fromConfiguration方法从Configuration中创建SSLHandlerFactory,其读取的是相关配置有security.ssl.rest.enabled,默认为false;security.ssl.protocol,默认为TLSv1.2;security.ssl.algorithms,默认为TLS_RSA_WITH_AES_128_CBC_SHA;security.ssl.rest.authentication-enabled,默认为false
- connectionTimeout读取的是rest.connection-timeout配置,默认是15000毫秒;idlenessTimeout读取的是rest.idleness-timeout配置,默认5分钟;maxContentLength读取的是rest.client.max-content-length配置,默认是104_857_600
RestClient
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
public class RestClient implements AutoCloseableAsync {
private static final Logger LOG = LoggerFactory.getLogger(RestClient.class);
private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
// used to open connections to a rest server endpoint
private final Executor executor;
private final Bootstrap bootstrap;
private final CompletableFuture<Void> terminationFuture;
private final AtomicBoolean isRunning = new AtomicBoolean(true);
public RestClient(RestClientConfiguration configuration, Executor executor) {
Preconditions.checkNotNull(configuration);
this.executor = Preconditions.checkNotNull(executor);
this.terminationFuture = new CompletableFuture<>();
final SSLHandlerFactory sslHandlerFactory = configuration.getSslHandlerFactory();
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
try {
// SSL should be the first handler in the pipeline
if (sslHandlerFactory != null) {
socketChannel.pipeline().addLast("ssl", sslHandlerFactory.createNettySSLHandler());
}
socketChannel.pipeline()
.addLast(new HttpClientCodec())
.addLast(new HttpObjectAggregator(configuration.getMaxContentLength()))
.addLast(new ChunkedWriteHandler()) // required for multipart-requests
.addLast(new IdleStateHandler(configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), TimeUnit.MILLISECONDS))
.addLast(new ClientHandler());
} catch (Throwable t) {
t.printStackTrace();
ExceptionUtils.rethrow(t);
}
}
};
NioEventLoopGroup group = new NioEventLoopGroup(1, new ExecutorThreadFactory("flink-rest-client-netty"));
bootstrap = new Bootstrap();
bootstrap
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(configuration.getConnectionTimeout()))
.group(group)
.channel(NioSocketChannel.class)
.handler(initializer);
LOG.info("Rest client endpoint started.");
}
@Override
public CompletableFuture<Void> closeAsync() {
return shutdownInternally(Time.seconds(10L));
}
public void shutdown(Time timeout) {
final CompletableFuture<Void> shutDownFuture = shutdownInternally(timeout);
try {
shutDownFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
LOG.info("Rest endpoint shutdown complete.");
} catch (Exception e) {
LOG.warn("Rest endpoint shutdown failed.", e);
}
}
private CompletableFuture<Void> shutdownInternally(Time timeout) {
if (isRunning.compareAndSet(true, false)) {
LOG.info("Shutting down rest endpoint.");
if (bootstrap != null) {
if (bootstrap.group() != null) {
bootstrap.group().shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
.addListener(finished -> {
if (finished.isSuccess()) {
terminationFuture.complete(null);
} else {
terminationFuture.completeExceptionally(finished.cause());
}
});
}
}
}
return terminationFuture;
}
//......
}
- RestClient的构造器接收RestClientConfiguration及Executor两个参数,构造器里头创建了netty的Bootstrap,其中ChannelOption.CONNECT_TIMEOUT_MILLIS使用的是configuration.getConnectionTimeout();IdleStateHandler的readerIdleTime、writerIdleTime、allIdleTime使用的是configuration.getIdlenessTimeout();HttpObjectAggregator的maxContentLength使用的是configuration.getMaxContentLength();SSLHandlerFactory使用的是configuration.getSslHandlerFactory()
小结
- RestClientConfiguration有四个属性,分别是sslHandlerFactory、connectionTimeout、idlenessTimeout、maxContentLength;fromConfiguration方法从Configuration中创建SSLHandlerFactory,其读取的是相关配置有security.ssl.rest.enabled,默认为false;security.ssl.protocol,默认为TLSv1.2;security.ssl.algorithms,默认为TLS_RSA_WITH_AES_128_CBC_SHA;security.ssl.rest.authentication-enabled,默认为false
- connectionTimeout读取的是rest.connection-timeout配置,默认是15000毫秒;idlenessTimeout读取的是rest.idleness-timeout配置,默认5分钟;maxContentLength读取的是rest.client.max-content-length配置,默认是104_857_600
- RestClient的构造器接收RestClientConfiguration及Executor两个参数,构造器里头创建了netty的Bootstrap,其中ChannelOption.CONNECT_TIMEOUT_MILLIS使用的是configuration.getConnectionTimeout();IdleStateHandler的readerIdleTime、writerIdleTime、allIdleTime使用的是configuration.getIdlenessTimeout();HttpObjectAggregator的maxContentLength使用的是configuration.getMaxContentLength();SSLHandlerFactory使用的是configuration.getSslHandlerFactory()
doc
以上所述就是小编给大家介绍的《聊聊flink的RestClientConfiguration》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
The Smashing Book
Jacob Gube、Dmitry Fadeev、Chris Spooner、Darius A Monsef IV、Alessandro Cattaneo、Steven Snell、David Leggett、Andrew Maier、Kayla Knight、Yves Peters、René Schmidt、Smashing Magazine editorial team、Vitaly Friedman、Sven Lennartz / 2009 / $ 29.90 / € 23.90
The Smashing Book is a printed book about best practices in modern Web design. The book shares technical tips and best practices on coding, usability and optimization and explores how to create succes......一起来看看 《The Smashing Book》 这本书的介绍吧!