【重回基础】线程池源码剖析:Worker工作线程

栏目: 编程工具 · 发布时间: 5年前

内容简介:本文目录:Worker 本身实现了 Runnable 接口,Worker 线程工作流程图:

本文目录:

一、前言
二、源码剖析
    2.1 worker 结构体
    2.2 runWorker:worker 工作主循环
    2.3 getTask() :worker 获取任务方法
    2.4 processWorkerExit:worker 工作结束处理方法
    2.5 addWorker:创建worker线程
复制代码

二、 源码剖析

2.1 worker 结构体

Worker 本身实现了 Runnable 接口,

/**
   * Worker 主要负责管理线程执行、中断。
   * 为防止任务执行时中断,每次执行任务时需要加锁。
   * 锁的实现通过通过继承AbstractQueuedSynchronizer简化。
   * 锁的机制为非重入互斥锁,防止通过 setCorePoolSize 等方法获取到锁,并执行中断等。
   * 另外,Worker初始化时,state设置为-1,防止线程未启动却执行中断。
   */
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    ...
   
    // 该 worker 运行所在线程,便于执行 interrupt 等管理
    final Thread thread;
    // 初始化任务
    Runnable firstTask;
    // 完成任务数
    volatile long completedTasks;
    // 构造函数
    Worker(Runnable firstTask) {
        // 防止线程未开始就执行interrupt
        setState(-1);
        this.firstTask = firstTask;
        // 线程工厂创建线程
        this.thread = getThreadFactory().newThread(this);
    }
    // 工作线程的工作内容,包装在 runWorker 方法
    public void run() {
        runWorker(this);
    }
		// 是否持有独占锁,status 0:否,1:是
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
		// 采用CAS机制尝试将status由0变为1,即持有独占锁
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
		// 释放锁
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
		// 获取独占锁,若已被独占,则进入FIFO队列排队待锁,直到获取到锁
    public void lock()        { acquire(1); }
    // 尝试获取独占锁
    public boolean tryLock()  { return tryAcquire(1); }
    // 释放独占锁
    public void unlock()      { release(1); }
    // 判断是否持有独占锁
    public boolean isLocked() { return isHeldExclusively(); }
		// 将当前 worker 所在线程标记为中断状态
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}
复制代码

2.2 runWorker:worker 工作主循环

Worker 线程工作流程图:

【重回基础】线程池源码剖析:Worker工作线程

下面是源码:

/**
 * Worker 线程的循环工作内容,就是重复不停地从队列中获取任务,并执行。
 *
 * 1. 初始化任务可带可不带。只要线程池状态为 RUNNING ,那么就循环调用 getTask() 获取任务。
 *    循环结果有两种:
 *    (1) getTask() 结果为 null,一般由于线程池状态的变更,或线程池配置参数限制。
 *    (2) task.run() 出现异常,completedAbruptly 会被标记为 true,当前线程中断。
 *     
 * 2. 在执行任何任务之前,会对当前 worker 加上互斥锁,防止 shutdown() 中断操作终止运行中的 worker。
 *    确保除非线程池状态为关闭中,否则线程不能别中断。
 *   
 * 3. 每个任务执行前会调用 beforeExecute(),该方法若抛出异常,会导致当前线程死亡,而没有执行任务。
 *
 * 4. task.run() 任务执行抛出来的任何 RuntimeException、Error、Throwable 都会被收集交给 
 * 	  afterExecute(task, thrown) 方法,并且上抛,导致当前线程的死亡。
 * 
 * 5. afterExecute(task, thrown) 方法若抛出异常,同样会引起当前线程的死亡。
 */
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // runWorker 开始执行后,将 status 设置为0,允许 interrupt 中断
    w.unlock();
    boolean completedAbruptly = true;
    try {
        // 判断 worker 初始化任务是否为空
        // 若空,则 getTask() 方法从阻塞队列中尝试获取新任务,这里可能陷入长久阻塞
        // 若返回为 null,退出循环,执行 processWorkerExit() 方法处理线程终结逻辑
        while (task != null || (task = getTask()) != null) {
            // 任务执行前,会对当前 worker 进行加锁,当然,并不是为了防止当前线程执行多任务,
            // 因为任务的获取也要等当前任务执行完毕,到下一个循环。
            // 这里的锁是为了防止例如 shutdown() 等某些方法中断执行任务中的线程。
            w.lock();
            // 总体思想就是,若线程状态为 STOP 就中断线程,若不是 STOP,则确保线程不被中断。
            // 具体:
            // 1. 若线程池状态为关闭,且当前线程未中断,则当前线程标记中断。
            // 2. 若未关闭,则执行 Thread.interrupted() && runStateAtLeast(ctl.get(), STOP) 
            // 即获取当前线程状态,并清理状态,若获取得到状态为中断,再次重新检查线程池的状态,
            // 满足则重新设置为中断状态;不满足,则在 Thread.interrupted() 已清理线程状态,直接略过。
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
          
            try {
                // 执行前调用,子类实现
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 执行后调用,子类实现,传递收集的 thrown
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
      	// 执行结束非中断标记
        completedAbruptly = false;
    } finally {
        // 工作线程结束处理
        processWorkerExit(w, completedAbruptly);
    }
}
复制代码

其中,中间的判断语句比较晦涩:

// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted.  This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
     (Thread.interrupted() &&
      runStateAtLeast(ctl.get(), STOP))) &&
    !wt.isInterrupted())
  wt.interrupt();
