内容简介:经常出现在等待某条 SQL 执行完成后,再继续执行下一条 SQL ,而这两条 SQL 本身是并无关系的,可以同时进行执行的。我们希望能够两条 SQL 同时进行处理,而不是等待其中的某一条 SQL 完成后,再继续下一条。由此可以扩展,在很多任务下,我们需要执行两个任务但这两个任务并没有前后的关联关系,我们也希望两个任务能够同时执行,然后再将执行结果汇聚就可以了。future 通过提交一个 callable 任务给线程池,线程池后台启动其他线程去执行,然后再调用 get() 方法获取结果
- 创建异步计算,并获取计算结果
- 使用非阻塞操作提升吞量
- 设计和实现异步 API
- 以异步的方式使用同步的 API
- 对两个或多个异步操作进行流水线和合并操作
- 处理异步操作的完成状态
现状
经常出现在等待某条 SQL 执行完成后,再继续执行下一条 SQL ,而这两条 SQL 本身是并无关系的,可以同时进行执行的。我们希望能够两条 SQL 同时进行处理,而不是等待其中的某一条 SQL 完成后,再继续下一条。
由此可以扩展,在很多任务下,我们需要执行两个任务但这两个任务并没有前后的关联关系,我们也希望两个任务能够同时执行,然后再将执行结果汇聚就可以了。
Future
Future 的功能
future 通过提交一个 callable 任务给线程池,线程池后台启动其他线程去执行,然后再调用 get() 方法获取结果
private void test() { ExecutorService executor = Executors.newCachedThreadPool(); Future<Integer> future = executor.submit(() -> sleep(1)); try { Integer integer = future.get(3, TimeUnit.SECONDS); System.out.println(integer); } catch (InterruptedException e) { // 当前线在等待中被中断 e.printStackTrace(); } catch (ExecutionException e) { // 任务执行中的异常 e.printStackTrace(); } catch (TimeoutException e) { // 超时 e.printStackTrace(); } } private int sleep(int timeout) { try { TimeUnit.SECONDS.sleep(timeout); } catch (InterruptedException e) { e.printStackTrace(); } return 1; } 复制代码
该方式存在的问题,如果 sleep 执行超过 3 秒钟,future 将无法拿到返回结果。当然,Future 提供了一个无参的get 方法,可以一直等待结果。不过还是建议使用带超时参数的 get 方法,同时定义好超时的处理方法。
Future 不具备的功能
- 将两个异步计算合并为一个,这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。
- 等待 Future 集合中的所有任务都完成。
- 仅等待 Future 集合中最快结束的任务完成,并返回它的结果。
- 通过编程方式完成一个 Future 任务的执行。
- 应对 Future 的完成事件,即当 Future 的完成事件发生时会收到通知,并能使用 Future 计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果。
CompletableFuture
- 提供异步 API
- 同步变异步
- 以响应式方式处理异步操作的完成事件
同步
调用某个方法,调用方在被调用方运行的过程中会等待,直到被调用方运行结束后返回,调用方取得被调用方的返回值并继续运行。即使调用方和被调用方不在同一个线程中运行,调用方还是需要等待被调用方结束才运行,这就是阻塞式调用。
异步
异步 API 调用后会直接返回,将计算任务交给其他线程来进行。其他线程执行完成后,再将结果返回给调用方。
使用异步 API
public void test(){ CompletableFuture<Integer> completableFuture = new CompletableFuture<>(); new Thread(() -> { int sleep = sleep(1); completableFuture.complete(sleep); }).start(); CompletableFuture<Integer> completableFuture1 = new CompletableFuture<>(); new Thread(() -> { int sleep = sleep(2); completableFuture1.complete(sleep); }).start(); Integer integer = null; Integer integer1 = null; try { integer = completableFuture.get(); integer1 = completableFuture1.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println(integer + "....CompletableFuture.." + integer1); Instant end = Instant.now(); Duration duration = Duration.between(start, end); long l = duration.toMillis(); System.err.println(l); } private int sleep(int timeout) { try { TimeUnit.SECONDS.sleep(timeout); } catch (InterruptedException e) { e.printStackTrace(); } return timeout; } 复制代码
异步处理
上面代码的问题是,如果在线程内发生了异常,如何在外部的调用中被发现,同时去处理呢?正常的情况是,线程内发生异常,会直接被封锁在线程内,而最终线程会被杀死,那么 get 方法一直会阻塞。
此时就不应该使用 get() 方法,而是使用带有超时参数的 get 方法,并且在线程内,将异常传递回调用方。
new Thread(() -> { try { int sleep = sleep(2); completableFuture1.complete(sleep); } catch (Exception e) { completableFuture1.completeExceptionally(e); } }).start(); 复制代码
completableFuture1.completeExceptionally(e);
将异常传递出来,在 ExecutionException
中会被捕获,然后对其进行处理即可。
try { integer = completableFuture.get(); integer1 = completableFuture1.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } 复制代码
示例:
public void test(){ CompletableFuture<Integer> completableFuture = new CompletableFuture<>(); new Thread(() -> { try { throw new RuntimeException("故意抛出的异常..."); } catch (Exception e) { completableFuture.completeExceptionally(e); } }).start(); Integer integer = null; try { integer = completableFuture.get(3, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } System.out.println(integer + "....CompletableFuture.." ); Instant end = Instant.now(); Duration duration = Duration.between(start, end); long l = duration.toMillis(); System.err.println(l); } 复制代码
此时会收到的异常:
java.util.concurrent.ExecutionException: java.lang.RuntimeException: 故意抛出的异常... at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at com.example.demo.me.sjl.service.UserService.test(UserService.java:92) at com.example.demo.me.sjl.controller.UserController.test(UserController.java:20) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at .... 复制代码
使用工厂方法 supplyAsync
创建 CompletableFuture
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> sleep(1)); 复制代码
相比于 new 的方式,更加优雅、简洁,并且不用显式的创建线程(new Thread) 操作。默认会交由 ForkJoinPoll
池中的某个执行线程运行,同时也提供了重载的方法,指定 Executor 。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } 复制代码
如何确定默认的线程数量:
-
如果配置了系统属性
java.util.concurrent.ForkJoinPool.common.parallelism
则取该值,转换成 int 作为线程数量String pp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.parallelism"); if (pp != null) parallelism = Integer.parseInt(pp); 复制代码
-
没有配置该值,则取
Runtime.getRuntime().availableProcessors()
值作为线程数量if (parallelism < 0 && // default 1 less than #cores (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) parallelism = 1; 复制代码
parallelism 初始值为 -1
调整线程池的大小
-
其中:$$N_{cppu}$$ 是处理器的核的数目,可以通过
Runtime.getRuntime().availableProcessors
得到
上面是一个参考公式《Java并发编程实战》(mng.bz/979c )
这是计算出的理论值,不过我们在使用时,需要考虑实际情况,比如我有 5 个并行任务,那么我需要开启 5 个线程来分别进行执行,多了会千万浪费,少了达不到并发的效果。此时我们需要 5 个线程。
插入数据库时
public void save(){ CompletableFuture<UserEntity> completableFuture = CompletableFuture.supplyAsync(() -> { UserEntity entity = UserEntity.builder() .id(1112) .userName("施杰灵") .password("abc1213") .birthday("2018-08-08") .createUser("1") .createTime(LocalDateTime.now()) .updateUser("2") .updateTime(LocalDateTime.now()) .build(); return userRepository.save(entity); }); CompletableFuture<UserEntity> completableFuture1 = CompletableFuture.supplyAsync(() -> { UserEntity entity = UserEntity.builder() .id(223) .userName("施杰灵1") .password("abc12131") .birthday("2018-08-18") .createUser("11") .createTime(LocalDateTime.now()) .updateUser("21") .updateTime(LocalDateTime.now()) .build(); if (true) { throw new RuntimeException("故意抛出的异常..."); } return userRepository.save(entity); }); System.out.println(completableFuture.join()); System.out.println(completableFuture1.join()); } 复制代码
测试结果,上面那条数据正常插入到数据库中,下面的数据插入失败。事务并没有回滚。
将两个异步计算合并为一个,依赖 (thenCompose)
将两个异步计算合并为一个,这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果
public void test(){ CompletableFuture<Integer> compose = CompletableFuture.supplyAsync(() -> sleep(2)) .thenCompose( (x) -> CompletableFuture.supplyAsync(() -> sleep(x)) ); } private int sleep(int timeout) { try { TimeUnit.SECONDS.sleep(timeout); } catch (InterruptedException e) { e.printStackTrace(); } return timeout; } 复制代码
从上面的代码中,可以看到在进行计算的时候,是使用到了前面的返回值 x
,整个任务的运行时间是 4 秒。
将两个异步计算合并为一个,无论是否依赖 (thenCombine)
public void test() { CompletableFuture<Integer> combine = CompletableFuture.supplyAsync(() -> sleep(2)) .thenCombine( CompletableFuture.supplyAsync(() -> sleep(1)), (t1, t2) -> t1 + t2 ); } private int sleep(int timeout) { try { TimeUnit.SECONDS.sleep(timeout); } catch (InterruptedException e) { e.printStackTrace(); } return timeout; } 复制代码
- thenCombine
- thenCombineAsync
两个方法接收的参数是一致的,区别在于他们接收的第二个参数: BiFunction
是否会在提交到线程池中,由另外一个任务以异步的方式执行。 thenCombine
不会以异步方式执行 BiFunction
而 thenCombineAsync
会以异步的方式执行。
何时使用 Async 后缀的方法?
当我们进行合并的方法是一个耗时的方法时,就尽可能的考虑使用 Async 后缀的方法。
在插入数据库时
我们平常的操作是,插入数据库时,如果两个操作中,其中一个操作发生异常,是否会回滚?
@Transactional(rollbackFor = Exception.class) public void save() { CompletableFuture<UserEntity> completableFuture = CompletableFuture.supplyAsync(() -> { UserEntity entity = UserEntity.builder() .id(111) .userName("施杰灵") .password("abc1213") .birthday("2018-08-08") .createUser("1") .createTime(LocalDateTime.now()) .updateUser("2") .updateTime(LocalDateTime.now()) .build(); return userRepository.save(entity); }).thenCombine(CompletableFuture.supplyAsync(() -> { UserEntity entity = UserEntity.builder() .id(222) .userName("施杰灵1") .password("abc12131") .birthday("2018-08-18") .createUser("11") .createTime(LocalDateTime.now()) .updateUser("21") .updateTime(LocalDateTime.now()) .build(); return userRepository.save(entity); }), (a, b) -> { System.out.println(a); System.out.println(b); return a; }); UserEntity join = completableFuture.join(); System.out.println(join); } 复制代码
经过实际测试,第二个任务抛出异常,是会回滚的。
CompletableFuture 的 Completion 事件
Java 8的CompletableFuture 通过thenAccept 方法提供了这一功能,它接收CompletableFuture 执行完毕后的返回值做参数。
public void test() { CompletableFuture.supplyAsync(() -> sleep(2)) .thenCombineAsync( CompletableFuture.supplyAsync(() -> sleep(1)), (t1, t2) -> t1 + t2 ).thenAccept((t) -> System.out.println(t + "------")); } private int sleep(int timeout) { try { TimeUnit.SECONDS.sleep(timeout); } catch (InterruptedException e) { e.printStackTrace(); } return timeout; } 复制代码
线程池(ThreadPoolExecutor)
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) ; 复制代码
-
int corePoolSize : 核心池的大小,这个参数与后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了 prestartAllCoreThreads() 或者 prestartCoreThread() 方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。 默认情况下 ,在创建了线程池后,线程池中的线程数为 0 ,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
-
int maximumPoolSize : 线程池最大线程数,它表示在线程池中最多能创建多少个线程;
-
long keepAliveTime : 表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize:即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize;但是如果调用了**allowCoreThreadTimeOut(boolean)**方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
-
TimeUnit unit : 参数keepAliveTime的时间单位
-
BlockingQueue<Runnable> workQueue : 一个阻塞队列,用来存储等待执行的任务,这个参数的选择会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择
-
ArrayBlockingQueue
-
LinkedBlockingQueue
-
-
PriorityBlockingQueue
- SynchronousQueue
ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
-
ThreadFactory threadFactory
-
RejectedExecutionHandler handler : 实现RejectedExecutionHandler接口,可自定义处理器
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- CompletableFuture 应用实践
- Java 8 CompletableFuture
- CompletableFuture
- Java并发 -- CompletableFuture
- CompletableFuture之灭霸的故事
- [Java并发-15] CompletableFuture: 异步编程
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
计算机网络(第5版)
Andrew S. Tanenbaum、David J. Wetherall / 严伟、潘爱民 / 清华大学出版社 / 2012-3-1 / 89.50元
本书是国内外使用最广泛、最权威的计算机网络经典教材。全书按照网络协议模型自下而上(物理层、数据链路层、介质访问控制层、网络层、传输层和应用层)有系统地介绍了计算机网络的基本原理,并结合Internet给出了大量的协议实例。在讲述网络各层次内容的同时,还与时俱进地引入了最新的网络技术,包括无线网络、3G蜂窝网络、RFID与传感器网络、内容分发与P2P网络、流媒体传输与IP语音,以及延迟容忍网络等。另......一起来看看 《计算机网络(第5版)》 这本书的介绍吧!