聊聊flink的AsyncWaitOperator

栏目: 编程工具 · 发布时间: 7年前

内容简介:本文主要研究一下flink的AsyncWaitOperatorflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.javaflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/Emitter.java

本文主要研究一下flink的AsyncWaitOperator

AsyncWaitOperator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java

@Internal
public class AsyncWaitOperator<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT>, OperatorActions {
    private static final long serialVersionUID = 1L;

    private static final String STATE_NAME = "_async_wait_operator_state_";

    /** Capacity of the stream element queue. */
    private final int capacity;

    /** Output mode for this operator. */
    private final AsyncDataStream.OutputMode outputMode;

    /** Timeout for the async collectors. */
    private final long timeout;

    protected transient Object checkpointingLock;

    /** {@link TypeSerializer} for inputs while making snapshots. */
    private transient StreamElementSerializer<IN> inStreamElementSerializer;

    /** Recovered input stream elements. */
    private transient ListState<StreamElement> recoveredStreamElements;

    /** Queue to store the currently in-flight stream elements into. */
    private transient StreamElementQueue queue;

    /** Pending stream element which could not yet added to the queue. */
    private transient StreamElementQueueEntry<?> pendingStreamElementQueueEntry;

    private transient ExecutorService executor;

    /** Emitter for the completed stream element queue entries. */
    private transient Emitter<OUT> emitter;

    /** Thread running the emitter. */
    private transient Thread emitterThread;

    public AsyncWaitOperator(
            AsyncFunction<IN, OUT> asyncFunction,
            long timeout,
            int capacity,
            AsyncDataStream.OutputMode outputMode) {
        super(asyncFunction);
        chainingStrategy = ChainingStrategy.ALWAYS;

        Preconditions.checkArgument(capacity > 0, "The number of concurrent async operation should be greater than 0.");
        this.capacity = capacity;

        this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode");

        this.timeout = timeout;
    }

    @Override
    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
        super.setup(containingTask, config, output);

        this.checkpointingLock = getContainingTask().getCheckpointLock();

