内容简介:本文目录:Worker 本身实现了 Runnable 接口,Worker 线程工作流程图:
本文目录:
一、前言 二、源码剖析 2.1 worker 结构体 2.2 runWorker:worker 工作主循环 2.3 getTask() :worker 获取任务方法 2.4 processWorkerExit:worker 工作结束处理方法 2.5 addWorker:创建worker线程 复制代码
二、 源码剖析
2.1 worker 结构体
Worker 本身实现了 Runnable 接口,
/** * Worker 主要负责管理线程执行、中断。 * 为防止任务执行时中断,每次执行任务时需要加锁。 * 锁的实现通过通过继承AbstractQueuedSynchronizer简化。 * 锁的机制为非重入互斥锁,防止通过 setCorePoolSize 等方法获取到锁,并执行中断等。 * 另外,Worker初始化时,state设置为-1,防止线程未启动却执行中断。 */ private final class Worker extends AbstractQueuedSynchronizer implements Runnable { ... // 该 worker 运行所在线程,便于执行 interrupt 等管理 final Thread thread; // 初始化任务 Runnable firstTask; // 完成任务数 volatile long completedTasks; // 构造函数 Worker(Runnable firstTask) { // 防止线程未开始就执行interrupt setState(-1); this.firstTask = firstTask; // 线程工厂创建线程 this.thread = getThreadFactory().newThread(this); } // 工作线程的工作内容,包装在 runWorker 方法 public void run() { runWorker(this); } // 是否持有独占锁,status 0:否,1:是 protected boolean isHeldExclusively() { return getState() != 0; } // 采用CAS机制尝试将status由0变为1,即持有独占锁 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 释放锁 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } // 获取独占锁,若已被独占,则进入FIFO队列排队待锁,直到获取到锁 public void lock() { acquire(1); } // 尝试获取独占锁 public boolean tryLock() { return tryAcquire(1); } // 释放独占锁 public void unlock() { release(1); } // 判断是否持有独占锁 public boolean isLocked() { return isHeldExclusively(); } // 将当前 worker 所在线程标记为中断状态 void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } } 复制代码
2.2 runWorker:worker 工作主循环
Worker 线程工作流程图:
下面是源码:
/** * Worker 线程的循环工作内容,就是重复不停地从队列中获取任务,并执行。 * * 1. 初始化任务可带可不带。只要线程池状态为 RUNNING ,那么就循环调用 getTask() 获取任务。 * 循环结果有两种: * (1) getTask() 结果为 null,一般由于线程池状态的变更,或线程池配置参数限制。 * (2) task.run() 出现异常,completedAbruptly 会被标记为 true,当前线程中断。 * * 2. 在执行任何任务之前,会对当前 worker 加上互斥锁,防止 shutdown() 中断操作终止运行中的 worker。 * 确保除非线程池状态为关闭中,否则线程不能别中断。 * * 3. 每个任务执行前会调用 beforeExecute(),该方法若抛出异常,会导致当前线程死亡,而没有执行任务。 * * 4. task.run() 任务执行抛出来的任何 RuntimeException、Error、Throwable 都会被收集交给 * afterExecute(task, thrown) 方法,并且上抛,导致当前线程的死亡。 * * 5. afterExecute(task, thrown) 方法若抛出异常,同样会引起当前线程的死亡。 */ final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // runWorker 开始执行后,将 status 设置为0,允许 interrupt 中断 w.unlock(); boolean completedAbruptly = true; try { // 判断 worker 初始化任务是否为空 // 若空,则 getTask() 方法从阻塞队列中尝试获取新任务,这里可能陷入长久阻塞 // 若返回为 null,退出循环,执行 processWorkerExit() 方法处理线程终结逻辑 while (task != null || (task = getTask()) != null) { // 任务执行前,会对当前 worker 进行加锁,当然,并不是为了防止当前线程执行多任务, // 因为任务的获取也要等当前任务执行完毕,到下一个循环。 // 这里的锁是为了防止例如 shutdown() 等某些方法中断执行任务中的线程。 w.lock(); // 总体思想就是,若线程状态为 STOP 就中断线程,若不是 STOP,则确保线程不被中断。 // 具体: // 1. 若线程池状态为关闭,且当前线程未中断,则当前线程标记中断。 // 2. 若未关闭,则执行 Thread.interrupted() && runStateAtLeast(ctl.get(), STOP) // 即获取当前线程状态,并清理状态,若获取得到状态为中断,再次重新检查线程池的状态, // 满足则重新设置为中断状态;不满足,则在 Thread.interrupted() 已清理线程状态,直接略过。 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 执行前调用,子类实现 beforeExecute(wt, task); Throwable thrown = null; try { // 执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 执行后调用,子类实现,传递收集的 thrown afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } // 执行结束非中断标记 completedAbruptly = false; } finally { // 工作线程结束处理 processWorkerExit(w, completedAbruptly); } } 复制代码
其中,中间的判断语句比较晦涩:
// If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); 复制代码
对语句进行拆解,方便阅读:
-
首先,编号1若为true时,即线程池状态大于 STOP ,出于关闭状态。那么验证编号3,若当前线程为非中断状态,则中断,若中断则不用处理了。
-
若编号1为false,那么验证编号2.1和2.2,获取当前线程中断状态,并将中断状态清理为false:
若编号2.1为true,则验证编号2.2,即二次检查线程池状态,若关闭状态,则验证编号3,这时编号3必然通过,因为在编号2.1已进行清理。
若编号2.1位false,即线程池非关闭状态,且当前线程非中断状态,不处理。
总结起来,就是确保:线程池为关闭状态时,中断线程;若非关闭状态,线程不被中断。
2.3 getTask() :worker 获取任务方法
/** * 从阻塞队列中获取待执行任务,根据线程池的状态,可能限时或不限时阻塞。出现以下任何情况会返回 null: * 1. 当前线程数量大于最大线程数。 * 2. 线程池状态为 STOP。 * 3. 线程池状态为 SHUTDOWN,且阻塞队列为空。 * 4. 在阻塞队列执行 poll 操作超时,且获取不到任务。 * 可以注意到,方法若返回 null,runWorker 便不再循环,因此,这里返回 null 的地方,都对线程数量进行扣减。 */ private Runnable getTask() { boolean timedOut = false; for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 2和3点:若线程池状态为STOP,或为SHUTDOWN且阻塞队列为空时,减少线程数计数,返回null待终结。 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 不断尝试线程数量减一,直到成功 decrementWorkerCount(); return null; } int wc = workerCountOf(c); // 是否需要关注超时:允许核心线程超时回收,或线程数量大于核心线程数量 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 1和4点:线程数量大于最大线程数,或执行 poll 超时。 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // 尝试线程数量减一,不成功则重试 if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 若需要关注超时,则调用 poll,给予时限。若无需关注超时,则调用 take,长时间等待任务。 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // 若任务不为空,返回;若空,则标记超时 if (r != null) return r; timedOut = true; // poll 和 take 上抛的等待中断异常 } catch (InterruptedException retry) { timedOut = false; } } } 复制代码
2.4 processWorkerExit:worker 工作结束处理方法
/** * 主要做三件事情: * 1. 维护worker线程结束后的线程池状态,比如移出woker集合,统计完成任务数。 * 2. 检测线程池是否满足 TIDYING 状态,满足则调整状态,触发 terminated()。 * 3. 当线程池状态为RUNNING或SHUTDOWN时,检测以下三种情况重新创建新的worker: * (1) 任务执行异常引起的worker线程死亡。 * (2) 线程数量为0且任务队列不为空。 * (3) 若不允许核心线程超时回收,线程数量少于核心线程时。 */ private void processWorkerExit(Worker w, boolean completedAbruptly) { // 若由于任务执行异常引起的线程终结,线程数量减一。 // 非任务执行异常引起,说明是由于getTask()方法返回null,线程数量减一已在返回时处理。 // 因此,这里只需要处理用户任务执行异常引起的线程终结。 if (completedAbruptly) decrementWorkerCount(); // 操作线程池共享变量加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } // 尝试进入 TIDYING 状态 tryTerminate(); int c = ctl.get(); // 若线程池为 RUNNING 或 SHUTDOWN if (runStateLessThan(c, STOP)) { // 若由于任务执行异常引起则直接跳过,创建新的worker代替 if (!completedAbruptly) { // 若允许核心线程超时回收,则最低线程数量为0,否则为核心线程数 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 若最低值为0,检测任务队列是否非空,非空最低改为1 if (min == 0 && ! workQueue.isEmpty()) min = 1; // 若当前线程数量大于最低值则跳过,否则创建新的worker代替 if (workerCountOf(c) >= min) return; } // 创建新 worker addWorker(null, false); } } 复制代码
2.5 addWorker:创建worker线程
/** * 主要负责检查是否满足线程创建条件,若满足则新建worker线程。线程创建成功返回true; * 若线程池状态为STOP,或为不满足条件的SHUTDOWN时,或线程工厂创建失败时,返回false。 * 线程创建失败也可能抛出异常,尤其是内存不足时。 */ private boolean addWorker(Runnable firstTask, boolean core) { retry: // 外圈循环,主要判断线程池状态 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 判断是否允许创建新的worker线程,看着比较拗口,实际主要拒绝以下三种场景下,进行创建线程: // 1. 线程池状态为STOP、TIDYING、TERMINATE。 // 2. 线程池状态为SHUTDOWN,新任务试图进入线程池并创建新线程。 // 3. 线程池状态为SHUTDOWN,任务队列为空。 // 后继对该判断语句进行拆解解析 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // 内圈循环,主要判断线程数量 for (;;) { int wc = workerCountOf(c); // 若线程数量超越了ctl的bit数,或者核心线程数量已满时创建核心线程,或线程已达最大线程数 // 则返回false,拒绝创建 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 采用CAS机制尝试线程数量加一,成功则不再进行retry外圈循环 if (compareAndIncrementWorkerCount(c)) break retry; // CAS操作线程数量加一失败,说明线程池ctl在当时已发生变化,因此重新获取 c = ctl.get(); // 若ctl变化的是线程池状态,则循环外圈,重新判断线程池状态 // 若ctl变化的只是线程数量,则无需外圈循环重新判断线程池状态,只需要内圈循环,尝试线程数量加一 if (runStateOf(c) != rs) continue retry; } } // 线程数成功加一,开始创建worker boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 调用worker构造方法,内部采用了线程工厂创建线程,可能返回null,也可能抛出异常,通常因为内存不足 w = new Worker(firstTask); final Thread t = w.thread; // 线程创建成功 if (t != null) { // 操作线程池共享变量时取锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 获取到锁后,重新检查线程池状态 int rs = runStateOf(ctl.get()); // 拿到锁后重新检查线程池状态,只允许为RUNNING或SHUTDOWN且非新建任务开辟线程时允许继续 // 否则,释放锁,回滚线程数量 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 线程工厂创建出来的新线程已经start,则抛出线程状态异常 if (t.isAlive()) throw new IllegalThreadStateException(); // 新worker进入集合 workers.add(w); int s = workers.size(); // 更新线程池最大线程数(区别于最大线程数,这个变量更多的是统计) if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // worker入列成功,开启线程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { // 若线程创建失败,则回滚 if (! workerStarted) addWorkerFailed(w); } // 返回是否新线程启动成功 return workerStarted; } 复制代码
其中,对中间那句比较拗口的判断语句剖析一下:
// 判断是否允许创建新的worker线程,看着比较拗口,实际主要拒绝以下三种场景下,进行创建线程: // 1. 线程池状态为STOP、TIDYING、TERMINATE。 // 2. 线程池状态为SHUTDOWN,新任务试图进入线程池并创建新线程。 // 3. 线程池状态为SHUTDOWN,任务队列为空。 // 后继对该判断语句进行拆解解析 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; 复制代码
语句可以转换成:
if (rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())) return false; 复制代码
即当线程池状态大于等于SHUTDOWN时,若后续条件 (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())
满足任意一个,则不允许创建。
- 先看
rs!=SHUTDOWN
,若为true,即意味着线程池状态为STOP、TIDYING、TERMINATE,那么皆不允许创建新线程。 - 若
rs!=SHUTDOWN
为false,即rs=SHUTDOWN
。从addWorker
方法的调用可知,只有当任务提交新建线程时会带有 firstTask 参数。因此,第二个条件firstTask!=null
,用来拒绝线程池状态为SHUTDOWN时,新任务想创建线程。 - 若前两个都不满足,即
rs=SHUTDOWN
且firstTask=null
,那么验证第三个条件workQueue.isEmpty()
,若任务线程为空,则满足拒绝创建;若非空则允许创建。
参考
以上所述就是小编给大家介绍的《【重回基础】线程池源码剖析:Worker工作线程》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 重回前端之Class
- 2020 开年,C 语言重回巅峰王座
- 敏捷读书之重回根本:《Scrum指南》100问
- 重回独立,SUSE 成全球最大独立开源公司
- Python之父重回决策层,社区未来如何发展?
- 码农口述:AI创业两年,积蓄花光,重回职场敲代码
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
MD5 加密
MD5 加密工具
RGB HSV 转换
RGB HSV 互转工具