内容简介:ExecutorCompletionService是线程池和队列的配合使用,它的内部封装了线程池(线程池需要在构造ExecutorCompletionService对象时传入),也可以自定义队列传入构造函数, 如果没传入,将构造无限的队列LinkedBlockingQueue,将提交的任务代理给线程池执行(任务是FutureTask的子类 QueueingFuture,QueueingFuture重写了done()方法,done方法在FutureTask类中是空实现,任务执行完会调用此方法,不清楚的可以看下
一、简介
ExecutorCompletionService是线程池和队列的配合使用,它的内部封装了线程池(线程池需要在构造ExecutorCompletionService对象时传入),也可以自定义队列传入构造函数, 如果没传入,将构造无限的队列LinkedBlockingQueue,将提交的任务代理给线程池执行(任务是FutureTask的子类 QueueingFuture,QueueingFuture重写了done()方法,done方法在FutureTask类中是空实现,任务执行完会调用此方法,不清楚的可以看下我的另一篇FutureTask源码分析,模板方法, 子类可以进行重写),因为提交的任务被转换为QueueingFuture对象,QueueingFuture任务对象处理完成之后,会调用重写的done方法主动将该执行完的QueueingFuture任务放到ExecutorCompletionService维护的阻塞队列中,因此执行完成的任务都会被放到阻塞队列中,如果想要获得任务的执行结果时,只需调用take()或者poll()方法获取即可。线程池下一篇会进行介绍。
二、属性
//线程池,执行任务 private final Executor executor; //如果在构造函数传入进来的线程池参数,是AbstractExecutorService的子类,也会将线程池赋值给该属性,该类主要用来将Callable和Runnable任务包装成FutureTask任务,下面会进行介绍 private final AbstractExecutorService aes; //存放执行完成的任务,如果任务执行出现异常,不会存放在队列中,队列如果没有自定义,在构造函数传入进来,ExecutorCompletionService会创建一个无限的链表队列,如果没有及时从队列获取执行完成的任务,有可能会导致内存溢出 private final BlockingQueue<Future<V>> completionQueue;复制代码
三、构造函数
//传入线程池构造ExecutorCompletionService实例 //@param executor 线程池对象,用来执行任务 public ExecutorCompletionService(Executor executor) { //如果传入的线程池对象为空,会抛出空指针异常 if (executor == null) //抛出空指针异常 throw new NullPointerException(); //将传入进来的线程池对象赋值给ExecutorCompletionService实例属性executor this.executor = executor; //如果传入进来的线程池对象是AbstractExecutorService的子类,也会将线程池赋值给ExecutorCompletionService实例属性aes,否则的话aes赋值为空 this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; //由于没有自定义队列,会创建一个无限的链表队列LinkedBlockingQueue,用来存放执行完成的任务,如果任务执行出现异常,不会存放在此队列中 this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } //传入线程池和自定义队列构造ExecutorCompletionService实例 public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) { //如果传入进来的线程池对象为空,或者传入进来的队列为空,抛出空指针异常 if (executor == null || completionQueue == null) //抛出空指针异常 throw new NullPointerException(); //将传入进来的线程池对象赋值给ExecutorCompletionService实例属性executor this.executor = executor; //如果传入进来的线程池对象是AbstractExecutorService的子类,也会将线程池赋值给ExecutorCompletionService实例属性aes,否则的话aes赋值为空 this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; //将传入进来的队列对象赋值给ExecutorCompletionService实例属性completionQueue this.completionQueue = completionQueue; } 复制代码
四、内部类
-
QueueingFuture内部类
//QueueingFuture是FutureTask的子类,重写了FutureTask中done方法,done方法在FutureTask中是个空方法,模板方法,子类可以进行重写 private class QueueingFuture extends FutureTask<Void> { //QueueingFuture构造函数,将RunnableFuture对象重新适配成QueueingFuture实例对象,对FutureTask不清楚的可以看下我的另一篇FutureTask源码分析https://juejin.im/post/5d08be8ce51d455d6c0ad925 QueueingFuture(RunnableFuture<V> task) { //调用父类FutureTask的构造函数,FutureTask不清楚的可以看下我的另一篇对FutureTask的源码分析 super(task, null); //将传入进来的任务赋值给task属性 this.task = task; } //done方法在FutureTask中是个空方法,模板方法,在任务被执行完时,设置任务执行结果中,finishCompletion会调用done()方法,对任务执行完成,做一些处理,QueueingFuture对done方法进行重写,将执行完成的任务加入到队列中,这样就可以从队列中获取任务执行完的结果 protected void done() { completionQueue.add(task); } //传入进来的任务 private final Future<V> task; }复制代码
五、封装任务
//将传入进来的Callable类型的任务封装成RunnableFuture任务 private RunnableFuture<V> newTaskFor(Callable<V> task) { //如果传入进来的线程池对象executor是AbstractExecutorService的子类,aes就赋值为线程池对象 //如果aes为空 if (aes == null) //直接使用FutureTask的构造函数将task封装成FutureTask return new FutureTask<V>(task); else //调用aes.newTaskFor方法将传入进来的task封装成FutureTask,newTaskFor方法内部也是使用FutureTask的构造函数将task封装成FutureTask return aes.newTaskFor(task); } //将传入进来的Runnable类型的任务封装成RunnableFuture任务 private RunnableFuture<V> newTaskFor(Runnable task, V result) { /如果传入进来的线程池对象executor是AbstractExecutorService的子类,aes就赋值为线程池对象 //如果aes为空 if (aes == null) //直接使用FutureTask的构造函数将task封装成FutureTask return new FutureTask<V>(task, result); else //调用aes.newTaskFor方法将传入进来的task封装成FutureTask,newTaskFor方法内部也是使用FutureTask的构造函数将task封装成FutureTask return aes.newTaskFor(task, result); } 复制代码
六、执行任务
//提交Callable类型的任务到线程池执行 public Future<V> submit(Callable<V> task) { //如果传入进来的任务为空直接抛出空指针异常 if (task == null) throw new NullPointerException(); //使用上面介绍的newTaskFor方法将传入进来的Callable类型的任务封装成RunnableFuture任务 RunnableFuture<V> f = newTaskFor(task); //QueueingFuture将RunnableFuture适配成QueueingFuture实例,QueueingFuture重写了FutureTask的done方法,任务执行完成调用done方法,将完成任务加入队列中,不清楚的可以看上面QueueingFuture内部类的介绍 executor.execute(new QueueingFuture(f)); //返回RunnableFuture类型的任务 return f; } //提交Runnable类型的任务到线程池执行,FutureTask也会调用Executors的callable方法将Runnable适配成Callable类型任务 public Future<V> submit(Runnable task, V result) { //如果传入进来的任务为空直接抛出空指针异常 if (task == null) throw new NullPointerException(); //使用上面介绍的newTaskFor方法将传入进来的Runnable类型的任务封装成RunnableFuture任务 RunnableFuture<V> f = newTaskFor(task, result); //QueueingFuture将RunnableFuture适配成QueueingFuture实例,QueueingFuture重写了FutureTask的done方法,任务执行完成调用done方法,将完成任务加入队列中,不清楚的可以看上面QueueingFuture内部类的介绍 executor.execute(new QueueingFuture(f)); //返回RunnableFuture类型的任务 return f; }复制代码
七、获取任务结果
//获取完成的任务,获取不到会阻塞,直到获取到或者等待线程被中断抛出中断异常,队列的源码等有空会进行分析 public Future<V> take() throws InterruptedException { //从队列中获取完成的任务 return completionQueue.take(); } //获取完成的任务,获取不到不会阻塞,不支持中断,从队列中获取不到完成的任务直接返回null public Future<V> poll() { //从队列中获取完成的任务 return completionQueue.poll(); } //获取完成的任务,循环的获取已完成的任务,直到获取到,或者超时,或者获取线程被中断,支持中断异常,从队列中获取不 public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException { //从队列中获取完成的任务 return completionQueue.poll(timeout, unit); }复制代码
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 以太坊源码分析(36)ethdb源码分析
- [源码分析] kubelet源码分析(一)之 NewKubeletCommand
- libmodbus源码分析(3)从机(服务端)功能源码分析
- [源码分析] nfs-client-provisioner源码分析
- [源码分析] kubelet源码分析(三)之 Pod的创建
- Spring事务源码分析专题(一)JdbcTemplate使用及源码分析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
互联网+ 战略版
刘润 / 中国华侨出版社 / 2015-5-1 / 49.8
1、“互联网+”上升为国家战略,“互联网+”成为下一个超级畅销书的热点话题在商业环境巨变的今天,传统企业该怎么走?传统企业转型是一个系统工程,如何定战略、抓主要矛盾? 2、首本“互联网+传统企业”的战略指导书。“我互联网+”时代到来了,传统企业的外部环境发生了哪些变化?了解商业新生代的新商业环境,跟之前工业时代的不同,从战略上指导传统企业转型,更安全也更大局把握游刃有余。一起来看看 《互联网+ 战略版》 这本书的介绍吧!