        this.inStreamElementSerializer = new StreamElementSerializer<>(
            getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader()));

        // create the operators executor for the complete operations of the queue entries
        this.executor = Executors.newSingleThreadExecutor();

        switch (outputMode) {
            case ORDERED:
                queue = new OrderedStreamElementQueue(
                    capacity,
                    executor,
                    this);
                break;
            case UNORDERED:
                queue = new UnorderedStreamElementQueue(
                    capacity,
                    executor,
                    this);
                break;
            default:
                throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
        }
    }

    @Override
    public void open() throws Exception {
        super.open();

        // create the emitter
        this.emitter = new Emitter<>(checkpointingLock, output, queue, this);

        // start the emitter thread
        this.emitterThread = new Thread(emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ')');
        emitterThread.setDaemon(true);
        emitterThread.start();

        // process stream elements from state, since the Emit thread will start as soon as all
        // elements from previous state are in the StreamElementQueue, we have to make sure that the
        // order to open all operators in the operator chain proceeds from the tail operator to the
        // head operator.
        if (recoveredStreamElements != null) {
            for (StreamElement element : recoveredStreamElements.get()) {
                if (element.isRecord()) {
                    processElement(element.<IN>asRecord());
                }
                else if (element.isWatermark()) {
                    processWatermark(element.asWatermark());
                }
                else if (element.isLatencyMarker()) {
                    processLatencyMarker(element.asLatencyMarker());
                }
                else {
                    throw new IllegalStateException("Unknown record type " + element.getClass() +
                        " encountered while opening the operator.");
                }
            }
            recoveredStreamElements = null;
        }

    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry<>(element);

        if (timeout > 0L) {
            // register a timeout for this AsyncStreamRecordBufferEntry
            long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();

            final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer(
                timeoutTimestamp,
                new ProcessingTimeCallback() {
                    @Override
                    public void onProcessingTime(long timestamp) throws Exception {
                        userFunction.timeout(element.getValue(), streamRecordBufferEntry);
                    }
                });

            // Cancel the timer once we've completed the stream record buffer entry. This will remove
            // the register trigger task
            streamRecordBufferEntry.onComplete(
                (StreamElementQueueEntry<Collection<OUT>> value) -> {
                    timerFuture.cancel(true);
                },
                executor);
        }

        addAsyncBufferEntry(streamRecordBufferEntry);

        userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        WatermarkQueueEntry watermarkBufferEntry = new WatermarkQueueEntry(mark);

        addAsyncBufferEntry(watermarkBufferEntry);
    }

    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);

        ListState<StreamElement> partitionableState =
            getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
        partitionableState.clear();

        Collection<StreamElementQueueEntry<?>> values = queue.values();

        try {
            for (StreamElementQueueEntry<?> value : values) {
                partitionableState.add(value.getStreamElement());
            }

            // add the pending stream element queue entry if the stream element queue is currently full
            if (pendingStreamElementQueueEntry != null) {
                partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
            }
        } catch (Exception e) {
            partitionableState.clear();

            throw new Exception("Could not add stream element queue entries to operator state " +
                "backend of operator " + getOperatorName() + '.', e);
        }
    }

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        recoveredStreamElements = context
            .getOperatorStateStore()
            .getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));

    }

    @Override
    public void close() throws Exception {
        try {
            assert(Thread.holdsLock(checkpointingLock));

            while (!queue.isEmpty()) {
                // wait for the emitter thread to output the remaining elements
                // for that he needs the checkpointing lock and thus we have to free it
                checkpointingLock.wait();
            }
        }
        finally {
            Exception exception = null;

            try {
                super.close();
            } catch (InterruptedException interrupted) {
                exception = interrupted;

                Thread.currentThread().interrupt();
            } catch (Exception e) {
                exception = e;
            }

            try {
                // terminate the emitter, the emitter thread and the executor
                stopResources(true);
            } catch (InterruptedException interrupted) {
                exception = ExceptionUtils.firstOrSuppressed(interrupted, exception);

                Thread.currentThread().interrupt();
            } catch (Exception e) {
                exception = ExceptionUtils.firstOrSuppressed(e, exception);
            }

            if (exception != null) {
                LOG.warn("Errors occurred while closing the AsyncWaitOperator.", exception);
            }
        }
    }

    @Override
    public void dispose() throws Exception {
        Exception exception = null;

        try {
            super.dispose();
        } catch (InterruptedException interrupted) {
            exception = interrupted;

            Thread.currentThread().interrupt();
        } catch (Exception e) {
            exception = e;
        }

        try {
            stopResources(false);
        } catch (InterruptedException interrupted) {
            exception = ExceptionUtils.firstOrSuppressed(interrupted, exception);

            Thread.currentThread().interrupt();
        } catch (Exception e) {
            exception = ExceptionUtils.firstOrSuppressed(e, exception);
        }

        if (exception != null) {
            throw exception;
        }
    }

    private void stopResources(boolean waitForShutdown) throws InterruptedException {
        emitter.stop();
        emitterThread.interrupt();

        executor.shutdown();

        if (waitForShutdown) {
            try {
                if (!executor.awaitTermination(365L, TimeUnit.DAYS)) {
                    executor.shutdownNow();
                }
            } catch (InterruptedException e) {
                executor.shutdownNow();

                Thread.currentThread().interrupt();
            }

            /*
             * FLINK-5638: If we have the checkpoint lock we might have to free it for a while so
             * that the emitter thread can complete/react to the interrupt signal.
             */
            if (Thread.holdsLock(checkpointingLock)) {
                while (emitterThread.isAlive()) {
                    checkpointingLock.wait(100L);
                }
            }

            emitterThread.join();
        } else {
            executor.shutdownNow();
        }
    }

    private <T> void addAsyncBufferEntry(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
        assert(Thread.holdsLock(checkpointingLock));

        pendingStreamElementQueueEntry = streamElementQueueEntry;

        while (!queue.tryPut(streamElementQueueEntry)) {
            // we wait for the emitter to notify us if the queue has space left again
            checkpointingLock.wait();
        }

        pendingStreamElementQueueEntry = null;
    }

    @Override
    public void failOperator(Throwable throwable) {
        getContainingTask().getEnvironment().failExternally(throwable);
    }
}
OrderedStreamElementQueue或者UnorderedStreamElementQueue

Emitter

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/Emitter.java

@Internal
public class Emitter<OUT> implements Runnable {

    private static final Logger LOG = LoggerFactory.getLogger(Emitter.class);

    /** Lock to hold before outputting. */
    private final Object checkpointLock;

    /** Output for the watermark elements. */
    private final Output<StreamRecord<OUT>> output;

    /** Queue to consume the async results from. */
    private final StreamElementQueue streamElementQueue;