复制代码

对语句进行拆解,方便阅读:

【重回基础】线程池源码剖析:Worker工作线程
  1. 首先,编号1若为true时,即线程池状态大于 STOP ,出于关闭状态。那么验证编号3,若当前线程为非中断状态,则中断,若中断则不用处理了。

  2. 若编号1为false,那么验证编号2.1和2.2,获取当前线程中断状态,并将中断状态清理为false:

    若编号2.1为true,则验证编号2.2,即二次检查线程池状态,若关闭状态,则验证编号3,这时编号3必然通过,因为在编号2.1已进行清理。

    若编号2.1位false,即线程池非关闭状态,且当前线程非中断状态,不处理。

总结起来,就是确保:线程池为关闭状态时,中断线程;若非关闭状态,线程不被中断。

2.3 getTask() :worker 获取任务方法

/**
 * 从阻塞队列中获取待执行任务,根据线程池的状态,可能限时或不限时阻塞。出现以下任何情况会返回 null:
 * 1. 当前线程数量大于最大线程数。
 * 2. 线程池状态为 STOP。
 * 3. 线程池状态为 SHUTDOWN,且阻塞队列为空。
 * 4. 在阻塞队列执行 poll 操作超时,且获取不到任务。
 * 可以注意到,方法若返回 null,runWorker 便不再循环,因此,这里返回 null 的地方,都对线程数量进行扣减。
 */
