聊聊flink的SourceFunction

栏目: 编程工具 · 发布时间: 7年前

内容简介:flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.javaflink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.javaflink-runtime_2.11-1.6.2-source
// set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> dataStreamSource = env.addSource(new RandomWordSource());

        dataStreamSource.map(new UpperCaseMapFunc()).print();

        env.execute("sourceFunctionDemo");
复制代码
  • 这里通过addSource方法来添加自定义的SourceFunction

SourceFunction

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.java

/**
 * Base interface for all stream data sources in Flink. The contract of a stream source
 * is the following: When the source should start emitting elements, the {@link #run} method
 * is called with a {@link SourceContext} that can be used for emitting elements.
 * The run method can run for as long as necessary. The source must, however, react to an
 * invocation of {@link #cancel()} by breaking out of its main loop.
 *
 * <h3>CheckpointedFunction Sources</h3>
 *
 * <p>Sources that also implement the {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction}
 * interface must ensure that state checkpointing, updating of internal state and emission of
 * elements are not done concurrently. This is achieved by using the provided checkpointing lock
 * object to protect update of state and emission of elements in a synchronized block.
 *
 * <p>This is the basic pattern one should follow when implementing a checkpointed source:
 *
 * <pre>{@code
 *  public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {
 *      private long count = 0L;
 *      private volatile boolean isRunning = true;
 *
 *      private transient ListState<Long> checkpointedCount;
 *
 *      public void run(SourceContext<T> ctx) {
 *          while (isRunning && count < 1000) {
 *              // this synchronized block ensures that state checkpointing,
 *              // internal state updates and emission of elements are an atomic operation
 *              synchronized (ctx.getCheckpointLock()) {
 *                  ctx.collect(count);
 *                  count++;
 *              }
 *          }
 *      }
 *
 *      public void cancel() {
 *          isRunning = false;
 *      }
 *
 *      public void initializeState(FunctionInitializationContext context) {
 *          this.checkpointedCount = context
 *              .getOperatorStateStore()
 *              .getListState(new ListStateDescriptor<>("count", Long.class));
 *
 *          if (context.isRestored()) {
 *              for (Long count : this.checkpointedCount.get()) {
 *                  this.count = count;
 *              }
 *          }
 *      }
 *
 *      public void snapshotState(FunctionSnapshotContext context) {
 *          this.checkpointedCount.clear();
 *          this.checkpointedCount.add(count);
 *      }
 * }
 * }</pre>
 *
 *
 * <h3>Timestamps and watermarks:</h3>
 * Sources may assign timestamps to elements and may manually emit watermarks.
 * However, these are only interpreted if the streaming program runs on
 * {@link TimeCharacteristic#EventTime}. On other time characteristics
 * ({@link TimeCharacteristic#IngestionTime} and {@link TimeCharacteristic#ProcessingTime}),
 * the watermarks from the source function are ignored.
 *
 * <h3>Gracefully Stopping Functions</h3>
 * Functions may additionally implement the {@link org.apache.flink.api.common.functions.StoppableFunction}
 * interface. "Stopping" a function, in contrast to "canceling" means a graceful exit that leaves the
 * state and the emitted elements in a consistent state.
 *
 * <p>When a source is stopped, the executing thread is not interrupted, but expected to leave the
 * {@link #run(SourceContext)} method in reasonable time on its own, preserving the atomicity
 * of state updates and element emission.
 *
 * @param <T> The type of the elements produced by this source.
 *
 * @see org.apache.flink.api.common.functions.StoppableFunction
 * @see org.apache.flink.streaming.api.TimeCharacteristic
 */
@Public
public interface SourceFunction<T> extends Function, Serializable {

