内容简介:原文:译者前言JDK1.5就增加了Future接口,但是接口使用不是很能满足异步开发的需求,使用起来不是那么友好。所以出现了很多第三方封装的
原文: Java 8 CompletableFutures Part I
- 作者:Bill Bejeck
- 译者:noONE
译者前言
JDK1.5就增加了Future接口,但是接口使用不是很能满足异步开发的需求,使用起来不是那么友好。所以出现了很多第三方封装的 Future
,Guava中就提供了一个更好的 ListenableFuture 类,Netty中则提供了一个自己的 Future
。所以,Java8中的 CompletableFuture
可以说是解决 Future
了一些痛点,可以优雅得进行组合式异步编程,同时也更加契合函数式编程。
Java8已经发布了很长一段时间,其中新增了一个很棒的并发控制工具,就是CompletableFuture类。 CompletableFuture
实现了Future接口,并且它可以显式地设定值,更有意思的是我们可以进行链式处理,并且支持依赖行为,这些行为由 CompletableFuture
完成所触发。 CompletableFuture
类似于Guava中的 ListenableFuture 类。它们两个提供了类似的功能,本文不会再对它们进行对比。我已经在之前的文章中介绍过 ListenableFutrue
。虽然对于 ListenableFutrue
的介绍有点过时,但是绝大数的知识仍然适用。 CompletableFuture
的文档已经非常全面了,但是缺少如何使用它们的具体示例 。本文意在通过单元测试中的一系列的简单示例来展示如何使用 CompletableFuture
。最初我想在一篇文章中介绍完 CompleteableFuture
,但是信息太多了,分成三部分似乎更好一些:
- 创建/组合任务以及为它们增加监听器。
- 处理错误以及错误恢复。
- 取消或者强制完成。
CompletableFuture 入门
在开始使用 CompletableFuture
之前, 我们需要了解一些背景知识。 CompletableFuture
实现了 CompletionStage 接口。javadoc中简明地介绍了 CompletionStage
:
一个可能的异步计算的阶段,当另外一个CompletionStage 完成时,它会执行一个操作或者计算一个值。一个阶段的完成取决于它本身结算的结果,同时也可能反过来触发其他依赖阶段。
CompletionStage 的全部文档的内容很多,所以,我们在这里总结几个关键点:
-
计算可以由Future,Consumer或者 Runnable 接口中的 apply , accept 或者 run 等方法表示。
-
计算的执行主要有以下
a. 默认执行(可能调用线程)
b. 使用默认的
CompletionStage
的异步执行提供者异步执行。这些方法名使用 someActionAsync 这种格式表示。c. 使用Executor 提供者异步执行。这些方法同样也是 someActionAsync 这种格式,但是会增加一个
Executor
参数。
接下来,我会在本文中直接引用 CompletableFuture
和 CompletionStage
。
创建一个CompleteableFuture
创建一个 CompleteableFuture
很简单,但是不是很清晰。最简单的方法就是使用 CompleteableFuture.completedFuture
方法,该方法返回一个新的且完结的 CompleteableFuture
:
@Test public void test_completed_future() throws Exception { String expectedValue = "the expected value"; CompletableFuture<String> alreadyCompleted = CompletableFuture.completedFuture(expectedValue); assertThat(alreadyCompleted.get(), is(expectedValue)); } 复制代码
这样看起来有点乏味,稍后,我们就会看到如何创建一个已经完成的 CompleteableFuture
会派上用场。
现在,让我们看一下如何创建一个表示异步任务的 CompleteableFuture
:
private static ExecutorService service = Executors.newCachedThreadPool(); @Test public void test_run_async() throws Exception { CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> System.out.println("running async task"), service); //utility testing method pauseSeconds(1); assertThat(runAsync.isDone(), is(true)); } @Test public void test_supply_async() throws Exception { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(simulatedTask(1, "Final Result"), service); assertThat(completableFuture.get(), is("Final Result")); } 复制代码
在第一个方法中,我们看到了 runAsync
任务,在第二个方法中,则是 supplyAsync
的示例。这可能是显而易见的,然而使用 runAsync
还是使用 supplyAsync
,这取决于任务是否有返回值。在这两个例子中,我们都提供了一个自定义的 Executor
,它作为一个异步执行提供者。当使用 supplyAsync
方法时,我个人认为使用 Callable 而不是一个 Supplier
似乎更自然一些。因为它们都是函数式接口, Callable
与异步任务的关系更紧密一些,并且它还可以抛出受检异常,而 Supplier
则不会(尽管我们可以通过少量的代码
让 Supplier
抛出受检异常
)。
增加监听器
现在,我们可以创建 CompleteableFuture
对象去运行异步任务,让我们开始学习如何去“监听”任务的完成,并且执行随后的一些动作。这里重点提一下,当增加对 CompletionStage
对象的追随时,之前的任务需要彻底成功,后续的任务和阶段才能运行。本文会介绍介绍一些处理失败任务的方法,而在 CompleteableFuture
中链式处理错误的方案会在后续的文章中介绍。
@Test public void test_then_run_async() throws Exception { Map<String,String> cache = new HashMap<>(); cache.put("key","value"); CompletableFuture<String> taskUsingCache = CompletableFuture.supplyAsync(simulatedTask(1,cache.get("key")),service); CompletableFuture<Void> cleanUp = taskUsingCache.thenRunAsync(cache::clear,service); cleanUp.get(); String theValue = taskUsingCache.get(); assertThat(cache.isEmpty(),is(true)); assertThat(theValue,is("value")); } 复制代码
这个例子主要展示在第一个 CompletableFuture
成功结束后,运行一个清理的任务。 在之前的例子中,当最初的任务成功结束后,我们使用 Runnable
任务执行。我们也可以定义一个后续任务,它可以直接获取之前任务的成功结果。
@Test public void test_accept_result() throws Exception { CompletableFuture<String> task = CompletableFuture.supplyAsync(simulatedTask(1, "add when done"), service); CompletableFuture<Void> acceptingTask = task.thenAccept(results::add); pauseSeconds(2); assertThat(acceptingTask.isDone(), is(true)); assertThat(results.size(), is(1)); assertThat(results.contains("add when done"), is(true)); } 复制代码
这是一个使用 Accept
方法的例子,该方法会获取 CompletableFuture
的结果,然后将结果传给一个 Consumer
对象。在 Java 8中, Consumer
实例是没有返回值的 ,如果想得到运行的副作用,需要把结果放到一个列表中。
组合与构成任务
除了增加监听器去运行后续任务或者接受 CompletableFuture
的成功结果,我们还可以组合或者构成任务。
构成任务
构成意味着获取一个成功的 CompletableFuture
结果作为输入,通过 一个Function 返回另外一个 CompletableFuture
。下面是一个使用 CompletableFuture.thenComposeAsync
的例子:
@Test public void test_then_compose() throws Exception { Function<Integer,Supplier<List<Integer>>> getFirstTenMultiples = num -> ()->Stream.iterate(num, i -> i + num).limit(10).collect(Collectors.toList()); Supplier<List<Integer>> multiplesSupplier = getFirstTenMultiples.apply(13); //Original CompletionStage CompletableFuture<List<Integer>> getMultiples = CompletableFuture.supplyAsync(multiplesSupplier, service); //Function that takes input from orignal CompletionStage Function<List<Integer>, CompletableFuture<Integer>> sumNumbers = multiples -> CompletableFuture.supplyAsync(() -> multiples.stream().mapToInt(Integer::intValue).sum()); //The final CompletableFuture composed of previous two. CompletableFuture<Integer> summedMultiples = getMultiples.thenComposeAsync(sumNumbers, service); assertThat(summedMultiples.get(), is(715)); } 复制代码
在这个列子中,第一个 CompletionStage
提供了一个列表,该列表包含10个数字,每个数字都乘以13。这个提供的 Function
获取这些结果,并且创建另外一个 CompletionStage
,它将对列表中的数字求和。
组合任务
组合任务的完成是通过获取两个成功的 CompletionStages
,并且从中获取BiFunction类型的参数,进而产出另外的结果。以下是一个非常简单的例子用来说明从组合的 CompletionStages
中获取结果。
@Test public void test_then_combine_async() throws Exception { CompletableFuture<String> firstTask = CompletableFuture.supplyAsync(simulatedTask(3, "combine all"), service); CompletableFuture<String> secondTask = CompletableFuture.supplyAsync(simulatedTask(2, "task results"), service); CompletableFuture<String> combined = firstTask.thenCombineAsync(secondTask, (f, s) -> f + " " + s, service); assertThat(combined.get(), is("combine all task results")); } 复制代码
这个例子展示了如何组合两个异步任务的 CompletionStage
,然而,我们也可以组合已经完成的 CompletableFuture
的异步任务。 组合一个已知的需要计算的值,也是一种很好的处理方式:
@Test public void test_then_combine_with_one_supplied_value() throws Exception { CompletableFuture<String> asyncComputedValue = CompletableFuture.supplyAsync(simulatedTask(2, "calculated value"), service); CompletableFuture<String> knowValueToCombine = CompletableFuture.completedFuture("known value"); BinaryOperator<String> calcResults = (f, s) -> "taking a " + f + " then adding a " + s; CompletableFuture<String> combined = asyncComputedValue.thenCombine(knowValueToCombine, calcResults); assertThat(combined.get(), is("taking a calculated value then adding a known value")); } 复制代码
最后,是一个使用 CompletableFuture.runAfterbothAsync
的例子
@Test public void test_run_after_both() throws Exception { CompletableFuture<Void> run1 = CompletableFuture.runAsync(() -> { pauseSeconds(2); results.add("first task"); }, service); CompletableFuture<Void> run2 = CompletableFuture.runAsync(() -> { pauseSeconds(3); results.add("second task"); }, service); CompletableFuture<Void> finisher = run1.runAfterBothAsync(run2,() -> results. add(results.get(0)+ "&"+results.get(1)),service); pauseSeconds(4); assertThat(finisher.isDone(),is(true)); assertThat(results.get(2),is("first task&second task")); } 复制代码
监听第一个结束的任务
在之前所有的例子中,所有的结果需要等待所有的 CompletionStage
结束,然而,需求并不总是这样的。我们可能需要获取第一个完成的任务的结果。下面的例子展示使用 Consumer
接受第一个完成的结果:
@Test public void test_accept_either_async_nested_finishes_first() throws Exception { CompletableFuture<String> callingCompletable = CompletableFuture.supplyAsync(simulatedTask(2, "calling"), service); CompletableFuture<String> nestedCompletable = CompletableFuture.supplyAsync(simulatedTask(1, "nested"), service); CompletableFuture<Void> collector = callingCompletable.acceptEither(nestedCompletable, results::add); pauseSeconds(2); assertThat(collector.isDone(), is(true)); assertThat(results.size(), is(1)); assertThat(results.contains("nested"), is(true)); } 复制代码
类似功能的 CompletableFuture.runAfterEither
@Test public void test_run_after_either() throws Exception { CompletableFuture<Void> run1 = CompletableFuture.runAsync(() -> { pauseSeconds(2); results.add("should be first"); }, service); CompletableFuture<Void> run2 = CompletableFuture.runAsync(() -> { pauseSeconds(3); results.add("should be second"); }, service); CompletableFuture<Void> finisher = run1.runAfterEitherAsync(run2,() -> results.add(results.get(0).toUpperCase()),service); pauseSeconds(4); assertThat(finisher.isDone(),is(true)); assertThat(results.get(1),is("SHOULD BE FIRST")); } 复制代码
多重组合
到目前为止,所有的组合/构成的例子都只有两个 CompletableFuture
对象。这里是有意为之,为了让例子尽量的简单明了。我们可以组合任意数量的 CompletionStage
。请注意,下面例子仅仅是为了说明而已!
@Test public void test_several_stage_combinations() throws Exception { Function<String,CompletableFuture<String>> upperCaseFunction = s -> CompletableFuture.completedFuture(s.toUpperCase()); CompletableFuture<String> stage1 = CompletableFuture.completedFuture("the quick "); CompletableFuture<String> stage2 = CompletableFuture.completedFuture("brown fox "); CompletableFuture<String> stage3 = stage1.thenCombine(stage2,(s1,s2) -> s1+s2); CompletableFuture<String> stage4 = stage3.thenCompose(upperCaseFunction); CompletableFuture<String> stage5 = CompletableFuture.supplyAsync(simulatedTask(2,"jumped over")); CompletableFuture<String> stage6 = stage4.thenCombineAsync(stage5,(s1,s2)-> s1+s2,service); CompletableFuture<String> stage6_sub_1_slow = CompletableFuture.supplyAsync(simulatedTask(4,"fell into")); CompletableFuture<String> stage7 = stage6.applyToEitherAsync(stage6_sub_1_slow,String::toUpperCase,service); CompletableFuture<String> stage8 = CompletableFuture.supplyAsync(simulatedTask(3," the lazy dog"),service); CompletableFuture<String> finalStage = stage7.thenCombineAsync(stage8,(s1,s2)-> s1+s2,service); assertThat(finalStage.get(),is("THE QUICK BROWN FOX JUMPED OVER the lazy dog")); } 复制代码
需要注意的是,组合CompletionStage的时候并不保证顺序。在这些单元测试中,提供了一个时间去模拟任务以确保完成顺序。
小结
本文主要是使用 CompletableFuture
类的第一部分。在后续文章中,将主要介绍错误处理及恢复,强制完成或取消。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。