线程池的使用和源码剖析

栏目: 编程工具 · 发布时间: 5年前

内容简介:线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。----摘自维基百科我们在Android或者Java开发中,日常所使用的

线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。----摘自维基百科

我们在Android或者 Java 开发中,日常所使用的就是 ThreadPoolExecutor 了,我们先来看下如何使用一个线程池来代替多线程开发。

使用线程池

// 创建一个核心线程数为5,最大线程数为10,空闲线程存活时间为60s的线程池对象
val threadPoolExecutor = ThreadPoolExecutor(
    5, 10, 60,
    TimeUnit.MINUTES,
    ArrayBlockingQueue<Runnable>(100),
    RejectedExecutionHandler { _, _ -> println("reject submit thread to thread pool") }
)

// 测试
for (i in 1..10) {
    threadPoolExecutor.execute { println("execute thread is:${Thread.currentThread().name}") }
}

// 结果
// execute thread is:pool-1-thread-1
// execute thread is:pool-1-thread-1
// execute thread is:pool-1-thread-1
// execute thread is:pool-1-thread-1
// execute thread is:pool-1-thread-5
// execute thread is:pool-1-thread-5
// execute thread is:pool-1-thread-4
// execute thread is:pool-1-thread-3
// execute thread is:pool-1-thread-2
// execute thread is:pool-1-thread-1
复制代码

从结果就可以看出来,执行时间操作,但是只创建了5个线程,另外5次都是复用线程的。这样就达到了复用存在的线程、减少对象的创建和销毁的额外开销;并且可以控制最大线程数,也就是控制了最大并发数。

知道如何使用一个线程池还不够,我们需要看看 ThreadPoolExecutor 是如何创建、复用这些线程的。下面我们看看创建 ThreadPoolExecutor 对象的几个参数:

构造方法

