内容简介:本文主要研究一下flink的InternalTimeServiceManagerflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.javaflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
序
本文主要研究一下flink的InternalTimeServiceManager
InternalTimeServiceManager
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
@Internal public class InternalTimeServiceManager<K> { @VisibleForTesting static final String TIMER_STATE_PREFIX = "_timer_state"; @VisibleForTesting static final String PROCESSING_TIMER_PREFIX = TIMER_STATE_PREFIX + "/processing_"; @VisibleForTesting static final String EVENT_TIMER_PREFIX = TIMER_STATE_PREFIX + "/event_"; private final KeyGroupRange localKeyGroupRange; private final KeyContext keyContext; private final PriorityQueueSetFactory priorityQueueSetFactory; private final ProcessingTimeService processingTimeService; private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices; private final boolean useLegacySynchronousSnapshots; InternalTimeServiceManager( KeyGroupRange localKeyGroupRange, KeyContext keyContext, PriorityQueueSetFactory priorityQueueSetFactory, ProcessingTimeService processingTimeService, boolean useLegacySynchronousSnapshots) { this.localKeyGroupRange = Preconditions.checkNotNull(localKeyGroupRange); this.priorityQueueSetFactory = Preconditions.checkNotNull(priorityQueueSetFactory); this.keyContext = Preconditions.checkNotNull(keyContext); this.processingTimeService = Preconditions.checkNotNull(processingTimeService); this.useLegacySynchronousSnapshots = useLegacySynchronousSnapshots; this.timerServices = new HashMap<>(); } @SuppressWarnings("unchecked") public <N> InternalTimerService<N> getInternalTimerService( String name, TimerSerializer<K, N> timerSerializer, Triggerable<K, N> triggerable) { InternalTimerServiceImpl<K, N> timerService = registerOrGetTimerService(name, timerSerializer); timerService.startTimerService( timerSerializer.getKeySerializer(), timerSerializer.getNamespaceSerializer(), triggerable); return timerService; } @SuppressWarnings("unchecked") <N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(String name, TimerSerializer<K, N> timerSerializer) { InternalTimerServiceImpl<K, N> timerService = (InternalTimerServiceImpl<K, N>) timerServices.get(name); if (timerService == null) { timerService = new InternalTimerServiceImpl<>( localKeyGroupRange, keyContext, processingTimeService, createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer), createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer)); timerServices.put(name, timerService); } return timerService; } Map<String, InternalTimerServiceImpl<K, ?>> getRegisteredTimerServices() { return Collections.unmodifiableMap(timerServices); } private <N> KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> createTimerPriorityQueue( String name, TimerSerializer<K, N> timerSerializer) { return priorityQueueSetFactory.create( name, timerSerializer); } public void advanceWatermark(Watermark watermark) throws Exception { for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) { service.advanceWatermark(watermark.getTimestamp()); } } ////////////////// Fault Tolerance Methods /////////////////// public void snapshotStateForKeyGroup(DataOutputView stream, int keyGroupIdx) throws IOException { Preconditions.checkState(useLegacySynchronousSnapshots); InternalTimerServiceSerializationProxy<K> serializationProxy = new InternalTimerServiceSerializationProxy<>(this, keyGroupIdx); serializationProxy.write(stream); } public void restoreStateForKeyGroup( InputStream stream, int keyGroupIdx, ClassLoader userCodeClassLoader) throws IOException { InternalTimerServiceSerializationProxy<K> serializationProxy = new InternalTimerServiceSerializationProxy<>( this, userCodeClassLoader, keyGroupIdx); serializationProxy.read(stream); } //////////////////// Methods used ONLY IN TESTS //////////////////// @VisibleForTesting public int numProcessingTimeTimers() { int count = 0; for (InternalTimerServiceImpl<?, ?> timerService : timerServices.values()) { count += timerService.numProcessingTimeTimers(); } return count; } @VisibleForTesting public int numEventTimeTimers() { int count = 0; for (InternalTimerServiceImpl<?, ?> timerService : timerServices.values()) { count += timerService.numEventTimeTimers(); } return count; } }
- InternalTimeServiceManager用于管理所有keyed operators要使用的timerService,它在内存使用map维护了timerService的名称与InternalTimerServiceImpl的映射
- getInternalTimerService方法首先调用registerOrGetTimerService方法获取或创建指定name的InternalTimerServiceImpl,之后调用timerService.startTimerService进行初始化然后返回
- registerOrGetTimerService方法先从名为timerServices的map中查找指定name的InternalTimerServiceImpl,没有就创建一个,然后放入到名为timerServices的map中;创建InternalTimerServiceImpl的时候,这里使用createTimerPriorityQueue来创建KeyGroupedInternalPriorityQueue类型的processingTimeTimersQueue及eventTimeTimersQueue;createTimerPriorityQueue是通过priorityQueueSetFactory来创建的
PriorityQueueSetFactory
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
public interface PriorityQueueSetFactory { @Nonnull <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T> create( @Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer); }
- PriorityQueueSetFactory定义了create方法,创建的是KeyGroupedInternalPriorityQueue,其中T的泛型要求是同时继承或实现HeapPriorityQueueElement、PriorityComparable、Keyed这三个接口
HeapPriorityQueueElement
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/heap/HeapPriorityQueueElement.java
@Internal public interface HeapPriorityQueueElement { /** * The index that indicates that a {@link HeapPriorityQueueElement} object is not contained in and managed by any * {@link HeapPriorityQueue}. We do not strictly enforce that internal indexes must be reset to this value when * elements are removed from a {@link HeapPriorityQueue}. */ int NOT_CONTAINED = Integer.MIN_VALUE; /** * Returns the current index of this object in the internal array of {@link HeapPriorityQueue}. */ int getInternalIndex(); /** * Sets the current index of this object in the {@link HeapPriorityQueue} and should only be called by the owning * {@link HeapPriorityQueue}. * * @param newIndex the new index in the timer heap. */ void setInternalIndex(int newIndex); }
- HeapPriorityQueueElement接口定义了HeapPriorityQueue所要求的元素类型,它定义了getInternalIndex、setInternalIndex方法
PriorityComparable
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/PriorityComparable.java
public interface PriorityComparable<T> { int comparePriorityTo(@Nonnull T other); }
- PriorityComparable定义了comparePriorityTo方法,用于根据priority来进行比对
Keyed
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/Keyed.java
public interface Keyed<K> { K getKey(); }
- Keyed接口定义了getKey方法,用于返回该对象的key
InternalTimer
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimer.java
@Internal public interface InternalTimer<K, N> extends PriorityComparable<InternalTimer<?, ?>>, Keyed<K> { /** Function to extract the key from a {@link InternalTimer}. */ KeyExtractorFunction<InternalTimer<?, ?>> KEY_EXTRACTOR_FUNCTION = InternalTimer::getKey; /** Function to compare instances of {@link InternalTimer}. */ PriorityComparator<InternalTimer<?, ?>> TIMER_COMPARATOR = (left, right) -> Long.compare(left.getTimestamp(), right.getTimestamp()); /** * Returns the timestamp of the timer. This value determines the point in time when the timer will fire. */ long getTimestamp(); /** * Returns the key that is bound to this timer. */ @Nonnull @Override K getKey(); /** * Returns the namespace that is bound to this timer. */ @Nonnull N getNamespace(); }
- InternalTimer继承了PriorityComparable、Keyed接口,它定义了getTimestamp、getKey、getNamespace方法,同时内置了KEY_EXTRACTOR_FUNCTION、TIMER_COMPARATOR
TimerHeapInternalTimer
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
@Internal public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N>, HeapPriorityQueueElement { /** The key for which the timer is scoped. */ @Nonnull private final K key; /** The namespace for which the timer is scoped. */ @Nonnull private final N namespace; /** The expiration timestamp. */ private final long timestamp; private transient int timerHeapIndex; TimerHeapInternalTimer(long timestamp, @Nonnull K key, @Nonnull N namespace) { this.timestamp = timestamp; this.key = key; this.namespace = namespace; this.timerHeapIndex = NOT_CONTAINED; } @Override public long getTimestamp() { return timestamp; } @Nonnull @Override public K getKey() { return key; } @Nonnull @Override public N getNamespace() { return namespace; } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o instanceof InternalTimer) { InternalTimer<?, ?> timer = (InternalTimer<?, ?>) o; return timestamp == timer.getTimestamp() && key.equals(timer.getKey()) && namespace.equals(timer.getNamespace()); } return false; } @Override public int getInternalIndex() { return timerHeapIndex; } @Override public void setInternalIndex(int newIndex) { this.timerHeapIndex = newIndex; } void removedFromTimerQueue() { setInternalIndex(NOT_CONTAINED); } @Override public int hashCode() { int result = (int) (timestamp ^ (timestamp >>> 32)); result = 31 * result + key.hashCode(); result = 31 * result + namespace.hashCode(); return result; } @Override public String toString() { return "Timer{" + "timestamp=" + timestamp + ", key=" + key + ", namespace=" + namespace + '}'; } @Override public int comparePriorityTo(@Nonnull InternalTimer<?, ?> other) { return Long.compare(timestamp, other.getTimestamp()); } }
- TimerHeapInternalTimer实现了InternalTimer及HeapPriorityQueueElement接口;这里removedFromTimerQueue接口是调用setInternalIndex(NOT_CONTAINED),即改动其index为NOT_CONTAINED,逻辑删除
HeapPriorityQueueSetFactory
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java
public class HeapPriorityQueueSetFactory implements PriorityQueueSetFactory { @Nonnull private final KeyGroupRange keyGroupRange; @Nonnegative private final int totalKeyGroups; @Nonnegative private final int minimumCapacity; public HeapPriorityQueueSetFactory( @Nonnull KeyGroupRange keyGroupRange, @Nonnegative int totalKeyGroups, @Nonnegative int minimumCapacity) { this.keyGroupRange = keyGroupRange; this.totalKeyGroups = totalKeyGroups; this.minimumCapacity = minimumCapacity; } @Nonnull @Override public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> HeapPriorityQueueSet<T> create( @Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) { return new HeapPriorityQueueSet<>( PriorityComparator.forPriorityComparableObjects(), KeyExtractorFunction.forKeyedObjects(), minimumCapacity, keyGroupRange, totalKeyGroups); } }
- HeapPriorityQueueSetFactory实现了PriorityQueueSetFactory接口,其create方法创建的是HeapPriorityQueueSet
小结
InternalTimer继承了PriorityComparable、Keyed接口,TimerHeapInternalTimer实现了InternalTimer及HeapPriorityQueueElement接口
doc
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
一站式学习C编程
宋劲杉 / 电子工业出版社 / 2011-3 / 59.00元
《一站式学习c编程》有两条线索,一条线索是以linux平台为载体全面深入地介绍c语言的语法和程序的工作原理,另一条线索是介绍程序设计的基本思想和开发调试方法。本书分为两部分:第一部分讲解编程语言和程序设计的基本思想方法,让读者从概念上认识c语言;第二部分结合操作系统和体系结构的知识讲解程序的工作原理,让读者从本质上认识c语言。 《一站式学习c编程》适合做零基础的初学者学习c语言的第一本教材,......一起来看看 《一站式学习C编程》 这本书的介绍吧!