聊聊flink taskmanager的data.port与rpc.port

栏目: 编程工具 · 发布时间: 5年前

内容简介:本文主要研究一下flink taskmanager的data.port与rpc.portflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.javaflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnect

本文主要研究一下flink taskmanager的data.port与rpc.port

TaskManagerServices

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java

public class TaskManagerServices {
    //......

    public static TaskManagerServices fromConfiguration(
            TaskManagerServicesConfiguration taskManagerServicesConfiguration,
            ResourceID resourceID,
            Executor taskIOExecutor,
            long freeHeapMemoryWithDefrag,
            long maxJvmHeapMemory) throws Exception {

        // pre-start checks
        checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());

        final NetworkEnvironment network = createNetworkEnvironment(taskManagerServicesConfiguration, maxJvmHeapMemory);
        network.start();

        final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(
            resourceID,
            taskManagerServicesConfiguration.getTaskManagerAddress(),
            network.getConnectionManager().getDataPort());

        // this call has to happen strictly after the network stack has been initialized
        final MemoryManager memoryManager = createMemoryManager(taskManagerServicesConfiguration, freeHeapMemoryWithDefrag, maxJvmHeapMemory);

        // start the I/O manager, it will create some temp directories.
        final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());

        final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();

        final List<ResourceProfile> resourceProfiles = new ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots());

        for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
            resourceProfiles.add(ResourceProfile.ANY);
        }

        final TimerService<AllocationID> timerService = new TimerService<>(
            new ScheduledThreadPoolExecutor(1),
            taskManagerServicesConfiguration.getTimerServiceShutdownTimeout());

        final TaskSlotTable taskSlotTable = new TaskSlotTable(resourceProfiles, timerService);

        final JobManagerTable jobManagerTable = new JobManagerTable();

        final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);

        final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();

        final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];

        for (int i = 0; i < stateRootDirectoryStrings.length; ++i) {
            stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
        }

        final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
            taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
            stateRootDirectoryFiles,
            taskIOExecutor);

        return new TaskManagerServices(
            taskManagerLocation,
            memoryManager,
            ioManager,
            network,
            broadcastVariableManager,
            taskSlotTable,
            jobManagerTable,
            jobLeaderService,
            taskStateManager);
    }

    private static NetworkEnvironment createNetworkEnvironment(
            TaskManagerServicesConfiguration taskManagerServicesConfiguration,
            long maxJvmHeapMemory) {

        NetworkEnvironmentConfiguration networkEnvironmentConfiguration = taskManagerServicesConfiguration.getNetworkConfig();

        final long networkBuf = calculateNetworkBufferMemory(taskManagerServicesConfiguration, maxJvmHeapMemory);
        int segmentSize = networkEnvironmentConfiguration.networkBufferSize();

        // tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory)
        final long numNetBuffersLong = networkBuf / segmentSize;
        if (numNetBuffersLong > Integer.MAX_VALUE) {
            throw new IllegalArgumentException("The given number of memory bytes (" + networkBuf
                + ") corresponds to more than MAX_INT pages.");
        }

        NetworkBufferPool networkBufferPool = new NetworkBufferPool(
            (int) numNetBuffersLong,
            segmentSize);

        ConnectionManager connectionManager;
        boolean enableCreditBased = false;
        NettyConfig nettyConfig = networkEnvironmentConfiguration.nettyConfig();
        if (nettyConfig != null) {
            connectionManager = new NettyConnectionManager(nettyConfig);
            enableCreditBased = nettyConfig.isCreditBasedEnabled();
        } else {
            connectionManager = new LocalConnectionManager();
        }

        ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
        TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();

        KvStateRegistry kvStateRegistry = new KvStateRegistry();

        QueryableStateConfiguration qsConfig = taskManagerServicesConfiguration.getQueryableStateConfig();

        int numProxyServerNetworkThreads = qsConfig.numProxyServerThreads() == 0 ?
                taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyServerThreads();

        int numProxyServerQueryThreads = qsConfig.numProxyQueryThreads() == 0 ?
                taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyQueryThreads();

        final KvStateClientProxy kvClientProxy = QueryableStateUtils.createKvStateClientProxy(
                taskManagerServicesConfiguration.getTaskManagerAddress(),
                qsConfig.getProxyPortRange(),
                numProxyServerNetworkThreads,
                numProxyServerQueryThreads,
                new DisabledKvStateRequestStats());

        int numStateServerNetworkThreads = qsConfig.numStateServerThreads() == 0 ?
                taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateServerThreads();

        int numStateServerQueryThreads = qsConfig.numStateQueryThreads() == 0 ?
                taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateQueryThreads();

        final KvStateServer kvStateServer = QueryableStateUtils.createKvStateServer(
                taskManagerServicesConfiguration.getTaskManagerAddress(),
                qsConfig.getStateServerPortRange(),
                numStateServerNetworkThreads,
                numStateServerQueryThreads,
                kvStateRegistry,
                new DisabledKvStateRequestStats());

        // we start the network first, to make sure it can allocate its buffers first
        return new NetworkEnvironment(
            networkBufferPool,
            connectionManager,
            resultPartitionManager,
            taskEventDispatcher,
            kvStateRegistry,
            kvStateServer,
            kvClientProxy,
            networkEnvironmentConfiguration.ioMode(),
            networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
            networkEnvironmentConfiguration.partitionRequestMaxBackoff(),
            networkEnvironmentConfiguration.networkBuffersPerChannel(),
            networkEnvironmentConfiguration.floatingNetworkBuffersPerGate(),
            enableCreditBased);
    }

    //......
}
它从配置文件读取taskmanager.data.port

