聊聊Elasticsearch的TaskScheduler

栏目: 后端 · 发布时间: 5年前

内容简介:本文主要研究一下Elasticsearch的TaskSchedulerelasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/TaskScheduler.javaelasticsearch-7.0.1/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java

本文主要研究一下Elasticsearch的TaskScheduler

TaskScheduler

elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/TaskScheduler.java

public class TaskScheduler {

    private final PriorityQueue<DelayedTask> tasks = new PriorityQueue<>(Comparator.comparingLong(DelayedTask::getDeadline));

    /**
     * Schedule a task at the defined relative nanotime. When {@link #pollTask(long)} is called with a
     * relative nanotime after the scheduled time, the task will be returned. This method returns a
     * {@link Runnable} that can be run to cancel the scheduled task.
     *
     * @param task to schedule
     * @param relativeNanos defining when to execute the task
     * @return runnable that will cancel the task
     */
    public Runnable scheduleAtRelativeTime(Runnable task, long relativeNanos) {
        DelayedTask delayedTask = new DelayedTask(relativeNanos, task);
        tasks.offer(delayedTask);
        return delayedTask;
    }

    Runnable pollTask(long relativeNanos) {
        DelayedTask task;
        while ((task = tasks.peek()) != null) {
            if (relativeNanos - task.deadline >= 0) {
                tasks.remove();
                if (task.cancelled == false) {
                    return task.runnable;
                }
            } else {
                return null;
            }
        }
        return null;
    }

    long nanosUntilNextTask(long relativeNanos) {
        DelayedTask nextTask = tasks.peek();
        if (nextTask == null) {
            return Long.MAX_VALUE;
        } else {
            return Math.max(nextTask.deadline - relativeNanos, 0);
        }
    }

    private static class DelayedTask implements Runnable {

        private final long deadline;
        private final Runnable runnable;
        private boolean cancelled = false;

        private DelayedTask(long deadline, Runnable runnable) {
            this.deadline = deadline;
            this.runnable = runnable;
        }

        private long getDeadline() {
            return deadline;
        }

        @Override
        public void run() {
            cancelled = true;
        }
    }
}
  • TaskScheduler定义了DelayedTask,它实现了Runnable接口,它包含deadline、runnable、cancelled三个属性
  • TaskScheduler定义了DelayedTask类型的PriorityQueue,其comparator为Comparator.comparingLong(DelayedTask::getDeadline)
  • scheduleAtRelativeTime方法将runnable包装为delayedTask,然后offer到priorityQueue中;pollTask则peek出来task,如果不为null则判断relativeNanos是否大于等于task.deadline,条件成立的话则将其从tasks中移除,然后在cancelled为false的时候返回task.runnable

SSLChannelContext

elasticsearch-7.0.1/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java

public final class SSLChannelContext extends SocketChannelContext {
    //......

    @Override
    public void queueWriteOperation(WriteOperation writeOperation) {
        getSelector().assertOnSelectorThread();
        if (writeOperation instanceof CloseNotifyOperation) {
            sslDriver.initiateClose();
            long relativeNanos = CLOSE_TIMEOUT_NANOS + System.nanoTime();
            closeTimeoutCanceller = getSelector().getTaskScheduler().scheduleAtRelativeTime(this::channelCloseTimeout, relativeNanos);
        } else {
            super.queueWriteOperation(writeOperation);
        }
    }

    private void channelCloseTimeout() {
        closeTimeoutCanceller = DEFAULT_TIMEOUT_CANCELLER;
        setCloseNow();
        getSelector().queueChannelClose(channel);
    }

    //......
}
  • SSLChannelContext的queueWriteOperation方法会使用taskScheduler的scheduleAtRelativeTime注册一个channelCloseTimeout的延时任务

NioSelector

elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java

public class NioSelector implements Closeable {

    //......