/**
     * 创建一个ThreadPoolExecutor对象
     *
     * @param corePoolSize 核心线程数,这些线程会一直在线程池中,除非设置了 allowCoreThreadTimeOut
     * @param maximumPoolSize 最大线程数,运行线程创建的最大值
     * @param keepAliveTime 当线程数>核心线程数的时候,这个值就是空闲且非核心线程存活的时间
     * @param unit keepAliveTime的单位
     * @param workQueue 保存task的队列,直到执行execute()方法执行
     * @param threadFactory ThreadFactory是一个接口,里面只有Thread newThread(Runnable r)方法,用来创建线程,
     *                      默认采用Executors.defaultThreadFactory()
     * @param handler 拒绝处理任务时的策略,如果线程池满了且所有线程都不处于空闲状态,
     *                通过RejectedExecutionHandler接口的rejectedExecution(Runnable r, ThreadPoolExecutor executor)来处理传进来的Runnable
     *                系统提供了四种:CallerRunsPolicy(), AbortPolicy(), DiscardPolicy(), DiscardOldestPolicy()
     *                默认采用new AbortPolicy()
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler){
        if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
复制代码

我在方法头注释中我都一一解释了几个参数的作用,还有几点需要注意的就是:

  • 核心线程数不能小于0;
  • 最大线程数不能小于0;
  • 最大线程数不能小于核心线程数;
  • 空闲线程的存活时间不能小于0;

通过上面的解释我们很明白的知道前面几个参数的作用,但是最后两个参数我们并不能通过表面的解释通晓它,既然不能通过表象看懂他俩,那就看看默认的实现是如何做的,这样在接下来的源码分析中很有帮助。

ThreadFactory:线程工厂

ThreadFactory 是一个接口,里面只由唯一的 Thread newThread(Runnable r); 方法,此方法是用来创建线程的,从接口中我们得到的就只有这么多,下面我们看看 Executors 默认的 DefaultThreadFactory 类:

// 静态内部类
static class DefaultThreadFactory implements ThreadFactory {
    // 线程池的标识,从1开始没创建一个线程池+1
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    // 线程组
    private final ThreadGroup group;
    // 线程名中的结尾标识,从1开始每创建一个线程+1
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    // 线程名
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}
复制代码

RejectedExecutionHandler:拒绝处理任务的策略

RejectedExecutionHandler 也是一个接口,并且也只提供了唯一的 void rejectedExecution(Runnable r, ThreadPoolExecutor executor); 方法。我们可以自定义策略,也可以用上面提到的封装好的四种策略,先看一下四种策略分别怎么拒绝任务的:

  • CallerRunsPolicy

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() {
        }
    
        /**
         * 如果线程池还没关闭,那么就再次执行这个Runnable
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
    复制代码
  • AbortPolicy

    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() {
        }
    
        /**
         * 这个策略就是抛出异常,不做其他处理
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                    " rejected from " +
                    e.toString());
        }
    }
    复制代码
  • DiscardPolicy

    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() {
        }
    
        /**
         * 什么也不做,也就是抛弃了这个Runnable
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
    复制代码
  • DiscardOldestPolicy

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() {
        }
    
        /**
         * 1. 线程池未关闭
         * 2. 获取队列中的下一个Runnable
         * 3. 获取到了,但是不对它进行处理,也就是抛弃它
         * 4. 执行我们传过来的这个Runnable
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }
    复制代码

重要的参数

除了上述构造方法中的几个参数外,线程池还有几个比较核心的参数,如下:

public class ThreadPoolExecutor extends AbstractExecutorService {

    // ctl 的低29位表示线程池中的线程数,高3位表示当前线程状态
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // (2^29) -1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 运行状态:接受新任务并处理排队的任务
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 关闭状态:不接受新任务,但处理排队的任务
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 停止状态:不接受新任务,不处理排队的任务,中断正在进行的任务
    private static final int STOP       =  1 << COUNT_BITS;
    // 整理状态:整理状态,所有任务已终止,workerCount为零,线程将运行terminate()方法
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 终止状态:terminate()方法执行完成
    private static final int TERMINATED =  3 << COUNT_BITS;

    // 表示线程是否允许或停止
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 线程的有效数量
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    
    ......后面的源码暂时省略
}
复制代码

execute:执行

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 如果运行中的线程数小于核心线程数,执行addWorker(command, true)创建新的核心Thread执行任务
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 1. 已经满足:运行中的线程数大于核心线程数,但是小于最大线程数
    // 2. 需要满足:线程池在运行状态
    // 3. 需要满足:添加到工作队列中成功
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 如果线程不在运行状态,就从工作队列中移除command
        // 并且执行拒绝策略
        if (!isRunning(recheck) && remove(command))
            reject(command);
        // 线程池处于运行状态,但是没有线程,则addWorker(null, false)
        // 至于这里为什么要传入一个null,因为在最外层的if条件中我们已经将Runnable添加到工作队列中了
        // 而且在runWorker()源码中也可以得到答案,如果传入的Runnable为空,就会去工作队列中取task。
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 执行addWorker()创建新的非核心线程Thread执行任务
    // addWorker() 失败,执行拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}
复制代码

从上面源码中可以看出, execute() 一个新的任务,主要有以下这几种情况:

  1. 核心线程未满,直接新建核心线程并执行任务;
  2. 核心线程满了,工作队列未满,将任务添加到工作队列中;
  3. 核心线程和工作队列都满,但是最大线程数未达到,新建线程并执行任务;
  4. 上面条件都不满足,那么就执行拒绝策略。

更形象的可以看下方流程图:

线程池的使用和源码剖析

addWorker(Runnable , boolean):添加Worker

private boolean addWorker(Runnable firstTask, boolean core) {
    // 标记外循环,比如在内循环中break retry就直接跳出外循环
    retry:
    for (; ; ) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 直接返回false有以下3种情况:
        // 1. 线程池状态为STOP、TIDYING、TERMINATED
        // 2. 线程池状态不是running状态,并且firstTask不为空
        // 3. 线程池状态不是running状态,并且工作队列为空
        if (rs >= SHUTDOWN &&
                !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
            return false;

        for (; ; ) {
            int wc = workerCountOf(c);
            // 如果添加的是核心线程,但是运行的线程数大于等于核心线程数,那么就不添加了,直接返回
            // 如果添加的是非核心线程,但是运行的线程数大于等于最大线程数,那么也不添加,直接返回
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 增加workerCount的值 +1
            if (compareAndIncrementWorkerCount(c))
                // 跳出外循环
                break retry;
            c = ctl.get();  // 重新检查线程池状态
            if (runStateOf(c) != rs)
                continue retry;
            // 重新检查的状态和之前不合,再次从外循环进入
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 线程池重入锁
            final ReentrantLock mainLock = this.mainLock;
            // 获得锁
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                // 线程池在运行状态或者是线程池关闭同时Runnable也为空
                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 想Worker中添加新的Worker
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                // 释放锁
                mainLock.unlock();
            }
            // 如果添加成功,启动线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (!workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
复制代码

addWorker() 主要就是在满足种种条件(上述源码中解释了)后,新建一个 Worker 对象,并添加到 HashSet<Worker> workers 中去,最后调用新建 Worker 对象的 Thread 变量的 start() 方法。

Worker类

Worker 是一个继承了 AQS 并实现了 Runnable 的内部类,我们重点看看它的 run() 方法,因为上面 addWorker() 中, t.start() 触发的就是它的 run() 方法:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /**
     * Thread this worker is running in.  Null if factory fails.
     */
    final Thread thread;
    /**
     * Initial task to run.  Possibly null.
     */
    Runnable firstTask;
    /**
     * Per-thread task counter
     */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     *
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 这边是把Runnable传给了Thread,也就是说Thread.run()就是执行了下面的run()方法
        this.thread = getThreadFactory().newThread(this);
    }

    /**
     * Delegates main run loop to outer runWorker
     */
    public void run() {
        runWorker(this);
    }
}
复制代码

