聊聊flink JobManager的heap大小设置

栏目: Java · 发布时间: 6年前

内容简介:本文主要研究一下flink JobManager的heap大小设置flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/JobManagerOptions.javaflink-core-1.7.1-sources.jar!/org/apache/flink/configuration/ConfigurationUtils.java

本文主要研究一下flink JobManager的heap大小设置

JobManagerOptions

flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/JobManagerOptions.java

@PublicEvolving
public class JobManagerOptions {
    //......

    /**
     * JVM heap size for the JobManager with memory size.
     */
    @Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY)
    public static final ConfigOption<String> JOB_MANAGER_HEAP_MEMORY =
        key("jobmanager.heap.size")
        .defaultValue("1024m")
        .withDescription("JVM heap size for the JobManager.");

    /**
     * JVM heap size (in megabytes) for the JobManager.
     * @deprecated use {@link #JOB_MANAGER_HEAP_MEMORY}
     */
    @Deprecated
    public static final ConfigOption<Integer> JOB_MANAGER_HEAP_MEMORY_MB =
        key("jobmanager.heap.mb")
        .defaultValue(1024)
        .withDescription("JVM heap size (in megabytes) for the JobManager.");

    //......
}
  • jobmanager.heap.size配置用于指定JobManager的大小,默认是1024m;jobmanager.heap.mb配置已经被废弃

ConfigurationUtils

flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/ConfigurationUtils.java

public class ConfigurationUtils {

    private static final String[] EMPTY = new String[0];

    /**
     * Get job manager's heap memory. This method will check the new key
     * {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY} and
     * the old key {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY_MB} for backwards compatibility.
     *
     * @param configuration the configuration object
     * @return the memory size of job manager's heap memory.
     */
    public static MemorySize getJobManagerHeapMemory(Configuration configuration) {
        if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key())) {
            return MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY));
        } else if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB.key())) {
            return MemorySize.parse(configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB) + "m");
        } else {
            //use default value
            return MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY));
        }
    }

    //......
}
  • ConfigurationUtils的getJobManagerHeapMemory方法从Configuration中读取配置,然后解析为MemorySize

MemorySize

flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/MemorySize.java

@PublicEvolving
public class MemorySize implements java.io.Serializable {

    private static final long serialVersionUID = 1L;

    // ------------------------------------------------------------------------

    /** The memory size, in bytes. */
    private final long bytes;

    /**
     * Constructs a new MemorySize.
     *
     * @param bytes The size, in bytes. Must be zero or larger.
     */
    public MemorySize(long bytes) {
        checkArgument(bytes >= 0, "bytes must be >= 0");
        this.bytes = bytes;
    }

    // ------------------------------------------------------------------------

    /**
     * Gets the memory size in bytes.
     */
    public long getBytes() {
        return bytes;
    }

    /**
     * Gets the memory size in Kibibytes (= 1024 bytes).
     */
    public long getKibiBytes() {
        return bytes >> 10;
    }

    /**
     * Gets the memory size in Mebibytes (= 1024 Kibibytes).
     */
    public int getMebiBytes() {
        return (int) (bytes >> 20);
    }

    /**
     * Gets the memory size in Gibibytes (= 1024 Mebibytes).
     */
    public long getGibiBytes() {
        return bytes >> 30;
    }

    /**
     * Gets the memory size in Tebibytes (= 1024 Gibibytes).
     */
    public long getTebiBytes() {
        return bytes >> 40;
    }

    // ------------------------------------------------------------------------

    @Override
    public int hashCode() {
        return (int) (bytes ^ (bytes >>> 32));
    }

    @Override
    public boolean equals(Object obj) {
        return obj == this ||
                (obj != null && obj.getClass() == this.getClass() && ((MemorySize) obj).bytes == this.bytes);
    }

    @Override
    public String toString() {
        return bytes + " bytes";
    }

    // ------------------------------------------------------------------------
    //  Parsing
    // ------------------------------------------------------------------------

    /**
     * Parses the given string as as MemorySize.
     *
     * @param text The string to parse
     * @return The parsed MemorySize
     *
     * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
     */
    public static MemorySize parse(String text) throws IllegalArgumentException {
        return new MemorySize(parseBytes(text));
    }

    /**
     * Parses the given string with a default unit.
     *
     * @param text The string to parse.
     * @param defaultUnit specify the default unit.
     * @return The parsed MemorySize.
     *
     * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
     */
    public static MemorySize parse(String text, MemoryUnit defaultUnit) throws IllegalArgumentException {
        if (!hasUnit(text)) {
            return parse(text + defaultUnit.getUnits()[0]);
        }

        return parse(text);
    }