	/**
	 * Starts the source. Implementations can use the {@link SourceContext} emit
	 * elements.
	 *
	 * <p>Sources that implement {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction}
	 * must lock on the checkpoint lock (using a synchronized block) before updating internal
	 * state and emitting elements, to make both an atomic operation:
	 *
	 * <pre>{@code
	 *  public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {
	 *      private long count = 0L;
	 *      private volatile boolean isRunning = true;
	 *
	 *      private transient ListState<Long> checkpointedCount;
	 *
	 *      public void run(SourceContext<T> ctx) {
	 *          while (isRunning && count < 1000) {
	 *              // this synchronized block ensures that state checkpointing,
	 *              // internal state updates and emission of elements are an atomic operation
	 *              synchronized (ctx.getCheckpointLock()) {
	 *                  ctx.collect(count);
	 *                  count++;
	 *              }
	 *          }
	 *      }
	 *
	 *      public void cancel() {
	 *          isRunning = false;
	 *      }
	 *
	 *      public void initializeState(FunctionInitializationContext context) {
	 *          this.checkpointedCount = context
	 *              .getOperatorStateStore()
	 *              .getListState(new ListStateDescriptor<>("count", Long.class));
	 *
	 *          if (context.isRestored()) {
	 *              for (Long count : this.checkpointedCount.get()) {
	 *                  this.count = count;
	 *              }
	 *          }
	 *      }
	 *
	 *      public void snapshotState(FunctionSnapshotContext context) {
	 *          this.checkpointedCount.clear();
	 *          this.checkpointedCount.add(count);
	 *      }
	 * }
	 * }</pre>
	 *
	 * @param ctx The context to emit elements to and for accessing locks.
	 */
	void run(SourceContext<T> ctx) throws Exception;

	/**
	 * Cancels the source. Most sources will have a while loop inside the
	 * {@link #run(SourceContext)} method. The implementation needs to ensure that the
	 * source will break out of that loop after this method is called.
	 *
	 * <p>A typical pattern is to have an {@code "volatile boolean isRunning"} flag that is set to
	 * {@code false} in this method. That flag is checked in the loop condition.
	 *
	 * <p>When a source is canceled, the executing thread will also be interrupted
	 * (via {@link Thread#interrupt()}). The interruption happens strictly after this
	 * method has been called, so any interruption handler can rely on the fact that
	 * this method has completed. It is good practice to make any flags altered by
	 * this method "volatile", in order to guarantee the visibility of the effects of
	 * this method to any interruption handler.
	 */
	void cancel();

	// ------------------------------------------------------------------------
	//  source context
	// ------------------------------------------------------------------------

	/**
	 * Interface that source functions use to emit elements, and possibly watermarks.
	 *
	 * @param <T> The type of the elements produced by the source.
	 */
	@Public // Interface might be extended in the future with additional methods.
	interface SourceContext<T> {

		//......
	}
}
复制代码
  • SourceFunction是flink stream data sources的基本接口,这里头定义了run方法以及cancel方法,同时定义了SourceContext接口

SourceContext

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.java

