线程池的使用和源码剖析

栏目: 编程工具 · 发布时间: 5年前

内容简介:线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。----摘自维基百科我们在Android或者Java开发中,日常所使用的

线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。----摘自维基百科

我们在Android或者 Java 开发中,日常所使用的就是 ThreadPoolExecutor 了,我们先来看下如何使用一个线程池来代替多线程开发。

使用线程池

// 创建一个核心线程数为5,最大线程数为10,空闲线程存活时间为60s的线程池对象
val threadPoolExecutor = ThreadPoolExecutor(
    5, 10, 60,
    TimeUnit.MINUTES,
    ArrayBlockingQueue<Runnable>(100),
    RejectedExecutionHandler { _, _ -> println("reject submit thread to thread pool") }
)

// 测试
for (i in 1..10) {
    threadPoolExecutor.execute { println("execute thread is:${Thread.currentThread().name}") }
}

// 结果
// execute thread is:pool-1-thread-1
// execute thread is:pool-1-thread-1
// execute thread is:pool-1-thread-1
// execute thread is:pool-1-thread-1
// execute thread is:pool-1-thread-5
// execute thread is:pool-1-thread-5
// execute thread is:pool-1-thread-4
// execute thread is:pool-1-thread-3
// execute thread is:pool-1-thread-2
// execute thread is:pool-1-thread-1
复制代码

从结果就可以看出来,执行时间操作,但是只创建了5个线程,另外5次都是复用线程的。这样就达到了复用存在的线程、减少对象的创建和销毁的额外开销;并且可以控制最大线程数,也就是控制了最大并发数。

知道如何使用一个线程池还不够,我们需要看看 ThreadPoolExecutor 是如何创建、复用这些线程的。下面我们看看创建 ThreadPoolExecutor 对象的几个参数:

构造方法

