内容简介:HashedWheelTimer是采用一种定时轮的方式来管理和维护大量的Timer调度算法.Linux 内核中的定时器采用的就是这个方案。一个HashedWheelTimer是环形结构,类似一个时钟,分为很多槽,一个槽代表一个时间间隔,每个槽又对应一个类似Map结构的对象,使用双向链表存储定时任务,指针周期性的跳动,跳动到一个槽位,就执行该槽位的定时任务。环形结构可以根据超时时间的 hash 值(这个 hash 值实际上就是ticks & mask)将 task 分布到不同的槽位中, 当 tick 到那个槽
1、原理
HashedWheelTimer是采用一种定时轮的方式来管理和维护大量的Timer调度算法.Linux 内核中的定时器采用的就是这个方案。
一个HashedWheelTimer是环形结构,类似一个时钟,分为很多槽,一个槽代表一个时间间隔,每个槽又对应一个类似Map结构的对象,使用双向链表存储定时任务,指针周期性的跳动,跳动到一个槽位,就执行该槽位的定时任务。
环形结构可以根据超时时间的 hash 值(这个 hash 值实际上就是ticks & mask)将 task 分布到不同的槽位中, 当 tick 到那个槽位时, 只需要遍历那个槽位的 task 即可知道哪些任务会超时(而使用线性结构, 你每次 tick 都需要遍历所有 task), 所以, 我们任务量大的时候, 相应的增加 wheel 的 ticksPerWheel 值, 可以减少 tick 时遍历任务的个数.
2、结构图
3、效率
3.1优点
- 可以添加、删除、取消定时任务
- 能高效的处理大批定时任务
3.2缺点
- 对内存要求较高,占用较高的内存
- 时间精度要求不高
4、结合源码分析
首先来看HashedWheelTimer的构造函数,HashedWheelTimer有很多构造方法,但是最后都是调用一个:
public HashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, long maxPendingTimeouts) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } if (unit == null) { throw new NullPointerException("unit"); } if (tickDuration <= 0) { throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration); } if (ticksPerWheel <= 0) { throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel); } // Normalize ticksPerWheel to power of two and initialize the wheel. // 构造时间轮的槽位数,槽位数只能是2的幂次方 wheel = createWheel(ticksPerWheel); // 时间轮槽位数 mask = wheel.length - 1; // Convert tickDuration to nanos. // 初始化时间周期 this.tickDuration = unit.toNanos(tickDuration); // Prevent overflow. if (this.tickDuration >= Long.MAX_VALUE / wheel.length) { throw new IllegalArgumentException(String.format( "tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length)); } // 初始化轮询时间轮的线程,使用这个线程周期性的轮询时间轮 workerThread = threadFactory.newThread(worker); this.maxPendingTimeouts = maxPendingTimeouts; if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT && WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) { reportTooManyInstances(); } }
时间轮实际就是一个HashedWeelBucket数组,上面这个构造方法就是在初始化这个数组,槽位数就是数组长度,tickDuration是时间周期,workerThread线程用来轮询数组;
private static HashedWheelBucket[] createWheel(int ticksPerWheel) { if (ticksPerWheel <= 0) { throw new IllegalArgumentException( "ticksPerWheel must be greater than 0: " + ticksPerWheel); } if (ticksPerWheel > 1073741824) { throw new IllegalArgumentException( "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel); } // HashedWheelBucket数组长度是2的幂次方,获取<=ticksPerWheel最大的2的幂次方 ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel]; for (int i = 0; i < wheel.length; i++) { wheel[i] = new HashedWheelBucket(); } return wheel; }
初始化的HashedWheelBucket数组的长度必须是2的幂次方。HashedWheelTimer初始化完了,记下来就是如何向时间轮里添加定时任务,其实很简单,只要调用newTimeOut()方法即可
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { if (task == null) { throw new NullPointerException("task"); } if (unit == null) { throw new NullPointerException("unit"); } long pendingTimeoutsCount = pendingTimeouts.incrementAndGet(); if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { pendingTimeouts.decrementAndGet(); throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); } // 开启时间轮轮询 start(); // Add the timeout to the timeout queue which will be processed on the next tick. // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket. long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; // Guard against overflow. if (delay > 0 && deadline < 0) { deadline = Long.MAX_VALUE; } // 将定时任务封装成HashedWheelTimeout HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); // 将定时任务存储到任务链表中 timeouts.add(timeout); return timeout; }
在newTimeOut()方法中会去开启轮询时间轮的线程(即workerThread),接下来在看如何轮询:
public void start() { // 判断HashedWheelTimer状态,如果状态开启,则开启轮询线程 switch (WORKER_STATE_UPDATER.get(this)) { case WORKER_STATE_INIT: if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { workerThread.start(); } break; case WORKER_STATE_STARTED: break; case WORKER_STATE_SHUTDOWN: throw new IllegalStateException("cannot be started once stopped"); default: throw new Error("Invalid WorkerState"); } // Wait until the startTime is initialized by the worker. while (startTime == 0) { try { // 阻塞当前线程,目的是保证轮询线程workerThread开启 startTimeInitialized.await(); } catch (InterruptedException ignore) { // Ignore - it will be ready very soon. } } }
在这个方法中会去开启workerThread线程,执行workerThread线程中run()方法
public void run() { // Initialize the startTime. // 初始化开始时间 startTime = System.nanoTime(); if (startTime == 0) { // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized. startTime = 1; } // Notify the other threads waiting for the initialization at start(). // 唤醒阻塞的线程 startTimeInitialized.countDown(); do { // 根据周期时间tickDuration,进行周期性的tick下一个槽位 final long deadline = waitForNextTick(); if (deadline > 0) { // 获取下一个槽位的角标 int idx = (int) (tick & mask); processCancelledTasks(); // 获取该角标对应的HashedWheelBucket对象 HashedWheelBucket bucket = wheel[idx]; // 将存储在链表timeOuts中的定时任务存储到对应的槽位的HashedWheelBucket对象中 transferTimeoutsToBuckets(); // 执行槽位中定时任务 bucket.expireTimeouts(deadline); tick++; } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); // Fill the unprocessedTimeouts so we can return them from stop() method. for (HashedWheelBucket bucket : wheel) { bucket.clearTimeouts(unprocessedTimeouts); } for (; ; ) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { break; } if (!timeout.isCancelled()) { unprocessedTimeouts.add(timeout); } } processCancelledTasks(); }
在上面方法中,轮询时间轮,执行对应槽位的定时任务,在执行之前,会先将存储在链表中任务按照各自的时间放入对应的槽位中,接下来咱们来看如何根据周期时间进行tick
private long waitForNextTick() { // 获取下一个槽位的等待时间 long deadline = tickDuration * (tick + 1); for (; ; ) { // 获取当前时间间隔 final long currentTime = System.nanoTime() - startTime; // 计算tick到下一个槽位需要等待的时间 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000; // 当前时间间隔大于等于下一个槽位周期时间,不需要等待,直接返回(从这个地方就可以得出HashedWheelTimer对时间精度要求不高,并不是严格按照延迟时间来执行的) if (sleepTimeMs <= 0) { if (currentTime == Long.MIN_VALUE) { return -Long.MAX_VALUE; } else { return currentTime; } } if (isWindows()) { sleepTimeMs = sleepTimeMs / 10 * 10; } try { // 当前时间间隔小于下一个槽位周期时间,则进行休眠 Thread.sleep(sleepTimeMs); } catch (InterruptedException ignored) { if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) { return Long.MIN_VALUE; } } } }
分析了如何实现时间间隔轮询,接下来分析如何将任务存储到HashedWheelBucket中
private void transferTimeoutsToBuckets() { // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just // adds new timeouts in a loop. // 遍历timeouts链表,默认遍历链表100000个任务 for (int i = 0; i < 100000; i++) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { // all processed break; } // 任务的状态等于取消,直接跳过 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) { // Was cancelled in the meantime. continue; } // 设置任务需要轮询的圈数,如:槽位=8,周期tickDuration=100ms,任务时间=900ms,则说明需要轮询一圈后,才能会执行到该任务,即remainingRounds= 1,槽位角标stopIndex=1 long calculated = timeout.deadline / tickDuration; timeout.remainingRounds = (calculated - tick) / wheel.length; // Ensure we don't schedule for past. final long ticks = Math.max(calculated, tick); int stopIndex = (int) (ticks & mask); HashedWheelBucket bucket = wheel[stopIndex]; // 将定时任务存储到对应的HashedWheelBucket槽位中 bucket.addTimeout(timeout); } }
HashedWheelBucket是一个包含双向链表的对象,addTimeout将任务存储到链表的末端
void expireTimeouts(long deadline) { // 获取链表表头任务 HashedWheelTimeout timeout = head; // process all timeouts while (timeout != null) { // 获取表头的下一个任务 HashedWheelTimeout next = timeout.next; if (timeout.remainingRounds <= 0) { // 将要执行的任务从链表中删除 next = remove(timeout); // 任务的时间小于间隔时间,执行任务 if (timeout.deadline <= deadline) { // 执行任务 timeout.expire(); } else { // The timeout was placed into a wrong slot. This should never happen. throw new IllegalStateException(String.format( "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } } else if (timeout.isCancelled()) { next = remove(timeout); } else { timeout.remainingRounds--; } timeout = next; } }
上面这个方法就是遍历槽位中链表中的任务进行执行
public void expire() { if (!compareAndSetState(ST_INIT, ST_EXPIRED)) { return; } try { // **这个地方就是真正执行封装的task任务,执行具体的任务逻辑** task.run(this); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t); } } }
以上就是HashedWheelTimer执行的整个过程,在分析的过程中最好还是结合具体的实例去分析,这样会更有利于自己的理解。
以上所述就是小编给大家介绍的《HashedWheelTimer定时任务算法解析》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
The Ruby Programming Language
David Flanagan、Yukihiro Matsumoto / O'Reilly Media, Inc. / 2008 / USD 39.99
Ruby has gained some attention through the popular Ruby on Rails web development framework, but the language alone is worthy of more consideration -- a lot more. This book offers a definition explanat......一起来看看 《The Ruby Programming Language》 这本书的介绍吧!