内容简介:本文主要研究一下flink的TimerServiceflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/TimerService.javaflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/SimpleTimerService.java
序
本文主要研究一下flink的TimerService
TimerService
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/TimerService.java
@PublicEvolving public interface TimerService { String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams."; String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams."; long currentProcessingTime(); long currentWatermark(); void registerProcessingTimeTimer(long time); void registerEventTimeTimer(long time); void deleteProcessingTimeTimer(long time); void deleteEventTimeTimer(long time); }
- TimerService接口定义了currentProcessingTime、currentWatermark、registerProcessingTimeTimer、registerEventTimeTimer、deleteProcessingTimeTimer、deleteEventTimeTimer接口
SimpleTimerService
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/SimpleTimerService.java
@Internal public class SimpleTimerService implements TimerService { private final InternalTimerService<VoidNamespace> internalTimerService; public SimpleTimerService(InternalTimerService<VoidNamespace> internalTimerService) { this.internalTimerService = internalTimerService; } @Override public long currentProcessingTime() { return internalTimerService.currentProcessingTime(); } @Override public long currentWatermark() { return internalTimerService.currentWatermark(); } @Override public void registerProcessingTimeTimer(long time) { internalTimerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, time); } @Override public void registerEventTimeTimer(long time) { internalTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, time); } @Override public void deleteProcessingTimeTimer(long time) { internalTimerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, time); } @Override public void deleteEventTimeTimer(long time) { internalTimerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, time); } }
- SimpleTimerService实现了TimerService,它是委托InternalTimerService来实现
InternalTimerService
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimerService.java
@Internal public interface InternalTimerService<N> { long currentProcessingTime(); long currentWatermark(); void registerProcessingTimeTimer(N namespace, long time); void deleteProcessingTimeTimer(N namespace, long time); void registerEventTimeTimer(N namespace, long time); void deleteEventTimeTimer(N namespace, long time); }
- InternalTimerService是TimerService的internal版本的接口,比起TimerService它定义了namespace,在registerProcessingTimeTimer、deleteProcessingTimeTimer、registerEventTimeTimer、deleteEventTimeTimer的方法中均多了一个namesapce的参数
InternalTimerServiceImpl
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N>, ProcessingTimeCallback { private final ProcessingTimeService processingTimeService; private final KeyContext keyContext; private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue; private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue; private final KeyGroupRange localKeyGroupRange; private final int localKeyGroupRangeStartIdx; private long currentWatermark = Long.MIN_VALUE; private ScheduledFuture<?> nextTimer; // Variables to be set when the service is started. private TypeSerializer<K> keySerializer; private TypeSerializer<N> namespaceSerializer; private Triggerable<K, N> triggerTarget; private volatile boolean isInitialized; private TypeSerializer<K> keyDeserializer; private TypeSerializer<N> namespaceDeserializer; private InternalTimersSnapshot<K, N> restoredTimersSnapshot; InternalTimerServiceImpl( KeyGroupRange localKeyGroupRange, KeyContext keyContext, ProcessingTimeService processingTimeService, KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue, KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue) { this.keyContext = checkNotNull(keyContext); this.processingTimeService = checkNotNull(processingTimeService); this.localKeyGroupRange = checkNotNull(localKeyGroupRange); this.processingTimeTimersQueue = checkNotNull(processingTimeTimersQueue); this.eventTimeTimersQueue = checkNotNull(eventTimeTimersQueue); // find the starting index of the local key-group range int startIdx = Integer.MAX_VALUE; for (Integer keyGroupIdx : localKeyGroupRange) { startIdx = Math.min(keyGroupIdx, startIdx); } this.localKeyGroupRangeStartIdx = startIdx; } public void startTimerService( TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerTarget) { if (!isInitialized) { if (keySerializer == null || namespaceSerializer == null) { throw new IllegalArgumentException("The TimersService serializers cannot be null."); } if (this.keySerializer != null || this.namespaceSerializer != null || this.triggerTarget != null) { throw new IllegalStateException("The TimerService has already been initialized."); } // the following is the case where we restore if (restoredTimersSnapshot != null) { CompatibilityResult<K> keySerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult( this.keyDeserializer, null, restoredTimersSnapshot.getKeySerializerConfigSnapshot(), keySerializer); CompatibilityResult<N> namespaceSerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult( this.namespaceDeserializer, null, restoredTimersSnapshot.getNamespaceSerializerConfigSnapshot(), namespaceSerializer); if (keySerializerCompatibility.isRequiresMigration() || namespaceSerializerCompatibility.isRequiresMigration()) { throw new IllegalStateException("Tried to initialize restored TimerService " + "with incompatible serializers than those used to snapshot its state."); } } this.keySerializer = keySerializer; this.namespaceSerializer = namespaceSerializer; this.keyDeserializer = null; this.namespaceDeserializer = null; this.triggerTarget = Preconditions.checkNotNull(triggerTarget); // re-register the restored timers (if any) final InternalTimer<K, N> headTimer = processingTimeTimersQueue.peek(); if (headTimer != null) { nextTimer = processingTimeService.registerTimer(headTimer.getTimestamp(), this); } this.isInitialized = true; } else { if (!(this.keySerializer.equals(keySerializer) && this.namespaceSerializer.equals(namespaceSerializer))) { throw new IllegalArgumentException("Already initialized Timer Service " + "tried to be initialized with different key and namespace serializers."); } } } @Override public long currentProcessingTime() { return processingTimeService.getCurrentProcessingTime(); } @Override public long currentWatermark() { return currentWatermark; } @Override public void registerProcessingTimeTimer(N namespace, long time) { InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek(); if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) { long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE; // check if we need to re-schedule our timer to earlier if (time < nextTriggerTime) { if (nextTimer != null) { nextTimer.cancel(false); } nextTimer = processingTimeService.registerTimer(time, this); } } } @Override public void registerEventTimeTimer(N namespace, long time) { eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace)); } @Override public void deleteProcessingTimeTimer(N namespace, long time) { processingTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace)); } @Override public void deleteEventTimeTimer(N namespace, long time) { eventTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace)); } @Override public void onProcessingTime(long time) throws Exception { // null out the timer in case the Triggerable calls registerProcessingTimeTimer() // inside the callback. nextTimer = null; InternalTimer<K, N> timer; while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) { processingTimeTimersQueue.poll(); keyContext.setCurrentKey(timer.getKey()); triggerTarget.onProcessingTime(timer); } if (timer != null && nextTimer == null) { nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this); } } public void advanceWatermark(long time) throws Exception { currentWatermark = time; InternalTimer<K, N> timer; while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) { eventTimeTimersQueue.poll(); keyContext.setCurrentKey(timer.getKey()); triggerTarget.onEventTime(timer); } } public InternalTimersSnapshot<K, N> snapshotTimersForKeyGroup(int keyGroupIdx) { return new InternalTimersSnapshot<>( keySerializer, keySerializer.snapshotConfiguration(), namespaceSerializer, namespaceSerializer.snapshotConfiguration(), eventTimeTimersQueue.getSubsetForKeyGroup(keyGroupIdx), processingTimeTimersQueue.getSubsetForKeyGroup(keyGroupIdx)); } @SuppressWarnings("unchecked") public void restoreTimersForKeyGroup(InternalTimersSnapshot<?, ?> restoredSnapshot, int keyGroupIdx) { this.restoredTimersSnapshot = (InternalTimersSnapshot<K, N>) restoredSnapshot; if (areSnapshotSerializersIncompatible(restoredSnapshot)) { throw new IllegalArgumentException("Tried to restore timers " + "for the same service with different serializers."); } this.keyDeserializer = restoredTimersSnapshot.getKeySerializer(); this.namespaceDeserializer = restoredTimersSnapshot.getNamespaceSerializer(); checkArgument(localKeyGroupRange.contains(keyGroupIdx), "Key Group " + keyGroupIdx + " does not belong to the local range."); // restore the event time timers eventTimeTimersQueue.addAll(restoredTimersSnapshot.getEventTimeTimers()); // restore the processing time timers processingTimeTimersQueue.addAll(restoredTimersSnapshot.getProcessingTimeTimers()); } @VisibleForTesting public int numProcessingTimeTimers() { return this.processingTimeTimersQueue.size(); } @VisibleForTesting public int numEventTimeTimers() { return this.eventTimeTimersQueue.size(); } @VisibleForTesting public int numProcessingTimeTimers(N namespace) { return countTimersInNamespaceInternal(namespace, processingTimeTimersQueue); } @VisibleForTesting public int numEventTimeTimers(N namespace) { return countTimersInNamespaceInternal(namespace, eventTimeTimersQueue); } private int countTimersInNamespaceInternal(N namespace, InternalPriorityQueue<TimerHeapInternalTimer<K, N>> queue) { int count = 0; try (final CloseableIterator<TimerHeapInternalTimer<K, N>> iterator = queue.iterator()) { while (iterator.hasNext()) { final TimerHeapInternalTimer<K, N> timer = iterator.next(); if (timer.getNamespace().equals(namespace)) { count++; } } } catch (Exception e) { throw new FlinkRuntimeException("Exception when closing iterator.", e); } return count; } @VisibleForTesting int getLocalKeyGroupRangeStartIdx() { return this.localKeyGroupRangeStartIdx; } @VisibleForTesting List<Set<TimerHeapInternalTimer<K, N>>> getEventTimeTimersPerKeyGroup() { return partitionElementsByKeyGroup(eventTimeTimersQueue); } @VisibleForTesting List<Set<TimerHeapInternalTimer<K, N>>> getProcessingTimeTimersPerKeyGroup() { return partitionElementsByKeyGroup(processingTimeTimersQueue); } private <T> List<Set<T>> partitionElementsByKeyGroup(KeyGroupedInternalPriorityQueue<T> keyGroupedQueue) { List<Set<T>> result = new ArrayList<>(localKeyGroupRange.getNumberOfKeyGroups()); for (int keyGroup : localKeyGroupRange) { result.add(Collections.unmodifiableSet(keyGroupedQueue.getSubsetForKeyGroup(keyGroup))); } return result; } private boolean areSnapshotSerializersIncompatible(InternalTimersSnapshot<?, ?> restoredSnapshot) { return (this.keyDeserializer != null && !this.keyDeserializer.equals(restoredSnapshot.getKeySerializer())) || (this.namespaceDeserializer != null && !this.namespaceDeserializer.equals(restoredSnapshot.getNamespaceSerializer())); } }
-
InternalTimerServiceImpl实现了InternalTimerService及ProcessingTimeCallback(
定义了onProcessingTime方法
)接口 -
startTimerService方法主要是初始化keySerializer、namespaceSerializer、triggerTarget属性;registerEventTimeTimer及deleteEventTimeTimer方法使用的是eventTimeTimersQueue;registerProcessingTimeTimer及deleteProcessingTimeTimer方法使用的是processingTimeTimersQueue(
eventTimeTimersQueue及processingTimeTimersQueue的类型为KeyGroupedInternalPriorityQueue,queue的元素类型为TimerHeapInternalTimer
) -
eventTimerTimer的触发主要是在advanceWatermark方法中(
AbstractStreamOperator的processWatermark方法会调用InternalTimeServiceManager的advanceWatermark方法,而该方法调用的是InternalTimerServiceImpl的advanceWatermark方法
),它会移除timestamp小于等于指定time的eventTimerTimer,然后回调triggerTarget.onEventTime方法;而processingTimeTimer的触发则是在onProcessingTime方法中(SystemProcessingTimeService的TriggerTask及RepeatedTriggerTask的定时任务会回调ProcessingTimeCallback的onProcessingTime方法
),它会移除timestamp小于等于指定time的processingTimeTimer,然后回调triggerTarget.onProcessingTime方法
Triggerable
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/Triggerable.java
@Internal public interface Triggerable<K, N> { /** * Invoked when an event-time timer fires. */ void onEventTime(InternalTimer<K, N> timer) throws Exception; /** * Invoked when a processing-time timer fires. */ void onProcessingTime(InternalTimer<K, N> timer) throws Exception; }
- Triggerable接口定义了InternalTimerService会调用的onEventTime及onProcessingTime方法;WindowOperator、IntervalJoinOperator、KeyedProcessOperator、KeyedCoProcessOperator等operator均实现了Triggerable接口,可以响应timer的onEventTime或onProcessingTime的回调
小结
- TimerService接口定义了currentProcessingTime、currentWatermark、registerProcessingTimeTimer、registerEventTimeTimer、deleteProcessingTimeTimer、deleteEventTimeTimer接口;它有一个实现类为SimpleTimerService,而SimpleTimerService主要是委托给InternalTimerService来实现
-
InternalTimerService是TimerService的internal版本的接口,比起TimerService它定义了namespace,在registerProcessingTimeTimer、deleteProcessingTimeTimer、registerEventTimeTimer、deleteEventTimeTimer的方法中均多了一个namesapce的参数;它的实现类为InternalTimerServiceImpl;InternalTimerServiceImpl实现了InternalTimerService及ProcessingTimeCallback(
定义了onProcessingTime方法
)接口,其registerEventTimeTimer及deleteEventTimeTimer方法使用的是eventTimeTimersQueue;registerProcessingTimeTimer及deleteProcessingTimeTimer方法使用的是processingTimeTimersQueue(eventTimeTimersQueue及processingTimeTimersQueue的类型为KeyGroupedInternalPriorityQueue,queue的元素类型为TimerHeapInternalTimer
) -
InternalTimerServiceImpl的eventTimerTimer的触发主要是在advanceWatermark方法中(
AbstractStreamOperator的processWatermark方法会调用InternalTimeServiceManager的advanceWatermark方法,而该方法调用的是InternalTimerServiceImpl的advanceWatermark方法
),它会移除timestamp小于等于指定time的eventTimerTimer,然后回调triggerTarget.onEventTime方法 -
InternalTimerServiceImpl的processingTimeTimer的触发则是在onProcessingTime方法中(
SystemProcessingTimeService的TriggerTask及RepeatedTriggerTask的定时任务会回调ProcessingTimeCallback的onProcessingTime方法
),它会移除timestamp小于等于指定time的processingTimeTimer,然后回调triggerTarget.onProcessingTime方法 - Triggerable接口定义了InternalTimerService会调用的onEventTime及onProcessingTime方法;WindowOperator、IntervalJoinOperator、KeyedProcessOperator、KeyedCoProcessOperator等operator均实现了Triggerable接口,可以响应timer的onEventTime或onProcessingTime的回调
doc
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Agile Web Development with Rails, Third Edition
Sam Ruby、Dave Thomas、David Heinemeier Hansson / Pragmatic Bookshelf / 2009-03-17 / USD 43.95
Rails just keeps on changing. Rails 2, released in 2008, brings hundreds of improvements, including new support for RESTful applications, new generator options, and so on. And, as importantly, we’ve a......一起来看看 《Agile Web Development with Rails, Third Edition》 这本书的介绍吧!