ExecutorCompletionService源码分析

栏目: 编程工具 · 发布时间: 6年前

内容简介:ExecutorCompletionService是线程池和队列的配合使用,它的内部封装了线程池(线程池需要在构造ExecutorCompletionService对象时传入),也可以自定义队列传入构造函数, 如果没传入,将构造无限的队列LinkedBlockingQueue,将提交的任务代理给线程池执行(任务是FutureTask的子类 QueueingFuture,QueueingFuture重写了done()方法,done方法在FutureTask类中是空实现,任务执行完会调用此方法,不清楚的可以看下

一、简介

ExecutorCompletionService是线程池和队列的配合使用,它的内部封装了线程池(线程池需要在构造ExecutorCompletionService对象时传入),也可以自定义队列传入构造函数, 如果没传入,将构造无限的队列LinkedBlockingQueue,将提交的任务代理给线程池执行(任务是FutureTask的子类 QueueingFuture,QueueingFuture重写了done()方法,done方法在FutureTask类中是空实现,任务执行完会调用此方法,不清楚的可以看下我的另一篇FutureTask源码分析,模板方法, 子类可以进行重写),因为提交的任务被转换为QueueingFuture对象,QueueingFuture任务对象处理完成之后,会调用重写的done方法主动将该执行完的QueueingFuture任务放到ExecutorCompletionService维护的阻塞队列中,因此执行完成的任务都会被放到阻塞队列中,如果想要获得任务的执行结果时,只需调用take()或者poll()方法获取即可。线程池下一篇会进行介绍。

二、属性

//线程池,执行任务
private final Executor executor;
//如果在构造函数传入进来的线程池参数,是AbstractExecutorService的子类,也会将线程池赋值给该属性,该类主要用来将Callable和Runnable任务包装成FutureTask任务,下面会进行介绍
private final AbstractExecutorService aes;
//存放执行完成的任务,如果任务执行出现异常,不会存放在队列中,队列如果没有自定义,在构造函数传入进来,ExecutorCompletionService会创建一个无限的链表队列,如果没有及时从队列获取执行完成的任务,有可能会导致内存溢出
private final BlockingQueue<Future<V>> completionQueue;复制代码

三、构造函数

//传入线程池构造ExecutorCompletionService实例
//@param executor 线程池对象,用来执行任务
public ExecutorCompletionService(Executor executor) {
        //如果传入的线程池对象为空,会抛出空指针异常
        if (executor == null)
            //抛出空指针异常  
            throw new NullPointerException();
        //将传入进来的线程池对象赋值给ExecutorCompletionService实例属性executor
        this.executor = executor;
        //如果传入进来的线程池对象是AbstractExecutorService的子类,也会将线程池赋值给ExecutorCompletionService实例属性aes,否则的话aes赋值为空  
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        //由于没有自定义队列,会创建一个无限的链表队列LinkedBlockingQueue,用来存放执行完成的任务,如果任务执行出现异常,不会存放在此队列中
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

//传入线程池和自定义队列构造ExecutorCompletionService实例
public ExecutorCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) {
        //如果传入进来的线程池对象为空,或者传入进来的队列为空,抛出空指针异常
        if (executor == null || completionQueue == null)
            //抛出空指针异常
            throw new NullPointerException();
        //将传入进来的线程池对象赋值给ExecutorCompletionService实例属性executor
        this.executor = executor;
        //如果传入进来的线程池对象是AbstractExecutorService的子类,也会将线程池赋值给ExecutorCompletionService实例属性aes,否则的话aes赋值为空  
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        //将传入进来的队列对象赋值给ExecutorCompletionService实例属性completionQueue
        this.completionQueue = completionQueue;
}
复制代码

四、内部类

  1. QueueingFuture内部类
    //QueueingFuture是FutureTask的子类,重写了FutureTask中done方法,done方法在FutureTask中是个空方法,模板方法,子类可以进行重写
    private class QueueingFuture extends FutureTask<Void> {
           //QueueingFuture构造函数,将RunnableFuture对象重新适配成QueueingFuture实例对象,对FutureTask不清楚的可以看下我的另一篇FutureTask源码分析https://juejin.im/post/5d08be8ce51d455d6c0ad925 
           QueueingFuture(RunnableFuture<V> task) {
                //调用父类FutureTask的构造函数,FutureTask不清楚的可以看下我的另一篇对FutureTask的源码分析
                super(task, null);
                //将传入进来的任务赋值给task属性
                this.task = task;
            }
            //done方法在FutureTask中是个空方法,模板方法,在任务被执行完时,设置任务执行结果中,finishCompletion会调用done()方法,对任务执行完成,做一些处理,QueueingFuture对done方法进行重写,将执行完成的任务加入到队列中,这样就可以从队列中获取任务执行完的结果  
            protected void done() { completionQueue.add(task); }
            //传入进来的任务 
            private final Future<V> task;
    }复制代码

五、封装任务

//将传入进来的Callable类型的任务封装成RunnableFuture任务
private RunnableFuture<V> newTaskFor(Callable<V> task) {
        //如果传入进来的线程池对象executor是AbstractExecutorService的子类,aes就赋值为线程池对象
        //如果aes为空
        if (aes == null)
            //直接使用FutureTask的构造函数将task封装成FutureTask
            return new FutureTask<V>(task);
        else
            //调用aes.newTaskFor方法将传入进来的task封装成FutureTask,newTaskFor方法内部也是使用FutureTask的构造函数将task封装成FutureTask
            return aes.newTaskFor(task);
}

//将传入进来的Runnable类型的任务封装成RunnableFuture任务
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
    /如果传入进来的线程池对象executor是AbstractExecutorService的子类,aes就赋值为线程池对象
    //如果aes为空
    if (aes == null)
        //直接使用FutureTask的构造函数将task封装成FutureTask
        return new FutureTask<V>(task, result);
    else
        //调用aes.newTaskFor方法将传入进来的task封装成FutureTask,newTaskFor方法内部也是使用FutureTask的构造函数将task封装成FutureTask
        return aes.newTaskFor(task, result);
}
复制代码