    public void runLoop() {
        if (runLock.tryLock()) {
            isRunningFuture.complete(null);
            try {
                setThread();
                while (isOpen()) {
                    singleLoop();
                }
            } finally {
                try {
                    cleanupAndCloseChannels();
                } finally {
                    try {
                        selector.close();
                    } catch (IOException e) {
                        eventHandler.selectorException(e);
                    } finally {
                        runLock.unlock();
                        exitedLoop.countDown();
                    }
                }
            }
        } else {
            throw new IllegalStateException("selector is already running");
        }
    }

    void singleLoop() {
        try {
            closePendingChannels();
            preSelect();
            long nanosUntilNextTask = taskScheduler.nanosUntilNextTask(System.nanoTime());
            int ready;
            if (nanosUntilNextTask == 0) {
                ready = selector.selectNow();
            } else {
                long millisUntilNextTask = TimeUnit.NANOSECONDS.toMillis(nanosUntilNextTask);
                // Only select until the next task needs to be run. Do not select with a value of 0 because
                // that blocks without a timeout.
                ready = selector.select(Math.min(300, Math.max(millisUntilNextTask, 1)));
            }
            if (ready > 0) {
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
                while (keyIterator.hasNext()) {
                    SelectionKey sk = keyIterator.next();
                    keyIterator.remove();
                    if (sk.isValid()) {
                        try {
                            processKey(sk);
                        } catch (CancelledKeyException cke) {
                            eventHandler.genericChannelException((ChannelContext<?>) sk.attachment(),  cke);
                        }
                    } else {
                        eventHandler.genericChannelException((ChannelContext<?>) sk.attachment(),  new CancelledKeyException());
                    }
                }
            }

            handleScheduledTasks(System.nanoTime());
        } catch (ClosedSelectorException e) {
            if (isOpen()) {
                throw e;
            }
        } catch (IOException e) {
            eventHandler.selectorException(e);
        } catch (Exception e) {
            eventHandler.uncaughtException(e);
        }
    }

    private void handleScheduledTasks(long nanoTime) {
        Runnable task;
        while ((task = taskScheduler.pollTask(nanoTime)) != null) {
            try {
                task.run();
            } catch (Exception e) {
                eventHandler.taskException(e);
            }
        }
    }

    //......
}
  • NioSelector的runLoop方法调用了singleLoop方法,后者调用了handleScheduledTasks方法,而handleScheduledTasks方法则是从taskScheduler.pollTask,然后执行task.run()

小结

  • TaskScheduler定义了DelayedTask,它实现了Runnable接口,它包含deadline、runnable、cancelled三个属性
  • TaskScheduler定义了DelayedTask类型的PriorityQueue,其comparator为Comparator.comparingLong(DelayedTask::getDeadline)
  • scheduleAtRelativeTime方法将runnable包装为delayedTask,然后offer到priorityQueue中;pollTask则peek出来task,如果不为null则判断relativeNanos是否大于等于task.deadline,条件成立的话则将其从tasks中移除,然后在cancelled为false的时候返回task.runnable

doc


以上所述就是小编给大家介绍的《聊聊Elasticsearch的TaskScheduler》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

产品经理全栈运营实战笔记

产品经理全栈运营实战笔记

林俊宇 / 化学工业出版社 / 49.8元

本书凝结作者多年的产品运营经验,读者会看到很多创业公司做运营的经验,书中列举了几十个互联网产品的运营案例去解析如何真正做好一个产品的冷启动到发展期再到平稳期。本书主要分为六篇:互联网运营的全面貌;我的运营生涯;后产品时代的运营之道;揭秘刷屏事件的背后运营;技能学习;深度思考。本书有很多关于产品运营的基础知识,会帮助你做好、做透。而且将理论和作者自己的案例以及其他人的运营案例结合起来,会让读者更容易......一起来看看 《产品经理全栈运营实战笔记》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

随机密码生成器
随机密码生成器

多种字符组合密码

URL 编码/解码
URL 编码/解码

URL 编码/解码