    /**
     * Parses the given string as bytes.
     * The supported expressions are listed under {@link MemorySize}.
     *
     * @param text The string to parse
     * @return The parsed size, in bytes.
     *
     * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
     */
    public static long parseBytes(String text) throws IllegalArgumentException {
        checkNotNull(text, "text");

        final String trimmed = text.trim();
        checkArgument(!trimmed.isEmpty(), "argument is an empty- or whitespace-only string");

        final int len = trimmed.length();
        int pos = 0;

        char current;
        while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') {
            pos++;
        }

        final String number = trimmed.substring(0, pos);
        final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);

        if (number.isEmpty()) {
            throw new NumberFormatException("text does not start with a number");
        }

        final long value;
        try {
            value = Long.parseLong(number); // this throws a NumberFormatException on overflow
        }
        catch (NumberFormatException e) {
            throw new IllegalArgumentException("The value '" + number +
                    "' cannot be re represented as 64bit number (numeric overflow).");
        }

        final long multiplier;
        if (unit.isEmpty()) {
            multiplier = 1L;
        }
        else {
            if (matchesAny(unit, BYTES)) {
                multiplier = 1L;
            }
            else if (matchesAny(unit, KILO_BYTES)) {
                multiplier = 1024L;
            }
            else if (matchesAny(unit, MEGA_BYTES)) {
                multiplier = 1024L * 1024L;
            }
            else if (matchesAny(unit, GIGA_BYTES)) {
                multiplier = 1024L * 1024L * 1024L;
            }
            else if (matchesAny(unit, TERA_BYTES)) {
                multiplier = 1024L * 1024L * 1024L * 1024L;
            }
            else {
                throw new IllegalArgumentException("Memory size unit '" + unit +
                        "' does not match any of the recognized units: " + MemoryUnit.getAllUnits());
            }
        }

        final long result = value * multiplier;

        // check for overflow
        if (result / multiplier != value) {
            throw new IllegalArgumentException("The value '" + text +
                    "' cannot be re represented as 64bit number of bytes (numeric overflow).");
        }

        return result;
    }

    private static boolean matchesAny(String str, MemoryUnit unit) {
        for (String s : unit.getUnits()) {
            if (s.equals(str)) {
                return true;
            }
        }
        return false;
    }

    //......
}
  • MemorySize内部有个bytes字段,以bytes为单位,之后提供了getBytes、getKibiBytes、getMebiBytes、getGibiBytes、getTebiBytes方法用于快速换算;parse静态方法用于从文本中解析并创建MemorySize,其中parse方法可接收MemoryUnit参数用于文本中没有MemoryUnit时才使用的默认的MemoryUnit,最后都是调用的parseBytes方法

MemoryUnit

flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/MemorySize.java

/**
     *  Enum which defines memory unit, mostly used to parse value from configuration file.
     *
     * <p>To make larger values more compact, the common size suffixes are supported:
     *
     * <ul>
     *     <li>q or 1b or 1bytes (bytes)
     *     <li>1k or 1kb or 1kibibytes (interpreted as kibibytes = 1024 bytes)
     *     <li>1m or 1mb or 1mebibytes (interpreted as mebibytes = 1024 kibibytes)
     *     <li>1g or 1gb or 1gibibytes (interpreted as gibibytes = 1024 mebibytes)
     *     <li>1t or 1tb or 1tebibytes (interpreted as tebibytes = 1024 gibibytes)
     * </ul>
     *
     */
    public enum MemoryUnit {

        BYTES(new String[] { "b", "bytes" }),
        KILO_BYTES(new String[] { "k", "kb", "kibibytes" }),
        MEGA_BYTES(new String[] { "m", "mb", "mebibytes" }),
        GIGA_BYTES(new String[] { "g", "gb", "gibibytes" }),
        TERA_BYTES(new String[] { "t", "tb", "tebibytes" });

        private String[] units;

        MemoryUnit(String[] units) {
            this.units = units;
        }

        public String[] getUnits() {
            return units;
        }

        public static String getAllUnits() {
            return concatenateUnits(BYTES.getUnits(), KILO_BYTES.getUnits(), MEGA_BYTES.getUnits(), GIGA_BYTES.getUnits(), TERA_BYTES.getUnits());
        }

        public static boolean hasUnit(String text) {
            checkNotNull(text, "text");

            final String trimmed = text.trim();
            checkArgument(!trimmed.isEmpty(), "argument is an empty- or whitespace-only string");

            final int len = trimmed.length();
            int pos = 0;

            char current;
            while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') {
                pos++;
            }

            final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);

            return unit.length() > 0;
        }

        private static String concatenateUnits(final String[]... allUnits) {
            final StringBuilder builder = new StringBuilder(128);

            for (String[] units : allUnits) {
                builder.append('(');

                for (String unit : units) {
                    builder.append(unit);
                    builder.append(" | ");
                }

                builder.setLength(builder.length() - 3);
                builder.append(") / ");
            }

            builder.setLength(builder.length() - 3);
            return builder.toString();
        }

    }
  • MemoryUnit枚举定义了BYTES、KILO_BYTES、MEGA_BYTES、GIGA_BYTES、TERA_BYTES;它有units属性,是一个string数组,用于指定每类单位的文本标识,最后匹配时都是转换为小写来匹配

