内容简介:本文主要研究一下flink taskmanager的data.port与rpc.portflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.javaflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnect
序
本文主要研究一下flink taskmanager的data.port与rpc.port
TaskManagerServices
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
public class TaskManagerServices {
//......
public static TaskManagerServices fromConfiguration(
TaskManagerServicesConfiguration taskManagerServicesConfiguration,
ResourceID resourceID,
Executor taskIOExecutor,
long freeHeapMemoryWithDefrag,
long maxJvmHeapMemory) throws Exception {
// pre-start checks
checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
final NetworkEnvironment network = createNetworkEnvironment(taskManagerServicesConfiguration, maxJvmHeapMemory);
network.start();
final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(
resourceID,
taskManagerServicesConfiguration.getTaskManagerAddress(),
network.getConnectionManager().getDataPort());
// this call has to happen strictly after the network stack has been initialized
final MemoryManager memoryManager = createMemoryManager(taskManagerServicesConfiguration, freeHeapMemoryWithDefrag, maxJvmHeapMemory);
// start the I/O manager, it will create some temp directories.
final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
final List<ResourceProfile> resourceProfiles = new ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots());
for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
resourceProfiles.add(ResourceProfile.ANY);
}
final TimerService<AllocationID> timerService = new TimerService<>(
new ScheduledThreadPoolExecutor(1),
taskManagerServicesConfiguration.getTimerServiceShutdownTimeout());
final TaskSlotTable taskSlotTable = new TaskSlotTable(resourceProfiles, timerService);
final JobManagerTable jobManagerTable = new JobManagerTable();
final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();
final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];
for (int i = 0; i < stateRootDirectoryStrings.length; ++i) {
stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
}
final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
stateRootDirectoryFiles,
taskIOExecutor);
return new TaskManagerServices(
taskManagerLocation,
memoryManager,
ioManager,
network,
broadcastVariableManager,
taskSlotTable,
jobManagerTable,
jobLeaderService,
taskStateManager);
}
private static NetworkEnvironment createNetworkEnvironment(
TaskManagerServicesConfiguration taskManagerServicesConfiguration,
long maxJvmHeapMemory) {
NetworkEnvironmentConfiguration networkEnvironmentConfiguration = taskManagerServicesConfiguration.getNetworkConfig();
final long networkBuf = calculateNetworkBufferMemory(taskManagerServicesConfiguration, maxJvmHeapMemory);
int segmentSize = networkEnvironmentConfiguration.networkBufferSize();
// tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory)
final long numNetBuffersLong = networkBuf / segmentSize;
if (numNetBuffersLong > Integer.MAX_VALUE) {
throw new IllegalArgumentException("The given number of memory bytes (" + networkBuf
+ ") corresponds to more than MAX_INT pages.");
}
NetworkBufferPool networkBufferPool = new NetworkBufferPool(
(int) numNetBuffersLong,
segmentSize);
ConnectionManager connectionManager;
boolean enableCreditBased = false;
NettyConfig nettyConfig = networkEnvironmentConfiguration.nettyConfig();
if (nettyConfig != null) {
connectionManager = new NettyConnectionManager(nettyConfig);
enableCreditBased = nettyConfig.isCreditBasedEnabled();
} else {
connectionManager = new LocalConnectionManager();
}
ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
KvStateRegistry kvStateRegistry = new KvStateRegistry();
QueryableStateConfiguration qsConfig = taskManagerServicesConfiguration.getQueryableStateConfig();
int numProxyServerNetworkThreads = qsConfig.numProxyServerThreads() == 0 ?
taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyServerThreads();
int numProxyServerQueryThreads = qsConfig.numProxyQueryThreads() == 0 ?
taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyQueryThreads();
final KvStateClientProxy kvClientProxy = QueryableStateUtils.createKvStateClientProxy(
taskManagerServicesConfiguration.getTaskManagerAddress(),
qsConfig.getProxyPortRange(),
numProxyServerNetworkThreads,
numProxyServerQueryThreads,
new DisabledKvStateRequestStats());
int numStateServerNetworkThreads = qsConfig.numStateServerThreads() == 0 ?
taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateServerThreads();
int numStateServerQueryThreads = qsConfig.numStateQueryThreads() == 0 ?
taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateQueryThreads();
final KvStateServer kvStateServer = QueryableStateUtils.createKvStateServer(
taskManagerServicesConfiguration.getTaskManagerAddress(),
qsConfig.getStateServerPortRange(),
numStateServerNetworkThreads,
numStateServerQueryThreads,
kvStateRegistry,
new DisabledKvStateRequestStats());
// we start the network first, to make sure it can allocate its buffers first
return new NetworkEnvironment(
networkBufferPool,
connectionManager,
resultPartitionManager,
taskEventDispatcher,
kvStateRegistry,
kvStateServer,
kvClientProxy,
networkEnvironmentConfiguration.ioMode(),
networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
networkEnvironmentConfiguration.partitionRequestMaxBackoff(),
networkEnvironmentConfiguration.networkBuffersPerChannel(),
networkEnvironmentConfiguration.floatingNetworkBuffersPerGate(),
enableCreditBased);
}
//......
}
它从配置文件读取taskmanager.data.port
NettyConnectionManager
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
public class NettyConnectionManager implements ConnectionManager {
private final NettyServer server;
private final NettyClient client;
private final NettyBufferPool bufferPool;
private final PartitionRequestClientFactory partitionRequestClientFactory;
public NettyConnectionManager(NettyConfig nettyConfig) {
this.server = new NettyServer(nettyConfig);
this.client = new NettyClient(nettyConfig);
this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas());
this.partitionRequestClientFactory = new PartitionRequestClientFactory(client);
}
@Override
public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException {
NettyProtocol partitionRequestProtocol = new NettyProtocol(
partitionProvider,
taskEventDispatcher,
client.getConfig().isCreditBasedEnabled());
client.init(partitionRequestProtocol, bufferPool);
server.init(partitionRequestProtocol, bufferPool);
}
@Override
public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId)
throws IOException, InterruptedException {
return partitionRequestClientFactory.createPartitionRequestClient(connectionId);
}
@Override
public void closeOpenChannelConnections(ConnectionID connectionId) {
partitionRequestClientFactory.closeOpenChannelConnections(connectionId);
}
@Override
public int getNumberOfActiveConnections() {
return partitionRequestClientFactory.getNumberOfActiveClients();
}
@Override
public int getDataPort() {
if (server != null && server.getLocalAddress() != null) {
return server.getLocalAddress().getPort();
} else {
return -1;
}
}
@Override
public void shutdown() {
client.shutdown();
server.shutdown();
}
NettyClient getClient() {
return client;
}
NettyServer getServer() {
return server;
}
NettyBufferPool getBufferPool() {
return bufferPool;
}
}
- NettyConnectionManager的构造器根据NettyConfig构造了NettyServer,而getDataPort则取的是server.getLocalAddress().getPort()
TaskManagerRunner
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync {
//......
public static RpcService createRpcService(
final Configuration configuration,
final HighAvailabilityServices haServices) throws Exception {
checkNotNull(configuration);
checkNotNull(haServices);
String taskManagerHostname = configuration.getString(TaskManagerOptions.HOST);
if (taskManagerHostname != null) {
LOG.info("Using configured hostname/address for TaskManager: {}.", taskManagerHostname);
} else {
Time lookupTimeout = Time.milliseconds(AkkaUtils.getLookupTimeout(configuration).toMillis());
InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(
haServices.getResourceManagerLeaderRetriever(),
lookupTimeout);
taskManagerHostname = taskManagerAddress.getHostName();
LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.",
taskManagerHostname, taskManagerAddress.getHostAddress());
}
final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT);
return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, portRangeDefinition, configuration);
}
//......
}
- TaskManagerRunner提供了createRpcService方法,其从配置文件读取taskmanager.rpc.port,然后调用AkkaRpcServiceUtils.createRpcService来创建RpcService
小结
它从配置文件读取taskmanager.data.port
doc
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
JavaScript精粹
爱德华兹 / 高铁军 / 人民邮电出版社 / 2007-6 / 49.00元
《JavaScript 精粹》主要介绍JavaScript应用中一些常见的问题及其解决方法,从最基础的数字、字符串、数组到进阶的DOM、表单验证、cookie,再到较为高级的Ajax,书中均有涉及。《JavaScript 精粹》覆盖现在非常流行和通用的技术,提出很多出现频率较高的Web开发常见问题,并提供了大量的技巧和解决方案,具有很强的实用性和通用性,书中的代码也具有很强的兼容性。《JavaSc......一起来看看 《JavaScript精粹》 这本书的介绍吧!