聊聊flink的TimerService

栏目: 编程工具 · 发布时间: 5年前

内容简介:本文主要研究一下flink的TimerServiceflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/TimerService.javaflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/SimpleTimerService.java

本文主要研究一下flink的TimerService

TimerService

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/TimerService.java

@PublicEvolving
public interface TimerService {

    String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams.";

    String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams.";

    long currentProcessingTime();

    long currentWatermark();

    void registerProcessingTimeTimer(long time);

    void registerEventTimeTimer(long time);

    void deleteProcessingTimeTimer(long time);

    void deleteEventTimeTimer(long time);
}
  • TimerService接口定义了currentProcessingTime、currentWatermark、registerProcessingTimeTimer、registerEventTimeTimer、deleteProcessingTimeTimer、deleteEventTimeTimer接口

SimpleTimerService

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/SimpleTimerService.java

@Internal
public class SimpleTimerService implements TimerService {

    private final InternalTimerService<VoidNamespace> internalTimerService;

    public SimpleTimerService(InternalTimerService<VoidNamespace> internalTimerService) {
        this.internalTimerService = internalTimerService;
    }

    @Override
    public long currentProcessingTime() {
        return internalTimerService.currentProcessingTime();
    }

    @Override
    public long currentWatermark() {
        return internalTimerService.currentWatermark();
    }

    @Override
    public void registerProcessingTimeTimer(long time) {
        internalTimerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, time);
    }

    @Override
    public void registerEventTimeTimer(long time) {
        internalTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, time);
    }

    @Override
    public void deleteProcessingTimeTimer(long time) {
        internalTimerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, time);
    }

    @Override
    public void deleteEventTimeTimer(long time) {
        internalTimerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, time);
    }
}
  • SimpleTimerService实现了TimerService,它是委托InternalTimerService来实现

InternalTimerService

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimerService.java

@Internal
public interface InternalTimerService<N> {

    long currentProcessingTime();

    long currentWatermark();

    void registerProcessingTimeTimer(N namespace, long time);

    void deleteProcessingTimeTimer(N namespace, long time);

    void registerEventTimeTimer(N namespace, long time);

    void deleteEventTimeTimer(N namespace, long time);
}
  • InternalTimerService是TimerService的internal版本的接口,比起TimerService它定义了namespace,在registerProcessingTimeTimer、deleteProcessingTimeTimer、registerEventTimeTimer、deleteEventTimeTimer的方法中均多了一个namesapce的参数

InternalTimerServiceImpl

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java

