Java 并发编程(二):线程池总结

栏目: IT技术 · 发布时间: 4年前

概述

常规 new Thread 创建线程问题:

  • Thread线程 属于一个重量级的对象,通过 new Thread 创建一个线程,首先它是一个 Java对象 ,需要分配堆空间资源;同时 Thread 需要调用操作 系统内核API ,在 系统层面创建一个线程与此对应 ,操作系统还要为线程分配一系列资源。所以,创建线程的成本是很高的,应该 避免频繁创建和销毁线程

  • 另外,线程缺乏统一管理,可能无限制新建线程,相互之间竞争加剧,以及可能占用过多系统资源导致服务器宕机或 OOM 等。

如何去解决这个问题,就是采用经常使用到的 资源池 方案,比如数据库连接池等,将资源提前初始化后放入到池中进行管理,待需要使用时从池中获取一个空闲资源,使用完后再将资源放回到池中达到释放目的,这样其它任务就可以继续重复使用该资源,避免资源被不停创建、销毁。

由于 Thread API 在接口设计上的问题,线程池和一般的资源池在使用上是有些差异的,比如连接池:从连接池获取可用连接 --> 使用连接执行任务 --> 将连接放入到连接池。如果我们从线程池中获取到一个 Thread 对象,根本没法处理我们的任务,因为 Thread 线程在启动之前要么 重写 run() 、要么 传入 Runnable 方式将任务和 Thread 绑定在一起。所以, Java线程池 是没有提供 申请线程释放线程 的方法,而是采用一种 生产者/消费者模式 去构建线程池执行机制。

Java 线程 Thread 是被一对一映射到本地操作系统线程,即 Java 启动时会创建一个本地操作系统线程,当 Java 线程终止时,对应操作系统线程会被回收。

Executor体系

Java 5 之前,仅仅只能使用 ThreadRunnableThreadLocalsynchronized 等进行多线程开发,线程的使用及其简陋; Java 5 极大的改善并发编程,构建出了多线程开发 API 的基础体系,这些类主要位于 java.util.concurrent 包下,简称 J.U.C

Java 并发编程(二):线程池总结

Executor 就是 J.U.C 中比较重要的一块,用于构建多线程开发中使用最普遍的线程池:

  • Executor 【接口】:最顶层接口,该接口只定义了一个方法: void execute(Runnable command)
  • ExecutorService
    Executor
    Callable
    Future
    
  • ScheduledExecutorService 【接口】:继承了 ExecutorService ,增加了定时任务相关的方法
  • ThreadPoolExecutor 【实现类】: ExecutorService 的默认实现
  • ScheduledThreadPoolExecutor
    ThreadPoolExecutor
    ScheduledExecutorService
    ScheduledThreadPoolExecutor
    

ExecutorService

Java 并发编程(二):线程池总结

方法描述:

  • shutdown() :优雅关闭线程池,之前提交的任务将被执行,包括当前正在执行的和等待队列中的任务,但是线程池不会再接收新任务,提交新任务会抛出异常

  • shutdownNow() :调用 shutdownNow 方法后, 线程池就不会再接受新的任务了,并且会丢弃工作队列里面的任务,正在执行的任务会被中断, 该方法会立刻返回,返回值为这时候队列里面被丢弃的任务列表。

  • isShutdown() :如果此线程池关闭,则返回true

  • isTerminated() :如果关闭后所有任务都已完成,则返回true

  • awaitTermination() :当线程调用 awaitTermination 方法后,当前线程会被阻塞,直到线程池状态变为 TERMINATED 才返回, 或者等待时间超时才返回、或当前线程被中断

  • submit() 系列方法:

    • public Future<?> submit(Runnable task)
      Runnable
      Runnable
      submit
      Future
      Future.get()
      Runnable
      Future.get()
      null
      
    • public <T> Future<T> submit(Runnable task, T result)
      submit(Runnable task)
      Runnable
      Future.get()
      
    • public <T> Future<T> submit(Callable<T> task) :提交一个 Callable 任务,该任务是带有返回结果的
  • invokeAll() 系列方法:

    • invokeAll(Collection<? extends Callable<T>> tasks) :执行给定任务集合,执行完毕后返回结果
    • invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
      Future.get()
      CancellationException
      
  • invokeAny() 系列方法:参照 invokeAll() 方法

    • T invokeAny(Collection<? extends Callable<T>> tasks) :执行给定的任务集合,任意一个执行成功则返回结果,其他任务终止
    • T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) :执行给定的任务集合,任意一个执行成功或超时,则返回结果,其他任务终止

注意: invoke 方法是阻塞方法,即提交的任务执行完或超时才会返回结果,而 submit 系列方法是会立即返回 Future 结果。区分一个方法是不是阻塞方法可以通过方法签名是否抛出 InterruptedException

public Future<?> submit(Runnable task)
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException

