聊聊flink的RestClientConfiguration

栏目: CSS · 发布时间: 5年前

内容简介:本文主要研究一下flink的RestClientConfigurationflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.javaflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java

本文主要研究一下flink的RestClientConfiguration

RestClientConfiguration

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java

public final class RestClientConfiguration {

    @Nullable
    private final SSLHandlerFactory sslHandlerFactory;

    private final long connectionTimeout;

    private final long idlenessTimeout;

    private final int maxContentLength;

    private RestClientConfiguration(
            @Nullable final SSLHandlerFactory sslHandlerFactory,
            final long connectionTimeout,
            final long idlenessTimeout,
            final int maxContentLength) {
        checkArgument(maxContentLength > 0, "maxContentLength must be positive, was: %d", maxContentLength);
        this.sslHandlerFactory = sslHandlerFactory;
        this.connectionTimeout = connectionTimeout;
        this.idlenessTimeout = idlenessTimeout;
        this.maxContentLength = maxContentLength;
    }

    /**
     * Returns the {@link SSLEngine} that the REST client endpoint should use.
     *
     * @return SSLEngine that the REST client endpoint should use, or null if SSL was disabled
     */
    @Nullable
    public SSLHandlerFactory getSslHandlerFactory() {
        return sslHandlerFactory;
    }

    /**
     * {@see RestOptions#CONNECTION_TIMEOUT}.
     */
    public long getConnectionTimeout() {
        return connectionTimeout;
    }

    /**
     * {@see RestOptions#IDLENESS_TIMEOUT}.
     */
    public long getIdlenessTimeout() {
        return idlenessTimeout;
    }

    /**
     * Returns the max content length that the REST client endpoint could handle.
     *
     * @return max content length that the REST client endpoint could handle
     */
    public int getMaxContentLength() {
        return maxContentLength;
    }

    /**
     * Creates and returns a new {@link RestClientConfiguration} from the given {@link Configuration}.
     *
     * @param config configuration from which the REST client endpoint configuration should be created from
     * @return REST client endpoint configuration
     * @throws ConfigurationException if SSL was configured incorrectly
     */

    public static RestClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
        Preconditions.checkNotNull(config);

        final SSLHandlerFactory sslHandlerFactory;
        if (SSLUtils.isRestSSLEnabled(config)) {
            try {
                sslHandlerFactory = SSLUtils.createRestClientSSLEngineFactory(config);
            } catch (Exception e) {
                throw new ConfigurationException("Failed to initialize SSLContext for the REST client", e);
            }
        } else {
            sslHandlerFactory = null;
        }

        final long connectionTimeout = config.getLong(RestOptions.CONNECTION_TIMEOUT);

        final long idlenessTimeout = config.getLong(RestOptions.IDLENESS_TIMEOUT);

        int maxContentLength = config.getInteger(RestOptions.CLIENT_MAX_CONTENT_LENGTH);

        return new RestClientConfiguration(sslHandlerFactory, connectionTimeout, idlenessTimeout, maxContentLength);
    }
}
  • RestClientConfiguration有四个属性,分别是sslHandlerFactory、connectionTimeout、idlenessTimeout、maxContentLength
  • fromConfiguration方法从Configuration中创建SSLHandlerFactory,其读取的是相关配置有security.ssl.rest.enabled,默认为false;security.ssl.protocol,默认为TLSv1.2;security.ssl.algorithms,默认为TLS_RSA_WITH_AES_128_CBC_SHA;security.ssl.rest.authentication-enabled,默认为false
  • connectionTimeout读取的是rest.connection-timeout配置,默认是15000毫秒;idlenessTimeout读取的是rest.idleness-timeout配置,默认5分钟;maxContentLength读取的是rest.client.max-content-length配置,默认是104_857_600

RestClient

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java

public class RestClient implements AutoCloseableAsync {
    private static final Logger LOG = LoggerFactory.getLogger(RestClient.class);

    private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();

    // used to open connections to a rest server endpoint
    private final Executor executor;

    private final Bootstrap bootstrap;

    private final CompletableFuture<Void> terminationFuture;

    private final AtomicBoolean isRunning = new AtomicBoolean(true);