/**
	 * Interface that source functions use to emit elements, and possibly watermarks.
	 *
	 * @param <T> The type of the elements produced by the source.
	 */
	@Public // Interface might be extended in the future with additional methods.
	interface SourceContext<T> {

		/**
		 * Emits one element from the source, without attaching a timestamp. In most cases,
		 * this is the default way of emitting elements.
		 *
		 * <p>The timestamp that the element will get assigned depends on the time characteristic of
		 * the streaming program:
		 * <ul>
		 *     <li>On {@link TimeCharacteristic#ProcessingTime}, the element has no timestamp.</li>
		 *     <li>On {@link TimeCharacteristic#IngestionTime}, the element gets the system's
		 *         current time as the timestamp.</li>
		 *     <li>On {@link TimeCharacteristic#EventTime}, the element will have no timestamp initially.
		 *         It needs to get a timestamp (via a {@link TimestampAssigner}) before any time-dependent
		 *         operation (like time windows).</li>
		 * </ul>
		 *
		 * @param element The element to emit
		 */
		void collect(T element);

		/**
		 * Emits one element from the source, and attaches the given timestamp. This method
		 * is relevant for programs using {@link TimeCharacteristic#EventTime}, where the
		 * sources assign timestamps themselves, rather than relying on a {@link TimestampAssigner}
		 * on the stream.
		 *
		 * <p>On certain time characteristics, this timestamp may be ignored or overwritten.
		 * This allows programs to switch between the different time characteristics and behaviors
		 * without changing the code of the source functions.
		 * <ul>
		 *     <li>On {@link TimeCharacteristic#ProcessingTime}, the timestamp will be ignored,
		 *         because processing time never works with element timestamps.</li>
		 *     <li>On {@link TimeCharacteristic#IngestionTime}, the timestamp is overwritten with the
		 *         system's current time, to realize proper ingestion time semantics.</li>
		 *     <li>On {@link TimeCharacteristic#EventTime}, the timestamp will be used.</li>
		 * </ul>
		 *
		 * @param element The element to emit
		 * @param timestamp The timestamp in milliseconds since the Epoch
		 */
		@PublicEvolving
		void collectWithTimestamp(T element, long timestamp);

		/**
		 * Emits the given {@link Watermark}. A Watermark of value {@code t} declares that no
		 * elements with a timestamp {@code t' <= t} will occur any more. If further such
		 * elements will be emitted, those elements are considered <i>late</i>.
		 *
		 * <p>This method is only relevant when running on {@link TimeCharacteristic#EventTime}.
		 * On {@link TimeCharacteristic#ProcessingTime},Watermarks will be ignored. On
		 * {@link TimeCharacteristic#IngestionTime}, the Watermarks will be replaced by the
		 * automatic ingestion time watermarks.
		 *
		 * @param mark The Watermark to emit
		 */
		@PublicEvolving
		void emitWatermark(Watermark mark);

		/**
		 * Marks the source to be temporarily idle. This tells the system that this source will
		 * temporarily stop emitting records and watermarks for an indefinite amount of time. This
		 * is only relevant when running on {@link TimeCharacteristic#IngestionTime} and
		 * {@link TimeCharacteristic#EventTime}, allowing downstream tasks to advance their
		 * watermarks without the need to wait for watermarks from this source while it is idle.
		 *
		 * <p>Source functions should make a best effort to call this method as soon as they
		 * acknowledge themselves to be idle. The system will consider the source to resume activity
		 * again once {@link SourceContext#collect(T)}, {@link SourceContext#collectWithTimestamp(T, long)},
		 * or {@link SourceContext#emitWatermark(Watermark)} is called to emit elements or watermarks from the source.
		 */
		@PublicEvolving
		void markAsTemporarilyIdle();

		/**
		 * Returns the checkpoint lock. Please refer to the class-level comment in
		 * {@link SourceFunction} for details about how to write a consistent checkpointed
		 * source.
		 *
		 * @return The object to use as the lock
		 */
		Object getCheckpointLock();

		/**
		 * This method is called by the system to shut down the context.
		 */
		void close();
	}
复制代码
  • SourceContext主要定义了数据源发射数据的接口,这里是collect方法( 如果数据本身没有时间,则在使用TimeCharacteristic.EventTime的时候,可以使用TimestampAssigner在进行依赖时间的相关操作时指定timestamp;如果是配合TimeCharacteristic.IngestionTime,则无需指定,系统会自动生成timestamp );除了collect方法外,还有collectWithTimestamp发射数据同时指定timestamp( 配合TimeCharacteristic.EventTime使用 )
  • 此外还定义了emitWatermark方法,用于处理数据乱序时,只考虑哪些时间范围内的数据,这个只有在配合TimeCharacteristic.EventTime的时候才有效;如果是TimeCharacteristic.ProcessingTime则watermark会被忽略;如果是TimeCharacteristic.IngestionTime则watermark会被自动生成的ingestion time watermarks替代
  • 这里还定义了markAsTemporarilyIdle方法,用于告诉系统当前的source会暂停发射数据一段时间,这个只在配合使用TimeCharacteristic.IngestionTime或者TimeCharacteristic.EventTime的时候才有效;当SourceContext.collect(T)或者SourceContext.collectWithTimestamp(T, long)或者SourceContext.emitWatermark(Watermark)被调用时,系统会认为source又恢复回来继续生产数据
  • 这里还定义了getCheckpointLock方法,用于返回checkpoint的lock,方便source处理checkpoint相关的逻辑
  • close方法主要给系统来调用,用于关闭context相关的资源

