Java并发编程 - Callable、Future和FutureTask的实现

栏目: Java · 发布时间: 5年前

内容简介:启动线程执行任务,如果需要在任务执行完毕之后得到任务执行结果,可以使用从Java 1.5开始提供的Callable和Future下面就分析一下Callable、Future以及FutureTask的具体实现及使用方法源码分析基于JDK 1.7

启动线程执行任务,如果需要在任务执行完毕之后得到任务执行结果,可以使用从 Java 1.5开始提供的Callable和Future

下面就分析一下Callable、Future以及FutureTask的具体实现及使用方法

源码分析基于JDK 1.7

一、Callable 与 Runnable

java.lang.Runnable是一个接口,只有一个 run()方法

public interface Runnable {
    public abstract void run();
}

run()方法 的返回值是void,故在执行完任务后无法返回任何结果

Callable是java.util.concurrent包下的,也是一个接口,也只有一个 call()方法 ,类似于java.lang.Runnable的 run()方法 ,实现Callable接口的类和实现Runnable接口的类都是可以被其它线程执行的任务

public interface Callable<V> {
    V call() throws Exception;
}

可以看到call()方法是有返回值的,可以将执行的结果返回

Callable和Runnable的区别:

1、Callable中定义的是call()方法,Runnable中定义的是run()方法

2、Callable中的call()方法可以返回执行任务后的结果,Runnable中的run()方法无法获得返回值

3、Callable中的call()方法定义了throws Exception抛出异常,抛出的异常可以在主线程Future.get()时被主线程捕获;Runnable中的run()方法没有定义抛出异常,运行任务时发生异常时也会上抛,因为即使不加默认也会上抛RuntimeException,但异常无法被主线程获取

4、运行Callable任务可以拿到一个Future对象代表异步运算的结果

二、Future

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Future是java.util.concurrent包下的一个接口,代表着一个异步计算的结果,可以通过 get() 获取线程执行的返回值, cancel() 取消任务执行, isCancelled()isDone() 获得任务执行的情况

boolean cancel(boolean mayInterruptIfRunning)

尝试取消任务的执行,取消成功返回true,取消失败返回false

mayInterruptIfRunning表示是否允许中断正在执行的任务

1、如果任务还未开始,cancel返回true,且任务永远不会被执行

2、如果任务正在执行,根据mayInterruptIfRunning的值判断是否需要中断执行中的任务,且如果mayInterruptIfRunning为true,会调用中断逻辑,返回true;如果mayInterruptIfRunning为false,不会调用线程中断,只是将任务取消

3、如果任务结束(可能是正常完成、异常终止、被取消),返回false

4、如果cancel()操作返回true,后续调用isDone()、isCancelled()都返回true

boolean isCancelled()

表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回true

boolean isDone()

表示任务是否已经完成,则返回true,注意:正常完成、异常 或 取消操作都代表任务完成

V get() 和 V get(long timeout, TimeUnit unit)

get() 用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回

get(long timeout, TimeUnit unit) 用来获取执行结果,如果在指定时间内还没获取到结果,会抛出TimeoutException

Future提供了三种功能:

1、获取任务执行的结果

2、取消任务

3、判断任务是否完成 或 是否取消

因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask

三、FutureTask

public class FutureTask<V> implements RunnableFuture<V>

FutureTask实现了RunnableFuture接口,那么RunnableFuture又是什么呢?

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

RunnableFuture接口继承了 RunnableFuture ,所以它既是一个可以让线程执行的Runnable任务,又是一个可以获取Callable返回值的Future

FutureTask的属性

/** The run state of this task */
private volatile int state;
private static final int NEW          = 0; 
private static final int COMPLETING   = 1; 
private static final int NORMAL       = 2; 
private static final int EXCEPTIONAL  = 3; 
private static final int CANCELLED    = 4; 
private static final int INTERRUPTING = 5; 
private static final int INTERRUPTED  = 6; 

/** The underlying callable; nulled out after running */
private Callable<V> callable;

/** The result to return or exception to throw from get() */
private Object outcome;

/** The thread running the callable; CASed during run() */
private volatile Thread runner;

/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

state 是任务的运行状态

  • 初始化时是NEW
  • 任务终止的状态有NORMAL(正常结束)、EXCEPTIONAL(异常结束)、CANCELLED(被取消)、INTERRUPTED(执行中被中断),这些状态是通过 set()setExceptioncancel() 方法触发的
  • COMPLETING 和 INTERRUPTING是两个中间状态,当正常结束设置outcome属性前是COMPLETING,设置后变成NORMAL;当中断运行中线程前是INTERRUPTING,调用thread.interrupt()后是INTERRUPTED

可能的状态转换:

NEW -> COMPLETING -> NORMAL

NEW -> COMPLETING -> EXCEPTIONAL

NEW -> CANCELLED

NEW -> INTERRUPTING -> INTERRUPTED

callable 是线程执行的有返回值的任务

outcome 是任务执行后的结果或异常

