HashedWheelTimer定时任务算法解析

栏目: 编程工具 · 发布时间: 5年前

内容简介:HashedWheelTimer是采用一种定时轮的方式来管理和维护大量的Timer调度算法.Linux 内核中的定时器采用的就是这个方案。一个HashedWheelTimer是环形结构,类似一个时钟,分为很多槽,一个槽代表一个时间间隔,每个槽又对应一个类似Map结构的对象,使用双向链表存储定时任务,指针周期性的跳动,跳动到一个槽位,就执行该槽位的定时任务。环形结构可以根据超时时间的 hash 值(这个 hash 值实际上就是ticks & mask)将 task 分布到不同的槽位中, 当 tick 到那个槽

1、原理

HashedWheelTimer是采用一种定时轮的方式来管理和维护大量的Timer调度算法.Linux 内核中的定时器采用的就是这个方案。

一个HashedWheelTimer是环形结构,类似一个时钟,分为很多槽,一个槽代表一个时间间隔,每个槽又对应一个类似Map结构的对象,使用双向链表存储定时任务,指针周期性的跳动,跳动到一个槽位,就执行该槽位的定时任务。

环形结构可以根据超时时间的 hash 值(这个 hash 值实际上就是ticks & mask)将 task 分布到不同的槽位中, 当 tick 到那个槽位时, 只需要遍历那个槽位的 task 即可知道哪些任务会超时(而使用线性结构, 你每次 tick 都需要遍历所有 task), 所以, 我们任务量大的时候, 相应的增加 wheel 的 ticksPerWheel 值, 可以减少 tick 时遍历任务的个数.

2、结构图

HashedWheelTimer定时任务算法解析

3、效率

3.1优点

  1. 可以添加、删除、取消定时任务
  2. 能高效的处理大批定时任务

3.2缺点

  1. 对内存要求较高,占用较高的内存
  2. 时间精度要求不高

4、结合源码分析

首先来看HashedWheelTimer的构造函数,HashedWheelTimer有很多构造方法,但是最后都是调用一个:

public HashedWheelTimer(
            ThreadFactory threadFactory,
            long tickDuration, TimeUnit unit, int ticksPerWheel,
            long maxPendingTimeouts) {

        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }
        if (tickDuration <= 0) {
            throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
        }
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
        }

        // Normalize ticksPerWheel to power of two and initialize the wheel.
        // 构造时间轮的槽位数,槽位数只能是2的幂次方
        wheel = createWheel(ticksPerWheel);
        // 时间轮槽位数
        mask = wheel.length - 1;

        // Convert tickDuration to nanos.
        // 初始化时间周期
        this.tickDuration = unit.toNanos(tickDuration);

        // Prevent overflow.
        if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
            throw new IllegalArgumentException(String.format(
                    "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
                    tickDuration, Long.MAX_VALUE / wheel.length));
        }
        // 初始化轮询时间轮的线程,使用这个线程周期性的轮询时间轮
        workerThread = threadFactory.newThread(worker);

        this.maxPendingTimeouts = maxPendingTimeouts;

        if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
                WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
            reportTooManyInstances();
        }
    }

时间轮实际就是一个HashedWeelBucket数组,上面这个构造方法就是在初始化这个数组,槽位数就是数组长度,tickDuration是时间周期,workerThread线程用来轮询数组;

private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException(
                    "ticksPerWheel must be greater than 0: " + ticksPerWheel);
        }
        if (ticksPerWheel > 1073741824) {
            throw new IllegalArgumentException(
                    "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
        }
        // HashedWheelBucket数组长度是2的幂次方,获取<=ticksPerWheel最大的2的幂次方
        ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
        HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
        for (int i = 0; i < wheel.length; i++) {
            wheel[i] = new HashedWheelBucket();
        }
        return wheel;
    }

初始化的HashedWheelBucket数组的长度必须是2的幂次方。HashedWheelTimer初始化完了,记下来就是如何向时间轮里添加定时任务,其实很简单,只要调用newTimeOut()方法即可

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts ("
                    + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                    + "timeouts (" + maxPendingTimeouts + ")");
        }

        // 开启时间轮轮询
        start();

        // Add the timeout to the timeout queue which will be processed on the next tick.
        // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

        // Guard against overflow.
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
        // 将定时任务封装成HashedWheelTimeout
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        // 将定时任务存储到任务链表中
        timeouts.add(timeout);
        return timeout;
    }

在newTimeOut()方法中会去开启轮询时间轮的线程(即workerThread),接下来在看如何轮询:

public void start() {
        // 判断HashedWheelTimer状态,如果状态开启,则开启轮询线程
        switch (WORKER_STATE_UPDATER.get(this)) {
            case WORKER_STATE_INIT:
                if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                    workerThread.start();
                }
                break;
            case WORKER_STATE_STARTED:
                break;
            case WORKER_STATE_SHUTDOWN:
                throw new IllegalStateException("cannot be started once stopped");
            default:
                throw new Error("Invalid WorkerState");
        }

        // Wait until the startTime is initialized by the worker.
        while (startTime == 0) {
            try {
                // 阻塞当前线程,目的是保证轮询线程workerThread开启
                startTimeInitialized.await();
            } catch (InterruptedException ignore) {
                // Ignore - it will be ready very soon.
            }
        }
    }

在这个方法中会去开启workerThread线程,执行workerThread线程中run()方法

