内容简介:本文主要研究一下flink taskmanager的data.port与rpc.portflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.javaflink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnect
序
本文主要研究一下flink taskmanager的data.port与rpc.port
TaskManagerServices
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
public class TaskManagerServices {
//......
public static TaskManagerServices fromConfiguration(
TaskManagerServicesConfiguration taskManagerServicesConfiguration,
ResourceID resourceID,
Executor taskIOExecutor,
long freeHeapMemoryWithDefrag,
long maxJvmHeapMemory) throws Exception {
// pre-start checks
checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
final NetworkEnvironment network = createNetworkEnvironment(taskManagerServicesConfiguration, maxJvmHeapMemory);
network.start();
final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(
resourceID,
taskManagerServicesConfiguration.getTaskManagerAddress(),
network.getConnectionManager().getDataPort());
// this call has to happen strictly after the network stack has been initialized
final MemoryManager memoryManager = createMemoryManager(taskManagerServicesConfiguration, freeHeapMemoryWithDefrag, maxJvmHeapMemory);
// start the I/O manager, it will create some temp directories.
final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
final List<ResourceProfile> resourceProfiles = new ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots());
for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
resourceProfiles.add(ResourceProfile.ANY);
}
final TimerService<AllocationID> timerService = new TimerService<>(
new ScheduledThreadPoolExecutor(1),
taskManagerServicesConfiguration.getTimerServiceShutdownTimeout());
final TaskSlotTable taskSlotTable = new TaskSlotTable(resourceProfiles, timerService);
final JobManagerTable jobManagerTable = new JobManagerTable();
final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();
final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];
for (int i = 0; i < stateRootDirectoryStrings.length; ++i) {
stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
}
final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
stateRootDirectoryFiles,
taskIOExecutor);
return new TaskManagerServices(
taskManagerLocation,
memoryManager,
ioManager,
network,
broadcastVariableManager,
taskSlotTable,
jobManagerTable,
jobLeaderService,
taskStateManager);
}
private static NetworkEnvironment createNetworkEnvironment(
TaskManagerServicesConfiguration taskManagerServicesConfiguration,
long maxJvmHeapMemory) {
NetworkEnvironmentConfiguration networkEnvironmentConfiguration = taskManagerServicesConfiguration.getNetworkConfig();
final long networkBuf = calculateNetworkBufferMemory(taskManagerServicesConfiguration, maxJvmHeapMemory);
int segmentSize = networkEnvironmentConfiguration.networkBufferSize();
// tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory)
final long numNetBuffersLong = networkBuf / segmentSize;
if (numNetBuffersLong > Integer.MAX_VALUE) {
throw new IllegalArgumentException("The given number of memory bytes (" + networkBuf
+ ") corresponds to more than MAX_INT pages.");
}
NetworkBufferPool networkBufferPool = new NetworkBufferPool(
(int) numNetBuffersLong,
segmentSize);
ConnectionManager connectionManager;
boolean enableCreditBased = false;
NettyConfig nettyConfig = networkEnvironmentConfiguration.nettyConfig();
if (nettyConfig != null) {
connectionManager = new NettyConnectionManager(nettyConfig);
enableCreditBased = nettyConfig.isCreditBasedEnabled();
} else {
connectionManager = new LocalConnectionManager();
}
ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
KvStateRegistry kvStateRegistry = new KvStateRegistry();
QueryableStateConfiguration qsConfig = taskManagerServicesConfiguration.getQueryableStateConfig();
int numProxyServerNetworkThreads = qsConfig.numProxyServerThreads() == 0 ?
taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyServerThreads();
int numProxyServerQueryThreads = qsConfig.numProxyQueryThreads() == 0 ?
taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyQueryThreads();
final KvStateClientProxy kvClientProxy = QueryableStateUtils.createKvStateClientProxy(
taskManagerServicesConfiguration.getTaskManagerAddress(),
qsConfig.getProxyPortRange(),
numProxyServerNetworkThreads,
numProxyServerQueryThreads,
new DisabledKvStateRequestStats());
int numStateServerNetworkThreads = qsConfig.numStateServerThreads() == 0 ?
taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateServerThreads();
int numStateServerQueryThreads = qsConfig.numStateQueryThreads() == 0 ?
taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateQueryThreads();
final KvStateServer kvStateServer = QueryableStateUtils.createKvStateServer(
taskManagerServicesConfiguration.getTaskManagerAddress(),
qsConfig.getStateServerPortRange(),
numStateServerNetworkThreads,
numStateServerQueryThreads,
kvStateRegistry,
new DisabledKvStateRequestStats());
// we start the network first, to make sure it can allocate its buffers first
return new NetworkEnvironment(
networkBufferPool,
connectionManager,
resultPartitionManager,
taskEventDispatcher,
kvStateRegistry,
kvStateServer,
kvClientProxy,
networkEnvironmentConfiguration.ioMode(),
networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
networkEnvironmentConfiguration.partitionRequestMaxBackoff(),
networkEnvironmentConfiguration.networkBuffersPerChannel(),
networkEnvironmentConfiguration.floatingNetworkBuffersPerGate(),
enableCreditBased);
}
//......
}
它从配置文件读取taskmanager.data.port
NettyConnectionManager
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
public class NettyConnectionManager implements ConnectionManager {
private final NettyServer server;
private final NettyClient client;
private final NettyBufferPool bufferPool;
private final PartitionRequestClientFactory partitionRequestClientFactory;
public NettyConnectionManager(NettyConfig nettyConfig) {
this.server = new NettyServer(nettyConfig);
this.client = new NettyClient(nettyConfig);
this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas());
this.partitionRequestClientFactory = new PartitionRequestClientFactory(client);
}
@Override
public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException {
NettyProtocol partitionRequestProtocol = new NettyProtocol(
partitionProvider,
taskEventDispatcher,
client.getConfig().isCreditBasedEnabled());
client.init(partitionRequestProtocol, bufferPool);
server.init(partitionRequestProtocol, bufferPool);
}
@Override
public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId)
throws IOException, InterruptedException {
return partitionRequestClientFactory.createPartitionRequestClient(connectionId);
}
@Override
public void closeOpenChannelConnections(ConnectionID connectionId) {
partitionRequestClientFactory.closeOpenChannelConnections(connectionId);
}
@Override
public int getNumberOfActiveConnections() {
return partitionRequestClientFactory.getNumberOfActiveClients();
}
@Override
public int getDataPort() {
if (server != null && server.getLocalAddress() != null) {
return server.getLocalAddress().getPort();
} else {
return -1;
}
}
@Override
public void shutdown() {
client.shutdown();
server.shutdown();
}
NettyClient getClient() {
return client;
}
NettyServer getServer() {
return server;
}
NettyBufferPool getBufferPool() {
return bufferPool;
}
}
- NettyConnectionManager的构造器根据NettyConfig构造了NettyServer,而getDataPort则取的是server.getLocalAddress().getPort()
TaskManagerRunner
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync {
//......
public static RpcService createRpcService(
final Configuration configuration,
final HighAvailabilityServices haServices) throws Exception {
checkNotNull(configuration);
checkNotNull(haServices);
String taskManagerHostname = configuration.getString(TaskManagerOptions.HOST);
if (taskManagerHostname != null) {
LOG.info("Using configured hostname/address for TaskManager: {}.", taskManagerHostname);
} else {
Time lookupTimeout = Time.milliseconds(AkkaUtils.getLookupTimeout(configuration).toMillis());
InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(
haServices.getResourceManagerLeaderRetriever(),
lookupTimeout);
taskManagerHostname = taskManagerAddress.getHostName();
LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.",
taskManagerHostname, taskManagerAddress.getHostAddress());
}
final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT);
return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, portRangeDefinition, configuration);
}
//......
}
- TaskManagerRunner提供了createRpcService方法,其从配置文件读取taskmanager.rpc.port,然后调用AkkaRpcServiceUtils.createRpcService来创建RpcService
小结
它从配置文件读取taskmanager.data.port
doc
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Head First Python
Paul Barry / O'Reilly Media / 2010-11-30 / USD 49.99
Are you keen to add Python to your programming skills? Learn quickly and have some fun at the same time with Head First Python. This book takes you beyond typical how-to manuals with engaging images, ......一起来看看 《Head First Python》 这本书的介绍吧!