FlinkYarnSessionCli

flink-1.7.1/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java

public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId> {
    //......

    private ClusterSpecification createClusterSpecification(Configuration configuration, CommandLine cmd) {
        if (cmd.hasOption(container.getOpt())) { // number of containers is required option!
            LOG.info("The argument {} is deprecated in will be ignored.", container.getOpt());
        }

        // TODO: The number of task manager should be deprecated soon
        final int numberTaskManagers;

        if (cmd.hasOption(container.getOpt())) {
            numberTaskManagers = Integer.valueOf(cmd.getOptionValue(container.getOpt()));
        } else {
            numberTaskManagers = 1;
        }

        // JobManager Memory
        final int jobManagerMemoryMB = ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();

        // Task Managers memory
        final int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();

        int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);

        return new ClusterSpecification.ClusterSpecificationBuilder()
            .setMasterMemoryMB(jobManagerMemoryMB)
            .setTaskManagerMemoryMB(taskManagerMemoryMB)
            .setNumberTaskManagers(numberTaskManagers)
            .setSlotsPerTaskManager(slotsPerTaskManager)
            .createClusterSpecification();
    }

    //......
}
  • FlinkYarnSessionCli的createClusterSpecification方法使用到了ConfigurationUtils.getJobManagerHeapMemory(configuration)来读取jobManagerMemoryMB

config.sh

flink-1.7.1/flink-dist/src/main/flink-bin/bin/config.sh

//......

DEFAULT_ENV_PID_DIR="/tmp"                          # Directory to store *.pid files to
DEFAULT_ENV_LOG_MAX=5                               # Maximum number of old log files to keep
DEFAULT_ENV_JAVA_OPTS=""                            # Optional JVM args
DEFAULT_ENV_JAVA_OPTS_JM=""                         # Optional JVM args (JobManager)
DEFAULT_ENV_JAVA_OPTS_TM=""                         # Optional JVM args (TaskManager)
DEFAULT_ENV_JAVA_OPTS_HS=""                         # Optional JVM args (HistoryServer)
DEFAULT_ENV_SSH_OPTS=""                             # Optional SSH parameters running in cluster mode
DEFAULT_YARN_CONF_DIR=""                            # YARN Configuration Directory, if necessary
DEFAULT_HADOOP_CONF_DIR=""                          # Hadoop Configuration Directory, if necessary

//......

# Define FLINK_JM_HEAP if it is not already set
if [ -z "${FLINK_JM_HEAP}" ]; then
    FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_MEM_SIZE} 0 "${YAML_CONF}")
fi

# Try read old config key, if new key not exists
if [ "${FLINK_JM_HEAP}" == 0 ]; then
    FLINK_JM_HEAP_MB=$(readFromConfig ${KEY_JOBM_MEM_MB} 0 "${YAML_CONF}")
fi

//......

if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then
    FLINK_ENV_JAVA_OPTS=$(readFromConfig ${KEY_ENV_JAVA_OPTS} "${DEFAULT_ENV_JAVA_OPTS}" "${YAML_CONF}")

    # Remove leading and ending double quotes (if present) of value
    FLINK_ENV_JAVA_OPTS="$( echo "${FLINK_ENV_JAVA_OPTS}" | sed -e 's/^"//'  -e 's/"$//' )"
fi

if [ -z "${FLINK_ENV_JAVA_OPTS_JM}" ]; then
    FLINK_ENV_JAVA_OPTS_JM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_JM} "${DEFAULT_ENV_JAVA_OPTS_JM}" "${YAML_CONF}")
    # Remove leading and ending double quotes (if present) of value
    FLINK_ENV_JAVA_OPTS_JM="$( echo "${FLINK_ENV_JAVA_OPTS_JM}" | sed -e 's/^"//'  -e 's/"$//' )"
fi

//......

# Arguments for the JVM. Used for job and task manager JVMs.
# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
# KEY_JOBM_MEM_SIZE and KEY_TASKM_MEM_SIZE for that!
if [ -z "${JVM_ARGS}" ]; then
    JVM_ARGS=""
