聊聊flink的Parallel Execution

栏目: 编程工具 · 发布时间: 5年前

内容简介:聊聊flink的Parallel Execution本文主要研究一下flink的Parallel Execution或者

聊聊flink的Parallel Execution

本文主要研究一下flink的Parallel Execution

实例

Operator Level

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
    .flatMap(new LineSplitter())
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1).setParallelism(5);

wordCounts.print();

env.execute("Word Count Example");
  • operators、data sources、data sinks都可以调用setParallelism()方法来设置parallelism

Execution Environment Level

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();

env.execute("Word Count Example");
  • 在ExecutionEnvironment里头可以通过setParallelism来给operators、data sources、data sinks设置默认的parallelism;如果operators、data sources、data sinks自己有设置parallelism则会覆盖ExecutionEnvironment设置的parallelism

Client Level

./bin/flink run -p 10 ../examples/*WordCount-java*.jar

或者

try {
    PackagedProgram program = new PackagedProgram(file, args);
    InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
    Configuration config = new Configuration();

    Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());

    // set the parallelism to 10 here
    client.run(program, 10, true);

} catch (ProgramInvocationException e) {
    e.printStackTrace();
}
  • 使用CLI client,可以在命令行调用是用-p来指定,或者Java/Scala调用时在Client.run的参数中指定parallelism

System Level

# The parallelism used for programs that did not specify and other parallelism.

parallelism.default: 1
  • 可以在flink-conf.yaml中通过parallelism.default配置项给所有execution environments指定系统级的默认parallelism

ExecutionEnvironment

flink-java-1.7.1-sources.jar!/org/apache/flink/api/java/ExecutionEnvironment.java

@Public
public abstract class ExecutionEnvironment {
    //......

    private final ExecutionConfig config = new ExecutionConfig();

    /**
     * Sets the parallelism for operations executed through this environment.
     * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run with
     * x parallel instances.
     *
     * <p>This method overrides the default parallelism for this environment.
     * The {@link LocalEnvironment} uses by default a value equal to the number of hardware
     * contexts (CPU cores / threads). When executing the program via the command line client
     * from a JAR file, the default parallelism is the one configured for that setup.
     *
     * @param parallelism The parallelism
     */
    public void setParallelism(int parallelism) {
        config.setParallelism(parallelism);
    }

    @Internal
    public Plan createProgramPlan(String jobName, boolean clearSinks) {
        if (this.sinks.isEmpty()) {
            if (wasExecuted) {
                throw new RuntimeException("No new data sinks have been defined since the " +
                        "last execution. The last execution refers to the latest call to " +
                        "'execute()', 'count()', 'collect()', or 'print()'.");
            } else {
                throw new RuntimeException("No data sinks have been created yet. " +
                        "A program needs at least one sink that consumes data. " +
                        "Examples are writing the data set or printing it.");
            }
        }

        if (jobName == null) {
            jobName = getDefaultName();
        }

        OperatorTranslation translator = new OperatorTranslation();
        Plan plan = translator.translateToPlan(this.sinks, jobName);

        if (getParallelism() > 0) {
            plan.setDefaultParallelism(getParallelism());
        }
        plan.setExecutionConfig(getConfig());

        // Check plan for GenericTypeInfo's and register the types at the serializers.
        if (!config.isAutoTypeRegistrationDisabled()) {
            plan.accept(new Visitor<org.apache.flink.api.common.operators.Operator<?>>() {

                private final Set<Class<?>> registeredTypes = new HashSet<>();
                private final Set<org.apache.flink.api.common.operators.Operator<?>> visitedOperators = new HashSet<>();

                @Override
                public boolean preVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {
                    if (!visitedOperators.add(visitable)) {
                        return false;
                    }
                    OperatorInformation<?> opInfo = visitable.getOperatorInfo();
                    Serializers.recursivelyRegisterType(opInfo.getOutputType(), config, registeredTypes);
                    return true;
                }

                @Override
                public void postVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {}
            });
        }

        try {
            registerCachedFilesWithPlan(plan);
        } catch (Exception e) {
            throw new RuntimeException("Error while registering cached files: " + e.getMessage(), e);
        }

        // clear all the sinks such that the next execution does not redo everything
        if (clearSinks) {
            this.sinks.clear();
            wasExecuted = true;
        }

        // All types are registered now. Print information.
        int registeredTypes = config.getRegisteredKryoTypes().size() +
                config.getRegisteredPojoTypes().size() +
                config.getRegisteredTypesWithKryoSerializerClasses().size() +
                config.getRegisteredTypesWithKryoSerializers().size();
        int defaultKryoSerializers = config.getDefaultKryoSerializers().size() +
                config.getDefaultKryoSerializerClasses().size();
        LOG.info("The job has {} registered types and {} default Kryo serializers", registeredTypes, defaultKryoSerializers);

        if (config.isForceKryoEnabled() && config.isForceAvroEnabled()) {
            LOG.warn("In the ExecutionConfig, both Avro and Kryo are enforced. Using Kryo serializer");
        }
        if (config.isForceKryoEnabled()) {
            LOG.info("Using KryoSerializer for serializing POJOs");
        }
        if (config.isForceAvroEnabled()) {
            LOG.info("Using AvroSerializer for serializing POJOs");
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("Registered Kryo types: {}", config.getRegisteredKryoTypes().toString());
            LOG.debug("Registered Kryo with Serializers types: {}", config.getRegisteredTypesWithKryoSerializers().entrySet().toString());
            LOG.debug("Registered Kryo with Serializer Classes types: {}", config.getRegisteredTypesWithKryoSerializerClasses().entrySet().toString());
            LOG.debug("Registered Kryo default Serializers: {}", config.getDefaultKryoSerializers().entrySet().toString());
            LOG.debug("Registered Kryo default Serializers Classes {}", config.getDefaultKryoSerializerClasses().entrySet().toString());
            LOG.debug("Registered POJO types: {}", config.getRegisteredPojoTypes().toString());

            // print information about static code analysis
            LOG.debug("Static code analysis mode: {}", config.getCodeAnalysisMode());
        }

        return plan;
    }

    //......
}
  • ExecutionEnvironment提供了setParallelism方法,给ExecutionConfig指定parallelism;最后createProgramPlan方法创建Plan后会读取ExecutionConfig的parallelism,给Plan设置defaultParallelism