/**
     * 创建一个ThreadPoolExecutor对象
     *
     * @param corePoolSize 核心线程数,这些线程会一直在线程池中,除非设置了 allowCoreThreadTimeOut
     * @param maximumPoolSize 最大线程数,运行线程创建的最大值
     * @param keepAliveTime 当线程数>核心线程数的时候,这个值就是空闲且非核心线程存活的时间
     * @param unit keepAliveTime的单位
     * @param workQueue 保存task的队列,直到执行execute()方法执行
     * @param threadFactory ThreadFactory是一个接口,里面只有Thread newThread(Runnable r)方法,用来创建线程,
     *                      默认采用Executors.defaultThreadFactory()
     * @param handler 拒绝处理任务时的策略,如果线程池满了且所有线程都不处于空闲状态,
     *                通过RejectedExecutionHandler接口的rejectedExecution(Runnable r, ThreadPoolExecutor executor)来处理传进来的Runnable
     *                系统提供了四种:CallerRunsPolicy(), AbortPolicy(), DiscardPolicy(), DiscardOldestPolicy()
     *                默认采用new AbortPolicy()
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler){
        if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
复制代码

我在方法头注释中我都一一解释了几个参数的作用,还有几点需要注意的就是:

  • 核心线程数不能小于0;
  • 最大线程数不能小于0;
  • 最大线程数不能小于核心线程数;
  • 空闲线程的存活时间不能小于0;

通过上面的解释我们很明白的知道前面几个参数的作用,但是最后两个参数我们并不能通过表面的解释通晓它,既然不能通过表象看懂他俩,那就看看默认的实现是如何做的,这样在接下来的源码分析中很有帮助。

ThreadFactory:线程工厂

ThreadFactory 是一个接口,里面只由唯一的 Thread newThread(Runnable r); 方法,此方法是用来创建线程的,从接口中我们得到的就只有这么多,下面我们看看 Executors 默认的 DefaultThreadFactory 类:

// 静态内部类
static class DefaultThreadFactory implements ThreadFactory {
    // 线程池的标识,从1开始没创建一个线程池+1
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    // 线程组
    private final ThreadGroup group;
    // 线程名中的结尾标识,从1开始每创建一个线程+1
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    // 线程名
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}
复制代码

RejectedExecutionHandler:拒绝处理任务的策略

RejectedExecutionHandler 也是一个接口,并且也只提供了唯一的 void rejectedExecution(Runnable r, ThreadPoolExecutor executor); 方法。我们可以自定义策略,也可以用上面提到的封装好的四种策略,先看一下四种策略分别怎么拒绝任务的:

  • CallerRunsPolicy

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() {
        }
    
        /**
         * 如果线程池还没关闭,那么就再次执行这个Runnable
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
    复制代码
  • AbortPolicy

    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() {
        }
    
        /**
         * 这个策略就是抛出异常,不做其他处理
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                    " rejected from " +
                    e.toString());
        }
    }
    复制代码
  • DiscardPolicy

    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() {
        }
    
        /**
         * 什么也不做,也就是抛弃了这个Runnable
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
    复制代码
  • DiscardOldestPolicy

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() {
        }
    
        /**
         * 1. 线程池未关闭
         * 2. 获取队列中的下一个Runnable
         * 3. 获取到了,但是不对它进行处理,也就是抛弃它
         * 4. 执行我们传过来的这个Runnable
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }
    复制代码

重要的参数

除了上述构造方法中的几个参数外,线程池还有几个比较核心的参数,如下:

public class ThreadPoolExecutor extends AbstractExecutorService {

    // ctl 的低29位表示线程池中的线程数,高3位表示当前线程状态
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // (2^29) -1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 运行状态:接受新任务并处理排队的任务
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 关闭状态:不接受新任务,但处理排队的任务
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 停止状态:不接受新任务,不处理排队的任务,中断正在进行的任务
    private static final int STOP       =  1 << COUNT_BITS;
    // 整理状态:整理状态,所有任务已终止,workerCount为零,线程将运行terminate()方法
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 终止状态:terminate()方法执行完成
    private static final int TERMINATED =  3 << COUNT_BITS;

    // 表示线程是否允许或停止
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 线程的有效数量
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    
    ......后面的源码暂时省略
}
复制代码

execute:执行

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 如果运行中的线程数小于核心线程数,执行addWorker(command, true)创建新的核心Thread执行任务
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 1. 已经满足:运行中的线程数大于核心线程数,但是小于最大线程数
    // 2. 需要满足:线程池在运行状态
    // 3. 需要满足:添加到工作队列中成功
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 如果线程不在运行状态,就从工作队列中移除command
        // 并且执行拒绝策略
        if (!isRunning(recheck) && remove(command))
            reject(command);
        // 线程池处于运行状态,但是没有线程,则addWorker(null, false)
        // 至于这里为什么要传入一个null,因为在最外层的if条件中我们已经将Runnable添加到工作队列中了
        // 而且在runWorker()源码中也可以得到答案,如果传入的Runnable为空,就会去工作队列中取task。
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 执行addWorker()创建新的非核心线程Thread执行任务
    // addWorker() 失败,执行拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}
复制代码

从上面源码中可以看出, execute() 一个新的任务,主要有以下这几种情况:

  1. 核心线程未满,直接新建核心线程并执行任务;
  2. 核心线程满了,工作队列未满,将任务添加到工作队列中;
  3. 核心线程和工作队列都满,但是最大线程数未达到,新建线程并执行任务;
  4. 上面条件都不满足,那么就执行拒绝策略。

更形象的可以看下方流程图:

线程池的使用和源码剖析

addWorker(Runnable , boolean):添加Worker

private boolean addWorker(Runnable firstTask, boolean core) {
    // 标记外循环,比如在内循环中break retry就直接跳出外循环
    retry:
    for (; ; ) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 直接返回false有以下3种情况:
        // 1. 线程池状态为STOP、TIDYING、TERMINATED
        // 2. 线程池状态不是running状态,并且firstTask不为空
        // 3. 线程池状态不是running状态,并且工作队列为空
        if (rs >= SHUTDOWN &&
                !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
            return false;

        for (; ; ) {
            int wc = workerCountOf(c);
            // 如果添加的是核心线程,但是运行的线程数大于等于核心线程数,那么就不添加了,直接返回
            // 如果添加的是非核心线程,但是运行的线程数大于等于最大线程数,那么也不添加,直接返回
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 增加workerCount的值 +1
            if (compareAndIncrementWorkerCount(c))
                // 跳出外循环
                break retry;
            c = ctl.get();  // 重新检查线程池状态
            if (runStateOf(c) != rs)
                continue retry;
            // 重新检查的状态和之前不合,再次从外循环进入
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 线程池重入锁
            final ReentrantLock mainLock = this.mainLock;
            // 获得锁
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                // 线程池在运行状态或者是线程池关闭同时Runnable也为空
                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 想Worker中添加新的Worker
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                // 释放锁
                mainLock.unlock();
            }
            // 如果添加成功,启动线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (!workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
复制代码

addWorker() 主要就是在满足种种条件(上述源码中解释了)后,新建一个 Worker 对象,并添加到 HashSet<Worker> workers 中去,最后调用新建 Worker 对象的 Thread 变量的 start() 方法。

Worker类

Worker 是一个继承了 AQS 并实现了 Runnable 的内部类,我们重点看看它的 run() 方法,因为上面 addWorker() 中, t.start() 触发的就是它的 run() 方法:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /**
     * Thread this worker is running in.  Null if factory fails.
     */
    final Thread thread;
    /**
     * Initial task to run.  Possibly null.
     */
    Runnable firstTask;
    /**
     * Per-thread task counter
     */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     *
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 这边是把Runnable传给了Thread,也就是说Thread.run()就是执行了下面的run()方法
        this.thread = getThreadFactory().newThread(this);
    }

    /**
     * Delegates main run loop to outer runWorker
     */
    public void run() {
        runWorker(this);
    }
}
复制代码