六、执行任务

//提交Callable类型的任务到线程池执行
public Future<V> submit(Callable<V> task) { 
        //如果传入进来的任务为空直接抛出空指针异常
        if (task == null) throw new NullPointerException();
        //使用上面介绍的newTaskFor方法将传入进来的Callable类型的任务封装成RunnableFuture任务
        RunnableFuture<V> f = newTaskFor(task);
        //QueueingFuture将RunnableFuture适配成QueueingFuture实例,QueueingFuture重写了FutureTask的done方法,任务执行完成调用done方法,将完成任务加入队列中,不清楚的可以看上面QueueingFuture内部类的介绍
        executor.execute(new QueueingFuture(f));
        //返回RunnableFuture类型的任务
        return f;
}

//提交Runnable类型的任务到线程池执行,FutureTask也会调用Executors的callable方法将Runnable适配成Callable类型任务
public Future<V> submit(Runnable task, V result) {
    //如果传入进来的任务为空直接抛出空指针异常
    if (task == null) throw new NullPointerException();
    //使用上面介绍的newTaskFor方法将传入进来的Runnable类型的任务封装成RunnableFuture任务 
    RunnableFuture<V> f = newTaskFor(task, result);
    //QueueingFuture将RunnableFuture适配成QueueingFuture实例,QueueingFuture重写了FutureTask的done方法,任务执行完成调用done方法,将完成任务加入队列中,不清楚的可以看上面QueueingFuture内部类的介绍
    executor.execute(new QueueingFuture(f));
    //返回RunnableFuture类型的任务
    return f;
}复制代码

七、获取任务结果

//获取完成的任务,获取不到会阻塞,直到获取到或者等待线程被中断抛出中断异常,队列的源码等有空会进行分析
public Future<V> take() throws InterruptedException {
        //从队列中获取完成的任务
        return completionQueue.take();
}

//获取完成的任务,获取不到不会阻塞,不支持中断,从队列中获取不到完成的任务直接返回null
public Future<V> poll() {
    //从队列中获取完成的任务  
    return completionQueue.poll();
}

//获取完成的任务,循环的获取已完成的任务,直到获取到,或者超时,或者获取线程被中断,支持中断异常,从队列中获取不
public Future<V> poll(long timeout, TimeUnit unit)
        throws InterruptedException {
    //从队列中获取完成的任务    
    return completionQueue.poll(timeout, unit);
}复制代码

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Pattern Recognition and Machine Learning

Pattern Recognition and Machine Learning

Christopher Bishop / Springer / 2007-10-1 / USD 94.95

The dramatic growth in practical applications for machine learning over the last ten years has been accompanied by many important developments in the underlying algorithms and techniques. For example,......一起来看看 《Pattern Recognition and Machine Learning》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具