DataStreamSource

flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/datastream/DataStreamSource.java

@Public
public class DataStreamSource<T> extends SingleOutputStreamOperator<T> {

    boolean isParallel;

    public DataStreamSource(StreamExecutionEnvironment environment,
            TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator,
            boolean isParallel, String sourceName) {
        super(environment, new SourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism()));

        this.isParallel = isParallel;
        if (!isParallel) {
            setParallelism(1);
        }
    }

    public DataStreamSource(SingleOutputStreamOperator<T> operator) {
        super(operator.environment, operator.getTransformation());
        this.isParallel = true;
    }

    @Override
    public DataStreamSource<T> setParallelism(int parallelism) {
        if (parallelism != 1 && !isParallel) {
            throw new IllegalArgumentException("Source: " + transformation.getId() + " is not a parallel source");
        } else {
            super.setParallelism(parallelism);
            return this;
        }
    }
}
  • DataStreamSource继承了SingleOutputStreamOperator,它提供了setParallelism方法,最终调用的是父类SingleOutputStreamOperator的setParallelism

SingleOutputStreamOperator

flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java

@Public
public class SingleOutputStreamOperator<T> extends DataStream<T> {
    //......

    /**
     * Sets the parallelism for this operator.
     *
     * @param parallelism
     *            The parallelism for this operator.
     * @return The operator with set parallelism.
     */
    public SingleOutputStreamOperator<T> setParallelism(int parallelism) {
        Preconditions.checkArgument(canBeParallel() || parallelism == 1,
                "The parallelism of non parallel operator must be 1.");

        transformation.setParallelism(parallelism);

        return this;
    }

    //......
}
  • SingleOutputStreamOperator的setParallelism最后是作用到StreamTransformation

DataStreamSink

flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/datastream/DataStreamSink.java

@Public
public class DataStreamSink<T> {

    private final SinkTransformation<T> transformation;

    //......

    /**
     * Sets the parallelism for this sink. The degree must be higher than zero.
     *
     * @param parallelism The parallelism for this sink.
     * @return The sink with set parallelism.
     */
    public DataStreamSink<T> setParallelism(int parallelism) {
        transformation.setParallelism(parallelism);
        return this;
    }

    //......
}
  • DataStreamSink提供了setParallelism方法,最后是作用于SinkTransformation

LocalEnvironment

flink-java-1.7.1-sources.jar!/org/apache/flink/api/java/LocalEnvironment.java

@Public
public class LocalEnvironment extends ExecutionEnvironment {

    //......

    public JobExecutionResult execute(String jobName) throws Exception {
        if (executor == null) {
            startNewSession();
        }

        Plan p = createProgramPlan(jobName);

        // Session management is disabled, revert this commit to enable
        //p.setJobId(jobID);
        //p.setSessionTimeout(sessionTimeout);

        JobExecutionResult result = executor.executePlan(p);

        this.lastJobExecutionResult = result;
        return result;
    }

    //......
}
  • LocalEnvironment的execute调用的是LocalExecutor的executePlan

LocalExecutor

flink-clients_2.11-1.7.1-sources.jar!/org/apache/flink/client/LocalExecutor.java

public class LocalExecutor extends PlanExecutor {
    
    //......

    @Override
    public JobExecutionResult executePlan(Plan plan) throws Exception {
        if (plan == null) {
            throw new IllegalArgumentException("The plan may not be null.");
        }

        synchronized (this.lock) {

            // check if we start a session dedicated for this execution
            final boolean shutDownAtEnd;

            if (jobExecutorService == null) {
                shutDownAtEnd = true;

                // configure the number of local slots equal to the parallelism of the local plan
                if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) {
                    int maxParallelism = plan.getMaximumParallelism();
                    if (maxParallelism > 0) {
                        this.taskManagerNumSlots = maxParallelism;
                    }
                }

                // start the cluster for us
                start();
            }
            else {
                // we use the existing session
                shutDownAtEnd = false;
            }

            try {
                // TODO: Set job's default parallelism to max number of slots
                final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots);
                final int numTaskManagers = jobExecutorServiceConfiguration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
                plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);