public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N>, ProcessingTimeCallback {

    private final ProcessingTimeService processingTimeService;

    private final KeyContext keyContext;

    private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue;

    private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue;

    private final KeyGroupRange localKeyGroupRange;

    private final int localKeyGroupRangeStartIdx;

    private long currentWatermark = Long.MIN_VALUE;

    private ScheduledFuture<?> nextTimer;

    // Variables to be set when the service is started.

    private TypeSerializer<K> keySerializer;

    private TypeSerializer<N> namespaceSerializer;

    private Triggerable<K, N> triggerTarget;

    private volatile boolean isInitialized;

    private TypeSerializer<K> keyDeserializer;

    private TypeSerializer<N> namespaceDeserializer;

    private InternalTimersSnapshot<K, N> restoredTimersSnapshot;

    InternalTimerServiceImpl(
        KeyGroupRange localKeyGroupRange,
        KeyContext keyContext,
        ProcessingTimeService processingTimeService,
        KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue,
        KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue) {

        this.keyContext = checkNotNull(keyContext);
        this.processingTimeService = checkNotNull(processingTimeService);
        this.localKeyGroupRange = checkNotNull(localKeyGroupRange);
        this.processingTimeTimersQueue = checkNotNull(processingTimeTimersQueue);
        this.eventTimeTimersQueue = checkNotNull(eventTimeTimersQueue);

        // find the starting index of the local key-group range
        int startIdx = Integer.MAX_VALUE;
        for (Integer keyGroupIdx : localKeyGroupRange) {
            startIdx = Math.min(keyGroupIdx, startIdx);
        }
        this.localKeyGroupRangeStartIdx = startIdx;
    }

    public void startTimerService(
            TypeSerializer<K> keySerializer,
            TypeSerializer<N> namespaceSerializer,
            Triggerable<K, N> triggerTarget) {

        if (!isInitialized) {

            if (keySerializer == null || namespaceSerializer == null) {
                throw new IllegalArgumentException("The TimersService serializers cannot be null.");
            }

            if (this.keySerializer != null || this.namespaceSerializer != null || this.triggerTarget != null) {
                throw new IllegalStateException("The TimerService has already been initialized.");
            }

            // the following is the case where we restore
            if (restoredTimersSnapshot != null) {
                CompatibilityResult<K> keySerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult(
                    this.keyDeserializer,
                    null,
                    restoredTimersSnapshot.getKeySerializerConfigSnapshot(),
                    keySerializer);

                CompatibilityResult<N> namespaceSerializerCompatibility = CompatibilityUtil.resolveCompatibilityResult(
                    this.namespaceDeserializer,
                    null,
                    restoredTimersSnapshot.getNamespaceSerializerConfigSnapshot(),
                    namespaceSerializer);

                if (keySerializerCompatibility.isRequiresMigration() || namespaceSerializerCompatibility.isRequiresMigration()) {
                    throw new IllegalStateException("Tried to initialize restored TimerService " +
                        "with incompatible serializers than those used to snapshot its state.");
                }
            }

            this.keySerializer = keySerializer;
            this.namespaceSerializer = namespaceSerializer;
            this.keyDeserializer = null;
            this.namespaceDeserializer = null;

            this.triggerTarget = Preconditions.checkNotNull(triggerTarget);

            // re-register the restored timers (if any)
            final InternalTimer<K, N> headTimer = processingTimeTimersQueue.peek();
            if (headTimer != null) {
                nextTimer = processingTimeService.registerTimer(headTimer.getTimestamp(), this);
            }
            this.isInitialized = true;
        } else {
            if (!(this.keySerializer.equals(keySerializer) && this.namespaceSerializer.equals(namespaceSerializer))) {
                throw new IllegalArgumentException("Already initialized Timer Service " +
                    "tried to be initialized with different key and namespace serializers.");
            }
        }
    }

    @Override
    public long currentProcessingTime() {
        return processingTimeService.getCurrentProcessingTime();
    }

    @Override
    public long currentWatermark() {
        return currentWatermark;
    }

    @Override
    public void registerProcessingTimeTimer(N namespace, long time) {
        InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
        if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
            long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
            // check if we need to re-schedule our timer to earlier
            if (time < nextTriggerTime) {
                if (nextTimer != null) {
                    nextTimer.cancel(false);
                }
                nextTimer = processingTimeService.registerTimer(time, this);
            }
        }
    }

    @Override
    public void registerEventTimeTimer(N namespace, long time) {
        eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
    }

    @Override
    public void deleteProcessingTimeTimer(N namespace, long time) {
        processingTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
    }

    @Override
    public void deleteEventTimeTimer(N namespace, long time) {
        eventTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
    }

    @Override
    public void onProcessingTime(long time) throws Exception {
        // null out the timer in case the Triggerable calls registerProcessingTimeTimer()
        // inside the callback.
        nextTimer = null;

        InternalTimer<K, N> timer;

        while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            processingTimeTimersQueue.poll();
            keyContext.setCurrentKey(timer.getKey());
            triggerTarget.onProcessingTime(timer);
        }

        if (timer != null && nextTimer == null) {
            nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
        }
    }

    public void advanceWatermark(long time) throws Exception {
        currentWatermark = time;

        InternalTimer<K, N> timer;

        while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            eventTimeTimersQueue.poll();
            keyContext.setCurrentKey(timer.getKey());
            triggerTarget.onEventTime(timer);
        }
    }

    public InternalTimersSnapshot<K, N> snapshotTimersForKeyGroup(int keyGroupIdx) {
        return new InternalTimersSnapshot<>(
            keySerializer,
            keySerializer.snapshotConfiguration(),
            namespaceSerializer,
            namespaceSerializer.snapshotConfiguration(),
            eventTimeTimersQueue.getSubsetForKeyGroup(keyGroupIdx),
            processingTimeTimersQueue.getSubsetForKeyGroup(keyGroupIdx));
    }

    @SuppressWarnings("unchecked")
    public void restoreTimersForKeyGroup(InternalTimersSnapshot<?, ?> restoredSnapshot, int keyGroupIdx) {
        this.restoredTimersSnapshot = (InternalTimersSnapshot<K, N>) restoredSnapshot;

        if (areSnapshotSerializersIncompatible(restoredSnapshot)) {
            throw new IllegalArgumentException("Tried to restore timers " +
                "for the same service with different serializers.");
        }

        this.keyDeserializer = restoredTimersSnapshot.getKeySerializer();
        this.namespaceDeserializer = restoredTimersSnapshot.getNamespaceSerializer();

        checkArgument(localKeyGroupRange.contains(keyGroupIdx),
            "Key Group " + keyGroupIdx + " does not belong to the local range.");

        // restore the event time timers
        eventTimeTimersQueue.addAll(restoredTimersSnapshot.getEventTimeTimers());

        // restore the processing time timers
        processingTimeTimersQueue.addAll(restoredTimersSnapshot.getProcessingTimeTimers());
    }

    @VisibleForTesting
    public int numProcessingTimeTimers() {
        return this.processingTimeTimersQueue.size();
    }

    @VisibleForTesting
    public int numEventTimeTimers() {
        return this.eventTimeTimersQueue.size();
    }

    @VisibleForTesting
    public int numProcessingTimeTimers(N namespace) {
        return countTimersInNamespaceInternal(namespace, processingTimeTimersQueue);
    }

    @VisibleForTesting
    public int numEventTimeTimers(N namespace) {
        return countTimersInNamespaceInternal(namespace, eventTimeTimersQueue);
    }

    private int countTimersInNamespaceInternal(N namespace, InternalPriorityQueue<TimerHeapInternalTimer<K, N>> queue) {
        int count = 0;
        try (final CloseableIterator<TimerHeapInternalTimer<K, N>> iterator = queue.iterator()) {
            while (iterator.hasNext()) {
                final TimerHeapInternalTimer<K, N> timer = iterator.next();
                if (timer.getNamespace().equals(namespace)) {
                    count++;
                }
            }
        } catch (Exception e) {
            throw new FlinkRuntimeException("Exception when closing iterator.", e);
        }
        return count;
    }

    @VisibleForTesting
    int getLocalKeyGroupRangeStartIdx() {
        return this.localKeyGroupRangeStartIdx;
    }

    @VisibleForTesting
    List<Set<TimerHeapInternalTimer<K, N>>> getEventTimeTimersPerKeyGroup() {
        return partitionElementsByKeyGroup(eventTimeTimersQueue);
    }

    @VisibleForTesting
    List<Set<TimerHeapInternalTimer<K, N>>> getProcessingTimeTimersPerKeyGroup() {
        return partitionElementsByKeyGroup(processingTimeTimersQueue);
    }

    private <T> List<Set<T>> partitionElementsByKeyGroup(KeyGroupedInternalPriorityQueue<T> keyGroupedQueue) {
        List<Set<T>> result = new ArrayList<>(localKeyGroupRange.getNumberOfKeyGroups());
        for (int keyGroup : localKeyGroupRange) {
            result.add(Collections.unmodifiableSet(keyGroupedQueue.getSubsetForKeyGroup(keyGroup)));
        }
        return result;
    }

    private boolean areSnapshotSerializersIncompatible(InternalTimersSnapshot<?, ?> restoredSnapshot) {
        return (this.keyDeserializer != null && !this.keyDeserializer.equals(restoredSnapshot.getKeySerializer())) ||
            (this.namespaceDeserializer != null && !this.namespaceDeserializer.equals(restoredSnapshot.getNamespaceSerializer()));
    }
}
  • InternalTimerServiceImpl实现了InternalTimerService及ProcessingTimeCallback( 定义了onProcessingTime方法 )接口
  • startTimerService方法主要是初始化keySerializer、namespaceSerializer、triggerTarget属性;registerEventTimeTimer及deleteEventTimeTimer方法使用的是eventTimeTimersQueue;registerProcessingTimeTimer及deleteProcessingTimeTimer方法使用的是processingTimeTimersQueue( eventTimeTimersQueue及processingTimeTimersQueue的类型为KeyGroupedInternalPriorityQueue,queue的元素类型为TimerHeapInternalTimer )
  • eventTimerTimer的触发主要是在advanceWatermark方法中( AbstractStreamOperator的processWatermark方法会调用InternalTimeServiceManager的advanceWatermark方法,而该方法调用的是InternalTimerServiceImpl的advanceWatermark方法 ),它会移除timestamp小于等于指定time的eventTimerTimer,然后回调triggerTarget.onEventTime方法;而processingTimeTimer的触发则是在onProcessingTime方法中( SystemProcessingTimeService的TriggerTask及RepeatedTriggerTask的定时任务会回调ProcessingTimeCallback的onProcessingTime方法 ),它会移除timestamp小于等于指定time的processingTimeTimer,然后回调triggerTarget.onProcessingTime方法