    private final OperatorActions operatorActions;

    /** Output for stream records. */
    private final TimestampedCollector<OUT> timestampedCollector;

    private volatile boolean running;

    public Emitter(
            final Object checkpointLock,
            final Output<StreamRecord<OUT>> output,
            final StreamElementQueue streamElementQueue,
            final OperatorActions operatorActions) {

        this.checkpointLock = Preconditions.checkNotNull(checkpointLock, "checkpointLock");
        this.output = Preconditions.checkNotNull(output, "output");
        this.streamElementQueue = Preconditions.checkNotNull(streamElementQueue, "streamElementQueue");
        this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");

        this.timestampedCollector = new TimestampedCollector<>(this.output);
        this.running = true;
    }

    @Override
    public void run() {
        try {
            while (running) {
                LOG.debug("Wait for next completed async stream element result.");
                AsyncResult streamElementEntry = streamElementQueue.peekBlockingly();

                output(streamElementEntry);
            }
        } catch (InterruptedException e) {
            if (running) {
                operatorActions.failOperator(e);
            } else {
                // Thread got interrupted which means that it should shut down
                LOG.debug("Emitter thread got interrupted, shutting down.");
            }
        } catch (Throwable t) {
            operatorActions.failOperator(new Exception("AsyncWaitOperator's emitter caught an " +
                "unexpected throwable.", t));
        }
    }

    private void output(AsyncResult asyncResult) throws InterruptedException {
        if (asyncResult.isWatermark()) {
            synchronized (checkpointLock) {
                AsyncWatermarkResult asyncWatermarkResult = asyncResult.asWatermark();

                LOG.debug("Output async watermark.");
                output.emitWatermark(asyncWatermarkResult.getWatermark());

                // remove the peeked element from the async collector buffer so that it is no longer
                // checkpointed
                streamElementQueue.poll();

                // notify the main thread that there is again space left in the async collector
                // buffer
                checkpointLock.notifyAll();
            }
        } else {
            AsyncCollectionResult<OUT> streamRecordResult = asyncResult.asResultCollection();

            if (streamRecordResult.hasTimestamp()) {
                timestampedCollector.setAbsoluteTimestamp(streamRecordResult.getTimestamp());
            } else {
                timestampedCollector.eraseTimestamp();
            }

            synchronized (checkpointLock) {
                LOG.debug("Output async stream element collection result.");

                try {
                    Collection<OUT> resultCollection = streamRecordResult.get();

                    if (resultCollection != null) {
                        for (OUT result : resultCollection) {
                            timestampedCollector.collect(result);
                        }
                    }
                } catch (Exception e) {
                    operatorActions.failOperator(
                        new Exception("An async function call terminated with an exception. " +
                            "Failing the AsyncWaitOperator.", e));
                }

                // remove the peeked element from the async collector buffer so that it is no longer
                // checkpointed
                streamElementQueue.poll();

                // notify the main thread that there is again space left in the async collector
                // buffer
                checkpointLock.notifyAll();
            }
        }
    }

    public void stop() {
        running = false;
    }
}
  • Emitter实现了Runnable接口,它主要负责从StreamElementQueue取出element,然后输出到TimestampedCollector
  • Emitter的run方法就是不断循环调用streamElementQueue.peekBlockingly()阻塞获取AsyncResult,获取到之后就调用output方法将result输出出去
  • Emitter的output方法根据asyncResult是否是watermark做不同处理,不是watermark的话,就会将result通过timestampedCollector.collect输出,如果出现异常则调用operatorActions.failOperator传递异常,最后调用streamElementQueue.poll()来移除队首的元素

StreamElementQueue

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueue.java

@Internal
public interface StreamElementQueue {

    <T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException;

    <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException;

    AsyncResult peekBlockingly() throws InterruptedException;

    AsyncResult poll() throws InterruptedException;

    Collection<StreamElementQueueEntry<?>> values() throws InterruptedException;

    boolean isEmpty();

    int size();
}
  • StreamElementQueue接口主要定义了AsyncWaitOperator所要用的blocking stream element queue的接口;它定义了put、tryPut、peekBlockingly、poll、values、isEmpty、size方法;StreamElementQueue接口有两个子类分别是UnorderedStreamElementQueue及OrderedStreamElementQueue;队列元素类型为StreamElementQueueEntry

UnorderedStreamElementQueue

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java