                Optimizer pc = new Optimizer(new DataStatistics(), jobExecutorServiceConfiguration);
                OptimizedPlan op = pc.compile(plan);

                JobGraphGenerator jgg = new JobGraphGenerator(jobExecutorServiceConfiguration);
                JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId());

                return jobExecutorService.executeJobBlocking(jobGraph);
            }
            finally {
                if (shutDownAtEnd) {
                    stop();
                }
            }
        }
    }

    //......
}
  • LocalExecutor的executePlan方法还会根据slotsPerTaskManager及numTaskManagers对plan设置defaultParallelism

RemoteEnvironment

flink-java-1.7.1-sources.jar!/org/apache/flink/api/java/RemoteEnvironment.java

@Public
public class RemoteEnvironment extends ExecutionEnvironment {

    //......

    public JobExecutionResult execute(String jobName) throws Exception {
        PlanExecutor executor = getExecutor();

        Plan p = createProgramPlan(jobName);

        // Session management is disabled, revert this commit to enable
        //p.setJobId(jobID);
        //p.setSessionTimeout(sessionTimeout);

        JobExecutionResult result = executor.executePlan(p);

        this.lastJobExecutionResult = result;
        return result;
    }

    //......
}
  • RemoteEnvironment的execute调用的是RemoteExecutor的executePlan

RemoteExecutor

flink-clients_2.11-1.7.1-sources.jar!/org/apache/flink/client/RemoteExecutor.java

public class RemoteExecutor extends PlanExecutor {

    private final Object lock = new Object();

    private final List<URL> jarFiles;

    private final List<URL> globalClasspaths;

    private final Configuration clientConfiguration;

    private ClusterClient<?> client;

    //......

    @Override
    public JobExecutionResult executePlan(Plan plan) throws Exception {
        if (plan == null) {
            throw new IllegalArgumentException("The plan may not be null.");
        }

        JobWithJars p = new JobWithJars(plan, this.jarFiles, this.globalClasspaths);
        return executePlanWithJars(p);
    }

    public JobExecutionResult executePlanWithJars(JobWithJars program) throws Exception {
        if (program == null) {
            throw new IllegalArgumentException("The job may not be null.");
        }

        synchronized (this.lock) {
            // check if we start a session dedicated for this execution
            final boolean shutDownAtEnd;

            if (client == null) {
                shutDownAtEnd = true;
                // start the executor for us
                start();
            }
            else {
                // we use the existing session
                shutDownAtEnd = false;
            }

            try {
                return client.run(program, defaultParallelism).getJobExecutionResult();
            }
            finally {
                if (shutDownAtEnd) {
                    stop();
                }
            }
        }
    }

    //......
}
  • RemoteExecutor的executePlan调用了executePlanWithJars方法,而后者则调用了ClusterClient的run,并在参数中指定了defaultParallelism

ClusterClient

flink-clients_2.11-1.7.1-sources.jar!/org/apache/flink/client/program/ClusterClient.java

public abstract class ClusterClient<T> {
    //......

    public JobSubmissionResult run(JobWithJars program, int parallelism) throws ProgramInvocationException {
        return run(program, parallelism, SavepointRestoreSettings.none());
    }

    public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings)
            throws CompilerException, ProgramInvocationException {
        ClassLoader classLoader = jobWithJars.getUserCodeClassLoader();
        if (classLoader == null) {
            throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
        }

        OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, parallelism);
        return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointSettings);
    }

    private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars prog, int parallelism)
            throws CompilerException, ProgramInvocationException {
        return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
    }

    public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException {
        Logger log = LoggerFactory.getLogger(ClusterClient.class);

        if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
            log.debug("Changing plan default parallelism from {} to {}", p.getDefaultParallelism(), parallelism);
            p.setDefaultParallelism(parallelism);
        }
        log.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism());

        return compiler.compile(p);
    }

    //......
}
  • ClusterClient的run方法中的parallelism最后作用到Plan中

小结

如果使用CLI client,可以在命令行调用是用-p来指定,或者Java/Scala调用时在Client.run的参数中指定parallelism

doc


以上所述就是小编给大家介绍的《聊聊flink的Parallel Execution》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

jQuery实战(第2版)

jQuery实战(第2版)

[美]Bear Bibeault、[美]Yehuda Katz / 三生石上 / 人民邮电出版社 / 2012-3 / 69.00元

jQuery 是目前最受欢迎的JavaScript/Ajax 库之一,能用最少的代码实现最多的功能。本书全面介绍jQuery 知识,展示如何遍历HTML 文档、处理事件、执行动画、给网页添加Ajax 以及jQuery UI 。书中紧紧地围绕“用实际的示例来解释每一个新概念”这一宗旨,生动描述了jQuery 如何与其他工具和框架交互以及如何生成jQuery 插件。 本书适合各层次Web 开发人......一起来看看 《jQuery实战(第2版)》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具