手把手教你使用 CompletableFuture

栏目: 数据库 · 发布时间: 7年前

内容简介:在jdk5中,我们通过使用Future和Callable,可以在任务执行完毕后得到任务执行结果。可以使用isDone检测计算是否完成,使用cancle停止执行任务,使用阻塞方法get阻塞住调用线程来获取返回结果,使用阻塞方式获取执行结果,有违异步编程的初衷,而且Future的异常只能自己内部处理。jdk8中加入了实现类CompletableFuture<T>,用于异步编程。底层做任务使用的是ForkJoin, 顾名思义,是将任务的数据集分为多个子数据集,而每个子集,都可以由独立的子任务来处理,最后将每个子

背景

在jdk5中,我们通过使用Future和Callable,可以在任务执行完毕后得到任务执行结果。可以使用isDone检测计算是否完成,使用cancle停止执行任务,使用阻塞方法get阻塞住调用线程来获取返回结果,使用阻塞方式获取执行结果,有违异步编程的初衷,而且Future的异常只能自己内部处理。

jdk8中加入了实现类CompletableFuture<T>,用于异步编程。底层做任务使用的是ForkJoin, 顾名思义,是将任务的数据集分为多个子数据集,而每个子集,都可以由独立的子任务来处理,最后将每个子任务的结果汇集起来。它是ExecutorService接口的一个实现,它把子任务分配给线程池(称为ForkJoinPool)中的工作线程。 从api文档看,它实现了2个接口 CompletionStage<T>, Future<T>,CompletableFuture<T>拥有Future的所有特性。 CompletionStage支持lambda表达式,接口的方法的功能都是在某个阶段得到结果后要做的事情。 CompletableFuture内置lambda表达式,支持异步回调,结果转换等功能,它有以下Future实现不了的功能

  • 合并两个相互独立的异步计算的结果。

  • 等待异步任务的所有任务都完成。

  • 等待异步任务的其中一个任务完成就返回结果。

  • 任务完成后调用回调方法

  • 任务完成的结果可以用于下一个任务。

  • 任务完成时发出通知

  • 提供原生的异常处理api

CompletableFuture的使用方法

首先说下获取结果方式 CompletableFuture获取结果的方式有如下4个方法:

1:get 阻塞获取结果,实现Future的get接口,显式抛出异常

2:getNow(T valueIfAbsent) 获取执行结果,如果当前任务未执行完成,则返回valueIfAbsent

3: join 执行完成后返回执行结果,或者抛出unchecked异常

4: T get(long timeout, TimeUnit unit) 在有限时间内获取数据

以下是CompletableFuture的创建对象以及api的使用

1: 创建CompletableFuture 对象

public static <U> CompletableFuture<U> completedFuture(U value) 

静态方法,返回一个已经计算好的CompletableFuture 比如

@Testpublic void testStatic() {
	CompletableFuture<String> completableFuture =              CompletableFuture.completedFuture("test");	//判断cf是否 执行完毕
	assertTrue(completableFuture.isDone());	//getNow获取结果,如果获取不到,返回默认值null
	assertEquals("test", completableFuture.getNow(null));
 }

completableFuture 还能主动结束运算,并显示处理异常,如下是异步执行的代码

@Test
public void testActive() {
	CompletableFuture<String> completableFuture = new CompletableFuture();	new Thread(() -> {		try {
			String string = null;
			string.length();
			Thread.currentThread().sleep(2000);			
                        // 通知完成计算 ,并将结果complete返回
			completableFuture.complete("complete");
		} catch (Exception e) {
		    // 处理异常 在获取结果地方可以捕获到异常
		    completableFuture.completeExceptionally(e);
		}
	}).start();	
        try {
		// 同步等待返回结果  如果thread内部未发生异常并执行了complete方法,将得到字符串“complete”的结果
		System.out.println(completableFuture.join());
	} catch (Exception e) {
		//捕获线程内部的异常  捕获空指针异常
		System.out.println("发生异常了" + e.getMessage());
	}
}

2: 使用工厂方式创建cf对象