    public RestClient(RestClientConfiguration configuration, Executor executor) {
        Preconditions.checkNotNull(configuration);
        this.executor = Preconditions.checkNotNull(executor);
        this.terminationFuture = new CompletableFuture<>();

        final SSLHandlerFactory sslHandlerFactory = configuration.getSslHandlerFactory();
        ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) {
                try {
                    // SSL should be the first handler in the pipeline
                    if (sslHandlerFactory != null) {
                        socketChannel.pipeline().addLast("ssl", sslHandlerFactory.createNettySSLHandler());
                    }

                    socketChannel.pipeline()
                        .addLast(new HttpClientCodec())
                        .addLast(new HttpObjectAggregator(configuration.getMaxContentLength()))
                        .addLast(new ChunkedWriteHandler()) // required for multipart-requests
                        .addLast(new IdleStateHandler(configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), TimeUnit.MILLISECONDS))
                        .addLast(new ClientHandler());
                } catch (Throwable t) {
                    t.printStackTrace();
                    ExceptionUtils.rethrow(t);
                }
            }
        };
        NioEventLoopGroup group = new NioEventLoopGroup(1, new ExecutorThreadFactory("flink-rest-client-netty"));

        bootstrap = new Bootstrap();
        bootstrap
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(configuration.getConnectionTimeout()))
            .group(group)
            .channel(NioSocketChannel.class)
            .handler(initializer);

        LOG.info("Rest client endpoint started.");
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        return shutdownInternally(Time.seconds(10L));
    }

    public void shutdown(Time timeout) {
        final CompletableFuture<Void> shutDownFuture = shutdownInternally(timeout);

        try {
            shutDownFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            LOG.info("Rest endpoint shutdown complete.");
        } catch (Exception e) {
            LOG.warn("Rest endpoint shutdown failed.", e);
        }
    }

    private CompletableFuture<Void> shutdownInternally(Time timeout) {
        if (isRunning.compareAndSet(true, false)) {
            LOG.info("Shutting down rest endpoint.");

            if (bootstrap != null) {
                if (bootstrap.group() != null) {
                    bootstrap.group().shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
                        .addListener(finished -> {
                            if (finished.isSuccess()) {
                                terminationFuture.complete(null);
                            } else {
                                terminationFuture.completeExceptionally(finished.cause());
                            }
                        });
                }
            }
        }
        return terminationFuture;
    }

    //......
}
  • RestClient的构造器接收RestClientConfiguration及Executor两个参数,构造器里头创建了netty的Bootstrap,其中ChannelOption.CONNECT_TIMEOUT_MILLIS使用的是configuration.getConnectionTimeout();IdleStateHandler的readerIdleTime、writerIdleTime、allIdleTime使用的是configuration.getIdlenessTimeout();HttpObjectAggregator的maxContentLength使用的是configuration.getMaxContentLength();SSLHandlerFactory使用的是configuration.getSslHandlerFactory()

小结

  • RestClientConfiguration有四个属性,分别是sslHandlerFactory、connectionTimeout、idlenessTimeout、maxContentLength;fromConfiguration方法从Configuration中创建SSLHandlerFactory,其读取的是相关配置有security.ssl.rest.enabled,默认为false;security.ssl.protocol,默认为TLSv1.2;security.ssl.algorithms,默认为TLS_RSA_WITH_AES_128_CBC_SHA;security.ssl.rest.authentication-enabled,默认为false
  • connectionTimeout读取的是rest.connection-timeout配置,默认是15000毫秒;idlenessTimeout读取的是rest.idleness-timeout配置,默认5分钟;maxContentLength读取的是rest.client.max-content-length配置,默认是104_857_600
  • RestClient的构造器接收RestClientConfiguration及Executor两个参数,构造器里头创建了netty的Bootstrap,其中ChannelOption.CONNECT_TIMEOUT_MILLIS使用的是configuration.getConnectionTimeout();IdleStateHandler的readerIdleTime、writerIdleTime、allIdleTime使用的是configuration.getIdlenessTimeout();HttpObjectAggregator的maxContentLength使用的是configuration.getMaxContentLength();SSLHandlerFactory使用的是configuration.getSslHandlerFactory()

doc


以上所述就是小编给大家介绍的《聊聊flink的RestClientConfiguration》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

互联网思维的企业

互联网思维的企业

[美] Dave Gray Thomas Vander Wal / 张 玳 / 人民邮电出版社 / 2014-4-25 / 59.00元

本书指导企业跳出仅更新自家产品和服务的怪圈,在管理方式、组织结构和公司文化方面进行变革,建立具有互联网思维的企业。书中通过大量图示和示例阐述了互联式公司必需的基础元素(透明的互动和交流平台,推崇自治和应变的组织结构,实验和学习的企业文化),以及一套鼓励员工创新的新式管理和奖励体系。最后,讨论板可方便你在工作时间和同事探讨如何增加公司的互联程度。一起来看看 《互联网思维的企业》 这本书的介绍吧!

MD5 加密
MD5 加密

MD5 加密工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具