内容简介:本文主要研究一下flink的TimerServiceflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/TimerService.javaflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/SimpleTimerService.java
序
本文主要研究一下flink的TimerService
TimerService
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/TimerService.java
@PublicEvolving public interface TimerService { String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams."; String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams."; long currentProcessingTime(); long currentWatermark(); void registerProcessingTimeTimer(long time); void registerEventTimeTimer(long time); void deleteProcessingTimeTimer(long time); void deleteEventTimeTimer(long time); }
- TimerService接口定义了currentProcessingTime、currentWatermark、registerProcessingTimeTimer、registerEventTimeTimer、deleteProcessingTimeTimer、deleteEventTimeTimer接口
SimpleTimerService
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/SimpleTimerService.java
@Internal public class SimpleTimerService implements TimerService { private final InternalTimerService<VoidNamespace> internalTimerService; public SimpleTimerService(InternalTimerService<VoidNamespace> internalTimerService) { this.internalTimerService = internalTimerService; } @Override public long currentProcessingTime() { return internalTimerService.currentProcessingTime(); } @Override public long currentWatermark() { return internalTimerService.currentWatermark(); } @Override public void registerProcessingTimeTimer(long time) { internalTimerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, time); } @Override public void registerEventTimeTimer(long time) { internalTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, time); } @Override public void deleteProcessingTimeTimer(long time) { internalTimerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, time); } @Override public void deleteEventTimeTimer(long time) { internalTimerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, time); } }
- SimpleTimerService实现了TimerService,它是委托InternalTimerService来实现
InternalTimerService
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimerService.java
@Internal public interface InternalTimerService<N> { long currentProcessingTime(); long currentWatermark(); void registerProcessingTimeTimer(N namespace, long time); void deleteProcessingTimeTimer(N namespace, long time); void registerEventTimeTimer(N namespace, long time); void deleteEventTimeTimer(N namespace, long time); }
- InternalTimerService是TimerService的internal版本的接口,比起TimerService它定义了namespace,在registerProcessingTimeTimer、deleteProcessingTimeTimer、registerEventTimeTimer、deleteEventTimeTimer的方法中均多了一个namesapce的参数
InternalTimerServiceImpl
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N>, ProcessingTimeCallback { private final ProcessingTimeService processingTimeService; private final KeyContext keyContext; private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue; private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue; private final KeyGroupRange localKeyGroupRange; private final int localKeyGroupRangeStartIdx; private long currentWatermark = Long.MIN_VALUE; private ScheduledFuture<?> nextTimer; // Variables to be set when the service is started. private TypeSerializer<K> keySerializer; private TypeSerializer<N> namespaceSerializer; private Triggerable<K, N> triggerTarget; private volatile boolean isInitialized; private TypeSerializer<K> keyDeserializer; private TypeSerializer<N> namespaceDeserializer; private InternalTimersSnapshot<K, N> restoredTimersSnapshot; InternalTimerServiceImpl( KeyGroupRange localKeyGroupRange, KeyContext keyContext, ProcessingTimeService processingTimeService, KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue, KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue) { this.keyContext = checkNotNull(keyContext); this.processingTimeService = checkNotNull(processingTimeService); this.localKeyGroupRange = checkNotNull(localKeyGroupRange); this.processingTimeTimersQueue = checkNotNull(processingTimeTimersQueue); this.eventTimeTimersQueue = checkNotNull(eventTimeTimersQueue); // find the starting index of the local key-group range int startIdx = Integer.MAX_VALUE; for (Integer keyGroupIdx : localKeyGroupRange) { startIdx = Math.min(keyGroupIdx, startIdx); } this.localKeyGroupRangeStartIdx = startIdx; } public void startTimerService( TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerTarget) { if (!isInitialized) { if (keySerializer == null || namespaceSerializer == null) { throw new IllegalArgumentException("The TimersService serializers cannot be null."); } if (this.keySerializer != null || this.namespaceSerializer != null || this.triggerTarget != null) { throw new IllegalStateException("The TimerService has already been initialized."); } // the following is the case where we restore if (restoredTimersSnapshot != null) { CompatibilityResult<K> keySerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult( this.keyDeserializer, null, restoredTimersSnapshot.getKeySerializerConfigSnapshot(), keySerializer); CompatibilityResult<N> namespaceSerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult( this.namespaceDeserializer, null, restoredTimersSnapshot.getNamespaceSerializerConfigSnapshot(), namespaceSerializer); if (keySerializerCompatibility.isRequiresMigration() || namespaceSerializerCompatibility.isRequiresMigration()) { throw new IllegalStateException("Tried to initialize restored TimerService " + "with incompatible serializers than those used to snapshot its state."); } } this.keySerializer = keySerializer; this.namespaceSerializer = namespaceSerializer; this.keyDeserializer = null; this.namespaceDeserializer = null; this.triggerTarget = Preconditions.checkNotNull(triggerTarget); // re-register the restored timers (if any) final InternalTimer<K, N> headTimer = processingTimeTimersQueue.peek(); if (headTimer != null) { nextTimer = processingTimeService.registerTimer(headTimer.getTimestamp(), this); } this.isInitialized = true; } else { if (!(this.keySerializer.equals(keySerializer) && this.namespaceSerializer.equals(namespaceSerializer))) { throw new IllegalArgumentException("Already initialized Timer Service " + "tried to be initialized with different key and namespace serializers."); } } } @Override public long currentProcessingTime() { return processingTimeService.getCurrentProcessingTime(); } @Override public long currentWatermark() { return currentWatermark; } @Override public void registerProcessingTimeTimer(N namespace, long time) { InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek(); if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) { long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE; // check if we need to re-schedule our timer to earlier if (time < nextTriggerTime) { if (nextTimer != null) { nextTimer.cancel(false); } nextTimer = processingTimeService.registerTimer(time, this); } } } @Override public void registerEventTimeTimer(N namespace, long time) { eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace)); } @Override public void deleteProcessingTimeTimer(N namespace, long time) { processingTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace)); } @Override public void deleteEventTimeTimer(N namespace, long time) { eventTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace)); } @Override public void onProcessingTime(long time) throws Exception { // null out the timer in case the Triggerable calls registerProcessingTimeTimer() // inside the callback. nextTimer = null; InternalTimer<K, N> timer; while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) { processingTimeTimersQueue.poll(); keyContext.setCurrentKey(timer.getKey()); triggerTarget.onProcessingTime(timer); } if (timer != null && nextTimer == null) { nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this); } } public void advanceWatermark(long time) throws Exception { currentWatermark = time; InternalTimer<K, N> timer; while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) { eventTimeTimersQueue.poll(); keyContext.setCurrentKey(timer.getKey()); triggerTarget.onEventTime(timer); } } public InternalTimersSnapshot<K, N> snapshotTimersForKeyGroup(int keyGroupIdx) { return new InternalTimersSnapshot<>( keySerializer, keySerializer.snapshotConfiguration(), namespaceSerializer, namespaceSerializer.snapshotConfiguration(), eventTimeTimersQueue.getSubsetForKeyGroup(keyGroupIdx), processingTimeTimersQueue.getSubsetForKeyGroup(keyGroupIdx)); } @SuppressWarnings("unchecked") public void restoreTimersForKeyGroup(InternalTimersSnapshot<?, ?> restoredSnapshot, int keyGroupIdx) { this.restoredTimersSnapshot = (InternalTimersSnapshot<K, N>) restoredSnapshot; if (areSnapshotSerializersIncompatible(restoredSnapshot)) { throw new IllegalArgumentException("Tried to restore timers " + "for the same service with different serializers."); } this.keyDeserializer = restoredTimersSnapshot.getKeySerializer(); this.namespaceDeserializer = restoredTimersSnapshot.getNamespaceSerializer(); checkArgument(localKeyGroupRange.contains(keyGroupIdx), "Key Group " + keyGroupIdx + " does not belong to the local range."); // restore the event time timers eventTimeTimersQueue.addAll(restoredTimersSnapshot.getEventTimeTimers()); // restore the processing time timers processingTimeTimersQueue.addAll(restoredTimersSnapshot.getProcessingTimeTimers()); } @VisibleForTesting public int numProcessingTimeTimers() { return this.processingTimeTimersQueue.size(); } @VisibleForTesting public int numEventTimeTimers() { return this.eventTimeTimersQueue.size(); } @VisibleForTesting public int numProcessingTimeTimers(N namespace) { return countTimersInNamespaceInternal(namespace, processingTimeTimersQueue); } @VisibleForTesting public int numEventTimeTimers(N namespace) { return countTimersInNamespaceInternal(namespace, eventTimeTimersQueue); } private int countTimersInNamespaceInternal(N namespace, InternalPriorityQueue<TimerHeapInternalTimer<K, N>> queue) { int count = 0; try (final CloseableIterator<TimerHeapInternalTimer<K, N>> iterator = queue.iterator()) { while (iterator.hasNext()) { final TimerHeapInternalTimer<K, N> timer = iterator.next(); if (timer.getNamespace().equals(namespace)) { count++; } } } catch (Exception e) { throw new FlinkRuntimeException("Exception when closing iterator.", e); } return count; } @VisibleForTesting int getLocalKeyGroupRangeStartIdx() { return this.localKeyGroupRangeStartIdx; } @VisibleForTesting List<Set<TimerHeapInternalTimer<K, N>>> getEventTimeTimersPerKeyGroup() { return partitionElementsByKeyGroup(eventTimeTimersQueue); } @VisibleForTesting List<Set<TimerHeapInternalTimer<K, N>>> getProcessingTimeTimersPerKeyGroup() { return partitionElementsByKeyGroup(processingTimeTimersQueue); } private <T> List<Set<T>> partitionElementsByKeyGroup(KeyGroupedInternalPriorityQueue<T> keyGroupedQueue) { List<Set<T>> result = new ArrayList<>(localKeyGroupRange.getNumberOfKeyGroups()); for (int keyGroup : localKeyGroupRange) { result.add(Collections.unmodifiableSet(keyGroupedQueue.getSubsetForKeyGroup(keyGroup))); } return result; } private boolean areSnapshotSerializersIncompatible(InternalTimersSnapshot<?, ?> restoredSnapshot) { return (this.keyDeserializer != null && !this.keyDeserializer.equals(restoredSnapshot.getKeySerializer())) || (this.namespaceDeserializer != null && !this.namespaceDeserializer.equals(restoredSnapshot.getNamespaceSerializer())); } }
-
InternalTimerServiceImpl实现了InternalTimerService及ProcessingTimeCallback(
定义了onProcessingTime方法
)接口 -
startTimerService方法主要是初始化keySerializer、namespaceSerializer、triggerTarget属性;registerEventTimeTimer及deleteEventTimeTimer方法使用的是eventTimeTimersQueue;registerProcessingTimeTimer及deleteProcessingTimeTimer方法使用的是processingTimeTimersQueue(
eventTimeTimersQueue及processingTimeTimersQueue的类型为KeyGroupedInternalPriorityQueue,queue的元素类型为TimerHeapInternalTimer
) -
eventTimerTimer的触发主要是在advanceWatermark方法中(
AbstractStreamOperator的processWatermark方法会调用InternalTimeServiceManager的advanceWatermark方法,而该方法调用的是InternalTimerServiceImpl的advanceWatermark方法
),它会移除timestamp小于等于指定time的eventTimerTimer,然后回调triggerTarget.onEventTime方法;而processingTimeTimer的触发则是在onProcessingTime方法中(SystemProcessingTimeService的TriggerTask及RepeatedTriggerTask的定时任务会回调ProcessingTimeCallback的onProcessingTime方法
),它会移除timestamp小于等于指定time的processingTimeTimer,然后回调triggerTarget.onProcessingTime方法
Triggerable
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/Triggerable.java
@Internal public interface Triggerable<K, N> { /** * Invoked when an event-time timer fires. */ void onEventTime(InternalTimer<K, N> timer) throws Exception; /** * Invoked when a processing-time timer fires. */ void onProcessingTime(InternalTimer<K, N> timer) throws Exception; }
- Triggerable接口定义了InternalTimerService会调用的onEventTime及onProcessingTime方法;WindowOperator、IntervalJoinOperator、KeyedProcessOperator、KeyedCoProcessOperator等operator均实现了Triggerable接口,可以响应timer的onEventTime或onProcessingTime的回调
小结
- TimerService接口定义了currentProcessingTime、currentWatermark、registerProcessingTimeTimer、registerEventTimeTimer、deleteProcessingTimeTimer、deleteEventTimeTimer接口;它有一个实现类为SimpleTimerService,而SimpleTimerService主要是委托给InternalTimerService来实现
-
InternalTimerService是TimerService的internal版本的接口,比起TimerService它定义了namespace,在registerProcessingTimeTimer、deleteProcessingTimeTimer、registerEventTimeTimer、deleteEventTimeTimer的方法中均多了一个namesapce的参数;它的实现类为InternalTimerServiceImpl;InternalTimerServiceImpl实现了InternalTimerService及ProcessingTimeCallback(
定义了onProcessingTime方法
)接口,其registerEventTimeTimer及deleteEventTimeTimer方法使用的是eventTimeTimersQueue;registerProcessingTimeTimer及deleteProcessingTimeTimer方法使用的是processingTimeTimersQueue(eventTimeTimersQueue及processingTimeTimersQueue的类型为KeyGroupedInternalPriorityQueue,queue的元素类型为TimerHeapInternalTimer
) -
InternalTimerServiceImpl的eventTimerTimer的触发主要是在advanceWatermark方法中(
AbstractStreamOperator的processWatermark方法会调用InternalTimeServiceManager的advanceWatermark方法,而该方法调用的是InternalTimerServiceImpl的advanceWatermark方法
),它会移除timestamp小于等于指定time的eventTimerTimer,然后回调triggerTarget.onEventTime方法 -
InternalTimerServiceImpl的processingTimeTimer的触发则是在onProcessingTime方法中(
SystemProcessingTimeService的TriggerTask及RepeatedTriggerTask的定时任务会回调ProcessingTimeCallback的onProcessingTime方法
),它会移除timestamp小于等于指定time的processingTimeTimer,然后回调triggerTarget.onProcessingTime方法 - Triggerable接口定义了InternalTimerService会调用的onEventTime及onProcessingTime方法;WindowOperator、IntervalJoinOperator、KeyedProcessOperator、KeyedCoProcessOperator等operator均实现了Triggerable接口,可以响应timer的onEventTime或onProcessingTime的回调
doc
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
JSON 在线解析
在线 JSON 格式化工具
Base64 编码/解码
Base64 编码/解码