public void run() {
            // Initialize the startTime.
            // 初始化开始时间
            startTime = System.nanoTime();
            if (startTime == 0) {
                // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
                startTime = 1;
            }

            // Notify the other threads waiting for the initialization at start().
            // 唤醒阻塞的线程
            startTimeInitialized.countDown();

            do {
                // 根据周期时间tickDuration,进行周期性的tick下一个槽位
                final long deadline = waitForNextTick();
                if (deadline > 0) {
                    // 获取下一个槽位的角标
                    int idx = (int) (tick & mask);
                    processCancelledTasks();
                    // 获取该角标对应的HashedWheelBucket对象
                    HashedWheelBucket bucket =
                            wheel[idx];
                    // 将存储在链表timeOuts中的定时任务存储到对应的槽位的HashedWheelBucket对象中        
                    transferTimeoutsToBuckets();
                    // 执行槽位中定时任务
                    bucket.expireTimeouts(deadline);
                    tick++;
                }
            } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

            // Fill the unprocessedTimeouts so we can return them from stop() method.
            for (HashedWheelBucket bucket : wheel) {
                bucket.clearTimeouts(unprocessedTimeouts);
            }
            for (; ; ) {
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    break;
                }
                if (!timeout.isCancelled()) {
                    unprocessedTimeouts.add(timeout);
                }
            }
            processCancelledTasks();
        }

在上面方法中,轮询时间轮,执行对应槽位的定时任务,在执行之前,会先将存储在链表中任务按照各自的时间放入对应的槽位中,接下来咱们来看如何根据周期时间进行tick

private long waitForNextTick() {
            // 获取下一个槽位的等待时间
            long deadline = tickDuration * (tick + 1);

            for (; ; ) {
                // 获取当前时间间隔
                final long currentTime = System.nanoTime() - startTime;
                // 计算tick到下一个槽位需要等待的时间
                long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

                // 当前时间间隔大于等于下一个槽位周期时间,不需要等待,直接返回(从这个地方就可以得出HashedWheelTimer对时间精度要求不高,并不是严格按照延迟时间来执行的)
                if (sleepTimeMs <= 0) {
                    if (currentTime == Long.MIN_VALUE) {
                        return -Long.MAX_VALUE;
                    } else {
                        return currentTime;
                    }
                }
                if (isWindows()) {
                    sleepTimeMs = sleepTimeMs / 10 * 10;
                }

                try {
                    // 当前时间间隔小于下一个槽位周期时间,则进行休眠
                    Thread.sleep(sleepTimeMs);
                } catch (InterruptedException ignored) {
                    if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                        return Long.MIN_VALUE;
                    }
                }
            }
        }

分析了如何实现时间间隔轮询,接下来分析如何将任务存储到HashedWheelBucket中

private void transferTimeoutsToBuckets() {
            // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
            // adds new timeouts in a loop.
            // 遍历timeouts链表,默认遍历链表100000个任务
            for (int i = 0; i < 100000; i++) {
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    // all processed
                    break;
                }
                // 任务的状态等于取消,直接跳过
                if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
                    // Was cancelled in the meantime.
                    continue;
                }

                // 设置任务需要轮询的圈数,如:槽位=8,周期tickDuration=100ms,任务时间=900ms,则说明需要轮询一圈后,才能会执行到该任务,即remainingRounds= 1,槽位角标stopIndex=1
                long calculated = timeout.deadline / tickDuration;
                timeout.remainingRounds = (calculated - tick) / wheel.length;

                // Ensure we don't schedule for past.
                final long ticks = Math.max(calculated, tick);
                int stopIndex = (int) (ticks & mask);

                HashedWheelBucket bucket = wheel[stopIndex];
                // 将定时任务存储到对应的HashedWheelBucket槽位中
                bucket.addTimeout(timeout);
            }
        }

HashedWheelBucket是一个包含双向链表的对象,addTimeout将任务存储到链表的末端

void expireTimeouts(long deadline) {
            // 获取链表表头任务
            HashedWheelTimeout timeout = head;

            // process all timeouts
            while (timeout != null) {
                // 获取表头的下一个任务
                HashedWheelTimeout next = timeout.next;
                if (timeout.remainingRounds <= 0) {
                    // 将要执行的任务从链表中删除
                    next = remove(timeout);
                    // 任务的时间小于间隔时间,执行任务
                    if (timeout.deadline <= deadline) {
                        // 执行任务
                        timeout.expire();
                    } else {
                        // The timeout was placed into a wrong slot. This should never happen.
                        throw new IllegalStateException(String.format(
                                "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                    }
                } else if (timeout.isCancelled()) {
                    next = remove(timeout);
                } else {
                    timeout.remainingRounds--;
                }
                timeout = next;
            }
        }

上面这个方法就是遍历槽位中链表中的任务进行执行

public void expire() {
            if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
                return;
            }

            try {
                // **这个地方就是真正执行封装的task任务,执行具体的任务逻辑**
                task.run(this);
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
                }
            }
        }

以上就是HashedWheelTimer执行的整个过程,在分析的过程中最好还是结合具体的实例去分析,这样会更有利于自己的理解。


以上所述就是小编给大家介绍的《HashedWheelTimer定时任务算法解析》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Computer Age Statistical Inference

Computer Age Statistical Inference

Bradley Efron、Trevor Hastie / Cambridge University Press / 2016-7-21 / USD 74.99

The twenty-first century has seen a breathtaking expansion of statistical methodology, both in scope and in influence. 'Big data', 'data science', and 'machine learning' have become familiar terms in ......一起来看看 《Computer Age Statistical Inference》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具