run() 方法实际调用了 runWorker(Worker) 方法

runWorker(Worker)方法:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // 释放锁,允许中断
        boolean completedAbruptly = true;
        try {
            // 1. worker中的task不为空
            // 2. 如果worker的task为空,那么取WorkerQueue的task
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 这是一个空方法,可由子类实现
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行task
                        task.run();
                    } 
                    .... 省略
                    // 这是一个空方法,可由子类实现
                    finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
复制代码

getTask():

``java
private Runnable getTask() {
    // 进入死循环
    for (; ; ) {
        try {
            // 为true的条件:
            // allowCoreThreadTimeOut=true: 核心线程需根据keepAliveTime超时等待
            // 核心线程数已满
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // 如果timed为true,执行BlockQueue.poll(),这个操作在取不到task的时候会等待keepAliveTime,然后返回null
            // 如果timed为false,执行BlockQueue.take(),这个操作在队列为空的时候一直阻塞
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
        }
    }
}
```
复制代码

线程池的源码按照上述的几个方法( execute(runnable) -> addWorker(runnable,core) -> Worker -> runWorker(worker) -> getTask() )的顺序来分析,你就可以很清晰的将运作过程了解清楚,同事构造方法和几个重要的参数一定要懂,不然对于后面的源码分析很受阻碍,相信大家通过这篇文章可以加深对线程池的理解。

源码分析的文章还在不断的更新,如果本文章你发现有不正确或者不足之处,欢迎你在下方留言或者扫描下方的二维码留言也可!

线程池的使用和源码剖析

扫描关注公众号


以上所述就是小编给大家介绍的《线程池的使用和源码剖析》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Learning Vue.js 2

Learning Vue.js 2

Olga Filipova / Packt Publishing / 2017-1-5 / USD 41.99

About This Book Learn how to propagate DOM changes across the website without writing extensive jQuery callbacks code.Learn how to achieve reactivity and easily compose views with Vue.js and unders......一起来看看 《Learning Vue.js 2》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

URL 编码/解码
URL 编码/解码

URL 编码/解码