内容简介:聊聊flink的Parallel Execution本文主要研究一下flink的Parallel Execution或者
聊聊flink的Parallel Execution
序
本文主要研究一下flink的Parallel Execution
实例
Operator Level
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = [...] DataStream<Tuple2<String, Integer>> wordCounts = text .flatMap(new LineSplitter()) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1).setParallelism(5); wordCounts.print(); env.execute("Word Count Example");
- operators、data sources、data sinks都可以调用setParallelism()方法来设置parallelism
Execution Environment Level
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); DataStream<String> text = [...] DataStream<Tuple2<String, Integer>> wordCounts = [...] wordCounts.print(); env.execute("Word Count Example");
- 在ExecutionEnvironment里头可以通过setParallelism来给operators、data sources、data sinks设置默认的parallelism;如果operators、data sources、data sinks自己有设置parallelism则会覆盖ExecutionEnvironment设置的parallelism
Client Level
./bin/flink run -p 10 ../examples/*WordCount-java*.jar
或者
try { PackagedProgram program = new PackagedProgram(file, args); InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123"); Configuration config = new Configuration(); Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader()); // set the parallelism to 10 here client.run(program, 10, true); } catch (ProgramInvocationException e) { e.printStackTrace(); }
- 使用CLI client,可以在命令行调用是用-p来指定,或者Java/Scala调用时在Client.run的参数中指定parallelism
System Level
# The parallelism used for programs that did not specify and other parallelism. parallelism.default: 1
- 可以在flink-conf.yaml中通过parallelism.default配置项给所有execution environments指定系统级的默认parallelism
ExecutionEnvironment
flink-java-1.7.1-sources.jar!/org/apache/flink/api/java/ExecutionEnvironment.java
@Public public abstract class ExecutionEnvironment { //...... private final ExecutionConfig config = new ExecutionConfig(); /** * Sets the parallelism for operations executed through this environment. * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run with * x parallel instances. * * <p>This method overrides the default parallelism for this environment. * The {@link LocalEnvironment} uses by default a value equal to the number of hardware * contexts (CPU cores / threads). When executing the program via the command line client * from a JAR file, the default parallelism is the one configured for that setup. * * @param parallelism The parallelism */ public void setParallelism(int parallelism) { config.setParallelism(parallelism); } @Internal public Plan createProgramPlan(String jobName, boolean clearSinks) { if (this.sinks.isEmpty()) { if (wasExecuted) { throw new RuntimeException("No new data sinks have been defined since the " + "last execution. The last execution refers to the latest call to " + "'execute()', 'count()', 'collect()', or 'print()'."); } else { throw new RuntimeException("No data sinks have been created yet. " + "A program needs at least one sink that consumes data. " + "Examples are writing the data set or printing it."); } } if (jobName == null) { jobName = getDefaultName(); } OperatorTranslation translator = new OperatorTranslation(); Plan plan = translator.translateToPlan(this.sinks, jobName); if (getParallelism() > 0) { plan.setDefaultParallelism(getParallelism()); } plan.setExecutionConfig(getConfig()); // Check plan for GenericTypeInfo's and register the types at the serializers. if (!config.isAutoTypeRegistrationDisabled()) { plan.accept(new Visitor<org.apache.flink.api.common.operators.Operator<?>>() { private final Set<Class<?>> registeredTypes = new HashSet<>(); private final Set<org.apache.flink.api.common.operators.Operator<?>> visitedOperators = new HashSet<>(); @Override public boolean preVisit(org.apache.flink.api.common.operators.Operator<?> visitable) { if (!visitedOperators.add(visitable)) { return false; } OperatorInformation<?> opInfo = visitable.getOperatorInfo(); Serializers.recursivelyRegisterType(opInfo.getOutputType(), config, registeredTypes); return true; } @Override public void postVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {} }); } try { registerCachedFilesWithPlan(plan); } catch (Exception e) { throw new RuntimeException("Error while registering cached files: " + e.getMessage(), e); } // clear all the sinks such that the next execution does not redo everything if (clearSinks) { this.sinks.clear(); wasExecuted = true; } // All types are registered now. Print information. int registeredTypes = config.getRegisteredKryoTypes().size() + config.getRegisteredPojoTypes().size() + config.getRegisteredTypesWithKryoSerializerClasses().size() + config.getRegisteredTypesWithKryoSerializers().size(); int defaultKryoSerializers = config.getDefaultKryoSerializers().size() + config.getDefaultKryoSerializerClasses().size(); LOG.info("The job has {} registered types and {} default Kryo serializers", registeredTypes, defaultKryoSerializers); if (config.isForceKryoEnabled() && config.isForceAvroEnabled()) { LOG.warn("In the ExecutionConfig, both Avro and Kryo are enforced. Using Kryo serializer"); } if (config.isForceKryoEnabled()) { LOG.info("Using KryoSerializer for serializing POJOs"); } if (config.isForceAvroEnabled()) { LOG.info("Using AvroSerializer for serializing POJOs"); } if (LOG.isDebugEnabled()) { LOG.debug("Registered Kryo types: {}", config.getRegisteredKryoTypes().toString()); LOG.debug("Registered Kryo with Serializers types: {}", config.getRegisteredTypesWithKryoSerializers().entrySet().toString()); LOG.debug("Registered Kryo with Serializer Classes types: {}", config.getRegisteredTypesWithKryoSerializerClasses().entrySet().toString()); LOG.debug("Registered Kryo default Serializers: {}", config.getDefaultKryoSerializers().entrySet().toString()); LOG.debug("Registered Kryo default Serializers Classes {}", config.getDefaultKryoSerializerClasses().entrySet().toString()); LOG.debug("Registered POJO types: {}", config.getRegisteredPojoTypes().toString()); // print information about static code analysis LOG.debug("Static code analysis mode: {}", config.getCodeAnalysisMode()); } return plan; } //...... }
- ExecutionEnvironment提供了setParallelism方法,给ExecutionConfig指定parallelism;最后createProgramPlan方法创建Plan后会读取ExecutionConfig的parallelism,给Plan设置defaultParallelism
DataStreamSource
flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@Public public class DataStreamSource<T> extends SingleOutputStreamOperator<T> { boolean isParallel; public DataStreamSource(StreamExecutionEnvironment environment, TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator, boolean isParallel, String sourceName) { super(environment, new SourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism())); this.isParallel = isParallel; if (!isParallel) { setParallelism(1); } } public DataStreamSource(SingleOutputStreamOperator<T> operator) { super(operator.environment, operator.getTransformation()); this.isParallel = true; } @Override public DataStreamSource<T> setParallelism(int parallelism) { if (parallelism != 1 && !isParallel) { throw new IllegalArgumentException("Source: " + transformation.getId() + " is not a parallel source"); } else { super.setParallelism(parallelism); return this; } } }
- DataStreamSource继承了SingleOutputStreamOperator,它提供了setParallelism方法,最终调用的是父类SingleOutputStreamOperator的setParallelism
SingleOutputStreamOperator
flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@Public public class SingleOutputStreamOperator<T> extends DataStream<T> { //...... /** * Sets the parallelism for this operator. * * @param parallelism * The parallelism for this operator. * @return The operator with set parallelism. */ public SingleOutputStreamOperator<T> setParallelism(int parallelism) { Preconditions.checkArgument(canBeParallel() || parallelism == 1, "The parallelism of non parallel operator must be 1."); transformation.setParallelism(parallelism); return this; } //...... }
- SingleOutputStreamOperator的setParallelism最后是作用到StreamTransformation
DataStreamSink
flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@Public public class DataStreamSink<T> { private final SinkTransformation<T> transformation; //...... /** * Sets the parallelism for this sink. The degree must be higher than zero. * * @param parallelism The parallelism for this sink. * @return The sink with set parallelism. */ public DataStreamSink<T> setParallelism(int parallelism) { transformation.setParallelism(parallelism); return this; } //...... }
- DataStreamSink提供了setParallelism方法,最后是作用于SinkTransformation
LocalEnvironment
flink-java-1.7.1-sources.jar!/org/apache/flink/api/java/LocalEnvironment.java
@Public public class LocalEnvironment extends ExecutionEnvironment { //...... public JobExecutionResult execute(String jobName) throws Exception { if (executor == null) { startNewSession(); } Plan p = createProgramPlan(jobName); // Session management is disabled, revert this commit to enable //p.setJobId(jobID); //p.setSessionTimeout(sessionTimeout); JobExecutionResult result = executor.executePlan(p); this.lastJobExecutionResult = result; return result; } //...... }
- LocalEnvironment的execute调用的是LocalExecutor的executePlan
LocalExecutor
flink-clients_2.11-1.7.1-sources.jar!/org/apache/flink/client/LocalExecutor.java
public class LocalExecutor extends PlanExecutor { //...... @Override public JobExecutionResult executePlan(Plan plan) throws Exception { if (plan == null) { throw new IllegalArgumentException("The plan may not be null."); } synchronized (this.lock) { // check if we start a session dedicated for this execution final boolean shutDownAtEnd; if (jobExecutorService == null) { shutDownAtEnd = true; // configure the number of local slots equal to the parallelism of the local plan if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) { int maxParallelism = plan.getMaximumParallelism(); if (maxParallelism > 0) { this.taskManagerNumSlots = maxParallelism; } } // start the cluster for us start(); } else { // we use the existing session shutDownAtEnd = false; } try { // TODO: Set job's default parallelism to max number of slots final int slotsPerTaskManager = jobExecutorServiceConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, taskManagerNumSlots); final int numTaskManagers = jobExecutorServiceConfiguration.getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers); Optimizer pc = new Optimizer(new DataStatistics(), jobExecutorServiceConfiguration); OptimizedPlan op = pc.compile(plan); JobGraphGenerator jgg = new JobGraphGenerator(jobExecutorServiceConfiguration); JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId()); return jobExecutorService.executeJobBlocking(jobGraph); } finally { if (shutDownAtEnd) { stop(); } } } } //...... }
- LocalExecutor的executePlan方法还会根据slotsPerTaskManager及numTaskManagers对plan设置defaultParallelism
RemoteEnvironment
flink-java-1.7.1-sources.jar!/org/apache/flink/api/java/RemoteEnvironment.java
@Public public class RemoteEnvironment extends ExecutionEnvironment { //...... public JobExecutionResult execute(String jobName) throws Exception { PlanExecutor executor = getExecutor(); Plan p = createProgramPlan(jobName); // Session management is disabled, revert this commit to enable //p.setJobId(jobID); //p.setSessionTimeout(sessionTimeout); JobExecutionResult result = executor.executePlan(p); this.lastJobExecutionResult = result; return result; } //...... }
- RemoteEnvironment的execute调用的是RemoteExecutor的executePlan
RemoteExecutor
flink-clients_2.11-1.7.1-sources.jar!/org/apache/flink/client/RemoteExecutor.java
public class RemoteExecutor extends PlanExecutor { private final Object lock = new Object(); private final List<URL> jarFiles; private final List<URL> globalClasspaths; private final Configuration clientConfiguration; private ClusterClient<?> client; //...... @Override public JobExecutionResult executePlan(Plan plan) throws Exception { if (plan == null) { throw new IllegalArgumentException("The plan may not be null."); } JobWithJars p = new JobWithJars(plan, this.jarFiles, this.globalClasspaths); return executePlanWithJars(p); } public JobExecutionResult executePlanWithJars(JobWithJars program) throws Exception { if (program == null) { throw new IllegalArgumentException("The job may not be null."); } synchronized (this.lock) { // check if we start a session dedicated for this execution final boolean shutDownAtEnd; if (client == null) { shutDownAtEnd = true; // start the executor for us start(); } else { // we use the existing session shutDownAtEnd = false; } try { return client.run(program, defaultParallelism).getJobExecutionResult(); } finally { if (shutDownAtEnd) { stop(); } } } } //...... }
- RemoteExecutor的executePlan调用了executePlanWithJars方法,而后者则调用了ClusterClient的run,并在参数中指定了defaultParallelism
ClusterClient
flink-clients_2.11-1.7.1-sources.jar!/org/apache/flink/client/program/ClusterClient.java
public abstract class ClusterClient<T> { //...... public JobSubmissionResult run(JobWithJars program, int parallelism) throws ProgramInvocationException { return run(program, parallelism, SavepointRestoreSettings.none()); } public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings) throws CompilerException, ProgramInvocationException { ClassLoader classLoader = jobWithJars.getUserCodeClassLoader(); if (classLoader == null) { throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader."); } OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, parallelism); return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointSettings); } private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars prog, int parallelism) throws CompilerException, ProgramInvocationException { return getOptimizedPlan(compiler, prog.getPlan(), parallelism); } public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException { Logger log = LoggerFactory.getLogger(ClusterClient.class); if (parallelism > 0 && p.getDefaultParallelism() <= 0) { log.debug("Changing plan default parallelism from {} to {}", p.getDefaultParallelism(), parallelism); p.setDefaultParallelism(parallelism); } log.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism()); return compiler.compile(p); } //...... }
- ClusterClient的run方法中的parallelism最后作用到Plan中
小结
如果使用CLI client,可以在命令行调用是用-p来指定,或者Java/Scala调用时在Client.run的参数中指定parallelism
doc
以上所述就是小编给大家介绍的《聊聊flink的Parallel Execution》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
编程珠玑(第2版•修订版)
[美] Jon Bentley 乔恩•本特利 / 黄倩、钱丽艳 / 人民邮电出版社 / 2014-12 / 39
历史上最伟大的计算机科学著作之一 融深邃思想、实战技术与趣味轶事于一炉的奇书 带你真正领略计算机科学之美 多年以来,当程序员们推选出最心爱的计算机图书时,《编程珠玑》总是位于前列。正如自然界里珍珠出自细沙对牡蛎的磨砺,计算机科学大师Jon Bentley以其独有的洞察力和创造力,从磨砺程序员的实际问题中凝结出一篇篇不朽的编程“珠玑”,成为世界计算机界名刊《ACM通讯》历史上最受欢......一起来看看 《编程珠玑(第2版•修订版)》 这本书的介绍吧!