waiters 表示等待获取结果的阻塞线程,链表结构,后等待线程的会排在链表前面

FutureTask的构造方法

FutureTask有两个构造方法:

FutureTask(Callable callable)

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

构造方法参数是Callable定义的任务,并将state置为NEW,只有当state为NEW时,callable才能被执行

FutureTask(Runnable runnable, V result)

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

参数为Runnable和带泛型的result对象,由于Runnable本身是没有返回值的,故线程的执行结果通过result返回

可以看到通过runnable和result封装了个Callable,实际上是 new RunnableAdapter<T>(task, result) ,这个Adapter适配器将Runnable和result转换成Callable,并返回result

FutureTask.run()的实现

线程运行时真正执行的方法, Callable.call() 会在其中执行,并包含设置返回值或异常的逻辑

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

1、任务执行状态不是NEW,直接返回;将runner属性从null->当前线程不成功,��接返回

2、调用call()方法,调用成功,使用set()设置返回值

3、调用过程发生异常,使用setException()保存异常

set() 和 setException()

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

set()setException() 的实现基本一样,都是先将任务运行状态从 NEW->COMPLETING ,分别设置返回值或异常给outcome,再将状态分别置为 NORMAL和EXCEPTIONAL ,最后调用 finishCompletion() 依次唤醒等待获取结果的阻塞线程

finishCompletion()实现

/**
 * Removes and signals all waiting threads, invokes done(), and nulls out callable.
 */
private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        //将成员变量waiters置为null
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            //循环唤醒WaitNode中的等待线程
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    //由子类实现的方法
    done();

    callable = null;        // to reduce footprint
}

1、执行FutureTask类的get方法时,会把主线程封装成WaitNode节点并保存在waiters链表中

2、FutureTask任务执行完成后,通过UNSAFE设置waiters的值为null,并通过LockSupport.unpark方法依次唤醒等待获取结果的线程

FutureTask.get()的实现

get() 方法有两个实现,一个是一直等待获取结果,直到任务执行完;一个是等待指定时间,超时后任务还未完成会上抛TimeoutException

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    
    return report(s);
}

public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
        
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
        
    return report(s);
}

内部通过 awaitDone() 对主线程进行阻塞,具体实现如下:

/**
 * Awaits completion or aborts on interrupt or timeout.
 *
 * @param timed true if use timed waits
 * @param nanos time to wait, if timed
 * @return state upon completion
 */
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L; //截止时间
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        //如果主线程已经被中断,removeWaiter(),并上抛InterruptedException
        //注意:Thread.interrupted()后会导致线程的中断状态为false
        if (Thread.interrupted()) {
            removeWaiter(q); //线程被中断的情况下,从waiters链表中删除q
            throw new InterruptedException();
        }

        int s = state;
        //如果任务已经完成(可能是正常完成、异常、中断),直接返回,即还没有开始等待,任务已经完成了
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        //如果任务正在完成,让出CPU资源,等待state变成NORMAL或EXCEPTIONAL
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        //s<COMPLETING 且 还没有创建WaitNode
        else if (q == null)
            q = new WaitNode();
        //s<COMPLETING 且 已经创建WaitNode,但还没有入队
        else if (!queued)
            /**
             * 1、将当前waiters赋值给q.next,即“q-->当前waiters”
             * 2、CAS,将waiters属性,从“当前waiters-->q”
             * 所以后等待的会排在链表的前面,而任务完成时会从链表前面开始依次唤醒等待线程
             */
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        //所有准备工作完成,判断等待是否需要计时
        else if (timed) {
            nanos = deadline - System.nanoTime();
            //如果已经等待超时,remove当前WaiterNode
            if (nanos <= 0L) {
                removeWaiter(q); //等待超时的情况下,从waiters链表中删除q
                return state;
            }
            LockSupport.parkNanos(this, nanos); //挂起一段时间
        }
        else
            LockSupport.park(this); //一直挂起,等待唤醒
    }
}

1、判断主线程是否被中断,如果被中断,将当前WaitNode节点从waiters链表中删除,并上抛InterruptedException

2、如果任务已经完成(可能是正常完成、异常、中断),直接返回(即还没有开始等待,任务已经完成了,就返回了)

3、如果任务正在完成,让出CPU资源,等待state变成NORMAL或EXCEPTIONAL

4、如果任务没有被中断,也没有完成,new WaitNode()

5、如果任务没有被中断,也没有完成,也创建了WaitNode,使用UNSAFE.CAS()操作将WaitNode加入waiters链表

6、所有准备工作完毕,通过LockSupport的park或parkNanos挂起线程

WaitNode 就是一个简单的链表节点,记录这等待的线程和下一个WaitNode

/**
 * Simple linked list nodes to record waiting threads in a Treiber
 * stack.  See other classes such as Phaser and SynchronousQueue
 * for more detailed explanation.
 */
static final class WaitNode {
    volatile Thread thread; //等待的线程
    volatile WaitNode next; //下一个WaitNode
    WaitNode() { thread = Thread.currentThread(); }
}

