内容简介:flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.javaflink-core-1.6.2-sources.jar!/org/apache/flink/api/common/functions/AbstractRichFunction.javaflink-core-1.6.2-sources.jar!/org/apache/f
/** * Base class for implementing a parallel data source. Upon execution, the runtime will * execute as many parallel instances of this function function as configured parallelism * of the source. * * <p>The data source has access to context information (such as the number of parallel * instances of the source, and which parallel instance the current instance is) * via {@link #getRuntimeContext()}. It also provides additional life-cycle methods * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.</p> * * @param <OUT> The type of the records produced by this source. */ @Public public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction implements ParallelSourceFunction<OUT> { private static final long serialVersionUID = 1L; } 复制代码
- RichParallelSourceFunction实现了ParallelSourceFunction接口,同时继承了AbstractRichFunction
ParallelSourceFunction
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
/** * A stream data source that is executed in parallel. Upon execution, the runtime will * execute as many parallel instances of this function function as configured parallelism * of the source. * * <p>This interface acts only as a marker to tell the system that this source may * be executed in parallel. When different parallel instances are required to perform * different tasks, use the {@link RichParallelSourceFunction} to get access to the runtime * context, which reveals information like the number of parallel tasks, and which parallel * task the current instance is. * * @param <OUT> The type of the records produced by this source. */ @Public public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> { } 复制代码
- ParallelSourceFunction继承了SourceFunction接口,它并没有定义其他额外的方法,仅仅是用接口名来表达意图,即可以被并行执行的stream data source
AbstractRichFunction
flink-core-1.6.2-sources.jar!/org/apache/flink/api/common/functions/AbstractRichFunction.java
/** * An abstract stub implementation for rich user-defined functions. * Rich functions have additional methods for initialization ({@link #open(Configuration)}) and * teardown ({@link #close()}), as well as access to their runtime execution context via * {@link #getRuntimeContext()}. */ @Public public abstract class AbstractRichFunction implements RichFunction, Serializable { private static final long serialVersionUID = 1L; // -------------------------------------------------------------------------------------------- // Runtime context access // -------------------------------------------------------------------------------------------- private transient RuntimeContext runtimeContext; @Override public void setRuntimeContext(RuntimeContext t) { this.runtimeContext = t; } @Override public RuntimeContext getRuntimeContext() { if (this.runtimeContext != null) { return this.runtimeContext; } else { throw new IllegalStateException("The runtime context has not been initialized."); } } @Override public IterationRuntimeContext getIterationRuntimeContext() { if (this.runtimeContext == null) { throw new IllegalStateException("The runtime context has not been initialized."); } else if (this.runtimeContext instanceof IterationRuntimeContext) { return (IterationRuntimeContext) this.runtimeContext; } else { throw new IllegalStateException("This stub is not part of an iteration step function."); } } // -------------------------------------------------------------------------------------------- // Default life cycle methods // -------------------------------------------------------------------------------------------- @Override public void open(Configuration parameters) throws Exception {} @Override public void close() throws Exception {} } 复制代码
- AbstractRichFunction主要实现了RichFunction接口的setRuntimeContext、getRuntimeContext、getIterationRuntimeContext方法;open及close方法都是空操作
RuntimeContext
flink-core-1.6.2-sources.jar!/org/apache/flink/api/common/functions/RuntimeContext.java
/** * A RuntimeContext contains information about the context in which functions are executed. Each parallel instance * of the function will have a context through which it can access static contextual information (such as * the current parallelism) and other constructs like accumulators and broadcast variables. * * <p>A function can, during runtime, obtain the RuntimeContext via a call to * {@link AbstractRichFunction#getRuntimeContext()}. */ @Public public interface RuntimeContext { /** * Returns the name of the task in which the UDF runs, as assigned during plan construction. * * @return The name of the task in which the UDF runs. */ String getTaskName(); /** * Returns the metric group for this parallel subtask. * * @return The metric group for this parallel subtask. */ @PublicEvolving MetricGroup getMetricGroup(); /** * Gets the parallelism with which the parallel task runs. * * @return The parallelism with which the parallel task runs. */ int getNumberOfParallelSubtasks(); /** * Gets the number of max-parallelism with which the parallel task runs. * * @return The max-parallelism with which the parallel task runs. */ @PublicEvolving int getMaxNumberOfParallelSubtasks(); /** * Gets the number of this parallel subtask. The numbering starts from 0 and goes up to * parallelism-1 (parallelism as returned by {@link #getNumberOfParallelSubtasks()}). * * @return The index of the parallel subtask. */ int getIndexOfThisSubtask(); /** * Gets the attempt number of this parallel subtask. First attempt is numbered 0. * * @return Attempt number of the subtask. */ int getAttemptNumber(); /** * Returns the name of the task, appended with the subtask indicator, such as "MyTask (3/6)", * where 3 would be ({@link #getIndexOfThisSubtask()} + 1), and 6 would be * {@link #getNumberOfParallelSubtasks()}. * * @return The name of the task, with subtask indicator. */ String getTaskNameWithSubtasks(); /** * Returns the {@link org.apache.flink.api.common.ExecutionConfig} for the currently executing * job. */ ExecutionConfig getExecutionConfig(); //....... } 复制代码
- RuntimeContext定义了很多方法,这里我们看下getNumberOfParallelSubtasks方法,它可以返回当前的task的parallelism;而getIndexOfThisSubtask则可以获取当前parallel subtask的下标;可以根据这些信息,开发既能并行执行但各自发射的数据又不重复的ParallelSourceFunction
JobMaster.startJobExecution
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/jobmaster/JobMaster.java
private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception { validateRunsInMainThread(); checkNotNull(newJobMasterId, "The new JobMasterId must not be null."); if (Objects.equals(getFencingToken(), newJobMasterId)) { log.info("Already started the job execution with JobMasterId {}.", newJobMasterId); return Acknowledge.get(); } setNewFencingToken(newJobMasterId); startJobMasterServices(); log.info("Starting execution of job {} ({})", jobGraph.getName(), jobGraph.getJobID()); resetAndScheduleExecutionGraph(); return Acknowledge.get(); } private void resetAndScheduleExecutionGraph() throws Exception { validateRunsInMainThread(); final CompletableFuture<Void> executionGraphAssignedFuture; if (executionGraph.getState() == JobStatus.CREATED) { executionGraphAssignedFuture = CompletableFuture.completedFuture(null); } else { suspendAndClearExecutionGraphFields(new FlinkException("ExecutionGraph is being reset in order to be rescheduled.")); final JobManagerJobMetricGroup newJobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph); final ExecutionGraph newExecutionGraph = createAndRestoreExecutionGraph(newJobManagerJobMetricGroup); executionGraphAssignedFuture = executionGraph.getTerminationFuture().handleAsync( (JobStatus ignored, Throwable throwable) -> { assignExecutionGraph(newExecutionGraph, newJobManagerJobMetricGroup); return null; }, getMainThreadExecutor()); } executionGraphAssignedFuture.thenRun(this::scheduleExecutionGraph); } private void scheduleExecutionGraph() { checkState(jobStatusListener == null); // register self as job status change listener jobStatusListener = new JobManagerJobStatusListener(); executionGraph.registerJobStatusListener(jobStatusListener); try { executionGraph.scheduleForExecution(); } catch (Throwable t) { executionGraph.failGlobal(t); } } 复制代码
- 这里调用了resetAndScheduleExecutionGraph方法,而resetAndScheduleExecutionGraph则组合了scheduleExecutionGraph方法;scheduleExecutionGraph这里调用executionGraph.scheduleForExecution()来调度执行
ExecutionGraph.scheduleForExecution
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
public void scheduleForExecution() throws JobException { final long currentGlobalModVersion = globalModVersion; if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) { final CompletableFuture<Void> newSchedulingFuture; switch (scheduleMode) { case LAZY_FROM_SOURCES: newSchedulingFuture = scheduleLazy(slotProvider); break; case EAGER: newSchedulingFuture = scheduleEager(slotProvider, allocationTimeout); break; default: throw new JobException("Schedule mode is invalid."); } if (state == JobStatus.RUNNING && currentGlobalModVersion == globalModVersion) { schedulingFuture = newSchedulingFuture; newSchedulingFuture.whenCompleteAsync( (Void ignored, Throwable throwable) -> { if (throwable != null && !(throwable instanceof CancellationException)) { // only fail if the scheduling future was not canceled failGlobal(ExceptionUtils.stripCompletionException(throwable)); } }, futureExecutor); } else { newSchedulingFuture.cancel(false); } } else { throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED); } } 复制代码
- 这里走的是EAGER模式,因而调用scheduleEager方法
ExecutionGraph.scheduleEager
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
/** * * * @param slotProvider The resource provider from which the slots are allocated * @param timeout The maximum time that the deployment may take, before a * TimeoutException is thrown. * @returns Future which is completed once the {@link ExecutionGraph} has been scheduled. * The future can also be completed exceptionally if an error happened. */ private CompletableFuture<Void> scheduleEager(SlotProvider slotProvider, final Time timeout) { checkState(state == JobStatus.RUNNING, "job is not running currently"); // Important: reserve all the space we need up front. // that way we do not have any operation that can fail between allocating the slots // and adding them to the list. If we had a failure in between there, that would // cause the slots to get lost final boolean queued = allowQueuedScheduling; // collecting all the slots may resize and fail in that operation without slots getting lost final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); // allocate the slots (obtain all their futures for (ExecutionJobVertex ejv : getVerticesTopologically()) { // these calls are not blocking, they only return futures Collection<CompletableFuture<Execution>> allocationFutures = ejv.allocateResourcesForAll( slotProvider, queued, LocationPreferenceConstraint.ALL, allocationTimeout); allAllocationFutures.addAll(allocationFutures); } // this future is complete once all slot futures are complete. // the future fails once one slot future fails. final ConjunctFuture<Collection<Execution>> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures); final CompletableFuture<Void> currentSchedulingFuture = allAllocationsFuture .thenAccept( (Collection<Execution> executionsToDeploy) -> { for (Execution execution : executionsToDeploy) { try { execution.deploy(); } catch (Throwable t) { throw new CompletionException( new FlinkException( String.format("Could not deploy execution %s.", execution), t)); } } }) // Generate a more specific failure message for the eager scheduling .exceptionally( (Throwable throwable) -> { final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); final Throwable resultThrowable; if (strippedThrowable instanceof TimeoutException) { int numTotal = allAllocationsFuture.getNumFuturesTotal(); int numComplete = allAllocationsFuture.getNumFuturesCompleted(); String message = "Could not allocate all requires slots within timeout of " + timeout + ". Slots required: " + numTotal + ", slots allocated: " + numComplete; resultThrowable = new NoResourceAvailableException(message); } else { resultThrowable = strippedThrowable; } throw new CompletionException(resultThrowable); }); return currentSchedulingFuture; } 复制代码
- scheduleEager方法这里先调用getVerticesTopologically来获取ExecutionJobVertex
- 之后调用ExecutionJobVertex.allocateResourcesForAll来分配资源得到Collection<CompletableFuture<Execution>>
- 最后通过FutureUtils.combineAll(allAllocationFutures)等待这批Future,之后挨个调用execution.deploy()进行部署
ExecutionJobVertex.allocateResourcesForAll
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
/** * Acquires a slot for all the execution vertices of this ExecutionJobVertex. The method returns * pairs of the slots and execution attempts, to ease correlation between vertices and execution * attempts. * * <p>If this method throws an exception, it makes sure to release all so far requested slots. * * @param resourceProvider The resource provider from whom the slots are requested. * @param queued if the allocation can be queued * @param locationPreferenceConstraint constraint for the location preferences * @param allocationTimeout timeout for allocating the individual slots */ public Collection<CompletableFuture<Execution>> allocateResourcesForAll( SlotProvider resourceProvider, boolean queued, LocationPreferenceConstraint locationPreferenceConstraint, Time allocationTimeout) { final ExecutionVertex[] vertices = this.taskVertices; final CompletableFuture<Execution>[] slots = new CompletableFuture[vertices.length]; // try to acquire a slot future for each execution. // we store the execution with the future just to be on the safe side for (int i = 0; i < vertices.length; i++) { // allocate the next slot (future) final Execution exec = vertices[i].getCurrentExecutionAttempt(); final CompletableFuture<Execution> allocationFuture = exec.allocateAndAssignSlotForExecution( resourceProvider, queued, locationPreferenceConstraint, allocationTimeout); slots[i] = allocationFuture; } // all good, we acquired all slots return Arrays.asList(slots); } 复制代码
- 这里根据ExecutionJobVertex的taskVertices来挨个调用exec.allocateAndAssignSlotForExecution进行分配;可以发现整个并行度由taskVertices来决定
Execution.deploy
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/executiongraph/Execution.java
/** * Deploys the execution to the previously assigned resource. * * @throws JobException if the execution cannot be deployed to the assigned resource */ public void deploy() throws JobException { final LogicalSlot slot = assignedResource; checkNotNull(slot, "In order to deploy the execution we first have to assign a resource via tryAssignResource."); //...... try { //...... final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor( attemptId, slot, taskRestore, attemptNumber); // null taskRestore to let it be GC'ed taskRestore = null; final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); final CompletableFuture<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, rpcTimeout); submitResultFuture.whenCompleteAsync( (ack, failure) -> { // only respond to the failure case if (failure != null) { if (failure instanceof TimeoutException) { String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')'; markFailed(new Exception( "Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation() + ") not responding after a rpcTimeout of " + rpcTimeout, failure)); } else { markFailed(failure); } } }, executor); } catch (Throwable t) { markFailed(t); ExceptionUtils.rethrow(t); } } 复制代码
- Execution.deploy会创建TaskDeploymentDescriptor,之后通过taskManagerGateway.submitTask提交这个deployment;之后就是触发TaskExecutor去触发Task的run方法
ExecutionJobVertex
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
private final ExecutionVertex[] taskVertices; public ExecutionJobVertex( ExecutionGraph graph, JobVertex jobVertex, int defaultParallelism, Time timeout, long initialGlobalModVersion, long createTimestamp) throws JobException { if (graph == null || jobVertex == null) { throw new NullPointerException(); } this.graph = graph; this.jobVertex = jobVertex; int vertexParallelism = jobVertex.getParallelism(); int numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism; final int configuredMaxParallelism = jobVertex.getMaxParallelism(); this.maxParallelismConfigured = (VALUE_NOT_SET != configuredMaxParallelism); // if no max parallelism was configured by the user, we calculate and set a default setMaxParallelismInternal(maxParallelismConfigured ? configuredMaxParallelism : KeyGroupRangeAssignment.computeDefaultMaxParallelism(numTaskVertices)); // verify that our parallelism is not higher than the maximum parallelism if (numTaskVertices > maxParallelism) { throw new JobException( String.format("Vertex %s's parallelism (%s) is higher than the max parallelism (%s). Please lower the parallelism or increase the max parallelism.", jobVertex.getName(), numTaskVertices, maxParallelism)); } this.parallelism = numTaskVertices; this.serializedTaskInformation = null; this.taskVertices = new ExecutionVertex[numTaskVertices]; //...... // create all task vertices for (int i = 0; i < numTaskVertices; i++) { ExecutionVertex vertex = new ExecutionVertex( this, i, producedDataSets, timeout, initialGlobalModVersion, createTimestamp, maxPriorAttemptsHistoryLength); this.taskVertices[i] = vertex; } //...... } 复制代码
- taskVertices是一个ExecutionVertex[],它的大小由numTaskVertices决定
-
ExecutionJobVertex先判断jobVertex.getParallelism()是否大于0(
一般大于0
),大于0则取jobVertex.getParallelism()的值为numTaskVertices;如果不大于0则取defaultParallelism(ExecutionGraph的attachJobGraph方法里头创建ExecutionJobVertex时,传递的defaultParallelism为1
) - 之后就是根据numTaskVertices挨个创建ExecutionVertex,放入到taskVertices数据中
-
而jobVertex的parallelism是StreamingJobGraphGenerator在createJobVertex方法中根据streamNode.getParallelism()来设置的(
如果streamNode.getParallelism()的值大于0的话
) -
streamNode的parallelism如果自己没有设置,则默认是取StreamExecutionEnvironment的parallelism(
详见DataStreamSource的构造器、DataStream.transform方法、DataStreamSink的构造器;DataStreamSource里头会将不是parallel类型的source的parallelism重置为1
);如果是LocalEnvironment的话,它默认是取Runtime.getRuntime().availableProcessors()
以上所述就是小编给大家介绍的《聊聊flink的RichParallelSourceFunction》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。