聊聊flink taskmanager的data.port与rpc.port

栏目: 编程工具 · 发布时间: 6年前

内容简介:本文主要研究一下flink taskmanager的data.port与rpc.portflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.javaflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnect

本文主要研究一下flink taskmanager的data.port与rpc.port

TaskManagerServices

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java

public class TaskManagerServices {
    //......

    public static TaskManagerServices fromConfiguration(
            TaskManagerServicesConfiguration taskManagerServicesConfiguration,
            ResourceID resourceID,
            Executor taskIOExecutor,
            long freeHeapMemoryWithDefrag,
            long maxJvmHeapMemory) throws Exception {

        // pre-start checks
        checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());

        final NetworkEnvironment network = createNetworkEnvironment(taskManagerServicesConfiguration, maxJvmHeapMemory);
        network.start();

        final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(
            resourceID,
            taskManagerServicesConfiguration.getTaskManagerAddress(),
            network.getConnectionManager().getDataPort());

        // this call has to happen strictly after the network stack has been initialized
        final MemoryManager memoryManager = createMemoryManager(taskManagerServicesConfiguration, freeHeapMemoryWithDefrag, maxJvmHeapMemory);

        // start the I/O manager, it will create some temp directories.
        final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());

        final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();

        final List<ResourceProfile> resourceProfiles = new ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots());

        for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
            resourceProfiles.add(ResourceProfile.ANY);
        }

        final TimerService<AllocationID> timerService = new TimerService<>(
            new ScheduledThreadPoolExecutor(1),
            taskManagerServicesConfiguration.getTimerServiceShutdownTimeout());

        final TaskSlotTable taskSlotTable = new TaskSlotTable(resourceProfiles, timerService);

        final JobManagerTable jobManagerTable = new JobManagerTable();

        final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);

        final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();

        final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];

        for (int i = 0; i < stateRootDirectoryStrings.length; ++i) {
            stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
        }

        final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
            taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
            stateRootDirectoryFiles,
            taskIOExecutor);

        return new TaskManagerServices(
            taskManagerLocation,
            memoryManager,
            ioManager,
            network,
            broadcastVariableManager,
            taskSlotTable,
            jobManagerTable,
            jobLeaderService,
            taskStateManager);
    }

    private static NetworkEnvironment createNetworkEnvironment(
            TaskManagerServicesConfiguration taskManagerServicesConfiguration,
            long maxJvmHeapMemory) {

        NetworkEnvironmentConfiguration networkEnvironmentConfiguration = taskManagerServicesConfiguration.getNetworkConfig();

        final long networkBuf = calculateNetworkBufferMemory(taskManagerServicesConfiguration, maxJvmHeapMemory);
        int segmentSize = networkEnvironmentConfiguration.networkBufferSize();

        // tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory)
        final long numNetBuffersLong = networkBuf / segmentSize;
        if (numNetBuffersLong > Integer.MAX_VALUE) {
            throw new IllegalArgumentException("The given number of memory bytes (" + networkBuf
                + ") corresponds to more than MAX_INT pages.");
        }

        NetworkBufferPool networkBufferPool = new NetworkBufferPool(
            (int) numNetBuffersLong,
            segmentSize);

        ConnectionManager connectionManager;
        boolean enableCreditBased = false;
        NettyConfig nettyConfig = networkEnvironmentConfiguration.nettyConfig();
        if (nettyConfig != null) {
            connectionManager = new NettyConnectionManager(nettyConfig);
            enableCreditBased = nettyConfig.isCreditBasedEnabled();
        } else {
            connectionManager = new LocalConnectionManager();
        }

        ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
        TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();

        KvStateRegistry kvStateRegistry = new KvStateRegistry();

        QueryableStateConfiguration qsConfig = taskManagerServicesConfiguration.getQueryableStateConfig();

        int numProxyServerNetworkThreads = qsConfig.numProxyServerThreads() == 0 ?
                taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyServerThreads();

        int numProxyServerQueryThreads = qsConfig.numProxyQueryThreads() == 0 ?
                taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyQueryThreads();

        final KvStateClientProxy kvClientProxy = QueryableStateUtils.createKvStateClientProxy(
                taskManagerServicesConfiguration.getTaskManagerAddress(),
                qsConfig.getProxyPortRange(),
                numProxyServerNetworkThreads,
                numProxyServerQueryThreads,
                new DisabledKvStateRequestStats());

        int numStateServerNetworkThreads = qsConfig.numStateServerThreads() == 0 ?
                taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateServerThreads();

        int numStateServerQueryThreads = qsConfig.numStateQueryThreads() == 0 ?
                taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateQueryThreads();

        final KvStateServer kvStateServer = QueryableStateUtils.createKvStateServer(
                taskManagerServicesConfiguration.getTaskManagerAddress(),
                qsConfig.getStateServerPortRange(),
                numStateServerNetworkThreads,
                numStateServerQueryThreads,
                kvStateRegistry,
                new DisabledKvStateRequestStats());

        // we start the network first, to make sure it can allocate its buffers first
        return new NetworkEnvironment(
            networkBufferPool,
            connectionManager,
            resultPartitionManager,
            taskEventDispatcher,
            kvStateRegistry,
            kvStateServer,
            kvClientProxy,
            networkEnvironmentConfiguration.ioMode(),
            networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
            networkEnvironmentConfiguration.partitionRequestMaxBackoff(),
            networkEnvironmentConfiguration.networkBuffersPerChannel(),
            networkEnvironmentConfiguration.floatingNetworkBuffersPerGate(),
            enableCreditBased);
    }

    //......
}
它从配置文件读取taskmanager.data.port

