内容简介:本文主要研究一下flink的NetworkEnvironmentConfigurationflink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.javaflink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesC
序
本文主要研究一下flink的NetworkEnvironmentConfiguration
NetworkEnvironmentConfiguration
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
public class NetworkEnvironmentConfiguration {
private final float networkBufFraction;
private final long networkBufMin;
private final long networkBufMax;
private final int networkBufferSize;
private final IOMode ioMode;
private final int partitionRequestInitialBackoff;
private final int partitionRequestMaxBackoff;
private final int networkBuffersPerChannel;
private final int floatingNetworkBuffersPerGate;
private final NettyConfig nettyConfig;
/**
* Constructor for a setup with purely local communication (no netty).
*/
public NetworkEnvironmentConfiguration(
float networkBufFraction,
long networkBufMin,
long networkBufMax,
int networkBufferSize,
IOMode ioMode,
int partitionRequestInitialBackoff,
int partitionRequestMaxBackoff,
int networkBuffersPerChannel,
int floatingNetworkBuffersPerGate) {
this(networkBufFraction, networkBufMin, networkBufMax, networkBufferSize,
ioMode,
partitionRequestInitialBackoff, partitionRequestMaxBackoff,
networkBuffersPerChannel, floatingNetworkBuffersPerGate,
null);
}
public NetworkEnvironmentConfiguration(
float networkBufFraction,
long networkBufMin,
long networkBufMax,
int networkBufferSize,
IOMode ioMode,
int partitionRequestInitialBackoff,
int partitionRequestMaxBackoff,
int networkBuffersPerChannel,
int floatingNetworkBuffersPerGate,
@Nullable NettyConfig nettyConfig) {
this.networkBufFraction = networkBufFraction;
this.networkBufMin = networkBufMin;
this.networkBufMax = networkBufMax;
this.networkBufferSize = networkBufferSize;
this.ioMode = ioMode;
this.partitionRequestInitialBackoff = partitionRequestInitialBackoff;
this.partitionRequestMaxBackoff = partitionRequestMaxBackoff;
this.networkBuffersPerChannel = networkBuffersPerChannel;
this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate;
this.nettyConfig = nettyConfig;
}
// ------------------------------------------------------------------------
public float networkBufFraction() {
return networkBufFraction;
}
public long networkBufMin() {
return networkBufMin;
}
public long networkBufMax() {
return networkBufMax;
}
public int networkBufferSize() {
return networkBufferSize;
}
public IOMode ioMode() {
return ioMode;
}
public int partitionRequestInitialBackoff() {
return partitionRequestInitialBackoff;
}
public int partitionRequestMaxBackoff() {
return partitionRequestMaxBackoff;
}
public int networkBuffersPerChannel() {
return networkBuffersPerChannel;
}
public int floatingNetworkBuffersPerGate() {
return floatingNetworkBuffersPerGate;
}
public NettyConfig nettyConfig() {
return nettyConfig;
}
// ------------------------------------------------------------------------
@Override
public int hashCode() {
int result = 1;
result = 31 * result + networkBufferSize;
result = 31 * result + ioMode.hashCode();
result = 31 * result + partitionRequestInitialBackoff;
result = 31 * result + partitionRequestMaxBackoff;
result = 31 * result + networkBuffersPerChannel;
result = 31 * result + floatingNetworkBuffersPerGate;
result = 31 * result + (nettyConfig != null ? nettyConfig.hashCode() : 0);
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
else if (obj == null || getClass() != obj.getClass()) {
return false;
}
else {
final NetworkEnvironmentConfiguration that = (NetworkEnvironmentConfiguration) obj;
return this.networkBufFraction == that.networkBufFraction &&
this.networkBufMin == that.networkBufMin &&
this.networkBufMax == that.networkBufMax &&
this.networkBufferSize == that.networkBufferSize &&
this.partitionRequestInitialBackoff == that.partitionRequestInitialBackoff &&
this.partitionRequestMaxBackoff == that.partitionRequestMaxBackoff &&
this.networkBuffersPerChannel == that.networkBuffersPerChannel &&
this.floatingNetworkBuffersPerGate == that.floatingNetworkBuffersPerGate &&
this.ioMode == that.ioMode &&
(nettyConfig != null ? nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null);
}
}
@Override
public String toString() {
return "NetworkEnvironmentConfiguration{" +
"networkBufFraction=" + networkBufFraction +
", networkBufMin=" + networkBufMin +
", networkBufMax=" + networkBufMax +
", networkBufferSize=" + networkBufferSize +
", ioMode=" + ioMode +
", partitionRequestInitialBackoff=" + partitionRequestInitialBackoff +
", partitionRequestMaxBackoff=" + partitionRequestMaxBackoff +
", networkBuffersPerChannel=" + networkBuffersPerChannel +
", floatingNetworkBuffersPerGate=" + floatingNetworkBuffersPerGate +
", nettyConfig=" + nettyConfig +
'}';
}
}
- NetworkEnvironmentConfiguration主要是flink network的相关配置,里头有networkBufFraction、networkBufMin、networkBufMax、networkBufferSize、ioMode、partitionRequestInitialBackoff、partitionRequestMaxBackoff、networkBuffersPerChannel、floatingNetworkBuffersPerGate、nettyConfig属性
TaskManagerServicesConfiguration
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
public class TaskManagerServicesConfiguration {
//......
/**
* Creates the {@link NetworkEnvironmentConfiguration} from the given {@link Configuration}.
*
* @param configuration to create the network environment configuration from
* @param localTaskManagerCommunication true if task manager communication is local
* @param taskManagerAddress address of the task manager
* @param slots to start the task manager with
* @return Network environment configuration
*/
@SuppressWarnings("deprecation")
private static NetworkEnvironmentConfiguration parseNetworkEnvironmentConfiguration(
Configuration configuration,
boolean localTaskManagerCommunication,
InetAddress taskManagerAddress,
int slots) throws Exception {
// ----> hosts / ports for communication and data exchange
int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT);
checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(),
"Leave config parameter empty or use 0 to let the system choose a port automatically.");
checkConfigParameter(slots >= 1, slots, TaskManagerOptions.NUM_TASK_SLOTS.key(),
"Number of task slots must be at least one.");
final int pageSize = checkedDownCast(MemorySize.parse(configuration.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes());
// check page size of for minimum size
checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize,
TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
"Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE);
// check page size for power of two
checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
"Memory segment size must be a power of 2.");
// network buffer memory fraction
float networkBufFraction = configuration.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
long networkBufMin = MemorySize.parse(configuration.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes();
long networkBufMax = MemorySize.parse(configuration.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();
checkNetworkBufferConfig(pageSize, networkBufFraction, networkBufMin, networkBufMax);
// fallback: number of network buffers
final int numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
checkNetworkConfigOld(numNetworkBuffers);
if (!hasNewNetworkBufConf(configuration)) {
// map old config to new one:
networkBufMin = networkBufMax = ((long) numNetworkBuffers) * pageSize;
} else {
if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
LOG.info("Ignoring old (but still present) network buffer configuration via {}.",
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
}
}
final NettyConfig nettyConfig;
if (!localTaskManagerCommunication) {
final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport);
nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(),
taskManagerInetSocketAddress.getPort(), pageSize, slots, configuration);
} else {
nettyConfig = null;
}
// Default spill I/O mode for intermediate results
final String syncOrAsync = configuration.getString(
ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE);
final IOManager.IOMode ioMode;
if (syncOrAsync.equals("async")) {
ioMode = IOManager.IOMode.ASYNC;
} else {
ioMode = IOManager.IOMode.SYNC;
}
int initialRequestBackoff = configuration.getInteger(
TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
int maxRequestBackoff = configuration.getInteger(
TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
int buffersPerChannel = configuration.getInteger(
TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
int extraBuffersPerGate = configuration.getInteger(
TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
return new NetworkEnvironmentConfiguration(
networkBufFraction,
networkBufMin,
networkBufMax,
pageSize,
ioMode,
initialRequestBackoff,
maxRequestBackoff,
buffersPerChannel,
extraBuffersPerGate,
nettyConfig);
}
//......
}
- TaskManagerServicesConfiguration有个私有方法parseNetworkEnvironmentConfiguration,用于创建NetworkEnvironmentConfiguration;它会读取TaskManagerOptions.MEMORY_SEGMENT_SIZE、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX、TaskManagerOptions.NETWORK_NUM_BUFFERS、ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX、TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL、TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE等配置
TaskManagerOptions
flink-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@PublicEvolving
public class TaskManagerOptions {
//......
/**
* Size of memory buffers used by the network stack and the memory manager.
*/
public static final ConfigOption<String> MEMORY_SEGMENT_SIZE =
key("taskmanager.memory.segment-size")
.defaultValue("32kb")
.withDescription("Size of memory buffers used by the network stack and the memory manager.");
/**
* Fraction of JVM memory to use for network buffers.
*/
public static final ConfigOption<Float> NETWORK_BUFFERS_MEMORY_FRACTION =
key("taskmanager.network.memory.fraction")
.defaultValue(0.1f)
.withDescription("Fraction of JVM memory to use for network buffers. This determines how many streaming" +
" data exchange channels a TaskManager can have at the same time and how well buffered the channels" +
" are. If a job is rejected or you get a warning that the system has not enough buffers available," +
" increase this value or the min/max values below. Also note, that \"taskmanager.network.memory.min\"" +
"` and \"taskmanager.network.memory.max\" may override this fraction.");
/**
* Minimum memory size for network buffers.
*/
public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MIN =
key("taskmanager.network.memory.min")
.defaultValue("64mb")
.withDescription("Minimum memory size for network buffers.");
/**
* Maximum memory size for network buffers.
*/
public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MAX =
key("taskmanager.network.memory.max")
.defaultValue("1gb")
.withDescription("Maximum memory size for network buffers.");
/**
* Number of buffers used in the network stack. This defines the number of possible tasks and
* shuffles.
*
* @deprecated use {@link #NETWORK_BUFFERS_MEMORY_FRACTION}, {@link #NETWORK_BUFFERS_MEMORY_MIN},
* and {@link #NETWORK_BUFFERS_MEMORY_MAX} instead
*/
@Deprecated
public static final ConfigOption<Integer> NETWORK_NUM_BUFFERS =
key("taskmanager.network.numberOfBuffers")
.defaultValue(2048);
/**
* Minimum backoff for partition requests of input channels.
*/
public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
key("taskmanager.network.request-backoff.initial")
.defaultValue(100)
.withDeprecatedKeys("taskmanager.net.request-backoff.initial")
.withDescription("Minimum backoff in milliseconds for partition requests of input channels.");
/**
* Maximum backoff for partition requests of input channels.
*/
public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX =
key("taskmanager.network.request-backoff.max")
.defaultValue(10000)
.withDeprecatedKeys("taskmanager.net.request-backoff.max")
.withDescription("Maximum backoff in milliseconds for partition requests of input channels.");
/**
* Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).
*
* <p>Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization.
*/
public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL =
key("taskmanager.network.memory.buffers-per-channel")
.defaultValue(2)
.withDescription("Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel)." +
"In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be" +
" configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is" +
" for parallel serialization.");
/**
* Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate).
*/
public static final ConfigOption<Integer> NETWORK_EXTRA_BUFFERS_PER_GATE =
key("taskmanager.network.memory.floating-buffers-per-gate")
.defaultValue(8)
.withDescription("Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate)." +
" In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels." +
" The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can" +
" help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be" +
" increased in case of higher round trip times between nodes and/or larger number of machines in the cluster.");
//......
}
- taskmanager.memory.segment-size指定memory segment的大小,默认为32kb;taskmanager.network.memory.fraction指定network buffers使用的memory的比例,默认为0.1;taskmanager.network.memory.min指定network buffers使用的最小内存,默认为64mb;taskmanager.network.memory.max指定network buffers使用的最大内存,默认为1gb;taskmanager.network.numberOfBuffers指定network使用的buffers数量,默认为2048,该配置已经被废弃,使用taskmanager.network.memory.fraction、taskmanager.network.memory.min、taskmanager.network.memory.max这几个配置来替代
-
taskmanager.network.request-backoff.initial指定input channels的partition requests的最小backoff时间(
毫秒),默认为100;taskmanager.network.request-backoff.max指定input channels的partition requests的最大backoff时间(毫秒),默认为10000 - taskmanager.network.memory.buffers-per-channel指定每个outgoing/incoming channel使用buffers数量,默认为2;taskmanager.network.memory.floating-buffers-per-gate指定每个outgoing/incoming gate使用buffers数量,默认为8
NettyConfig
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
public class NettyConfig {
private static final Logger LOG = LoggerFactory.getLogger(NettyConfig.class);
// - Config keys ----------------------------------------------------------
public static final ConfigOption<Integer> NUM_ARENAS = ConfigOptions
.key("taskmanager.network.netty.num-arenas")
.defaultValue(-1)
.withDeprecatedKeys("taskmanager.net.num-arenas")
.withDescription("The number of Netty arenas.");
public static final ConfigOption<Integer> NUM_THREADS_SERVER = ConfigOptions
.key("taskmanager.network.netty.server.numThreads")
.defaultValue(-1)
.withDeprecatedKeys("taskmanager.net.server.numThreads")
.withDescription("The number of Netty server threads.");
public static final ConfigOption<Integer> NUM_THREADS_CLIENT = ConfigOptions
.key("taskmanager.network.netty.client.numThreads")
.defaultValue(-1)
.withDeprecatedKeys("taskmanager.net.client.numThreads")
.withDescription("The number of Netty client threads.");
public static final ConfigOption<Integer> CONNECT_BACKLOG = ConfigOptions
.key("taskmanager.network.netty.server.backlog")
.defaultValue(0) // default: 0 => Netty's default
.withDeprecatedKeys("taskmanager.net.server.backlog")
.withDescription("The netty server connection backlog.");
public static final ConfigOption<Integer> CLIENT_CONNECT_TIMEOUT_SECONDS = ConfigOptions
.key("taskmanager.network.netty.client.connectTimeoutSec")
.defaultValue(120) // default: 120s = 2min
.withDeprecatedKeys("taskmanager.net.client.connectTimeoutSec")
.withDescription("The Netty client connection timeout.");
public static final ConfigOption<Integer> SEND_RECEIVE_BUFFER_SIZE = ConfigOptions
.key("taskmanager.network.netty.sendReceiveBufferSize")
.defaultValue(0) // default: 0 => Netty's default
.withDeprecatedKeys("taskmanager.net.sendReceiveBufferSize")
.withDescription("The Netty send and receive buffer size. This defaults to the system buffer size" +
" (cat /proc/sys/net/ipv4/tcp_[rw]mem) and is 4 MiB in modern Linux.");
public static final ConfigOption<String> TRANSPORT_TYPE = ConfigOptions
.key("taskmanager.network.netty.transport")
.defaultValue("nio")
.withDeprecatedKeys("taskmanager.net.transport")
.withDescription("The Netty transport type, either \"nio\" or \"epoll\"");
// ------------------------------------------------------------------------
enum TransportType {
NIO, EPOLL, AUTO
}
static final String SERVER_THREAD_GROUP_NAME = "Flink Netty Server";
static final String CLIENT_THREAD_GROUP_NAME = "Flink Netty Client";
private final InetAddress serverAddress;
private final int serverPort;
private final int memorySegmentSize;
private final int numberOfSlots;
private final Configuration config; // optional configuration
public NettyConfig(
InetAddress serverAddress,
int serverPort,
int memorySegmentSize,
int numberOfSlots,
Configuration config) {
this.serverAddress = checkNotNull(serverAddress);
checkArgument(serverPort >= 0 && serverPort <= 65536, "Invalid port number.");
this.serverPort = serverPort;
checkArgument(memorySegmentSize > 0, "Invalid memory segment size.");
this.memorySegmentSize = memorySegmentSize;
checkArgument(numberOfSlots > 0, "Number of slots");
this.numberOfSlots = numberOfSlots;
this.config = checkNotNull(config);
LOG.info(this.toString());
}
InetAddress getServerAddress() {
return serverAddress;
}
int getServerPort() {
return serverPort;
}
int getMemorySegmentSize() {
return memorySegmentSize;
}
public int getNumberOfSlots() {
return numberOfSlots;
}
// ------------------------------------------------------------------------
// Getters
// ------------------------------------------------------------------------
public int getServerConnectBacklog() {
return config.getInteger(CONNECT_BACKLOG);
}
public int getNumberOfArenas() {
// default: number of slots
final int configValue = config.getInteger(NUM_ARENAS);
return configValue == -1 ? numberOfSlots : configValue;
}
public int getServerNumThreads() {
// default: number of task slots
final int configValue = config.getInteger(NUM_THREADS_SERVER);
return configValue == -1 ? numberOfSlots : configValue;
}
public int getClientNumThreads() {
// default: number of task slots
final int configValue = config.getInteger(NUM_THREADS_CLIENT);
return configValue == -1 ? numberOfSlots : configValue;
}
public int getClientConnectTimeoutSeconds() {
return config.getInteger(CLIENT_CONNECT_TIMEOUT_SECONDS);
}
public int getSendAndReceiveBufferSize() {
return config.getInteger(SEND_RECEIVE_BUFFER_SIZE);
}
public TransportType getTransportType() {
String transport = config.getString(TRANSPORT_TYPE);
switch (transport) {
case "nio":
return TransportType.NIO;
case "epoll":
return TransportType.EPOLL;
default:
return TransportType.AUTO;
}
}
@Nullable
public SSLHandlerFactory createClientSSLEngineFactory() throws Exception {
return getSSLEnabled() ?
SSLUtils.createInternalClientSSLEngineFactory(config) :
null;
}
@Nullable
public SSLHandlerFactory createServerSSLEngineFactory() throws Exception {
return getSSLEnabled() ?
SSLUtils.createInternalServerSSLEngineFactory(config) :
null;
}
public boolean getSSLEnabled() {
return config.getBoolean(TaskManagerOptions.DATA_SSL_ENABLED)
&& SSLUtils.isInternalSSLEnabled(config);
}
public boolean isCreditBasedEnabled() {
return config.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
}
public Configuration getConfig() {
return config;
}
@Override
public String toString() {
String format = "NettyConfig [" +
"server address: %s, " +
"server port: %d, " +
"ssl enabled: %s, " +
"memory segment size (bytes): %d, " +
"transport type: %s, " +
"number of server threads: %d (%s), " +
"number of client threads: %d (%s), " +
"server connect backlog: %d (%s), " +
"client connect timeout (sec): %d, " +
"send/receive buffer size (bytes): %d (%s)]";
String def = "use Netty's default";
String man = "manual";
return String.format(format, serverAddress, serverPort, getSSLEnabled() ? "true" : "false",
memorySegmentSize, getTransportType(), getServerNumThreads(),
getServerNumThreads() == 0 ? def : man,
getClientNumThreads(), getClientNumThreads() == 0 ? def : man,
getServerConnectBacklog(), getServerConnectBacklog() == 0 ? def : man,
getClientConnectTimeoutSeconds(), getSendAndReceiveBufferSize(),
getSendAndReceiveBufferSize() == 0 ? def : man);
}
}
单位秒
小结
- NetworkEnvironmentConfiguration主要是flink network的相关配置,里头有networkBufFraction、networkBufMin、networkBufMax、networkBufferSize、ioMode、partitionRequestInitialBackoff、partitionRequestMaxBackoff、networkBuffersPerChannel、floatingNetworkBuffersPerGate、nettyConfig属性
- TaskManagerServicesConfiguration有个私有方法parseNetworkEnvironmentConfiguration,用于创建NetworkEnvironmentConfiguration;它会读取TaskManagerOptions.MEMORY_SEGMENT_SIZE、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX、TaskManagerOptions.NETWORK_NUM_BUFFERS、ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX、TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL、TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE等配置
- NettyConfig的构造器接收serverAddress、serverPort、memorySegmentSize、numberOfSlots、config这几个参数;它还提供了getServerConnectBacklog、getNumberOfArenas、getServerNumThreads、getClientNumThreads、getClientConnectTimeoutSeconds、getSendAndReceiveBufferSize、getTransportType等方法用于从config读取配置
doc
以上所述就是小编给大家介绍的《聊聊flink的NetworkEnvironmentConfiguration》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Open Data Structures
Pat Morin / AU Press / 2013-6 / USD 29.66
Offered as an introduction to the field of data structures and algorithms, Open Data Structures covers the implementation and analysis of data structures for sequences (lists), queues, priority queues......一起来看看 《Open Data Structures》 这本书的介绍吧!