内容简介:转载请注明出处:http://zhongmingmao.me/2019/05/15/java-concurrent-completable-future/访问原文「
CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> { System.out.println("T1: 洗水壶"); sleep(1, TimeUnit.SECONDS); System.out.println("T1: 烧开水"); sleep(15, TimeUnit.SECONDS); }); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { System.out.println("T2: 洗茶壶"); sleep(1, TimeUnit.SECONDS); System.out.println("T2: 洗茶杯"); sleep(2, TimeUnit.SECONDS); System.out.println("T2: 拿茶叶"); sleep(1, TimeUnit.SECONDS); return "龙井"; }); CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tea) -> { System.out.println("T3: 拿到茶叶: " + tea); System.out.println("T3: 泡茶"); return "上茶: " + tea; }); System.out.println(f3.join());
创建CompletableFuture对象
// 默认线程池,采用公共的ForkJoinPool线程池,线程数为CPU核数 // 如果所有的CompletableFuture共享同一个线程池,一旦有任务有慢IO操作,会导致其他线程饥饿,影响系统性能 // 所以,应该根据不同的业务类型创建不同的线程池,避免相互干扰 public static CompletableFuture<Void> runAsync(Runnable runnable); // Runnable.run没有返回值,Supplier.get有返回值 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier); // 指定线程池 public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor); public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor); // 创建完CompletableFuture对象之后,会自动地异步执行Runnable.run或者Supplier.get
CompletionStage
// CompletionStage接口可以清晰地描述任务之间的时序关系,例如串行关系、并行关系和汇聚关系(AND、OR)等 // CompletionStage接口也可以方便地描述异常处理 public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
串行关系
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor); public CompletableFuture<Void> thenAccept(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor); public CompletableFuture<Void> thenRun(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action, Executor executor); // 新建一个子流程,最终结果与thenApply相同 public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn); public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn); public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor);
// supplyAsync会启动一个异步流程,步骤1、2、3是串行执行的 CompletableFuture<String> f = CompletableFuture .supplyAsync(() -> "Hello World") // 1 .thenApply(s -> s + " QQ") // 2 .thenApply(String::toUpperCase); // 3 System.out.println(f.join()); // HELLO WORLD QQ
AND 汇聚关系
public <U,V> CompletionStage<V> thenCombine (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn); public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn); public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor); public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action); public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action); public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor); public CompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action); public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action); public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor);
OR 汇聚关系
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn); public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn); public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor); public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action); public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action); public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor); public CompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action); public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action); public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor);
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> { int t = getRandom(); System.out.println("f1 need " + t); sleep(t, TimeUnit.SECONDS); System.out.println("f1 done"); return "f1 takes " + String.valueOf(t); }); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> { int t = getRandom(); System.out.println("f2 need " + t); sleep(t, TimeUnit.SECONDS); System.out.println("f2 done"); return "f2 takes " + String.valueOf(t); }); CompletableFuture<String> f3 = f1.applyToEither(f2, s -> s); f3.join(); // f1 need 9 // f2 need 1 // f2 done
异常处理
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn); public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action); public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action); public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor); public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn); public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn); public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor);
CompletableFuture<Integer> f = CompletableFuture .supplyAsync(() -> 1 / 0) .thenApply(i -> i * 10) .exceptionally(t -> 0) // 类似于catch{} .whenComplete((i, t) -> { // 类似于finally,whenComplete不支持返回结果,handle支持返回结果 }); System.out.println(f.join()); // 0
转载请注明出处:http://zhongmingmao.me/2019/05/15/java-concurrent-completable-future/
访问原文「 Java并发 -- CompletableFuture 」获取最佳阅读体验并参与讨论
以上所述就是小编给大家介绍的《Java并发 -- CompletableFuture》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Java并发系列—并发编程基础
- [Java并发-17-并发设计模式] Immutability模式:如何利用不变性解决并发问题?
- JAVA并发编程之并发模拟工具
- Java并发系列—并发编程的挑战
- Core Java 并发:理解并发概念
- [Java并发-11] 并发容器的使用
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Python for Data Analysis
Wes McKinney / O'Reilly Media / 2012-11-1 / USD 39.99
Finding great data analysts is difficult. Despite the explosive growth of data in industries ranging from manufacturing and retail to high technology, finance, and healthcare, learning and accessing d......一起来看看 《Python for Data Analysis》 这本书的介绍吧!