@Internal
public class UnorderedStreamElementQueue implements StreamElementQueue {

    private static final Logger LOG = LoggerFactory.getLogger(UnorderedStreamElementQueue.class);

    /** Capacity of this queue. */
    private final int capacity;

    /** Executor to run the onComplete callbacks. */
    private final Executor executor;

    /** OperatorActions to signal the owning operator a failure. */
    private final OperatorActions operatorActions;

    /** Queue of uncompleted stream element queue entries segmented by watermarks. */
    private final ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue;

    /** Queue of completed stream element queue entries. */
    private final ArrayDeque<StreamElementQueueEntry<?>> completedQueue;

    /** First (chronologically oldest) uncompleted set of stream element queue entries. */
    private Set<StreamElementQueueEntry<?>> firstSet;

    // Last (chronologically youngest) uncompleted set of stream element queue entries. New
    // stream element queue entries are inserted into this set.
    private Set<StreamElementQueueEntry<?>> lastSet;
    private volatile int numberEntries;

    /** Locks and conditions for the blocking queue. */
    private final ReentrantLock lock;
    private final Condition notFull;
    private final Condition hasCompletedEntries;

    public UnorderedStreamElementQueue(
            int capacity,
            Executor executor,
            OperatorActions operatorActions) {

        Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");
        this.capacity = capacity;

        this.executor = Preconditions.checkNotNull(executor, "executor");

        this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");

        this.uncompletedQueue = new ArrayDeque<>(capacity);
        this.completedQueue = new ArrayDeque<>(capacity);

        this.firstSet = new HashSet<>(capacity);
        this.lastSet = firstSet;

        this.numberEntries = 0;

        this.lock = new ReentrantLock();
        this.notFull = lock.newCondition();
        this.hasCompletedEntries = lock.newCondition();
    }

    @Override
    public <T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
        lock.lockInterruptibly();

        try {
            while (numberEntries >= capacity) {
                notFull.await();
            }

            addEntry(streamElementQueueEntry);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
        lock.lockInterruptibly();

        try {
            if (numberEntries < capacity) {
                addEntry(streamElementQueueEntry);

                LOG.debug("Put element into unordered stream element queue. New filling degree " +
                    "({}/{}).", numberEntries, capacity);

                return true;
            } else {
                LOG.debug("Failed to put element into unordered stream element queue because it " +
                    "was full ({}/{}).", numberEntries, capacity);

                return false;
            }
        } finally {
            lock.unlock();
        }
    }

