内容简介:本文主要研究一下Elasticsearch的TaskSchedulerelasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/TaskScheduler.javaelasticsearch-7.0.1/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java
序
本文主要研究一下Elasticsearch的TaskScheduler
TaskScheduler
elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/TaskScheduler.java
public class TaskScheduler {
private final PriorityQueue<DelayedTask> tasks = new PriorityQueue<>(Comparator.comparingLong(DelayedTask::getDeadline));
/**
* Schedule a task at the defined relative nanotime. When {@link #pollTask(long)} is called with a
* relative nanotime after the scheduled time, the task will be returned. This method returns a
* {@link Runnable} that can be run to cancel the scheduled task.
*
* @param task to schedule
* @param relativeNanos defining when to execute the task
* @return runnable that will cancel the task
*/
public Runnable scheduleAtRelativeTime(Runnable task, long relativeNanos) {
DelayedTask delayedTask = new DelayedTask(relativeNanos, task);
tasks.offer(delayedTask);
return delayedTask;
}
Runnable pollTask(long relativeNanos) {
DelayedTask task;
while ((task = tasks.peek()) != null) {
if (relativeNanos - task.deadline >= 0) {
tasks.remove();
if (task.cancelled == false) {
return task.runnable;
}
} else {
return null;
}
}
return null;
}
long nanosUntilNextTask(long relativeNanos) {
DelayedTask nextTask = tasks.peek();
if (nextTask == null) {
return Long.MAX_VALUE;
} else {
return Math.max(nextTask.deadline - relativeNanos, 0);
}
}
private static class DelayedTask implements Runnable {
private final long deadline;
private final Runnable runnable;
private boolean cancelled = false;
private DelayedTask(long deadline, Runnable runnable) {
this.deadline = deadline;
this.runnable = runnable;
}
private long getDeadline() {
return deadline;
}
@Override
public void run() {
cancelled = true;
}
}
}
- TaskScheduler定义了DelayedTask,它实现了Runnable接口,它包含deadline、runnable、cancelled三个属性
- TaskScheduler定义了DelayedTask类型的PriorityQueue,其comparator为Comparator.comparingLong(DelayedTask::getDeadline)
- scheduleAtRelativeTime方法将runnable包装为delayedTask,然后offer到priorityQueue中;pollTask则peek出来task,如果不为null则判断relativeNanos是否大于等于task.deadline,条件成立的话则将其从tasks中移除,然后在cancelled为false的时候返回task.runnable
SSLChannelContext
elasticsearch-7.0.1/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java
public final class SSLChannelContext extends SocketChannelContext {
//......
@Override
public void queueWriteOperation(WriteOperation writeOperation) {
getSelector().assertOnSelectorThread();
if (writeOperation instanceof CloseNotifyOperation) {
sslDriver.initiateClose();
long relativeNanos = CLOSE_TIMEOUT_NANOS + System.nanoTime();
closeTimeoutCanceller = getSelector().getTaskScheduler().scheduleAtRelativeTime(this::channelCloseTimeout, relativeNanos);
} else {
super.queueWriteOperation(writeOperation);
}
}
private void channelCloseTimeout() {
closeTimeoutCanceller = DEFAULT_TIMEOUT_CANCELLER;
setCloseNow();
getSelector().queueChannelClose(channel);
}
//......
}
- SSLChannelContext的queueWriteOperation方法会使用taskScheduler的scheduleAtRelativeTime注册一个channelCloseTimeout的延时任务
NioSelector
elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java
public class NioSelector implements Closeable {
//......
public void runLoop() {
if (runLock.tryLock()) {
isRunningFuture.complete(null);
try {
setThread();
while (isOpen()) {
singleLoop();
}
} finally {
try {
cleanupAndCloseChannels();
} finally {
try {
selector.close();
} catch (IOException e) {
eventHandler.selectorException(e);
} finally {
runLock.unlock();
exitedLoop.countDown();
}
}
}
} else {
throw new IllegalStateException("selector is already running");
}
}
void singleLoop() {
try {
closePendingChannels();
preSelect();
long nanosUntilNextTask = taskScheduler.nanosUntilNextTask(System.nanoTime());
int ready;
if (nanosUntilNextTask == 0) {
ready = selector.selectNow();
} else {
long millisUntilNextTask = TimeUnit.NANOSECONDS.toMillis(nanosUntilNextTask);
// Only select until the next task needs to be run. Do not select with a value of 0 because
// that blocks without a timeout.
ready = selector.select(Math.min(300, Math.max(millisUntilNextTask, 1)));
}
if (ready > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey sk = keyIterator.next();
keyIterator.remove();
if (sk.isValid()) {
try {
processKey(sk);
} catch (CancelledKeyException cke) {
eventHandler.genericChannelException((ChannelContext<?>) sk.attachment(), cke);
}
} else {
eventHandler.genericChannelException((ChannelContext<?>) sk.attachment(), new CancelledKeyException());
}
}
}
handleScheduledTasks(System.nanoTime());
} catch (ClosedSelectorException e) {
if (isOpen()) {
throw e;
}
} catch (IOException e) {
eventHandler.selectorException(e);
} catch (Exception e) {
eventHandler.uncaughtException(e);
}
}
private void handleScheduledTasks(long nanoTime) {
Runnable task;
while ((task = taskScheduler.pollTask(nanoTime)) != null) {
try {
task.run();
} catch (Exception e) {
eventHandler.taskException(e);
}
}
}
//......
}
- NioSelector的runLoop方法调用了singleLoop方法,后者调用了handleScheduledTasks方法,而handleScheduledTasks方法则是从taskScheduler.pollTask,然后执行task.run()
小结
- TaskScheduler定义了DelayedTask,它实现了Runnable接口,它包含deadline、runnable、cancelled三个属性
- TaskScheduler定义了DelayedTask类型的PriorityQueue,其comparator为Comparator.comparingLong(DelayedTask::getDeadline)
- scheduleAtRelativeTime方法将runnable包装为delayedTask,然后offer到priorityQueue中;pollTask则peek出来task,如果不为null则判断relativeNanos是否大于等于task.deadline,条件成立的话则将其从tasks中移除,然后在cancelled为false的时候返回task.runnable
doc
以上所述就是小编给大家介绍的《聊聊Elasticsearch的TaskScheduler》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
美团机器学习实践
美团算法团队 / 人民邮电出版社 / 2018-8-1 / 79.00元
人工智能技术正以一种超快的速度深刻地改变着我们的生活,引导了第四次工业革命。美团作为国内O2O领域领 先的服务平台,结合自身的业务场景和数据,积极进行了人工智能领域的应用探索。在美团的搜索、推荐、计算广告、风控、图像处理等领域,相关的人工智能技术得到广泛的应用。本书包括通用流程、数据挖掘、搜索和推荐、计算广告、深度学习以及算法工程6大部分内容,全面介绍了美团在多个重要方面对机器学习的应用。 ......一起来看看 《美团机器学习实践》 这本书的介绍吧!