CompletableFuture主要有以下四个工厂方式创建对象的静态方法:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {  
    return asyncSupplyStage(asyncPool, supplier);
}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
  return asyncSupplyStage(screenExecutor(executor), supplier);
}
public static CompletableFuture<Void> runAsync(Runnable runnable) {    
   return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {   
   return asyncRunStage(screenExecutor(executor), runnable);
}

Supplier是 java 8函数式编程的一个接口,是一个生产者,可以不接收参数。只有一个get方法返回一个泛型实例。 很明显,Async结尾的都是可以异步执行,runAsync 接收一个Runnable函数式接口类型参数,不返回结算结果。supplyAsync接收一个函数式接口类型Supplier ,可以返回计算结果。以上方法如果不指定执行任务的线程池Executor ,则默认使用ForkJoinPool.commonPoolcommonPool执行任务。这些接口都支持lambda实现异步的操作。 以下是SupplyAsync异步执行的简单示例

@Testpublic void testSupplyAsync() {
	CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {		//执行耗时任务
		try {
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}		return "glz";
	});	//获取结果
	System.out.println(cf.join());
}

3: CompletableFuture 的异步回调功能

上面的方法,执行任务是异步操作。但是调用线程还在等待结果。我们还可以给cf添加回调方法,在任务执行完成后使用cf的结果再做下一步操作,转换。所以 执行以下方法时,cf已经计算完毕。

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);

从参数类型可以看到,这是接收一个cf计算的结果T,经过处理后返回参数类型为U的cf。 其中第一个方法是在cf完成的线程中调用。而带Async将在与调用者cf不同的线程中异步调用。

@Test
public void testThenApply() {
	CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {		// 执行耗时任务
		try {
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
               return "123";
	});
	// 这里cf的计算结果传个thenApply作为参数,执行字符串转int的方法,并返回一个cf对象cf1
	CompletableFuture<Integer> cf1 = cf.thenApply(Integer::parseInt);
	// cf1的计算结果作为参数x传给thenApply,返回一个心得cf对象 cf2.	CompletableFuture<Double> cf2 = cf1.thenApply(x -> x * 0.01);
	// 获取最终结果
	System.out.println(cf2.join());
	//如果回调函数比较耗时,可以使用异步的方法thenApplyAsync}

4: 运行完成时的代码,即对结果进行消耗

public CompletionStage<Void> thenAccept(Consumer<? super T> action); 
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

入参是Consumer ,执行Consumer 后没有返回结果,所以称为消耗。

@Test
public void thenAccept(){    
    CompletableFuture.supplyAsync(() -> "gong").thenAccept(x -> System.out.println(s+" lz"));
}

结果是 gong lz

5:上一步结果与下一步操作无关系

在执行cf后,如果得到的结果对下一步没有影响,也就是说下一步的操作并不关心上一步的结果,最终也不返回值,可以使用thenRun 参数传递一个Runnable.

public CompletionStage<Void> thenRun(Runnable action);public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
@Test
public void thenRun() {
	CompletableFuture.supplyAsync(() -> "hello").thenRun(() -> System.out.println("hello world"));
}

6: 对2个cf的结果进行组合thenCompose

public <U> CompletionStage<U> thenCompose  (Function<? super T, ? extends CompletionStage<U>> fn); 
 public <U> CompletionStage<U> thenComposeAsync (Function<? super T, ? extends CompletionStage<U>> fn); 
 public <U> CompletionStage<U> thenComposeAsync (Function<? super T, ? extends CompletionStage<U>> fn, Executor executor);

thenCompose方法,可以将2个独立的任务进行流水线操作 。将当前cf的计算结果作为参数传递给后面的cf

@Test
public void testCompose() {
	CompletableFuture<String> cf = CompletableFuture.completedFuture("hello")
			.thenCompose(result -> CompletableFuture.supplyAsync(() -> {
				System.out.println(result);				
                                return "result";
			}));	
        System.out.println(cf.join());
}

7: 结合2个cf的结果 thenCombine

可以将2个完全不相干的对象的结果整合起来,2项任务可以同时执行,比如一个对外的接口服务,既查询数据库中要查询数据的总量,也要返回具体某一页的数据,可以一个cf负责执行查询总条数count的sql,一个查询一页数据。BiFunction是合并结果数据的函数

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)

