内容简介:原文:译者前言JDK1.5就增加了Future接口,但是接口使用不是很能满足异步开发的需求,使用起来不是那么友好。所以出现了很多第三方封装的
原文: Java 8 CompletableFutures Part I
- 作者:Bill Bejeck
- 译者:noONE
译者前言
JDK1.5就增加了Future接口,但是接口使用不是很能满足异步开发的需求,使用起来不是那么友好。所以出现了很多第三方封装的 Future
,Guava中就提供了一个更好的 ListenableFuture 类,Netty中则提供了一个自己的 Future
。所以,Java8中的 CompletableFuture
可以说是解决 Future
了一些痛点,可以优雅得进行组合式异步编程,同时也更加契合函数式编程。
Java8已经发布了很长一段时间,其中新增了一个很棒的并发控制工具,就是CompletableFuture类。 CompletableFuture
实现了Future接口,并且它可以显式地设定值,更有意思的是我们可以进行链式处理,并且支持依赖行为,这些行为由 CompletableFuture
完成所触发。 CompletableFuture
类似于Guava中的 ListenableFuture 类。它们两个提供了类似的功能,本文不会再对它们进行对比。我已经在之前的文章中介绍过 ListenableFutrue
。虽然对于 ListenableFutrue
的介绍有点过时,但是绝大数的知识仍然适用。 CompletableFuture
的文档已经非常全面了,但是缺少如何使用它们的具体示例 。本文意在通过单元测试中的一系列的简单示例来展示如何使用 CompletableFuture
。最初我想在一篇文章中介绍完 CompleteableFuture
,但是信息太多了,分成三部分似乎更好一些:
- 创建/组合任务以及为它们增加监听器。
- 处理错误以及错误恢复。
- 取消或者强制完成。
CompletableFuture 入门
在开始使用 CompletableFuture
之前, 我们需要了解一些背景知识。 CompletableFuture
实现了 CompletionStage 接口。javadoc中简明地介绍了 CompletionStage
:
一个可能的异步计算的阶段,当另外一个CompletionStage 完成时,它会执行一个操作或者计算一个值。一个阶段的完成取决于它本身结算的结果,同时也可能反过来触发其他依赖阶段。
CompletionStage 的全部文档的内容很多,所以,我们在这里总结几个关键点:
-
计算可以由Future,Consumer或者 Runnable 接口中的 apply , accept 或者 run 等方法表示。
-
计算的执行主要有以下
a. 默认执行(可能调用线程)
b. 使用默认的
CompletionStage
的异步执行提供者异步执行。这些方法名使用 someActionAsync 这种格式表示。c. 使用Executor 提供者异步执行。这些方法同样也是 someActionAsync 这种格式,但是会增加一个
Executor
参数。
接下来,我会在本文中直接引用 CompletableFuture
和 CompletionStage
。
创建一个CompleteableFuture
创建一个 CompleteableFuture
很简单,但是不是很清晰。最简单的方法就是使用 CompleteableFuture.completedFuture
方法,该方法返回一个新的且完结的 CompleteableFuture
:
@Test public void test_completed_future() throws Exception { String expectedValue = "the expected value"; CompletableFuture<String> alreadyCompleted = CompletableFuture.completedFuture(expectedValue); assertThat(alreadyCompleted.get(), is(expectedValue)); } 复制代码
这样看起来有点乏味,稍后,我们就会看到如何创建一个已经完成的 CompleteableFuture
会派上用场。
现在,让我们看一下如何创建一个表示异步任务的 CompleteableFuture
:
private static ExecutorService service = Executors.newCachedThreadPool(); @Test public void test_run_async() throws Exception { CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> System.out.println("running async task"), service); //utility testing method pauseSeconds(1); assertThat(runAsync.isDone(), is(true)); } @Test public void test_supply_async() throws Exception { CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(simulatedTask(1, "Final Result"), service); assertThat(completableFuture.get(), is("Final Result")); } 复制代码
在第一个方法中,我们看到了 runAsync
任务,在第二个方法中,则是 supplyAsync
的示例。这可能是显而易见的,然而使用 runAsync
还是使用 supplyAsync
,这取决于任务是否有返回值。在这两个例子中,我们都提供了一个自定义的 Executor
,它作为一个异步执行提供者。当使用 supplyAsync
方法时,我个人认为使用 Callable 而不是一个 Supplier
似乎更自然一些。因为它们都是函数式接口, Callable
与异步任务的关系更紧密一些,并且它还可以抛出受检异常,而 Supplier
则不会(尽管我们可以通过少量的代码
让 Supplier
抛出受检异常
)。
增加监听器
现在,我们可以创建 CompleteableFuture
对象去运行异步任务,让我们开始学习如何去“监听”任务的完成,并且执行随后的一些动作。这里重点提一下,当增加对 CompletionStage
对象的追随时,之前的任务需要彻底成功,后续的任务和阶段才能运行。本文会介绍介绍一些处理失败任务的方法,而在 CompleteableFuture
中链式处理错误的方案会在后续的文章中介绍。
@Test public void test_then_run_async() throws Exception { Map<String,String> cache = new HashMap<>(); cache.put("key","value"); CompletableFuture<String> taskUsingCache = CompletableFuture.supplyAsync(simulatedTask(1,cache.get("key")),service); CompletableFuture<Void> cleanUp = taskUsingCache.thenRunAsync(cache::clear,service); cleanUp.get(); String theValue = taskUsingCache.get(); assertThat(cache.isEmpty(),is(true)); assertThat(theValue,is("value")); } 复制代码
这个例子主要展示在第一个 CompletableFuture
成功结束后,运行一个清理的任务。 在之前的例子中,当最初的任务成功结束后,我们使用 Runnable
任务执行。我们也可以定义一个后续任务,它可以直接获取之前任务的成功结果。
@Test public void test_accept_result() throws Exception { CompletableFuture<String> task = CompletableFuture.supplyAsync(simulatedTask(1, "add when done"), service); CompletableFuture<Void> acceptingTask = task.thenAccept(results::add); pauseSeconds(2); assertThat(acceptingTask.isDone(), is(true)); assertThat(results.size(), is(1)); assertThat(results.contains("add when done"), is(true)); } 复制代码
这是一个使用 Accept
方法的例子,该方法会获取 CompletableFuture
的结果,然后将结果传给一个 Consumer
对象。在 Java 8中, Consumer
实例是没有返回值的 ,如果想得到运行的副作用,需要把结果放到一个列表中。
组合与构成任务
除了增加监听器去运行后续任务或者接受 CompletableFuture
的成功结果,我们还可以组合或者构成任务。
构成任务
构成意味着获取一个成功的 CompletableFuture
结果作为输入,通过 一个Function 返回另外一个 CompletableFuture
。下面是一个使用 CompletableFuture.thenComposeAsync
的例子:
@Test public void test_then_compose() throws Exception { Function<Integer,Supplier<List<Integer>>> getFirstTenMultiples = num -> ()->Stream.iterate(num, i -> i + num).limit(10).collect(Collectors.toList()); Supplier<List<Integer>> multiplesSupplier = getFirstTenMultiples.apply(13); //Original CompletionStage CompletableFuture<List<Integer>> getMultiples = CompletableFuture.supplyAsync(multiplesSupplier, service); //Function that takes input from orignal CompletionStage Function<List<Integer>, CompletableFuture<Integer>> sumNumbers = multiples -> CompletableFuture.supplyAsync(() -> multiples.stream().mapToInt(Integer::intValue).sum()); //The final CompletableFuture composed of previous two. CompletableFuture<Integer> summedMultiples = getMultiples.thenComposeAsync(sumNumbers, service); assertThat(summedMultiples.get(), is(715)); } 复制代码
在这个列子中,第一个 CompletionStage
提供了一个列表,该列表包含10个数字,每个数字都乘以13。这个提供的 Function
获取这些结果,并且创建另外一个 CompletionStage
,它将对列表中的数字求和。
组合任务
组合任务的完成是通过获取两个成功的 CompletionStages
,并且从中获取BiFunction类型的参数,进而产出另外的结果。以下是一个非常简单的例子用来说明从组合的 CompletionStages
中获取结果。
@Test public void test_then_combine_async() throws Exception { CompletableFuture<String> firstTask = CompletableFuture.supplyAsync(simulatedTask(3, "combine all"), service); CompletableFuture<String> secondTask = CompletableFuture.supplyAsync(simulatedTask(2, "task results"), service); CompletableFuture<String> combined = firstTask.thenCombineAsync(secondTask, (f, s) -> f + " " + s, service); assertThat(combined.get(), is("combine all task results")); } 复制代码
这个例子展示了如何组合两个异步任务的 CompletionStage
,然而,我们也可以组合已经完成的 CompletableFuture
的异步任务。 组合一个已知的需要计算的值,也是一种很好的处理方式:
@Test public void test_then_combine_with_one_supplied_value() throws Exception { CompletableFuture<String> asyncComputedValue = CompletableFuture.supplyAsync(simulatedTask(2, "calculated value"), service); CompletableFuture<String> knowValueToCombine = CompletableFuture.completedFuture("known value"); BinaryOperator<String> calcResults = (f, s) -> "taking a " + f + " then adding a " + s; CompletableFuture<String> combined = asyncComputedValue.thenCombine(knowValueToCombine, calcResults); assertThat(combined.get(), is("taking a calculated value then adding a known value")); } 复制代码
最后,是一个使用 CompletableFuture.runAfterbothAsync
的例子
@Test public void test_run_after_both() throws Exception { CompletableFuture<Void> run1 = CompletableFuture.runAsync(() -> { pauseSeconds(2); results.add("first task"); }, service); CompletableFuture<Void> run2 = CompletableFuture.runAsync(() -> { pauseSeconds(3); results.add("second task"); }, service); CompletableFuture<Void> finisher = run1.runAfterBothAsync(run2,() -> results. add(results.get(0)+ "&"+results.get(1)),service); pauseSeconds(4); assertThat(finisher.isDone(),is(true)); assertThat(results.get(2),is("first task&second task")); } 复制代码
监听第一个结束的任务
在之前所有的例子中,所有的结果需要等待所有的 CompletionStage
结束,然而,需求并不总是这样的。我们可能需要获取第一个完成的任务的结果。下面的例子展示使用 Consumer
接受第一个完成的结果:
@Test public void test_accept_either_async_nested_finishes_first() throws Exception { CompletableFuture<String> callingCompletable = CompletableFuture.supplyAsync(simulatedTask(2, "calling"), service); CompletableFuture<String> nestedCompletable = CompletableFuture.supplyAsync(simulatedTask(1, "nested"), service); CompletableFuture<Void> collector = callingCompletable.acceptEither(nestedCompletable, results::add); pauseSeconds(2); assertThat(collector.isDone(), is(true)); assertThat(results.size(), is(1)); assertThat(results.contains("nested"), is(true)); } 复制代码
类似功能的 CompletableFuture.runAfterEither
@Test public void test_run_after_either() throws Exception { CompletableFuture<Void> run1 = CompletableFuture.runAsync(() -> { pauseSeconds(2); results.add("should be first"); }, service); CompletableFuture<Void> run2 = CompletableFuture.runAsync(() -> { pauseSeconds(3); results.add("should be second"); }, service); CompletableFuture<Void> finisher = run1.runAfterEitherAsync(run2,() -> results.add(results.get(0).toUpperCase()),service); pauseSeconds(4); assertThat(finisher.isDone(),is(true)); assertThat(results.get(1),is("SHOULD BE FIRST")); } 复制代码
多重组合
到目前为止,所有的组合/构成的例子都只有两个 CompletableFuture
对象。这里是有意为之,为了让例子尽量的简单明了。我们可以组合任意数量的 CompletionStage
。请注意,下面例子仅仅是为了说明而已!
@Test public void test_several_stage_combinations() throws Exception { Function<String,CompletableFuture<String>> upperCaseFunction = s -> CompletableFuture.completedFuture(s.toUpperCase()); CompletableFuture<String> stage1 = CompletableFuture.completedFuture("the quick "); CompletableFuture<String> stage2 = CompletableFuture.completedFuture("brown fox "); CompletableFuture<String> stage3 = stage1.thenCombine(stage2,(s1,s2) -> s1+s2); CompletableFuture<String> stage4 = stage3.thenCompose(upperCaseFunction); CompletableFuture<String> stage5 = CompletableFuture.supplyAsync(simulatedTask(2,"jumped over")); CompletableFuture<String> stage6 = stage4.thenCombineAsync(stage5,(s1,s2)-> s1+s2,service); CompletableFuture<String> stage6_sub_1_slow = CompletableFuture.supplyAsync(simulatedTask(4,"fell into")); CompletableFuture<String> stage7 = stage6.applyToEitherAsync(stage6_sub_1_slow,String::toUpperCase,service); CompletableFuture<String> stage8 = CompletableFuture.supplyAsync(simulatedTask(3," the lazy dog"),service); CompletableFuture<String> finalStage = stage7.thenCombineAsync(stage8,(s1,s2)-> s1+s2,service); assertThat(finalStage.get(),is("THE QUICK BROWN FOX JUMPED OVER the lazy dog")); } 复制代码
需要注意的是,组合CompletionStage的时候并不保证顺序。在这些单元测试中,提供了一个时间去模拟任务以确保完成顺序。
小结
本文主要是使用 CompletableFuture
类的第一部分。在后续文章中,将主要介绍错误处理及恢复,强制完成或取消。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Web Design Handbook
Baeck, Philippe de 编 / 2009-12 / $ 22.54
This non-technical book brings together contemporary web design's latest and most original creative examples in the areas of services, media, blogs, contacts, links and jobs. It also traces the latest......一起来看看 《Web Design Handbook》 这本书的介绍吧!
图片转BASE64编码
在线图片转Base64编码工具
html转js在线工具
html转js在线工具