ScheduledExecutorService

创建并执行一个一次性任务,可以指定延迟时间:

schedule(Runnable command, long delay, TimeUnit unit)

schedule(Callable<V> callable, long delay, TimeUnit unit)

创建并执行一个周期性任务:

scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) :周期是以两个任务开始时间间隔为周期,初始延迟时间后会触发第一次执行,执行过程中发生异常,那么任务就停止,一次任务执行时长超过周期时间,那下一次任务会等到该任务执行结束后,立即执行。

scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) :周期是以上一个任务结束到下一个任务开始时间间隔为周期,指的是 以固定的延时 执行, initialDelay 初次延迟时间, delay 指的是上一次执行终止和下一次执行开始之间的延迟。

线程池状态

  • RUNNING :接收新任务并处理排队任务;
  • SHUTDOWN :不接收新任务,但处理排队任务,调用 shutdown() 会处于该状态;
  • STOP :不接收新任务,也不处理排队任务,并中断正在运行的任务,调用 shutdownNow() 会处于该状态;
  • TIDYING
    workerCount
    TIDYING
    terminate()
    
  • TERMINATEDterminate() 运行完成;

参见 ThreadPoolExecutor

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

线程池状态具体可参加线程池终止一节中源码分析。

线程池模型

Java 并发编程(二):线程池总结

线程池中主要有:

  • AtomicInteger ctl :二进制高3位表示线程池状态,后29位记录工作线程数量,所以线程池所能够支持的最大线程数: 2^29 - 1 = 536870911

线程池状态高3位表示:

RUNNING    = -1 << COUNT_BITS;//111
SHUTDOWN   =  0 << COUNT_BITS;//000
STOP       =  1 << COUNT_BITS;//010
TIDYING    =  2 << COUNT_BITS;//100
TERMINATED =  3 << COUNT_BITS;//110
  • Worker
    Worker
    Worker
    Thread
    Worker
    firstTask
    Worker
    getTask
    

ThreadPoolExecutor 定义:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) 

核心参数:

  • corePoolSize :核心线程数,线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时,再创建新线程去执行任务
  • maximumPoolSize :最大线程数,线程池有可能会在核心线程数的基础上,额外增加一些线程,但是这些新增加的线程数有一个上限,这就是最大量 maxPoolSize
  • keepAliveTime
    工作线程数 > 核心线程数
    线程空闲超时
    allowCoreThreadTimeOut(true)
    工作线程数
    核心线程数
    

等待队列 workQueue

  • 有界队列 ArrayBlockingQueue :等待队列有固定大小;
  • LinkedBlockingQueue
    corePoolSize
    corePoolSize
    maximumPoolSize
    corePoolSize
    
  • SynchronousQueue
    Executors
    newCachedThreadPool()
    

拒绝策略 handler :如若使用拒绝策略,等待队列一定要设置成有界队列才行;若等待队列已满,则在总线程数不大于 maximumPoolSize 的前提下,创建新的线程;若线程数大于 maximumPoolSize ,则执行拒绝策略

  • AbortPolicy :拒绝任务,并且抛出异常

  • CallerRunsPolicy :只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务

  • DiscardOldestPolicy :丢弃最老的一个请求,然后把当前任务重新加入到队列中

  • DiscardPolicy :丢弃无法处理的任务,没有异常信息

  • 如果需要自定义拒绝策略可以实现 RejectedExecutionHandler 接口

增减线程时机:

  • 如果 工作线程数 < corePoolSize ,即使其它工作线程处于空闲状态,也会创建一个新线程来运行新任务;
  • 如果 工作线程数 >= corePoolSize ,但 工作线程数 < maximumPoolSize ,则提交的任务会被放入到等待队列中;
  • 如果 等待队列已满 ,并且 工作线程数 < maxPoolSize ,则创建一个新线程来运行新任务,这个机制可能很多人都会搞错,这里就容易出现之前提交的任务还在等待队列中阻塞,但是新提交任务却被执行情况,千万注意;
  • 如果队列已满,并且 工作线程数 >= maxPoolSize ,这时提交的新任务采用拒绝策略处理;
  • 等待队列
    corePoolSize
    LinkedBlockingQueue
    等待队列
    工作线程数
    corePoolSize
    maximumPoolSize
    
  • 线程池有个线程超时机制,超时的线程会被销毁回收,详见 keepAliveTime 参数;
  • shutdownshutdownNow 方法终止线程池时,最终会把所有的工作线程都销毁,具体参见后续源码分析;

Executors