NettyConnectionManager

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java

public class NettyConnectionManager implements ConnectionManager {

    private final NettyServer server;

    private final NettyClient client;

    private final NettyBufferPool bufferPool;

    private final PartitionRequestClientFactory partitionRequestClientFactory;

    public NettyConnectionManager(NettyConfig nettyConfig) {
        this.server = new NettyServer(nettyConfig);
        this.client = new NettyClient(nettyConfig);
        this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas());

        this.partitionRequestClientFactory = new PartitionRequestClientFactory(client);
    }

    @Override
    public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException {
        NettyProtocol partitionRequestProtocol = new NettyProtocol(
            partitionProvider,
            taskEventDispatcher,
            client.getConfig().isCreditBasedEnabled());

        client.init(partitionRequestProtocol, bufferPool);
        server.init(partitionRequestProtocol, bufferPool);
    }

    @Override
    public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId)
            throws IOException, InterruptedException {
        return partitionRequestClientFactory.createPartitionRequestClient(connectionId);
    }

    @Override
    public void closeOpenChannelConnections(ConnectionID connectionId) {
        partitionRequestClientFactory.closeOpenChannelConnections(connectionId);
    }

    @Override
    public int getNumberOfActiveConnections() {
        return partitionRequestClientFactory.getNumberOfActiveClients();
    }

    @Override
    public int getDataPort() {
        if (server != null && server.getLocalAddress() != null) {
            return server.getLocalAddress().getPort();
        } else {
            return -1;
        }
    }

    @Override
    public void shutdown() {
        client.shutdown();
        server.shutdown();
    }

    NettyClient getClient() {
        return client;
    }

    NettyServer getServer() {
        return server;
    }

    NettyBufferPool getBufferPool() {
        return bufferPool;
    }
}
  • NettyConnectionManager的构造器根据NettyConfig构造了NettyServer,而getDataPort则取的是server.getLocalAddress().getPort()

TaskManagerRunner

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java

public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync {
    //......

    public static RpcService createRpcService(
        final Configuration configuration,
        final HighAvailabilityServices haServices) throws Exception {

        checkNotNull(configuration);
        checkNotNull(haServices);

        String taskManagerHostname = configuration.getString(TaskManagerOptions.HOST);

        if (taskManagerHostname != null) {
            LOG.info("Using configured hostname/address for TaskManager: {}.", taskManagerHostname);
        } else {
            Time lookupTimeout = Time.milliseconds(AkkaUtils.getLookupTimeout(configuration).toMillis());

            InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(
                haServices.getResourceManagerLeaderRetriever(),
                lookupTimeout);

            taskManagerHostname = taskManagerAddress.getHostName();

            LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.",
                taskManagerHostname, taskManagerAddress.getHostAddress());
        }

        final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT);
        return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, portRangeDefinition, configuration);
    }

    //......
}
  • TaskManagerRunner提供了createRpcService方法,其从配置文件读取taskmanager.rpc.port,然后调用AkkaRpcServiceUtils.createRpcService来创建RpcService

小结

它从配置文件读取taskmanager.data.port

doc


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

自品牌

自品牌

陈为、孙郁婷 / 机械工业出版社 / 2015-9-7 / 39

移动互联网来势汹涌,让品牌重新回到人的时代。微信旗帜鲜明地宣示,“再小的个体也有自己的品牌”。《自品牌:个人如何玩转移动互联网时代》作者历经一年,深度访谈10位嘉宾,挖掘其品牌与商业成功密码。吴晓波、雕爷、罗永浩、鬼脚七、马佳佳……这些商业新浪潮中的探路者与领军者,要么是传统领域的老将,要么是新领域里的先锋,但都能以新媒体为载体,构建个人品牌,打造商业生态,抓住互联网的时代红利,顺风而起,顺势而为......一起来看看 《自品牌》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具