    @Override
    public AsyncResult peekBlockingly() throws InterruptedException {
        lock.lockInterruptibly();

        try {
            while (completedQueue.isEmpty()) {
                hasCompletedEntries.await();
            }

            LOG.debug("Peeked head element from unordered stream element queue with filling degree " +
                "({}/{}).", numberEntries, capacity);

            return completedQueue.peek();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public AsyncResult poll() throws InterruptedException {
        lock.lockInterruptibly();

        try {
            while (completedQueue.isEmpty()) {
                hasCompletedEntries.await();
            }

            numberEntries--;
            notFull.signalAll();

            LOG.debug("Polled element from unordered stream element queue. New filling degree " +
                "({}/{}).", numberEntries, capacity);

            return completedQueue.poll();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public Collection<StreamElementQueueEntry<?>> values() throws InterruptedException {
        lock.lockInterruptibly();

        try {
            StreamElementQueueEntry<?>[] array = new StreamElementQueueEntry[numberEntries];

            array = completedQueue.toArray(array);

            int counter = completedQueue.size();

            for (StreamElementQueueEntry<?> entry: firstSet) {
                array[counter] = entry;
                counter++;
            }

            for (Set<StreamElementQueueEntry<?>> asyncBufferEntries : uncompletedQueue) {

                for (StreamElementQueueEntry<?> streamElementQueueEntry : asyncBufferEntries) {
                    array[counter] = streamElementQueueEntry;
                    counter++;
                }
            }

            return Arrays.asList(array);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public boolean isEmpty() {
        return numberEntries == 0;
    }

    @Override
    public int size() {
        return numberEntries;
    }

    public void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
        lock.lockInterruptibly();

        try {
            if (firstSet.remove(streamElementQueueEntry)) {
                completedQueue.offer(streamElementQueueEntry);

                while (firstSet.isEmpty() && firstSet != lastSet) {
                    firstSet = uncompletedQueue.poll();

                    Iterator<StreamElementQueueEntry<?>> it = firstSet.iterator();

                    while (it.hasNext()) {
                        StreamElementQueueEntry<?> bufferEntry = it.next();

                        if (bufferEntry.isDone()) {
                            completedQueue.offer(bufferEntry);
                            it.remove();
                        }
                    }
                }

                LOG.debug("Signal unordered stream element queue has completed entries.");
                hasCompletedEntries.signalAll();
            }
        } finally {
            lock.unlock();
        }
    }

    private <T> void addEntry(StreamElementQueueEntry<T> streamElementQueueEntry) {
        assert(lock.isHeldByCurrentThread());

        if (streamElementQueueEntry.isWatermark()) {
            lastSet = new HashSet<>(capacity);

            if (firstSet.isEmpty()) {
                firstSet.add(streamElementQueueEntry);
            } else {
                Set<StreamElementQueueEntry<?>> watermarkSet = new HashSet<>(1);
                watermarkSet.add(streamElementQueueEntry);
                uncompletedQueue.offer(watermarkSet);
            }
            uncompletedQueue.offer(lastSet);
        } else {
            lastSet.add(streamElementQueueEntry);
        }

        streamElementQueueEntry.onComplete(
            (StreamElementQueueEntry<T> value) -> {
                try {
                    onCompleteHandler(value);
                } catch (InterruptedException e) {
                    // The accept executor thread got interrupted. This is probably cause by
                    // the shutdown of the executor.
                    LOG.debug("AsyncBufferEntry could not be properly completed because the " +
                        "executor thread has been interrupted.", e);
                } catch (Throwable t) {
                    operatorActions.failOperator(new Exception("Could not complete the " +
                        "stream element queue entry: " + value + '.', t));
                }
            },
            executor);

        numberEntries++;
    }
}
  • UnorderedStreamElementQueue实现了StreamElementQueue接口,它emit结果的顺序是无序的,其内部使用了两个ArrayDeque,一个是uncompletedQueue,一个是completedQueue
  • peekBlockingly方法首先判断completedQueue是否有元素,没有的话则执行hasCompletedEntries.await(),有则执行completedQueue.peek();put及tryPut都会调用addEntry方法,该方法会往uncompletedQueue队列新增元素,然后同时给每个streamElementQueueEntry的onComplete方法注册一个onCompleteHandler
  • onCompleteHandler方法会将执行完成的streamElementQueueEntry从uncompletedQueue移除,然后添加到completedQueue

OrderedStreamElementQueue

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java

@Internal
public class OrderedStreamElementQueue implements StreamElementQueue {

    private static final Logger LOG = LoggerFactory.getLogger(OrderedStreamElementQueue.class);

    /** Capacity of this queue. */
    private final int capacity;

    /** Executor to run the onCompletion callback. */
    private final Executor executor;

    /** Operator actions to signal a failure to the operator. */
    private final OperatorActions operatorActions;

    /** Lock and conditions for the blocking queue. */
    private final ReentrantLock lock;
    private final Condition notFull;
    private final Condition headIsCompleted;

    /** Queue for the inserted StreamElementQueueEntries. */
    private final ArrayDeque<StreamElementQueueEntry<?>> queue;

    public OrderedStreamElementQueue(
            int capacity,
            Executor executor,
            OperatorActions operatorActions) {

        Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");
        this.capacity = capacity;

        this.executor = Preconditions.checkNotNull(executor, "executor");

        this.operatorActions = Preconditions.checkNotNull(operatorActions, "operatorActions");

        this.lock = new ReentrantLock(false);
        this.headIsCompleted = lock.newCondition();
        this.notFull = lock.newCondition();

        this.queue = new ArrayDeque<>(capacity);
    }

    @Override
    public AsyncResult peekBlockingly() throws InterruptedException {
        lock.lockInterruptibly();

        try {
            while (queue.isEmpty() || !queue.peek().isDone()) {
                headIsCompleted.await();
            }

            LOG.debug("Peeked head element from ordered stream element queue with filling degree " +
                "({}/{}).", queue.size(), capacity);

            return queue.peek();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public AsyncResult poll() throws InterruptedException {
        lock.lockInterruptibly();

        try {
            while (queue.isEmpty() || !queue.peek().isDone()) {
                headIsCompleted.await();
            }

            notFull.signalAll();

            LOG.debug("Polled head element from ordered stream element queue. New filling degree " +
                "({}/{}).", queue.size() - 1, capacity);

            return queue.poll();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public Collection<StreamElementQueueEntry<?>> values() throws InterruptedException {
        lock.lockInterruptibly();

        try {
            StreamElementQueueEntry<?>[] array = new StreamElementQueueEntry[queue.size()];

            array = queue.toArray(array);

            return Arrays.asList(array);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public boolean isEmpty() {
        return queue.isEmpty();
    }

    @Override
    public int size() {
        return queue.size();
    }

    @Override
    public <T> void put(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
        lock.lockInterruptibly();

        try {
            while (queue.size() >= capacity) {
                notFull.await();
            }

            addEntry(streamElementQueueEntry);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
        lock.lockInterruptibly();

        try {
            if (queue.size() < capacity) {
                addEntry(streamElementQueueEntry);

                LOG.debug("Put element into ordered stream element queue. New filling degree " +
                    "({}/{}).", queue.size(), capacity);

                return true;
            } else {
                LOG.debug("Failed to put element into ordered stream element queue because it " +
                    "was full ({}/{}).", queue.size(), capacity);

                return false;
            }
        } finally {
            lock.unlock();
        }
    }

    private <T> void addEntry(StreamElementQueueEntry<T> streamElementQueueEntry) {
        assert(lock.isHeldByCurrentThread());

        queue.addLast(streamElementQueueEntry);

        streamElementQueueEntry.onComplete(
            (StreamElementQueueEntry<T> value) -> {
                try {
                    onCompleteHandler(value);
                } catch (InterruptedException e) {
                    // we got interrupted. This indicates a shutdown of the executor
                    LOG.debug("AsyncBufferEntry could not be properly completed because the " +
                        "executor thread has been interrupted.", e);
                } catch (Throwable t) {
                    operatorActions.failOperator(new Exception("Could not complete the " +
                        "stream element queue entry: " + value + '.', t));
                }
            },
            executor);
    }

    private void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
        lock.lockInterruptibly();

        try {
            if (!queue.isEmpty() && queue.peek().isDone()) {
                LOG.debug("Signal ordered stream element queue has completed head element.");
                headIsCompleted.signalAll();
            }
        } finally {
            lock.unlock();
        }
    }
}
  • OrderedStreamElementQueue实现了StreamElementQueue接口,它有序地emit结果,它内部有一个ArrayDeque类型的queue
  • peekBlockingly方法首先判断queue是否有元素而且是执行完成的,没有就执行headIsCompleted.await(),有则执行queue.peek();put及tryPut都会调用addEntry方法,该方法会执行queue.addLast(streamElementQueueEntry),然后同时给每个streamElementQueueEntry的onComplete方法注册一个onCompleteHandler
  • onCompleteHandler方法会检测执行完成的元素是否是队列的第一个元素,如果是则执行headIsCompleted.signalAll()

AsyncResult

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/AsyncResult.java

@Internal
public interface AsyncResult {

    boolean isWatermark();

    boolean isResultCollection();

    AsyncWatermarkResult asWatermark();

    <T> AsyncCollectionResult<T> asResultCollection();
}
  • AsyncResult接口定义了StreamElementQueue的元素异步返回的结果要实现的方法,该async result可能是watermark,可能是真正的结果

StreamElementQueueEntry

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueEntry.java

@Internal
public abstract class StreamElementQueueEntry<T> implements AsyncResult {

    private final StreamElement streamElement;

    public StreamElementQueueEntry(StreamElement streamElement) {
        this.streamElement = Preconditions.checkNotNull(streamElement);
    }

    public StreamElement getStreamElement() {
        return streamElement;
    }

    public boolean isDone() {
        return getFuture().isDone();
    }

    public void onComplete(
            final Consumer<StreamElementQueueEntry<T>> completeFunction,
            Executor executor) {
        final StreamElementQueueEntry<T> thisReference = this;

        getFuture().whenCompleteAsync(
            // call the complete function for normal completion as well as exceptional completion
            // see FLINK-6435
            (value, throwable) -> completeFunction.accept(thisReference),
            executor);
    }

    protected abstract CompletableFuture<T> getFuture();

    @Override
    public final boolean isWatermark() {
        return AsyncWatermarkResult.class.isAssignableFrom(getClass());
    }

    @Override
    public final boolean isResultCollection() {
        return AsyncCollectionResult.class.isAssignableFrom(getClass());
    }

    @Override
    public final AsyncWatermarkResult asWatermark() {
        return (AsyncWatermarkResult) this;
    }

    @Override
    public final <T> AsyncCollectionResult<T> asResultCollection() {
        return (AsyncCollectionResult<T>) this;
    }
}
  • StreamElementQueueEntry实现了AsyncResult接口,它定义了onComplete方法用于结果完成时的回调处理,同时它还定义了抽象方法getFuture供子类实现;它有两个子类,分别是WatermarkQueueEntry及StreamRecordQueueEntry

WatermarkQueueEntry

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/WatermarkQueueEntry.java

@Internal
public class WatermarkQueueEntry extends StreamElementQueueEntry<Watermark> implements AsyncWatermarkResult {

    private final CompletableFuture<Watermark> future;

    public WatermarkQueueEntry(Watermark watermark) {
        super(watermark);

        this.future = CompletableFuture.completedFuture(watermark);
    }

    @Override
    public Watermark getWatermark() {
        return (Watermark) getStreamElement();
    }

    @Override
    protected CompletableFuture<Watermark> getFuture() {
        return future;
    }
}
  • WatermarkQueueEntry继承了StreamElementQueueEntry,其元素类型为Watermark,同时实现了AsyncWatermarkResult接口

StreamRecordQueueEntry

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/async/queue/StreamRecordQueueEntry.java

@Internal
public class StreamRecordQueueEntry<OUT> extends StreamElementQueueEntry<Collection<OUT>>
    implements AsyncCollectionResult<OUT>, ResultFuture<OUT> {

    /** Timestamp information. */
    private final boolean hasTimestamp;
    private final long timestamp;

    /** Future containing the collection result. */
    private final CompletableFuture<Collection<OUT>> resultFuture;

    public StreamRecordQueueEntry(StreamRecord<?> streamRecord) {
        super(streamRecord);

        hasTimestamp = streamRecord.hasTimestamp();
        timestamp = streamRecord.getTimestamp();

        resultFuture = new CompletableFuture<>();
    }

    @Override
    public boolean hasTimestamp() {
        return hasTimestamp;
    }

    @Override
    public long getTimestamp() {
        return timestamp;
    }

    @Override
    public Collection<OUT> get() throws Exception {
        return resultFuture.get();
    }

    @Override
    protected CompletableFuture<Collection<OUT>> getFuture() {
        return resultFuture;
    }

    @Override
    public void complete(Collection<OUT> result) {
        resultFuture.complete(result);
    }

    @Override
    public void completeExceptionally(Throwable error) {
        resultFuture.completeExceptionally(error);
    }
}
  • StreamRecordQueueEntry继承了StreamElementQueueEntry,同时实现了AsyncCollectionResult、ResultFuture接口

小结

  • AsyncWaitOperator继承了AbstractUdfStreamOperator,覆盖了AbstractUdfStreamOperator的setup、open、initializeState、close、dispose方法;实现了OneInputStreamOperator接口定义的processElement、processWatermark、processLatencyMarker方法;实现了OperatorActions定义的failOperator方法;open方法使用Emitter创建并启动AsyncIO-Emitter-Thread
  • Emitter实现了Runnable接口,它主要负责从StreamElementQueue取出element,然后输出到TimestampedCollector;其run方法就是不断循环调用streamElementQueue.peekBlockingly()阻塞获取AsyncResult,获取到之后就调用output方法将result输出出去
  • StreamElementQueue接口主要定义了AsyncWaitOperator所要用的blocking stream element queue的接口;它定义了put、tryPut、peekBlockingly、poll、values、isEmpty、size方法;StreamElementQueue接口有两个子类分别是UnorderedStreamElementQueue及OrderedStreamElementQueue;队列元素类型为StreamElementQueueEntry,StreamElementQueueEntry实现了AsyncResult接口,它定义了onComplete方法用于结果完成时的回调处理,同时它还定义了抽象方法getFuture供子类实现;它有两个子类,分别是WatermarkQueueEntry及StreamRecordQueueEntry

doc


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Landing Page Optimization

Landing Page Optimization

Tim Ash / Wiley Publishing / 2008-1-29 / USD 29.99

在线阅读本书 How much money are you losing because of poor landing page design? In this comprehensive, step-by-step guide, you’ll learn all the skills necessary to dramatically improve your bottom li......一起来看看 《Landing Page Optimization》 这本书的介绍吧!

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具