run() 方法实际调用了 runWorker(Worker) 方法

runWorker(Worker)方法:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // 释放锁,允许中断
        boolean completedAbruptly = true;
        try {
            // 1. worker中的task不为空
            // 2. 如果worker的task为空,那么取WorkerQueue的task
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 这是一个空方法,可由子类实现
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行task
                        task.run();
                    } 
                    .... 省略
                    // 这是一个空方法,可由子类实现
                    finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
复制代码

getTask():

``java
private Runnable getTask() {
    // 进入死循环
    for (; ; ) {
        try {
            // 为true的条件:
            // allowCoreThreadTimeOut=true: 核心线程需根据keepAliveTime超时等待
            // 核心线程数已满
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // 如果timed为true,执行BlockQueue.poll(),这个操作在取不到task的时候会等待keepAliveTime,然后返回null
            // 如果timed为false,执行BlockQueue.take(),这个操作在队列为空的时候一直阻塞
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
        }
    }
}
```
复制代码

线程池的源码按照上述的几个方法( execute(runnable) -> addWorker(runnable,core) -> Worker -> runWorker(worker) -> getTask() )的顺序来分析,你就可以很清晰的将运作过程了解清楚,同事构造方法和几个重要的参数一定要懂,不然对于后面的源码分析很受阻碍,相信大家通过这篇文章可以加深对线程池的理解。

源码分析的文章还在不断的更新,如果本文章你发现有不正确或者不足之处,欢迎你在下方留言或者扫描下方的二维码留言也可!

线程池的使用和源码剖析

扫描关注公众号


以上所述就是小编给大家介绍的《线程池的使用和源码剖析》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

数字乌托邦

数字乌托邦

尼古拉斯•卡尔 / 姜忠伟 / 中信前沿出版社 / 2018-5 / 69.00

当下,技术与我们的关系变得越来越紧密不可分割,特别是智能手机等设备的出现,带给整个人类社会一场彻底的变革。的确,智能手机上的各种应用程序让我们的工作生活无比便利:社交媒体让我们能够和他人实时保持联络并传输信息,不再受时间、地点的限制;搜索引擎通过精准的算法将我们所需要的信息整合推送至屏幕上,让我们毫不费力就看到自己想要的;地图软件为我们的出行提供了更多路线选择,甚至可以使用语音导航,帮助我们顺利到......一起来看看 《数字乌托邦》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具