private Runnable getTask() {
    boolean timedOut = false;

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 2和3点:若线程池状态为STOP,或为SHUTDOWN且阻塞队列为空时,减少线程数计数,返回null待终结。
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // 不断尝试线程数量减一,直到成功
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // 是否需要关注超时:允许核心线程超时回收,或线程数量大于核心线程数量
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
				// 1和4点:线程数量大于最大线程数,或执行 poll 超时。
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // 尝试线程数量减一,不成功则重试
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            // 若需要关注超时,则调用 poll,给予时限。若无需关注超时,则调用 take,长时间等待任务。
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            // 若任务不为空,返回;若空,则标记超时
            if (r != null)
                return r;
            timedOut = true;
        // poll 和 take 上抛的等待中断异常
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
复制代码

2.4 processWorkerExit:worker 工作结束处理方法

/**
 * 主要做三件事情:
 * 1. 维护worker线程结束后的线程池状态,比如移出woker集合,统计完成任务数。
 * 2. 检测线程池是否满足 TIDYING 状态,满足则调整状态,触发 terminated()。
 * 3. 当线程池状态为RUNNING或SHUTDOWN时,检测以下三种情况重新创建新的worker:
 *    (1) 任务执行异常引起的worker线程死亡。
 *    (2) 线程数量为0且任务队列不为空。
 *    (3) 若不允许核心线程超时回收,线程数量少于核心线程时。
 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
		// 若由于任务执行异常引起的线程终结,线程数量减一。
    // 非任务执行异常引起,说明是由于getTask()方法返回null,线程数量减一已在返回时处理。
    // 因此,这里只需要处理用户任务执行异常引起的线程终结。
  	if (completedAbruptly)
        decrementWorkerCount();
		// 操作线程池共享变量加锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
		// 尝试进入 TIDYING 状态
    tryTerminate();
  
    int c = ctl.get();
    // 若线程池为 RUNNING 或 SHUTDOWN
    if (runStateLessThan(c, STOP)) {
        // 若由于任务执行异常引起则直接跳过,创建新的worker代替
        if (!completedAbruptly) {
            // 若允许核心线程超时回收,则最低线程数量为0,否则为核心线程数
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 若最低值为0,检测任务队列是否非空,非空最低改为1
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            // 若当前线程数量大于最低值则跳过,否则创建新的worker代替
            if (workerCountOf(c) >= min)
                return;
        }
        // 创建新 worker
        addWorker(null, false);
    }
}
复制代码

2.5 addWorker:创建worker线程

/**
 * 主要负责检查是否满足线程创建条件,若满足则新建worker线程。线程创建成功返回true;
 * 若线程池状态为STOP,或为不满足条件的SHUTDOWN时,或线程工厂创建失败时,返回false。
 * 线程创建失败也可能抛出异常,尤其是内存不足时。
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    // 外圈循环,主要判断线程池状态
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 判断是否允许创建新的worker线程,看着比较拗口,实际主要拒绝以下三种场景下,进行创建线程:
        // 1. 线程池状态为STOP、TIDYING、TERMINATE。
        // 2. 线程池状态为SHUTDOWN,新任务试图进入线程池并创建新线程。
        // 3. 线程池状态为SHUTDOWN,任务队列为空。
        // 后继对该判断语句进行拆解解析
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        
        // 内圈循环,主要判断线程数量
        for (;;) {
            int wc = workerCountOf(c);
            // 若线程数量超越了ctl的bit数,或者核心线程数量已满时创建核心线程,或线程已达最大线程数
            // 则返回false,拒绝创建
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 采用CAS机制尝试线程数量加一,成功则不再进行retry外圈循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // CAS操作线程数量加一失败,说明线程池ctl在当时已发生变化,因此重新获取
            c = ctl.get();
            // 若ctl变化的是线程池状态,则循环外圈,重新判断线程池状态
            // 若ctl变化的只是线程数量,则无需外圈循环重新判断线程池状态,只需要内圈循环,尝试线程数量加一
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
	  // 线程数成功加一,开始创建worker
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 调用worker构造方法,内部采用了线程工厂创建线程,可能返回null,也可能抛出异常,通常因为内存不足
        w = new Worker(firstTask);
        final Thread t = w.thread;
        // 线程创建成功
        if (t != null) {
            // 操作线程池共享变量时取锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 获取到锁后,重新检查线程池状态
                int rs = runStateOf(ctl.get());
								// 拿到锁后重新检查线程池状态,只允许为RUNNING或SHUTDOWN且非新建任务开辟线程时允许继续
                // 否则,释放锁,回滚线程数量
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 线程工厂创建出来的新线程已经start,则抛出线程状态异常
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                    // 新worker进入集合
                  	workers.add(w);
                    int s = workers.size();
                    // 更新线程池最大线程数(区别于最大线程数,这个变量更多的是统计)
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // worker入列成功,开启线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 若线程创建失败,则回滚
        if (! workerStarted)
            addWorkerFailed(w);
    }
    // 返回是否新线程启动成功
    return workerStarted;
}
复制代码

其中,对中间那句比较拗口的判断语句剖析一下:

// 判断是否允许创建新的worker线程,看着比较拗口,实际主要拒绝以下三种场景下,进行创建线程:
// 1. 线程池状态为STOP、TIDYING、TERMINATE。
// 2. 线程池状态为SHUTDOWN,新任务试图进入线程池并创建新线程。
// 3. 线程池状态为SHUTDOWN,任务队列为空。
// 后继对该判断语句进行拆解解析
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
		return false;
复制代码

语句可以转换成:

if (rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()))
		return false;
复制代码

即当线程池状态大于等于SHUTDOWN时,若后续条件 (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()) 满足任意一个,则不允许创建。

  1. 先看 rs!=SHUTDOWN ,若为true,即意味着线程池状态为STOP、TIDYING、TERMINATE,那么皆不允许创建新线程。
  2. rs!=SHUTDOWN 为false,即 rs=SHUTDOWN 。从 addWorker 方法的调用可知,只有当任务提交新建线程时会带有 firstTask 参数。因此,第二个条件 firstTask!=null ,用来拒绝线程池状态为SHUTDOWN时,新任务想创建线程。
  3. 若前两个都不满足,即 rs=SHUTDOWNfirstTask=null ,那么验证第三个条件 workQueue.isEmpty() ,若任务线程为空,则满足拒绝创建;若非空则允许创建。

参考

  1. Java线程池和ThreadPoolExecutor使用和分析(一)
  2. Java线程池和ThreadPoolExecutor使用和分析(二)

以上所述就是小编给大家介绍的《【重回基础】线程池源码剖析:Worker工作线程》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Practical Django Projects, Second Edition

Practical Django Projects, Second Edition

James Bennett / Apress / 2009 / 44.99

Build a django content management system, blog, and social networking site with James Bennett as he introduces version 1.1 of the popular Django framework. You’ll work through the development of ea......一起来看看 《Practical Django Projects, Second Edition》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具