类似于 CollectionCollections 关系, ExecutorsExecutor工具 类,提供了一些创建线程池方法,主要如下:

  • ExecutorService newFixedThreadPool(int nThreads) :创建一个固定大小、任务队列容量无界的线程池;
  • ExecutorService newCachedThreadPool()
    Integer.MAX_VALUE
    SynchronousQueue
    
  • ExecutorService newSingleThreadExecutor() :只有一个线程来执行无界任务队列的单一线程池,该线程池确保任务按照加入的顺序一个一个依次执行,当唯一的线程因任务异常终止时,将创建一个新的线程来继续执行后续的任务;
  • ScheduledExecutorService newScheduledThreadPool(int corePoolSize) :能定时执行任务的线程池,线程池中核心线程数由参数指定,最大线程数= Integer.MAX_VALUE

FixedThreadPoolSingleThreadExecutorQueueLinkedBlockingQueue ,因为固定线程,线程满的话需要等待队列去缓存任务;

CachedThreadPool 使用的 QueueSynchronousQueue ,不需要缓存任务,因为最大线程数没有限制;

ScheduledThreadPool 来说,它使用的是延迟队列, DelayedWorkQueue

JDK1.8 新加入了一种线程池: workStealingPool

Executors 工具类方式创建线程池不建议在生产开发中直接使用,因为创建出来的线程池比较粗糙,或多或少都存在一些问题,一般只在测试用例中使用,阿里开发手册中也是规定不能在代码中直接通过 Executors 方式创建线程池。

CompletionService

ExecutorService + Future 在处理批量提交异步任务并获取结果时可能会存在一些问题,如下:

public void executorServiceTest() throws InterruptedException {
    List<Future<Integer>> futures = new ArrayList<>();
    for(int i=0;i<10;i++){
        Future<Integer> future = executor.submit(new Task());
        futures.add(future);
    }
    //Future.get获取处理结果时,按照顺序获取
    for (int i = 0, size = futures.size(); i < size; i++) {
        Future<Integer> f = futures.get(i);
        try {
            Integer ret = f.get();
            log.info("ret:{}", ret);
        } catch (InterruptedException | ExecutionException e) {
        }
    }
}

class Task implements Callable<Integer>{
    private Random random = new Random();

    @Override
    public Integer call() throws Exception {
        int v = random.nextInt(500);
        TimeUnit.MICROSECONDS.sleep(v);
        return v;
    }
}

如上述案例:通过 executor.submit() 批量提交 10个任务 ,然后获取这些任务的结果,只能顺序通过 Future.get() 获取,因为无法知道哪些任务先完成,这就造成即使有些任务先完成,由于前面任务没有完成依然被阻塞。

CompletionServiceExecutorService 进行包装,将结果集 按照完成时间的顺序放入到阻塞队列中,获取结果时只需要从阻塞队列中获取即可,即实现:先完成的任务先获取结果

Java 并发编程(二):线程池总结

CompletionService 逻辑如上图,其实现代码也很简单:

1、将任务封装成 FutureTask ,然后提交到内部线程池中执行任务;

2、 FutureTask#done() 方法会在任务被完成时进行回调的方法, CompletionService 就是重写该方法,在 FutureTask 任务完成时添加到阻塞队列中去,这样就可以从阻塞队列中获取已完成的任务,见下:

protected void done() { completionQueue.add(task); }

TipsExecutorCompletionService 类中使用 LinkedBlockingQueue 无界队列存储结果集,所以,一定要及时去取结果,不然完成的任务结果不停的堆积到阻塞队列中,可能会撑爆内存空间。

总结

在生产开发中建议更多的使用线程池技术,除去 性能 方面考虑外,从 软件设计 上线程池比之前介绍的创建线程方式更好:

首先,线程池更好的体现了把任务单元与执行机制分离开的思想,开发者更多关注的是任务单元,只需要把需要执行的任务封装到 RunnableCallable 中,通过 submit()invoke() 等方式提交给线程池;而多线程如何创建、运行等执行机制是由 Executor 框架提供,一般情况下对于开发者是不需要关心的。

另一点 Thread ( JDK1.0 就存在)可能是由于出现的比较早,本身在接口设计上有那么点不是很完善的地方,比如我们的任务封装到 Runnable 中需要和 Thread 绑定完成后才能使用 start 启动线程执行任务。执行完成这个线程就废弃了,而不能通过重新绑定 Runnable 实现执行新任务,这就导致 Thread 类接口设计上就将任务和执行很紧密的耦合在一起了,即使 RunnableCallable 接口的出现在代码层面将任务逻辑代码部分剥离出来,但是在执行时依然需要先绑定才能启动线程,而不能做到像线程池在运行时有新任务随时提交。

Java 并发编程(二):线程池总结


以上所述就是小编给大家介绍的《Java 并发编程(二):线程池总结》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Tagging

Tagging

Gene Smith / New Riders / 2007-12-27 / GBP 28.99

Tagging is fast becoming one of the primary ways people organize and manage digital information. Tagging complements traditional organizational tools like folders and search on users desktops as well ......一起来看看 《Tagging》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

html转js在线工具
html转js在线工具

html转js在线工具