Task.run( 上游 )

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java

/**
 * The Task represents one execution of a parallel subtask on a TaskManager.
 * A Task wraps a Flink operator (which may be a user function) and
 * runs it, providing all services necessary for example to consume input data,
 * produce its results (intermediate result partitions) and communicate
 * with the JobManager.
 *
 * <p>The Flink operators (implemented as subclasses of
 * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
 * The task connects those to the network stack and actor messages, and tracks the state
 * of the execution and handles exceptions.
 *
 * <p>Tasks have no knowledge about how they relate to other tasks, or whether they
 * are the first attempt to execute the task, or a repeated attempt. All of that
 * is only known to the JobManager. All the task knows are its own runnable code,
 * the task's configuration, and the IDs of the intermediate results to consume and
 * produce (if any).
 *
 * <p>Each Task is run by one dedicated thread.
 */
public class Task implements Runnable, TaskActions, CheckpointListener {
    //......

    /**
     * The core work method that bootstraps the task and executes its code.
     */
    @Override
    public void run() {
            //......
            // now load and instantiate the task's invokable code
            invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

            // ----------------------------------------------------------------
            //  actual task core work
            // ----------------------------------------------------------------

            // we must make strictly sure that the invokable is accessible to the cancel() call
            // by the time we switched to running.
            this.invokable = invokable;

            // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }

            // notify everyone that we switched to running
            notifyObservers(ExecutionState.RUNNING, null);
            taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

            // make sure the user code classloader is accessible thread-locally
            executingThread.setContextClassLoader(userCodeClassLoader);

            // run the invokable
            invokable.invoke();

            //......
    }
}
复制代码
  • Task的run方法会调用invokable.invoke(),这里的invokable为StreamTask

StreamTask.invoke

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java

/**
 * Base class for all streaming tasks. A task is the unit of local processing that is deployed
 * and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form
 * the Task's operator chain. Operators that are chained together execute synchronously in the
 * same thread and hence on the same stream partition. A common case for these chains
 * are successive map/flatmap/filter tasks.
 *
 * <p>The task chain contains one "head" operator and multiple chained operators.
 * The StreamTask is specialized for the type of the head operator: one-input and two-input tasks,
 * as well as for sources, iteration heads and iteration tails.
 *
 * <p>The Task class deals with the setup of the streams read by the head operator, and the streams
 * produced by the operators at the ends of the operator chain. Note that the chain may fork and
 * thus have multiple ends.
 *
 * <p>The life cycle of the task is set up as follows:
 * <pre>{@code
 *  -- setInitialState -> provides state of all operators in the chain
 *
 *  -- invoke()
 *        |
 *        +----> Create basic utils (config, etc) and load the chain of operators
 *        +----> operators.setup()
 *        +----> task specific init()
 *        +----> initialize-operator-states()
 *        +----> open-operators()
 *        +----> run()
 *        +----> close-operators()
 *        +----> dispose-operators()
 *        +----> common cleanup
 *        +----> task specific cleanup()
 * }</pre>
 *
 * <p>The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a
 * {@code StreamOperator} must be synchronized on this lock object to ensure that no methods
 * are called concurrently.
 *
 * @param <OUT>
 * @param <OP>
 */
@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
        extends AbstractInvokable
        implements AsyncExceptionHandler {

        //......

    @Override
    public final void invoke() throws Exception {

        boolean disposed = false;
        try {
            //......

            // let the task do its work
            isRunning = true;
            run();

            // if this left the run() method cleanly despite the fact that this was canceled,
            // make sure the "clean shutdown" is not attempted
            if (canceled) {
                throw new CancelTaskException();
            }

            LOG.debug("Finished task {}", getName());

            //......
        }
        finally {
            // clean up everything we initialized
            isRunning = false;

            //......
        }
    }
}
复制代码
  • StreamTask的invoke方法里头调用了子类的run方法,这里子类为SourceStreamTask

SourceStreamTask.run

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java

@Override
	protected void run() throws Exception {
		headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
	}
