内容简介:本文主要研究一下Elasticsearch的TaskSchedulerelasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/TaskScheduler.javaelasticsearch-7.0.1/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java
序
本文主要研究一下Elasticsearch的TaskScheduler
TaskScheduler
elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/TaskScheduler.java
public class TaskScheduler { private final PriorityQueue<DelayedTask> tasks = new PriorityQueue<>(Comparator.comparingLong(DelayedTask::getDeadline)); /** * Schedule a task at the defined relative nanotime. When {@link #pollTask(long)} is called with a * relative nanotime after the scheduled time, the task will be returned. This method returns a * {@link Runnable} that can be run to cancel the scheduled task. * * @param task to schedule * @param relativeNanos defining when to execute the task * @return runnable that will cancel the task */ public Runnable scheduleAtRelativeTime(Runnable task, long relativeNanos) { DelayedTask delayedTask = new DelayedTask(relativeNanos, task); tasks.offer(delayedTask); return delayedTask; } Runnable pollTask(long relativeNanos) { DelayedTask task; while ((task = tasks.peek()) != null) { if (relativeNanos - task.deadline >= 0) { tasks.remove(); if (task.cancelled == false) { return task.runnable; } } else { return null; } } return null; } long nanosUntilNextTask(long relativeNanos) { DelayedTask nextTask = tasks.peek(); if (nextTask == null) { return Long.MAX_VALUE; } else { return Math.max(nextTask.deadline - relativeNanos, 0); } } private static class DelayedTask implements Runnable { private final long deadline; private final Runnable runnable; private boolean cancelled = false; private DelayedTask(long deadline, Runnable runnable) { this.deadline = deadline; this.runnable = runnable; } private long getDeadline() { return deadline; } @Override public void run() { cancelled = true; } } }
- TaskScheduler定义了DelayedTask,它实现了Runnable接口,它包含deadline、runnable、cancelled三个属性
- TaskScheduler定义了DelayedTask类型的PriorityQueue,其comparator为Comparator.comparingLong(DelayedTask::getDeadline)
- scheduleAtRelativeTime方法将runnable包装为delayedTask,然后offer到priorityQueue中;pollTask则peek出来task,如果不为null则判断relativeNanos是否大于等于task.deadline,条件成立的话则将其从tasks中移除,然后在cancelled为false的时候返回task.runnable
SSLChannelContext
elasticsearch-7.0.1/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java
public final class SSLChannelContext extends SocketChannelContext { //...... @Override public void queueWriteOperation(WriteOperation writeOperation) { getSelector().assertOnSelectorThread(); if (writeOperation instanceof CloseNotifyOperation) { sslDriver.initiateClose(); long relativeNanos = CLOSE_TIMEOUT_NANOS + System.nanoTime(); closeTimeoutCanceller = getSelector().getTaskScheduler().scheduleAtRelativeTime(this::channelCloseTimeout, relativeNanos); } else { super.queueWriteOperation(writeOperation); } } private void channelCloseTimeout() { closeTimeoutCanceller = DEFAULT_TIMEOUT_CANCELLER; setCloseNow(); getSelector().queueChannelClose(channel); } //...... }
- SSLChannelContext的queueWriteOperation方法会使用taskScheduler的scheduleAtRelativeTime注册一个channelCloseTimeout的延时任务
NioSelector
elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java
public class NioSelector implements Closeable { //...... public void runLoop() { if (runLock.tryLock()) { isRunningFuture.complete(null); try { setThread(); while (isOpen()) { singleLoop(); } } finally { try { cleanupAndCloseChannels(); } finally { try { selector.close(); } catch (IOException e) { eventHandler.selectorException(e); } finally { runLock.unlock(); exitedLoop.countDown(); } } } } else { throw new IllegalStateException("selector is already running"); } } void singleLoop() { try { closePendingChannels(); preSelect(); long nanosUntilNextTask = taskScheduler.nanosUntilNextTask(System.nanoTime()); int ready; if (nanosUntilNextTask == 0) { ready = selector.selectNow(); } else { long millisUntilNextTask = TimeUnit.NANOSECONDS.toMillis(nanosUntilNextTask); // Only select until the next task needs to be run. Do not select with a value of 0 because // that blocks without a timeout. ready = selector.select(Math.min(300, Math.max(millisUntilNextTask, 1))); } if (ready > 0) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectionKeys.iterator(); while (keyIterator.hasNext()) { SelectionKey sk = keyIterator.next(); keyIterator.remove(); if (sk.isValid()) { try { processKey(sk); } catch (CancelledKeyException cke) { eventHandler.genericChannelException((ChannelContext<?>) sk.attachment(), cke); } } else { eventHandler.genericChannelException((ChannelContext<?>) sk.attachment(), new CancelledKeyException()); } } } handleScheduledTasks(System.nanoTime()); } catch (ClosedSelectorException e) { if (isOpen()) { throw e; } } catch (IOException e) { eventHandler.selectorException(e); } catch (Exception e) { eventHandler.uncaughtException(e); } } private void handleScheduledTasks(long nanoTime) { Runnable task; while ((task = taskScheduler.pollTask(nanoTime)) != null) { try { task.run(); } catch (Exception e) { eventHandler.taskException(e); } } } //...... }
- NioSelector的runLoop方法调用了singleLoop方法,后者调用了handleScheduledTasks方法,而handleScheduledTasks方法则是从taskScheduler.pollTask,然后执行task.run()
小结
- TaskScheduler定义了DelayedTask,它实现了Runnable接口,它包含deadline、runnable、cancelled三个属性
- TaskScheduler定义了DelayedTask类型的PriorityQueue,其comparator为Comparator.comparingLong(DelayedTask::getDeadline)
- scheduleAtRelativeTime方法将runnable包装为delayedTask,然后offer到priorityQueue中;pollTask则peek出来task,如果不为null则判断relativeNanos是否大于等于task.deadline,条件成立的话则将其从tasks中移除,然后在cancelled为false的时候返回task.runnable
doc
以上所述就是小编给大家介绍的《聊聊Elasticsearch的TaskScheduler》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Large-Scale Inference
Bradley Efron / Cambridge University Press / 2010-8-5 / GBP 48.00
We live in a new age for statistical inference, where modern scientific technology such as microarrays and fMRI machines routinely produce thousands and sometimes millions of parallel data sets, each ......一起来看看 《Large-Scale Inference》 这本书的介绍吧!