FutureTask.cancel()的实现

public boolean cancel(boolean mayInterruptIfRunning) {
    if (state != NEW)
        return false;
    
    if (mayInterruptIfRunning) {
        if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
            return false;
        
        Thread t = runner;
        if (t != null)
            t.interrupt(); //中断线程
        
        UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
    }
    else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
        return false;
    
    finishCompletion();
    
    return true;
}

1、如果任务不是运行状态,直接返回false失败

2、如果mayInterruptIfRunning==true,中断运行中的任务,使用CAS操作将状态 NEW-->INTERRUPTING ,再调用runner.interrupt(),最后将状态置为INTERRUPTED

3、如果mayInterruptIfRunning==false,将任务置为CANCELLED取消状态

4、调用 finishCompletion() 依次唤醒等待获取结果的线程,返回true取消成功

四、使用示例

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class TestFuture {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(1);
        Task task = new Task(); //callable任务
        Future<Integer> result = executor.submit(task);
        executor.shutdown();
         
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
         
        System.out.println("主线程在执行任务");
         
        try {
            System.out.println("task运行结果:"+result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        System.out.println("所有任务执行完毕");
    }
    
    static class Task implements Callable<Integer>{
        @Override
        public Integer call() throws Exception {
            System.out.println("子线程在进行计算");
            Thread.sleep(3000);
            int sum = 0;
            for(int i=0;i<100;i++)
                sum += i;
            return sum;
        }
    }
}

运行结果:

子线程在进行计算
主线程在执行任务
task运行结果:4950
所有任务执行完毕

如果只是想控制在某些情况下可以将任务取消,可以使用 Future<?> future = executor.submit(runnable) ,这样返回结果肯定为null,但可以使用future.cancel()取消任务执行

五、总结

1、有了Runnable,为什么还需要Callable,它们的区别是什么?

Runnable和Callable都表示执行的任务,但不同的是Runnable.run()方法没有返回值,Callable.call()有返回值

但其实线程在执行任务时还是执行的Runnable.run()方法,所以在使用ThreadPoolExecutor.submit()时会将Callable封装为FutureTask,而FutureTask是Runnable和Future的实现类

所以在执行Callable的任务时,线程其实是执行FutureTask这个Runnable的run()方法,其中封装了调用Callable.call()并返回结果的逻辑

执行Runnable任务如果发生异常,主线程无法知晓;而执行Callable任务如果发生异常,在Future.get()时会抛出java.util.concurrent.ExecutionException,其中封装了真实异常

2、Future.get()是如何获取线程返回值的?

首先得益于Callable.call()方法定义了返回值,提交Callable任务后,Callable会被封装成FutureTask,其既可以作为Runnable被执行,也可以作为Future获取返回值,FutureTask.run()方法会调用Callable.call()中的任务代码

在任务执行完成前,如果主线程使用Future.get(),其实是调用FutureTask.get(),其中会判断任务状态尚未结束,将主线程加入waiters等待链表,并挂起主线程

待任务执行结束后,FutureTask会唤醒所有等待获取返回值的线程,此时主线程的FutureTask.get()就会返回了

所以,主线程和运行线程是通过FutureTask作为桥梁获取线程返回值的

3、Future.cancel()真的能取消任务的执行吗?

首先答案是“不一定”,根据JDK中的方法注释“Attempts to cancel execution of this task”,即尝试去取消执行的任务

如果任务正在执行,且调用cancel()时参数mayInterruptIfRunning传的是true,那么会对执行线程调用interrupt()方法

那么问题就变成了interrupt()方法能中断线程执行吗?

interrupt()方法不会中断正在运行的线程。这一方法实际上完成的是在线程受到阻塞时抛出一个中断信号,这样线程就得以退出阻塞的状态。更确切的说,如果线程被Object.wait()、Thread.join()、Thread.sleep()等阻塞,那么它将接收到一个中断异常(InterruptedException),从而提早地终结被阻塞状态。

如果线程没有被阻塞,调用interrupt()将不起作用

那么即使线程正在阻塞状态,并抛出了InterruptedException,线程能否真的取消执行还要看代码中是否捕获了InterruptedException和有没有做相应的对中断标示的判断逻辑

Linux公社的RSS地址https://www.linuxidc.com/rssFeed.aspx

本文永久更新链接地址: https://www.linuxidc.com/Linux/2019-02/156799.htm


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Python语言及其应用

Python语言及其应用

[美] Bill Lubanovic / 丁嘉瑞、梁杰、禹常隆 / 人民邮电出版社 / 2016-1 / 79.00元

本书介绍Python 语言的基础知识及其在各个领域的具体应用,基于最新版本3.x。书中首先介绍了Python 语言的一些必备基本知识,然后介绍了在商业、科研以及艺术领域使用Python 开发各种应用的实例。文字简洁明了,案例丰富实用,是一本难得的Python 入门手册。一起来看看 《Python语言及其应用》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具