复制代码
  • SourceStreamTask的run方法主要调用headOperator的run方法,这里的headOperator为SourceStream

SourceStream.run

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java

public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception {
		run(lockingObject, streamStatusMaintainer, output);
	}

	public void run(final Object lockingObject,
			final StreamStatusMaintainer streamStatusMaintainer,
			final Output<StreamRecord<OUT>> collector) throws Exception {

		final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();

		final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
		final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured()
			? getExecutionConfig().getLatencyTrackingInterval()
			: configuration.getLong(MetricOptions.LATENCY_INTERVAL);

		LatencyMarksEmitter<OUT> latencyEmitter = null;
		if (latencyTrackingInterval > 0) {
			latencyEmitter = new LatencyMarksEmitter<>(
				getProcessingTimeService(),
				collector,
				latencyTrackingInterval,
				this.getOperatorID(),
				getRuntimeContext().getIndexOfThisSubtask());
		}

		final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();

		this.ctx = StreamSourceContexts.getSourceContext(
			timeCharacteristic,
			getProcessingTimeService(),
			lockingObject,
			streamStatusMaintainer,
			collector,
			watermarkInterval,
			-1);

		try {
			userFunction.run(ctx);

			// if we get here, then the user function either exited after being done (finite source)
			// or the function was canceled or stopped. For the finite source case, we should emit
			// a final watermark that indicates that we reached the end of event-time
			if (!isCanceledOrStopped()) {
				ctx.emitWatermark(Watermark.MAX_WATERMARK);
			}
		} finally {
			// make sure that the context is closed in any case
			ctx.close();
			if (latencyEmitter != null) {
				latencyEmitter.close();
			}
		}
	}
复制代码
  • SourceStream的run方法,这里先通过StreamSourceContexts.getSourceContext构造SourceFunction.SourceContext,然后调用userFunction的run方法,这里的userFunction为RandomWordSource,即用户自定义的SourceFunction( 这里要注意在调用userFunction.run(ctx)之前,如果latencyTrackingInterval大于0,还创建了LatencyMarksEmitter )

RandomWordSource.run

public class RandomWordSource implements SourceFunction<String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(RandomWordSource.class);

    private volatile boolean isRunning = true;

    private static final String[] words = new String[]{"The", "brown", "fox", "quick", "jump", "sucky", "5dolla"};

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (isRunning) {
            Thread.sleep(300);
            int rnd = (int) (Math.random() * 10 % words.length);
            LOGGER.info("emit word: {}", words[rnd]);
            ctx.collect(words[rnd]);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}
复制代码
  • RandomWordSource的run方法会一直循环发射数据

StreamSource.LatencyMarksEmitter

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.java

private static class LatencyMarksEmitter<OUT> {
		private final ScheduledFuture<?> latencyMarkTimer;

		public LatencyMarksEmitter(
				final ProcessingTimeService processingTimeService,
				final Output<StreamRecord<OUT>> output,
				long latencyTrackingInterval,
				final OperatorID operatorId,
				final int subtaskIndex) {

			latencyMarkTimer = processingTimeService.scheduleAtFixedRate(
				new ProcessingTimeCallback() {
					@Override
					public void onProcessingTime(long timestamp) throws Exception {
						try {
							// ProcessingTimeService callbacks are executed under the checkpointing lock
							output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex));
						} catch (Throwable t) {
							// we catch the Throwables here so that we don't trigger the processing
							// timer services async exception handler
							LOG.warn("Error while emitting latency marker.", t);
						}
					}
				},
				0L,
				latencyTrackingInterval);
		}

		public void close() {
			latencyMarkTimer.cancel(true);
		}
	}