fi

//......
  • config.sh首先判断环境变量FLINK_JM_HEAP是否有设置,没有的话,则从flink-conf.yaml中读取jobmanager.heap.size配置到FLINK_JM_HEAP;如果FLINK_JM_HEAP为0,则读取jobmanager.heap.mb的配置到FLINK_JM_HEAP_MB
  • 如果没有设置FLINK_ENV_JAVA_OPTS,则从flink-conf.yaml中读取env.java.opts配置,如果没有该配置则使用DEFAULT_ENV_JAVA_OPTS,默认为空;如果没有设置FLINK_ENV_JAVA_OPTS_JM,则从flink-conf.yaml中读取env.java.opts.jobmanager配置,如果没有该配置则使用DEFAULT_ENV_JAVA_OPTS_JM,默认为空
  • JVM_ARGS变量会被job及task manager使用,如果没有设置,则初始化为空;注意不要设置内存相关参数到JVM_ARGS,要使用flink-conf.yaml中的jobmanager.heap.size、taskmanager.heap.size来配置

jobmanager.sh

flink-1.7.1/flink-dist/src/main/flink-bin/bin/jobmanager.sh

#!/usr/bin/env bash
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# Start/stop a Flink JobManager.
USAGE="Usage: jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all"

STARTSTOP=$1
HOST=$2 # optional when starting multiple instances
WEBUIPORT=$3 # optional when starting multiple instances

if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
  echo $USAGE
  exit 1
fi

bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

. "$bin"/config.sh

ENTRYPOINT=standalonesession

if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
    if [ ! -z "${FLINK_JM_HEAP_MB}" ] && [ "${FLINK_JM_HEAP}" == 0 ]; then
        echo "used deprecated key \`${KEY_JOBM_MEM_MB}\`, please replace with key \`${KEY_JOBM_MEM_SIZE}\`"
    else
        flink_jm_heap_bytes=$(parseBytes ${FLINK_JM_HEAP})
        FLINK_JM_HEAP_MB=$(getMebiBytes ${flink_jm_heap_bytes})
    fi

    if [[ ! ${FLINK_JM_HEAP_MB} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP_MB}" -lt "0" ]]; then
        echo "[ERROR] Configured JobManager memory size is not a valid value. Please set '${KEY_JOBM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
        exit 1
    fi

    if [ "${FLINK_JM_HEAP_MB}" -gt "0" ]; then
        export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP_MB"m -Xmx"$FLINK_JM_HEAP_MB"m"
    fi

    # Add JobManager-specific JVM options
    export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"

    # Startup parameters
    args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster")
    if [ ! -z $HOST ]; then
        args+=("--host")
        args+=("${HOST}")
    fi

    if [ ! -z $WEBUIPORT ]; then
        args+=("--webui-port")
        args+=("${WEBUIPORT}")
    fi
fi

if [[ $STARTSTOP == "start-foreground" ]]; then
    exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${args[@]}"
else
    "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}"
fi
  • jobmanager.sh首先调用config.sh来初始化相关变量( FLINK_JM_HEAP、FLINK_JM_HEAP_MB、FLINK_ENV_JAVA_OPTS、FLINK_ENV_JAVA_OPTS_JM、JVM_ARGS )
  • 如果FLINK_JM_HEAP值大于0,则解析到FLINK_JM_HEAP_MB变量;如果FLINK_JM_HEAP_MB大于0,则使用该值设置Xms及Xmx追加到JVM_ARGS变量中;然后将FLINK_ENV_JAVA_OPTS_JM( 依据env.java.opts.jobmanager配置 )追加到FLINK_ENV_JAVA_OPTS( 依据env.java.opts )中
  • jobmanager.sh最后调用flink-console.sh来启动相关类

flink-console.sh

flink-1.7.1/flink-dist/src/main/flink-bin/bin/flink-console.sh

#!/usr/bin/env bash
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# Start a Flink service as a console application. Must be stopped with Ctrl-C
# or with SIGTERM by kill or the controlling process.
USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]"

SERVICE=$1
ARGS=("${@:2}") # get remaining arguments as array

bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

. "$bin"/config.sh

case $SERVICE in
    (taskexecutor)
        CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
    ;;

    (historyserver)
        CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
    ;;

    (zookeeper)
        CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
    ;;

    (standalonesession)
        CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
    ;;

    (standalonejob)
        CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint
    ;;

    (*)
        echo "Unknown service '${SERVICE}'. $USAGE."
        exit 1
    ;;
esac