NettyConnectionManager

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java

public class NettyConnectionManager implements ConnectionManager {

    private final NettyServer server;

    private final NettyClient client;

    private final NettyBufferPool bufferPool;

    private final PartitionRequestClientFactory partitionRequestClientFactory;

    public NettyConnectionManager(NettyConfig nettyConfig) {
        this.server = new NettyServer(nettyConfig);
        this.client = new NettyClient(nettyConfig);
        this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas());

        this.partitionRequestClientFactory = new PartitionRequestClientFactory(client);
    }

    @Override
    public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException {
        NettyProtocol partitionRequestProtocol = new NettyProtocol(
            partitionProvider,
            taskEventDispatcher,
            client.getConfig().isCreditBasedEnabled());

        client.init(partitionRequestProtocol, bufferPool);
        server.init(partitionRequestProtocol, bufferPool);
    }

    @Override
    public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId)
            throws IOException, InterruptedException {
        return partitionRequestClientFactory.createPartitionRequestClient(connectionId);
    }

    @Override
    public void closeOpenChannelConnections(ConnectionID connectionId) {
        partitionRequestClientFactory.closeOpenChannelConnections(connectionId);
    }

    @Override
    public int getNumberOfActiveConnections() {
        return partitionRequestClientFactory.getNumberOfActiveClients();
    }

    @Override
    public int getDataPort() {
        if (server != null && server.getLocalAddress() != null) {
            return server.getLocalAddress().getPort();
        } else {
            return -1;
        }
    }

    @Override
    public void shutdown() {
        client.shutdown();
        server.shutdown();
    }

    NettyClient getClient() {
        return client;
    }

    NettyServer getServer() {
        return server;
    }

    NettyBufferPool getBufferPool() {
        return bufferPool;
    }
}
  • NettyConnectionManager的构造器根据NettyConfig构造了NettyServer,而getDataPort则取的是server.getLocalAddress().getPort()

TaskManagerRunner

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java

public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync {
    //......

    public static RpcService createRpcService(
        final Configuration configuration,
        final HighAvailabilityServices haServices) throws Exception {

        checkNotNull(configuration);
        checkNotNull(haServices);

        String taskManagerHostname = configuration.getString(TaskManagerOptions.HOST);

        if (taskManagerHostname != null) {
            LOG.info("Using configured hostname/address for TaskManager: {}.", taskManagerHostname);
        } else {
            Time lookupTimeout = Time.milliseconds(AkkaUtils.getLookupTimeout(configuration).toMillis());

            InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(
                haServices.getResourceManagerLeaderRetriever(),
                lookupTimeout);

            taskManagerHostname = taskManagerAddress.getHostName();

            LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.",
                taskManagerHostname, taskManagerAddress.getHostAddress());
        }

        final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT);
        return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, portRangeDefinition, configuration);
    }

    //......
}
  • TaskManagerRunner提供了createRpcService方法,其从配置文件读取taskmanager.rpc.port,然后调用AkkaRpcServiceUtils.createRpcService来创建RpcService

小结

它从配置文件读取taskmanager.data.port

doc


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Head First Python

Head First Python

Paul Barry / O'Reilly Media / 2010-11-30 / USD 49.99

Are you keen to add Python to your programming skills? Learn quickly and have some fun at the same time with Head First Python. This book takes you beyond typical how-to manuals with engaging images, ......一起来看看 《Head First Python》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

html转js在线工具
html转js在线工具

html转js在线工具

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具