复制代码
  • LatencyMarksEmitter是在StreamSource的run方法里头,调用userFunction的run方法前创建的( 如果latencyTrackingInterval>0的话 ),这里的latencyTrackingInterval先调用getExecutionConfig().isLatencyTrackingConfigured()判断executionConfig是否有配置该值,有配置的话则使用getExecutionConfig().getLatencyTrackingInterval()返回的值,没有配置的话则使用configuration.getLong(MetricOptions.LATENCY_INTERVAL)返回的值,后者默认是2000L( 这里使用的是后者的配置,即为2000 )
  • LatencyMarksEmitter的构造器里头调用processingTimeService.scheduleAtFixedRate方法注册了一个fixedRate的定时任务,调度间隔为latencyTrackingInterval
  • 定时任务的处理内容在ProcessingTimeCallback的onProcessTime方法,里头调用了output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex))来发送LatencyMarker;这里的processingTimeService为SystemProcessingTimeService;这里的output为AbstractStreamOperator.CountingOutput

SystemProcessingTimeService.scheduleAtFixedRate

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java

@Override
	public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) {
		long nextTimestamp = getCurrentProcessingTime() + initialDelay;

		// we directly try to register the timer and only react to the status on exception
		// that way we save unnecessary volatile accesses for each timer
		try {
			return timerService.scheduleAtFixedRate(
				new RepeatedTriggerTask(status, task, checkpointLock, callback, nextTimestamp, period),
				initialDelay,
				period,
				TimeUnit.MILLISECONDS);
		} catch (RejectedExecutionException e) {
			final int status = this.status.get();
			if (status == STATUS_QUIESCED) {
				return new NeverCompleteFuture(initialDelay);
			}
			else if (status == STATUS_SHUTDOWN) {
				throw new IllegalStateException("Timer service is shut down");
			}
			else {
				// something else happened, so propagate the exception
				throw e;
			}
		}
	}

	@Override
	public long getCurrentProcessingTime() {
		return System.currentTimeMillis();
	}
复制代码
  • SystemProcessingTimeService的scheduleAtFixedRate方法,实际是委托timerService的scheduleAtFixedRate来执行的,这里的timerService即ScheduledThreadPoolExecutor,它的corePoolSize为1,然后它调度的任务是RepeatedTriggerTask

RepeatedTriggerTask

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java

/**
	 * Internal task which is repeatedly called by the processing time service.
	 */
	private static final class RepeatedTriggerTask implements Runnable {

		private final AtomicInteger serviceStatus;
		private final Object lock;
		private final ProcessingTimeCallback target;
		private final long period;
		private final AsyncExceptionHandler exceptionHandler;

		private long nextTimestamp;

		private RepeatedTriggerTask(
				final AtomicInteger serviceStatus,
				final AsyncExceptionHandler exceptionHandler,
				final Object lock,
				final ProcessingTimeCallback target,
				final long nextTimestamp,
				final long period) {

			this.serviceStatus = Preconditions.checkNotNull(serviceStatus);
			this.lock = Preconditions.checkNotNull(lock);
			this.target = Preconditions.checkNotNull(target);
			this.period = period;
			this.exceptionHandler = Preconditions.checkNotNull(exceptionHandler);

			this.nextTimestamp = nextTimestamp;
		}

		@Override
		public void run() {
			synchronized (lock) {
				try {
					if (serviceStatus.get() == STATUS_ALIVE) {
						target.onProcessingTime(nextTimestamp);
					}

					nextTimestamp += period;
				} catch (Throwable t) {
					TimerException asyncException = new TimerException(t);
					exceptionHandler.handleAsyncException("Caught exception while processing repeated timer task.", asyncException);
				}
			}
		}
	}
复制代码
  • RepeatedTriggerTask会在serviceStatus为STATUS_ALIVE的时候,调用ProcessingTimeCallback的onProcessingTime;这里的nextTimestamp最初传进来的是依据getCurrentProcessingTime() + initialDelay来算的,之后不断累加period

AbstractStreamOperator.CountingOutput.emitLatencyMarker

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java