FLINK_TM_CLASSPATH=`constructFlinkClassPath`

log_setting=("-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml")

JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')

# Only set JVM 8 arguments if we have correctly extracted the version
if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
    if [ "$JAVA_VERSION" -lt 18 ]; then
        JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m"
    fi
fi

echo "Starting $SERVICE as a console application on host $HOSTNAME."
exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}"
  • flink-console.sh在 java 小于8版本时会给JVM_ARGS追加-XX:MaxPermSize=256m;之后使用JVM_ARGS及FLINK_ENV_JAVA_OPTS作为jvm参数启动CLASS_TO_RUN

小结

  • jobmanager.heap.size配置用于指定JobManager的大小,默认是1024m;jobmanager.heap.mb配置已经被废弃;ConfigurationUtils的getJobManagerHeapMemory方法从Configuration中读取配置,然后解析为MemorySize;MemorySize内部有个bytes字段,以bytes为单位,之后提供了getBytes、getKibiBytes、getMebiBytes、getGibiBytes、getTebiBytes方法用于快速换算;parse静态方法用于从文本中解析并创建MemorySize,其中parse方法可接收MemoryUnit参数用于文本中没有MemoryUnit时才使用的默认的MemoryUnit,最后都是调用的parseBytes方法
  • FlinkYarnSessionCli的createClusterSpecification方法使用到了ConfigurationUtils.getJobManagerHeapMemory(configuration)来读取jobManagerMemoryMB
  • config.sh首先判断环境变量FLINK_JM_HEAP是否有设置,没有的话,则从flink-conf.yaml中读取jobmanager.heap.size配置到FLINK_JM_HEAP;如果FLINK_JM_HEAP为0,则读取jobmanager.heap.mb的配置到FLINK_JM_HEAP_MB;如果没有设置FLINK_ENV_JAVA_OPTS,则从flink-conf.yaml中读取env.java.opts配置,如果没有该配置则使用DEFAULT_ENV_JAVA_OPTS,默认为空;如果没有设置FLINK_ENV_JAVA_OPTS_JM,则从flink-conf.yaml中读取env.java.opts.jobmanager配置,如果没有该配置则使用DEFAULT_ENV_JAVA_OPTS_JM,默认为空;JVM_ARGS变量会被job及task manager使用,如果没有设置,则初始化为空;注意不要设置内存相关参数到JVM_ARGS,要使用flink-conf.yaml中的jobmanager.heap.size、taskmanager.heap.size来配置
  • jobmanager.sh首先调用config.sh来初始化相关变量( FLINK_JM_HEAP、FLINK_JM_HEAP_MB、FLINK_ENV_JAVA_OPTS、FLINK_ENV_JAVA_OPTS_JM、JVM_ARGS );如果FLINK_JM_HEAP值大于0,则解析到FLINK_JM_HEAP_MB变量,如果FLINK_JM_HEAP_MB大于0,则使用该值设置Xms及Xmx追加到JVM_ARGS变量中;它会将FLINK_ENV_JAVA_OPTS_JM( 依据env.java.opts.jobmanager配置 )追加到FLINK_ENV_JAVA_OPTS( 依据env.java.opts )中;jobmanager.sh最后调用flink-console.sh来启动相关类
  • flink-console.sh在java小于8版本时会给JVM_ARGS追加-XX:MaxPermSize=256m;之后使用JVM_ARGS及FLINK_ENV_JAVA_OPTS作为jvm参数启动CLASS_TO_RUN

由此可见最后的jvm参数取决于JVM_ARGS及FLINK_ENV_JAVA_OPTS;其中注意不要设置内存相关参数到JVM_ARGS,因为jobmanager.sh在FLINK_JM_HEAP_MB大于0,则使用该值设置Xms及Xmx追加到JVM_ARGS变量中,而FLINK_JM_HEAP_MB则取决于FLINK_JM_HEAP或者jobmanager.heap.size配置;FLINK_ENV_JAVA_OPTS的配置则取决于env.java.opts以及env.java.opts.jobmanager;因而要配置jobmanager的heap大小的话,要么指定FLINK_JM_HEAP环境变量( 比如FLINK_JM_HEAP=512m ),要么在flink-conf.yaml中指定jobmanager.heap.size或者env.java.opts以及env.java.opts.jobmanager

doc


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Web Data Mining

Web Data Mining

Bing Liu / Springer / 2006-12-28 / USD 59.95

Web mining aims to discover useful information and knowledge from the Web hyperlink structure, page contents, and usage data. Although Web mining uses many conventional data mining techniques, it is n......一起来看看 《Web Data Mining》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

html转js在线工具
html转js在线工具

html转js在线工具