内容简介:日常开发中, 或许不会直接new线程或线程池, 但这些线程相关的基础或思想是非常重要的, 参考就算没有直接用到, 可能间接也用到了类似的思想或原理, 例如tomcat, jetty, 数据库连接池, MQ;本文不会对线程的基础知识进行介绍, 所以最好已"进食"关于线程的基础知识, 再"食用"本文更佳;
小侃一下
日常开发中, 或许不会直接new线程或线程池, 但这些线程相关的基础或思想是非常重要的, 参考 林迪效应 ;
就算没有直接用到, 可能间接也用到了类似的思想或原理, 例如tomcat, jetty, 数据库连接池, MQ;
本文不会对线程的基础知识进行介绍, 所以最好已"进食"关于线程的基础知识, 再"食用"本文更佳;
由于在下的工作及其它原因, 前后花费了数月的时间才完成这篇博客, 希望能帮助到想要了解 ThreadPoolExecutor 线程池源码和原理的同学.
1. 使用线程池的好处. 为什么要使用线程池?
-
避免 频繁创建、销毁 线程的开销; 复用创建的线程.
-
及时响应提交的任务; 提交一个任务,不再是每次都需要创建新的线程.
-
避免每次提交的任务都新建线程, 造成 服务器资源耗尽 , 线程频繁上下文切换 等服务器资源开销.
-
更容易 监控、管理 线程; 可以统计出已完成的任务数, 活跃的线程数, 等待的任务数等, 可以重写hook方法
beforeExecute,afterExecute,terminated, 重写之后, 结合具体的业务进行处理.
2. 线程池核心参数介绍
| 参数 | 意义 |
|---|---|
| corePoolSize | 线程池中的核心线程数 |
| workQueue | 存放提交的task |
| maximumPoolSize | 线程池中允许的最大线程数 |
| threadFactory | 线程工厂, 用来创建线程, 由 Executors#defaultThreadFactory 实现 |
| keepAliveTime | 空闲线程存活时间(默认是临时线程, 也可设置为核心线程) |
| unit | 空闲线程存活时间单位枚举 |
下面将结合线程池中的任务提交流程加深理解.
3. 提交任务到线程池中的流程
3.1 ThreadPoolExecutor#execute方法整体流程
这里以 java.util.concurrent.ThreadPoolExecutor#execute 方法为例, 画一个简单的图:
上图中的worker可简单理解为线程池中的一个线程, workers.size() 即使 线程池中的线程数 ;
- 当
workers.size()小于corePoolSize时, 创建新的线程执行提交的task. - 当
workers.size()大于corePoolSize时, 并且workQueue没有满, 将task添加到workQueue. - 当
workers.size()大于corePoolSize时, 并且workQueue已经满了, 但是workers.size()<maximumPoolSize, 就创建一个临时线程处理task. - 当
workers.size()大于corePoolSize时, 并且workQueue已经满了, 但是workers.size()>=maximumPoolSize, 执行拒绝策略.
后续会有对 ThreadPoolExecutor#execute 方法的详细解读: execute方法源码: 提交task到线程池 .
4种默认的拒绝策略: ThreadPoolExecutor默认实现的4种拒绝策略 .
3.2 排队恰火锅的场景
这里我们可以想像一个场景: 去海底捞吃火锅;
下午4点晚市正式开始排队, 假如店内一共有16张桌子, 陆续光临的16组客人将店内坐满;
店外一共有20组客人座位, 则第17~36组客人坐在店外排队;
第37组客人来了, 启动临时餐桌供客人吃饭.
所以, 这里的店内16张桌子则是 corePoolSize , 店外一共有20组座位则为 BlockingQueue , 而临时餐桌数量即 maximumPoolSize-corePoolSize .
上面的例子并非绝对完美, 仅仅是为了便于我们理解线程池的各个参数, 以及加深印象.
4. ThreadPoolExecutor线程池源码及其原理
有了上面对线程池的总体了解后, 下面结合源码来看看线程池的底层原理吧!
4.1 从创建ThreadPoolExecutor开始: 线程池构造函数的源码
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
上面是 ThreadPoolExecutor 参数最少的一个构造方法, 默认的 ThreadFactory 是 Executors.defaultThreadFactory() , 默认的 RejectedExecutionHandler 是 defaultHandler = new AbortPolicy() ;
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
上面是 ThreadPoolExecutor 参数最多的一个构造方法, 其他构造方法都是传入参数调用这个构造方法, 默认的线程工厂见 默认的线程工厂Executors#defaultThreadFactory , 各个参数在线程池核心参数介绍已经介绍.
4.2 ThreadPoolExecutor中的一些重要的属性
对一些重要属性有基础的认知, 有助于后面我们更容易看懂源码流程.
4.2.1 线程池的运行状态
private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
根据上面源码可知, COUNT_BITS 的值为29, CAPACITY 的值为2的29次方-1, 二进制表示为: "00011111111111111111111111111111"(明显29个1);
上面的源码中线程池的运行状态的二进制表示:
| 状态 | 二进制 | 意义 |
|---|---|---|
RUNNING |
11100000000000000000000000000000 | 接受 新execute 的task, 执行 已入队 的task |
SHUTDOWN |
0 | 不接受 新execute 的task, 但执行 已入队 的task, 中断所有空闲的线程 |
STOP |
00100000000000000000000000000000 | 不接受 新execute 的task, 不执行 已入队 的task, 中断所有的线程 |
TIDYING |
01000000000000000000000000000000 | 所有线程停止, workerCount 数量为0, 将执行hook方法: terminated() |
TERMINATED |
01100000000000000000000000000000 | terminated()方法执行完毕 |
可以看出, 线程池的状态由32位 int 整型的二进制的 前三位 表示.
下图根据 Javadoc 所画:
4.2.2 核心属性 ctl 源码(线程池状态和有效线程数)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
核心属性 ctl , 数据类型是 AtomicInteger , 表示了两个含义:
runState workerCount
那是如何做到一个属性表示两个含义的呢? 那就要看看 ctlOf 方法
private static int ctlOf(int rs, int wc) { return rs | wc; }
ctlOf 方法在线程池内部用来更新线程池的 ctl 属性,比如 ctl 初始化的时候: ctl = new AtomicInteger(ctlOf(RUNNING, 0)) , 调用 ThreadPoolExecutor#shutdown 方法等;
rs 表示 runState , wc 表示 workerCount ;
将 runState 和 workerCount 做 按位或 运算得到 ctl 的值;
而 runState 和 workerCount 的值由下面两个方法packing和unpacking, 这里的形参 c 就是 ctl.get() 的值;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
下面用表格更清晰理解:
| 方法 | 方法体 | 带入CAPACITY的值 |
|---|---|---|
runStateOf |
c & ~CAPACITY |
c & 11100000000000000000000000000000 |
workerCountOf |
c & CAPACITY |
c & 00011111111111111111111111111111 |
按位与运算, 相同位置, 同1才为1, 其余为0;
结合表格看, runStateOf 方法取 ctl 前3位表示 runState , workerCountOf 方法取第4~32位的值表示 workerCount ;
相信大家已经明白 runState 和 workerCount 如何被packing和unpacking, 这就是为什么 ctl 能即表示 runState 又能表示 wokerCount .
Note: 众所周知, 与2的整数次幂-1进行按位与运算结果等于取余运算的结果, 而位运算效率高于取余运算, 与 Java 8及其之后的 HashMap 的散列方式有同曲同工之妙, 见: https://www.cnblogs.com/theRhyme/p/9404082.html#_lab2_1_16 .
4.2.3 线程池中的mainLock锁
private final ReentrantLock mainLock = new ReentrantLock();
这把可重入锁, 在线程池的很多地方会被用到;
比如要对 workers (线程池中的线程集合)操作的时候(如添加一个worker到工作中), interrupt所有的 workers , 调用shutdown方法等.
4.2.4 线程池中的线程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
用来保存当前线程池中的所有线程;
可通过该集合对线程池中的线程进行 中断 , 遍历 等;
创建新的线程时, 要添加到该集合, 移除线程, 也要从该集合中移除对应的线程;
对该集合操作都需要 mainLock 锁.
4.2.5 mainLock的Condition()对象
private final Condition termination = mainLock.newCondition();
主要是为了让tryTerminate方法与awaitTermination方法结合使用;
而 tryTerminate 又被 shutdown 、 shutdownNow 、 processWorkerExit 等方法调用;
Condition对象 termination 的作用就是当线程池中的状态表示的值小于 TERMINATED 的值3时, 当前调用了awaitTermination方法的线程就会wait对应的时间;
等到过了指定的wait时间, 或者线程池状态等于或大于TERMINATED, wait的线程被唤醒, 就继续执行;
如果不清楚 wait(long) 与 wait() 的区别可参考: Object#wait()与Object#wait(long)的区别 .
4.2.6 线程池中曾经达到的最大线程数
private int largestPoolSize;
用作监控, 查看当前线程池, 线程数最多的时候的数量是多少, 见方法 ThreadPoolExecutor#getLargestPoolSize ;
mainLock 保证其可见性和原子性.
4.2.7 线程池中已完成的任务数
private long completedTaskCount;
通过方法 ThreadPoolExecutor#getCompletedTaskCount 获取.
4.2.8 核心线程池中的空闲线程
private volatile boolean allowCoreThreadTimeOut;
默认情况下, 只有临时线程超过了 keepAliveTime 的时间会被回收;
allowCoreThreadTimeOut 默认为false, 如果设置为true, 则会通过 中断 或getTask的结果为 null 的方式停止超过 keepAliveTime 的 核心线程 , 具体见getTask方法, 后续会详细介绍.
5. ThreadPoolExecutor一些重要的方法源码及其原理解析
5.1 execute方法源码: 提交task到线程池
public void execute(Runnable command) {
// 如果task为null, 抛出NPE
if (command == null)
throw new NullPointerException();
// 获得ctl的int值
int c = ctl.get();
// workerCount小于corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 添加一个新的worker, 作为核心线程池的线程
if (addWorker(command, true))
// 添加worker作为核心线程成功, execute方法退出
return;
// 添加worker作为核心线程失败, 重新获取ctl的int值
c = ctl.get();
}
// 线程池是RUNNING状态并且task入阻塞队列成功
if (isRunning(c) && workQueue.offer(command)) {
// double-check, 再次获取ctl的值
int recheck = ctl.get();
// 线程池不是RUNNING状态并且当前task从workerQueue被移除成功
if (! isRunning(recheck) && remove(command))
// 执行拒绝策略
reject(command);
// 线程池中的workerCount为0
else if (workerCountOf(recheck) == 0)
// 启动一个非核心线程, 由于这里的task参数为null, 该线程会从workerQueue拉去任务
addWorker(null, false);
}
// 添加一个非核心线程执行提交的task
else if (!addWorker(command, false))
// 添加一个非核心线程失败, 执行拒绝策略
reject(command);
}
结合上面代码中的注释和 提交任务到线程池中的流程 , 相信我们已经对这个 execute 方法提交task到线程池的流程的源码更加清晰了.
5.2 addWorker方法源码: 创建线程并启动, 执行提交的task
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 线程池运行状态
int rs = runStateOf(c);
// 如果线程池运行状态大于等于SHUTDOWN, 提交的firstTask为null, workQueue为null,返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// workerCount
int wc = workerCountOf(c);
// 线程数大于了2的29次方-1, 或者想要添加为核心线程但是核心线程池满, 或者想要添加为临时线程, 但是workerCount等于或大于了最大的线程池线程数maximumPoolSize, 返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS的方式让workerCount数量增加1,如果成功, 终止循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
// 再次检查runState, 如果被更改, 重头执行retry代码
if (runStateOf(c) != rs)
continue retry;
// 其他的, 上面的CAS如果由于workerCount被其他线程改变而失败, 继续内部的for循环
}
}
// 标志位workerStarted, workerAdded
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 传入task对象, 创建Worker对象
w = new Worker(firstTask);
// 从worker对象中回去Thread对象
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 获取mainLock锁
mainLock.lock();
try {
// 获取mainLock锁之后, 再次检查runState
int rs = runStateOf(ctl.get());
// 如果是RUNNING状态, 或者是SHUTDOWN状态并且传入的task为null(执行workQueue中的task)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 线程已经被启动, 抛出IllegalThreadStateException
if (t.isAlive())
throw new IllegalThreadStateException();
// 将worker对象添加到HashSet
workers.add(w);
int s = workers.size();
// 线程池中曾经达到的最大线程数(上面4.2.6提到过)
if (s > largestPoolSize)
largestPoolSize = s;
// worker被添加成功
workerAdded = true;
}
} finally {
// 释放mainLock锁
mainLock.unlock();
}
// 如果worker被添加成功, 启动线程, 执行对应的task
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果线程启动失败, 执行addWorkerFailed方法
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
每行代码都有详细的对应的注释, 相信我们已经明白了 addWorker 方法的过程.
5.3 Worker类源码: 线程是如何执行提交到线程池中的task?
上面的addWorker方法中, 获得 Worker 对象中的 Thread 对象( final Thread t = w.thread; ), 并调用线程的 start 方法启动线程执行 Worker中的run 方法.
5.3.1 Worker 的定义
继承了 AQS(AbstractQueuedSynchronizer) , 重写了部分方法, 这里的主要作用主要是通过tryLock或isLocked方法判断 当前线程是否正在执行Worker中的run方法 , 如果返回 false , 则线程没有正在执行或没有处于active, 反之, 处于;
结合 getActiveCount方法源码 理解;
实现了 Runnable 接口, 是一个线程可执行的任务.
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{
...
}
5.3.2 Worker中的属性
| 属性 | 意义 |
|---|---|
final Thread thread |
线程对象, worker会被提交到该线程 |
Runnable firstTask |
提交到线程池中的task, 可能为null, 比如方法 ThreadPoolExecutor#prestartCoreThread |
volatile long completedTasks |
每个线程完成的任务数 |
5.3.3 Worker的构造方法
首先设置初始状态 state 为-1, 这里的 setState 方法是 AQS 中的方法;
提交的task赋值给 firstTask 属性;
利用 ThreadFactory , 传入当前Worker对象( 为了执行当前Worker中的run方法 ), 创建 Thread 对象.
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
5.3.4 Worker中的run方法
Worker对象的 run 方法, 直接调用了 ThreadPoolExecutor 的runWorker方法.
public void run() {
runWorker(this);
}
5.3.5 Worker中的重写AQS的方法tryAcquire, tryRelease, isHeldExclusively
5.3.5.1 tryAcquire方法
尝试将 state 从0设置为1, 成功后把当前持有锁的线程设置为 当前线程 ;
形参 unused 没有用到.
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
5.3.5.2 tryRelease方法
直接将当前持有锁的线程设置为null, 将 state 设置为1;
形参 unused 没有用到.
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
5.3.5.3 isHeldExclusively方法
判断当前线程是否已经获取了 Worker 的锁;
如果 getState() == 0 , 则没有线程获取了该锁, 可以尝试获取锁, 将 state 设置为1;
如果 getState() == 1 , 已经有线程获取了该锁, 互斥, 此时无法获取该锁.
protected boolean isHeldExclusively() {
return getState() != 0;
}
5.3.6 lock方法
获取锁, 直到获取到锁为止(具体见 AbstractQueuedSynchronizer#acquireQueued 方法);
public void lock() { acquire(1); }
5.3.7 tryLock方法
tryLock , 尝试获取锁, 获取到返回true, 否则返回false.
public boolean tryLock() { return tryAcquire(1); }
5.3.8 isLocked方法
isLocked 方法, 如果当前有线程持有该锁, 则返回true, 否则返回false.
public boolean isLocked() { return isHeldExclusively(); }
5.3.9 interruptIfStarted方法
线程启动会调用 unlock 方法(ThreadPoolExecutor.java第1131行), 将state设置为0;
如果线程已经启动, 并且没有被中断, 调用线程的中断方法.
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
5.3.10 unlock方法
底层调用worker的tryRelease方法, 设置state为0.
public void unlock() { release(1); }
5.4 runWorker方法源码: 线程池中线程被复用的关键
执行提交的task或死循环从 BlockingQueue 获取task.
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
// 当传入的task不为null, 或者task为null但是从BlockingQueue中获取的task不为null
while (task != null || (task = getTask()) != null) {
// 执行任务之前先获取锁
w.lock();
// 线程池状态如果为STOP, 或者当前线程是被中断并且线程池是STOP状态, 或者当前线程不是被中断;
// 则调用interrupt方法中断当前线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// beforeExecute hook方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 真正执行提交的task的run方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// afterExecute hook方法
afterExecute(task, thrown);
}
} finally {
// task赋值为null, 下次从BlockingQueue中获取task
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
5.5 getTask方法源码: 从BlockingQueue中获取task
private Runnable getTask() {
// BlockingQueue的poll方法是否已经超时
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果线程池状态>=SHUTDOWN,并且BlockingQueue为null;
// 或者线程池状态>=STOP
// 以上两种情况都减少工作线程的数量, 返回的task为null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 当前线程是否需要被淘汰
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// BlockingQueue的poll方法超时会直接返回null
// BlockingQueue的take方法, 如果队列中没有元素, 当前线程会wait, 直到其他线程提交任务入队唤醒当前线程.
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
5.6 shutdown方法源码: 中断所有空闲的线程
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 死循环将线程池状态设置为SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断所有空闲的线程
interruptIdleWorkers();
// hook函数, 比如ScheduledThreadPoolExecutor对该方法的重写
onShutdown();
} finally {
mainLock.unlock();
}
tryTerminate();
}
5.7 shutdownNow方法源码: 中断所有空闲的线程, 删除并返回BlockingQueue中所有的task
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 死循环将线程池状态设置为STOP
advanceRunState(STOP);
// 中断所有空闲的线程
interruptWorkers();
// 删除并返回BlockingQueue中所有的task
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
// 返回BlockingQueue中所有的task
return tasks;
}
6. ThreadPoolExecutor一些其他的方法和属性介绍
6.1 默认的线程工厂Executors#defaultThreadFactory
默认的线程工厂的两个重要作用就是 创建线程 和 初始化线程名前缀 .
创建 DefaultThreadFactory 对象.
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
DefaultThreadFactory 默认构造方法, 初始化 ThreadGroup 和创建出的 线程名前缀 namePrefix .
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
// 非daemon线程, 不会随父线程的消亡而消亡
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
6.2 ThreadPoolExecutor默认实现的4种拒绝策略
6.2.1 CallerRunsPolicy
如果线程池状态不是 SHUTDOWN , 由提交任务到线程池中(如调用 ThreadPoolExecutor#execute 方法)的线程执行该任务;
如果线程池状态是 SHUTDOWN , 则该任务会被直接丢弃掉, 不会再次入队 或 被任何线程执行 .
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
6.2.2 AbortPolicy
在调用提交任务到线程池中(如调用 ThreadPoolExecutor#execute 方法)的线程中直接抛出 RejectedExecutionException 异常, 当然任务也不会被执行, 提交任务的线程如果未捕获异常会因此停止.
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
6.2.3 DiscardPolicy
直接丢弃掉这个任务, 不做任何事情.
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
6.2.4 DiscardOldestPolicy
线程池如果不是 SHUTDOWN 状态, 丢弃最老的任务, 即 workQueue 队头的任务, 将当前任务 execute 提交到线程池;
与 CallerRunsPolicy 一样, 如果线程池状态是 SHUTDOWN , 则该任务会被直接丢弃掉, 不会再次入队或被任何线程执行.
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
6.3 addWorkerFailed方法源码: 移除启动线程失败的worker
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
// 获取mainLock锁
mainLock.lock();
try {
// 如果worker不为null, 从HashSet中移除worker
if (w != null)
workers.remove(w);
// 循环执行CAS操作直到让workerCount数量减少1
decrementWorkerCount();
// 执行tryTerminate方法
tryTerminate();
} finally {
mainLock.unlock();
}
}
6.4 tryTerminate方法源码: 尝试更改runState, workerCount, 尝试关闭线程池
final void tryTerminate() {
for (;;) {
// 获取ctl, runState和workerCount
int c = ctl.get();
// 当前线程池状态是否是RUNNING, 或者是否是TIDYING或TERMINATED状态, 或者是否是SHUTDOWN状态并且workQueue不为空(需要被线程执行), return结束方法
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// workerCount如果不为0, 随机中断一个空闲的线程, return结束方法
if (workerCount如果不为0,(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
// 获取mainLock锁
mainLock.lock();
try {
// CAS方式设置当前线程池状态为TIDYING, workerCount为0
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 执行hook方法terminated
terminated();
} finally {
// 设置当前线程池状态为TERMINATED, workerCount为0
ctl.set(ctlOf(TERMINATED, 0));
// 唤醒调用了awaitTermination方法的线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// 当CAS失败, 循环重试
}
}
6.5 awaitTermination方法源码: 等待指定时间后, 线程池是否已经关闭
死循环判断, 如果当前线程池状态小于 TERMINATED , 则 wait 对应的时间;
如果过了 wait 的时间( nanos <= 0 ), 线程池状态大于等于 TERMINATED 则循环终止, 函数返回 true , 否则返回 false .
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
6.6 prestartCoreThread方法源码: 预启动一个核心线程
如果 当前线程池中的核心线程数 小于 corePoolSize , 则增加一个 核心线程 ( 提交的task为null ).
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}
6.7 prestartAllCoreThreads方法源码: 预先启动线程池中的所有核心线程
启动所有的核心线程.
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
6.8 getActiveCount方法源码: 获得当前线程池中活跃的线程
获得当前线程池中活跃的线程(即正在执行task没有wait的线程, [runWorker](#5.4 runWorker方法源码: 线程池中线程被复用的关键)方法中的同步代码块).
public int getActiveCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int n = 0;
for (Worker w : workers)
if (w.isLocked())
++n;
return n;
} finally {
mainLock.unlock();
}
}
总结
通过介绍 ThreadPoolExecutor 的构造方法, 重要属性, execute 方法, 引出 Worker 类, 以及真正的线程处理提交到线程池中的 task 的源码和流程, 对 ThreadPoolExecutor 整体结构有了清晰的认知;
线程池 ThreadPoolExecutor 使用 BlockingQueue 实现线程间的等待-通知机制, 当然也可以自己手动实现;
复用线程体现在runWorker方法中, 死循环+ BlockingQueue 的特性.
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- Linux 进程、线程、文件描述符的底层原理
- 【好文推荐】黑莓OS手册是如何详细阐述底层的进程和线程模型的?
- 一步步动手实现高并发的Reactor模型 —— Kafka底层如何充分利用多线程优势去处理网络I/O与业务分发...
- avue 1.5.2 优化大量底层代码,crud 和 form 底层公用
- Synchronized 关键字使用、底层原理、JDK1.6 之后的底层优化以及 和ReenTrantLock 的对比
- Docker 底层原理浅析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Pattern Recognition and Machine Learning
Christopher Bishop / Springer / 2007-10-1 / USD 94.95
The dramatic growth in practical applications for machine learning over the last ten years has been accompanied by many important developments in the underlying algorithms and techniques. For example,......一起来看看 《Pattern Recognition and Machine Learning》 这本书的介绍吧!