/**
	 * Wrapping {@link Output} that updates metrics on the number of emitted elements.
	 */
	public static class CountingOutput<OUT> implements Output<StreamRecord<OUT>> {
		private final Output<StreamRecord<OUT>> output;
		private final Counter numRecordsOut;

		public CountingOutput(Output<StreamRecord<OUT>> output, Counter counter) {
			this.output = output;
			this.numRecordsOut = counter;
		}

		@Override
		public void emitWatermark(Watermark mark) {
			output.emitWatermark(mark);
		}

		@Override
		public void emitLatencyMarker(LatencyMarker latencyMarker) {
			output.emitLatencyMarker(latencyMarker);
		}

		@Override
		public void collect(StreamRecord<OUT> record) {
			numRecordsOut.inc();
			output.collect(record);
		}

		@Override
		public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
			numRecordsOut.inc();
			output.collect(outputTag, record);
		}

		@Override
		public void close() {
			output.close();
		}
	}
复制代码
  • 它实际包装的是RecordWriterOutput

RecordWriterOutput.emitLatencyMarker

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java

/**
 * Implementation of {@link Output} that sends data using a {@link RecordWriter}.
 */
@Internal
public class RecordWriterOutput<OUT> implements OperatorChain.WatermarkGaugeExposingOutput<StreamRecord<OUT>> {

	private StreamRecordWriter<SerializationDelegate<StreamElement>> recordWriter;

	private SerializationDelegate<StreamElement> serializationDelegate;

	//......

	@Override
	public void emitLatencyMarker(LatencyMarker latencyMarker) {
		serializationDelegate.setInstance(latencyMarker);

		try {
			recordWriter.randomEmit(serializationDelegate);
		}
		catch (Exception e) {
			throw new RuntimeException(e.getMessage(), e);
		}
	}
}
复制代码
  • 这里的emitLatencyMarker主要调用了StreamRecordWriter的randomEmit( 它实际上是通过父类RecordWriter来发射 ),来发射LatencyMarker

RecordWriter

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java

/**
	 * This is used to send LatencyMarks to a random target channel.
	 */
	public void randomEmit(T record) throws IOException, InterruptedException {
		sendToTarget(record, rng.nextInt(numChannels));
	}

	private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
		RecordSerializer<T> serializer = serializers[targetChannel];

		SerializationResult result = serializer.addRecord(record);

		while (result.isFullBuffer()) {
			if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {
				// If this was a full record, we are done. Not breaking
				// out of the loop at this point will lead to another
				// buffer request before breaking out (that would not be
				// a problem per se, but it can lead to stalls in the
				// pipeline).
				if (result.isFullRecord()) {
					break;
				}
			}
			BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);

			result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
		}
		checkState(!serializer.hasSerializedData(), "All data should be written at once");

		if (flushAlways) {
			targetPartition.flush(targetChannel);
		}
	}
复制代码
  • RecordWriter的randomEmit就是随机选择一个targetChannel,然后进行发送

Task.run( 下游 )

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java

/**
 * The Task represents one execution of a parallel subtask on a TaskManager.
 * A Task wraps a Flink operator (which may be a user function) and
 * runs it, providing all services necessary for example to consume input data,
 * produce its results (intermediate result partitions) and communicate
 * with the JobManager.
 *
 * <p>The Flink operators (implemented as subclasses of
 * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
 * The task connects those to the network stack and actor messages, and tracks the state
 * of the execution and handles exceptions.
 *
 * <p>Tasks have no knowledge about how they relate to other tasks, or whether they
 * are the first attempt to execute the task, or a repeated attempt. All of that
 * is only known to the JobManager. All the task knows are its own runnable code,
 * the task's configuration, and the IDs of the intermediate results to consume and
 * produce (if any).
 *
 * <p>Each Task is run by one dedicated thread.
 */
public class Task implements Runnable, TaskActions, CheckpointListener {
    //......

    /**
     * The core work method that bootstraps the task and executes its code.
     */
    @Override
    public void run() {
            //......
            // now load and instantiate the task's invokable code
            invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

            // ----------------------------------------------------------------
            //  actual task core work
            // ----------------------------------------------------------------

            // we must make strictly sure that the invokable is accessible to the cancel() call
            // by the time we switched to running.
            this.invokable = invokable;

            // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }

            // notify everyone that we switched to running
            notifyObservers(ExecutionState.RUNNING, null);
            taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

            // make sure the user code classloader is accessible thread-locally
            executingThread.setContextClassLoader(userCodeClassLoader);

            // run the invokable
            invokable.invoke();

            //......
    }
}
复制代码
  • 下游的Task的run方法会调用invokable.invoke(),这里的invokable为OneInputStreamTask