Triggerable

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/Triggerable.java

@Internal
public interface Triggerable<K, N> {

    /**
     * Invoked when an event-time timer fires.
     */
    void onEventTime(InternalTimer<K, N> timer) throws Exception;

    /**
     * Invoked when a processing-time timer fires.
     */
    void onProcessingTime(InternalTimer<K, N> timer) throws Exception;
}
  • Triggerable接口定义了InternalTimerService会调用的onEventTime及onProcessingTime方法;WindowOperator、IntervalJoinOperator、KeyedProcessOperator、KeyedCoProcessOperator等operator均实现了Triggerable接口,可以响应timer的onEventTime或onProcessingTime的回调

小结

  • TimerService接口定义了currentProcessingTime、currentWatermark、registerProcessingTimeTimer、registerEventTimeTimer、deleteProcessingTimeTimer、deleteEventTimeTimer接口;它有一个实现类为SimpleTimerService,而SimpleTimerService主要是委托给InternalTimerService来实现
  • InternalTimerService是TimerService的internal版本的接口,比起TimerService它定义了namespace,在registerProcessingTimeTimer、deleteProcessingTimeTimer、registerEventTimeTimer、deleteEventTimeTimer的方法中均多了一个namesapce的参数;它的实现类为InternalTimerServiceImpl;InternalTimerServiceImpl实现了InternalTimerService及ProcessingTimeCallback( 定义了onProcessingTime方法 )接口,其registerEventTimeTimer及deleteEventTimeTimer方法使用的是eventTimeTimersQueue;registerProcessingTimeTimer及deleteProcessingTimeTimer方法使用的是processingTimeTimersQueue( eventTimeTimersQueue及processingTimeTimersQueue的类型为KeyGroupedInternalPriorityQueue,queue的元素类型为TimerHeapInternalTimer )
  • InternalTimerServiceImpl的eventTimerTimer的触发主要是在advanceWatermark方法中( AbstractStreamOperator的processWatermark方法会调用InternalTimeServiceManager的advanceWatermark方法,而该方法调用的是InternalTimerServiceImpl的advanceWatermark方法 ),它会移除timestamp小于等于指定time的eventTimerTimer,然后回调triggerTarget.onEventTime方法
  • InternalTimerServiceImpl的processingTimeTimer的触发则是在onProcessingTime方法中( SystemProcessingTimeService的TriggerTask及RepeatedTriggerTask的定时任务会回调ProcessingTimeCallback的onProcessingTime方法 ),它会移除timestamp小于等于指定time的processingTimeTimer,然后回调triggerTarget.onProcessingTime方法
  • Triggerable接口定义了InternalTimerService会调用的onEventTime及onProcessingTime方法;WindowOperator、IntervalJoinOperator、KeyedProcessOperator、KeyedCoProcessOperator等operator均实现了Triggerable接口,可以响应timer的onEventTime或onProcessingTime的回调

doc


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Agile Web Development with Rails, Third Edition

Agile Web Development with Rails, Third Edition

Sam Ruby、Dave Thomas、David Heinemeier Hansson / Pragmatic Bookshelf / 2009-03-17 / USD 43.95

Rails just keeps on changing. Rails 2, released in 2008, brings hundreds of improvements, including new support for RESTful applications, new generator options, and so on. And, as importantly, we’ve a......一起来看看 《Agile Web Development with Rails, Third Edition》 这本书的介绍吧!

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

SHA 加密
SHA 加密

SHA 加密工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具