其中T是调用thenCombine的cf的结果数据,U是other的结果,v就是合并的结果类型。

@Test
public void testCombine() {
	CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {		try {
			Thread.sleep(3000);
			System.out.println("cf1 is doning");
		} catch (InterruptedException e) {
			e.printStackTrace();
		} // 返回结果
		return "hello";
	});
	CompletableFuture<String> result = cf1.thenCombine(CompletableFuture.supplyAsync(() -> {		try {
			Thread.sleep(500);
			System.out.println("cf2 is doning");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}	
return "world"; }), (x, y) -> x + y);//合并2个操作结果
S
ystem.out.println(result.join());
}

8:消耗两个cf的结果,不返回结果

<U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other, BiConsumer<? super T,? super U> block)
CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other, Runnable action)

对于2个cf,我们只想在他们执行完成时,消耗执行结果,但是不做数据返回,,我们只是希望当完成时得到通知. 此方法与thenCombine相似,只不过返回 CompletableFuture<Void> ,只做消耗处理

@Test
public void testAcceptBoth(){
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "100");
    CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 100);
    CompletableFuture<Void> future = future1.thenAcceptBoth(future2, (s, i) -> System.out.println(Double.parseDouble(s + i)));

    try {
        future.get();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}

9:取计算速度最快的结果

针对两个CompletionStage,将计算最快的那个CompletionStage的结果用来作为下一步的消耗。 此方法接受Consumer只对结果进行消耗.

public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action); 
 public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action); 
 public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
@Test
public void acceptEither() {
	CompletableFuture.supplyAsync(() -> {		
               try {	// 如果不加sleep,可能打印hello
			Thread.currentThread().sleep(10L);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}		return "hello";
	}).acceptEither(CompletableFuture.supplyAsync(() -> "world"), result -> {
		System.out.println(result);
	});
}

10: 计算最快的cf的结果转换

针对两个CompletionStage,将计算的快的那个CompletionStage的结果用来作为下一步的转换操作。

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);

fn是对 调用applyToEither的调用者和 other 2个计算最快的那个结果进行处理,传入t类型数据,返回一个CompletionStage

@Test
public void applyToEither() {
    double result = CompletableFuture.supplyAsync(() -> {        
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }        return "0.001";
    }).applyToEither(CompletableFuture.supplyAsync(() -> {       
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }        return "0.002";
    }), s -> Double.valueOf(s)).join();   
     System.out.println(result);
}
    //由于返回0.002的cf睡眠时间比较短,先执行完毕,优先返回结果,所以2个cf最先返回0.002.最终result就是0.002

11:2个cf都执行完后执行操作

2个cf都执行完后,执行操作Runnable,Runnable不关心2个cf的执行结果

public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
    @Test
    public void runAfterBoth() {
	 CompletableFuture.supplyAsync(() -> "m").runAfterBothAsync(CompletableFuture.supplyAsync(() ->		"n"), () -> System.out.println("hello world"));
    }

12:处理cf数组

以上介绍的都是2个future的组合使用。cf还提供allOf,参数是cf数组,当数组中所有的cf都执行完成时,返回一个CompletableFuture<Void>。调用返回的cf的join方法阻塞等待cf数组中所有cf执行完成。 anyOf是当cf数组中任意一个cf执行完成后,就返回一个cf。

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);

读者可自行编写示例代码

13: 异常处理

在上面手工创建cf对象中,介绍过异常的处理,同样使用工厂创建的cf也具有异常管理机制,读者可自行举一反三。

小结

本文简单介绍了cf的使用方法,读者可参阅java8实战这本书,更深入学习CompletableFuture的应用场景。

参考: java8实战

手把手教你使用 CompletableFuture


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Spring Into HTML and CSS

Spring Into HTML and CSS

Molly E. Holzschlag / Addison-Wesley Professional / 2005-5-2 / USD 34.99

The fastest route to true HTML/CSS mastery! Need to build a web site? Or update one? Or just create some effective new web content? Maybe you just need to update your skills, do the job better. Welco......一起来看看 《Spring Into HTML and CSS》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

URL 编码/解码
URL 编码/解码

URL 编码/解码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具