OneInputStreamTask

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java

@Override
	protected void run() throws Exception {
		// cache processor reference on the stack, to make the code more JIT friendly
		final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;

		while (running && inputProcessor.processInput()) {
			// all the work happens in the "processInput" method
		}
	}
复制代码
  • Task的run方法会调用StreamTask的invoke方法,而invoke方法会调用OneInputStreamTask的run方法这里主要是不断循环调用inputProcessor.processInput();这里的inputProcessor为StreamInputProcessor

StreamInputProcessor

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java

public boolean processInput() throws Exception {
		if (isFinished) {
			return false;
		}
		if (numRecordsIn == null) {
			try {
				numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
			} catch (Exception e) {
				LOG.warn("An exception occurred during the metrics setup.", e);
				numRecordsIn = new SimpleCounter();
			}
		}

		while (true) {
			if (currentRecordDeserializer != null) {
				DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);

				if (result.isBufferConsumed()) {
					currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
					currentRecordDeserializer = null;
				}

				if (result.isFullRecord()) {
					StreamElement recordOrMark = deserializationDelegate.getInstance();

					if (recordOrMark.isWatermark()) {
						// handle watermark
						statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
						continue;
					} else if (recordOrMark.isStreamStatus()) {
						// handle stream status
						statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
						continue;
					} else if (recordOrMark.isLatencyMarker()) {
						// handle latency marker
						synchronized (lock) {
							streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
						}
						continue;
					} else {
						// now we can do the actual processing
						StreamRecord<IN> record = recordOrMark.asRecord();
						synchronized (lock) {
							numRecordsIn.inc();
							streamOperator.setKeyContextElement1(record);
							streamOperator.processElement(record);
						}
						return true;
					}
				}
			}

			final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
			if (bufferOrEvent != null) {
				if (bufferOrEvent.isBuffer()) {
					currentChannel = bufferOrEvent.getChannelIndex();
					currentRecordDeserializer = recordDeserializers[currentChannel];
					currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
				}
				else {
					// Event received
					final AbstractEvent event = bufferOrEvent.getEvent();
					if (event.getClass() != EndOfPartitionEvent.class) {
						throw new IOException("Unexpected event: " + event);
					}
				}
			}
			else {
				isFinished = true;
				if (!barrierHandler.isEmpty()) {
					throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
				}
				return false;
			}
		}
	}
复制代码
  • processInput方法首先调用currentRecordDeserializer.getNextRecord(deserializationDelegate)获取nextRecord,然后只有当result.isFullRecord()的时候才进行处理
  • 处理的时候会根据StreamElement的不同类型进行不同处理,主要分为watermark、streamStatus、latencyMakrker及正常的数据这几类来处理
  • 如果是正常的数据,则调用streamOperator.processElement(record),这里的streamOperator为StreamMap

StreamMap.processElement

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamMap.java

/**
 * A {@link StreamOperator} for executing {@link MapFunction MapFunctions}.
 */
@Internal
public class StreamMap<IN, OUT>
		extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
		implements OneInputStreamOperator<IN, OUT> {

	private static final long serialVersionUID = 1L;

	public StreamMap(MapFunction<IN, OUT> mapper) {
		super(mapper);
		chainingStrategy = ChainingStrategy.ALWAYS;
	}

	@Override
	public void processElement(StreamRecord<IN> element) throws Exception {
		output.collect(element.replace(userFunction.map(element.getValue())));
	}
}
复制代码
  • 这里调用了userFunction.map(element.getValue())来进行map操作,这里的userFunction即为UpperCaseMapFunc

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Single Page Web Applications

Single Page Web Applications

Michael Mikowski、Josh Powell / Manning Publications / 2013-9-30 / USD 44.99

Code for most web sites mostly runs on the server. When a user clicks on a link, the site reacts slowly because the browser sends information to the server and the server sends it back again before di......一起来看看 《Single Page Web Applications》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具