内容简介:经常出现在等待某条 SQL 执行完成后,再继续执行下一条 SQL ,而这两条 SQL 本身是并无关系的,可以同时进行执行的。我们希望能够两条 SQL 同时进行处理,而不是等待其中的某一条 SQL 完成后,再继续下一条。由此可以扩展,在很多任务下,我们需要执行两个任务但这两个任务并没有前后的关联关系,我们也希望两个任务能够同时执行,然后再将执行结果汇聚就可以了。future 通过提交一个 callable 任务给线程池,线程池后台启动其他线程去执行,然后再调用 get() 方法获取结果
- 创建异步计算,并获取计算结果
- 使用非阻塞操作提升吞量
- 设计和实现异步 API
- 以异步的方式使用同步的 API
- 对两个或多个异步操作进行流水线和合并操作
- 处理异步操作的完成状态
现状
经常出现在等待某条 SQL 执行完成后,再继续执行下一条 SQL ,而这两条 SQL 本身是并无关系的,可以同时进行执行的。我们希望能够两条 SQL 同时进行处理,而不是等待其中的某一条 SQL 完成后,再继续下一条。
由此可以扩展,在很多任务下,我们需要执行两个任务但这两个任务并没有前后的关联关系,我们也希望两个任务能够同时执行,然后再将执行结果汇聚就可以了。
Future
Future 的功能
future 通过提交一个 callable 任务给线程池,线程池后台启动其他线程去执行,然后再调用 get() 方法获取结果
private void test() {
ExecutorService executor = Executors.newCachedThreadPool();
Future<Integer> future = executor.submit(() -> sleep(1));
try {
Integer integer = future.get(3, TimeUnit.SECONDS);
System.out.println(integer);
} catch (InterruptedException e) {
// 当前线在等待中被中断
e.printStackTrace();
} catch (ExecutionException e) {
// 任务执行中的异常
e.printStackTrace();
} catch (TimeoutException e) {
// 超时
e.printStackTrace();
}
}
private int sleep(int timeout) {
try {
TimeUnit.SECONDS.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}
复制代码
该方式存在的问题,如果 sleep 执行超过 3 秒钟,future 将无法拿到返回结果。当然,Future 提供了一个无参的get 方法,可以一直等待结果。不过还是建议使用带超时参数的 get 方法,同时定义好超时的处理方法。
Future 不具备的功能
- 将两个异步计算合并为一个,这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。
- 等待 Future 集合中的所有任务都完成。
- 仅等待 Future 集合中最快结束的任务完成,并返回它的结果。
- 通过编程方式完成一个 Future 任务的执行。
- 应对 Future 的完成事件,即当 Future 的完成事件发生时会收到通知,并能使用 Future 计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果。
CompletableFuture
- 提供异步 API
- 同步变异步
- 以响应式方式处理异步操作的完成事件
同步
调用某个方法,调用方在被调用方运行的过程中会等待,直到被调用方运行结束后返回,调用方取得被调用方的返回值并继续运行。即使调用方和被调用方不在同一个线程中运行,调用方还是需要等待被调用方结束才运行,这就是阻塞式调用。
异步
异步 API 调用后会直接返回,将计算任务交给其他线程来进行。其他线程执行完成后,再将结果返回给调用方。
使用异步 API
public void test(){
CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
new Thread(() -> {
int sleep = sleep(1);
completableFuture.complete(sleep);
}).start();
CompletableFuture<Integer> completableFuture1 = new CompletableFuture<>();
new Thread(() -> {
int sleep = sleep(2);
completableFuture1.complete(sleep);
}).start();
Integer integer = null;
Integer integer1 = null;
try {
integer = completableFuture.get();
integer1 = completableFuture1.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println(integer + "....CompletableFuture.." + integer1);
Instant end = Instant.now();
Duration duration = Duration.between(start, end);
long l = duration.toMillis();
System.err.println(l);
}
private int sleep(int timeout) {
try {
TimeUnit.SECONDS.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
return timeout;
}
复制代码
异步处理
上面代码的问题是,如果在线程内发生了异常,如何在外部的调用中被发现,同时去处理呢?正常的情况是,线程内发生异常,会直接被封锁在线程内,而最终线程会被杀死,那么 get 方法一直会阻塞。
此时就不应该使用 get() 方法,而是使用带有超时参数的 get 方法,并且在线程内,将异常传递回调用方。
new Thread(() -> {
try {
int sleep = sleep(2);
completableFuture1.complete(sleep);
} catch (Exception e) {
completableFuture1.completeExceptionally(e);
}
}).start();
复制代码
completableFuture1.completeExceptionally(e); 将异常传递出来,在 ExecutionException 中会被捕获,然后对其进行处理即可。
try {
integer = completableFuture.get();
integer1 = completableFuture1.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
复制代码
示例:
public void test(){
CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
new Thread(() -> {
try {
throw new RuntimeException("故意抛出的异常...");
} catch (Exception e) {
completableFuture.completeExceptionally(e);
}
}).start();
Integer integer = null;
try {
integer = completableFuture.get(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
System.out.println(integer + "....CompletableFuture.." );
Instant end = Instant.now();
Duration duration = Duration.between(start, end);
long l = duration.toMillis();
System.err.println(l);
}
复制代码
此时会收到的异常:
java.util.concurrent.ExecutionException: java.lang.RuntimeException: 故意抛出的异常... at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at com.example.demo.me.sjl.service.UserService.test(UserService.java:92) at com.example.demo.me.sjl.controller.UserController.test(UserController.java:20) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at .... 复制代码
使用工厂方法 supplyAsync 创建 CompletableFuture
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> sleep(1)); 复制代码
相比于 new 的方式,更加优雅、简洁,并且不用显式的创建线程(new Thread) 操作。默认会交由 ForkJoinPoll 池中的某个执行线程运行,同时也提供了重载的方法,指定 Executor 。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
复制代码
如何确定默认的线程数量:
-
如果配置了系统属性
java.util.concurrent.ForkJoinPool.common.parallelism则取该值,转换成 int 作为线程数量String pp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.parallelism"); if (pp != null) parallelism = Integer.parseInt(pp); 复制代码 -
没有配置该值,则取
Runtime.getRuntime().availableProcessors()值作为线程数量if (parallelism < 0 && // default 1 less than #cores (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) parallelism = 1; 复制代码parallelism 初始值为 -1
调整线程池的大小
-
其中:$$N_{cppu}$$ 是处理器的核的数目,可以通过
Runtime.getRuntime().availableProcessors得到 -
上面是一个参考公式《Java并发编程实战》(mng.bz/979c )
这是计算出的理论值,不过我们在使用时,需要考虑实际情况,比如我有 5 个并行任务,那么我需要开启 5 个线程来分别进行执行,多了会千万浪费,少了达不到并发的效果。此时我们需要 5 个线程。
插入数据库时
public void save(){
CompletableFuture<UserEntity> completableFuture = CompletableFuture.supplyAsync(() -> {
UserEntity entity = UserEntity.builder()
.id(1112)
.userName("施杰灵")
.password("abc1213")
.birthday("2018-08-08")
.createUser("1")
.createTime(LocalDateTime.now())
.updateUser("2")
.updateTime(LocalDateTime.now())
.build();
return userRepository.save(entity);
});
CompletableFuture<UserEntity> completableFuture1 = CompletableFuture.supplyAsync(() -> {
UserEntity entity = UserEntity.builder()
.id(223)
.userName("施杰灵1")
.password("abc12131")
.birthday("2018-08-18")
.createUser("11")
.createTime(LocalDateTime.now())
.updateUser("21")
.updateTime(LocalDateTime.now())
.build();
if (true) {
throw new RuntimeException("故意抛出的异常...");
}
return userRepository.save(entity);
});
System.out.println(completableFuture.join());
System.out.println(completableFuture1.join());
}
复制代码
测试结果,上面那条数据正常插入到数据库中,下面的数据插入失败。事务并没有回滚。
将两个异步计算合并为一个,依赖 (thenCompose)
将两个异步计算合并为一个,这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果
public void test(){
CompletableFuture<Integer> compose = CompletableFuture.supplyAsync(() -> sleep(2))
.thenCompose(
(x) -> CompletableFuture.supplyAsync(() -> sleep(x))
);
}
private int sleep(int timeout) {
try {
TimeUnit.SECONDS.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
return timeout;
}
复制代码
从上面的代码中,可以看到在进行计算的时候,是使用到了前面的返回值 x ,整个任务的运行时间是 4 秒。
将两个异步计算合并为一个,无论是否依赖 (thenCombine)
public void test() {
CompletableFuture<Integer> combine = CompletableFuture.supplyAsync(() -> sleep(2))
.thenCombine(
CompletableFuture.supplyAsync(() -> sleep(1)),
(t1, t2) -> t1 + t2
);
}
private int sleep(int timeout) {
try {
TimeUnit.SECONDS.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
return timeout;
}
复制代码
- thenCombine
- thenCombineAsync
两个方法接收的参数是一致的,区别在于他们接收的第二个参数: BiFunction 是否会在提交到线程池中,由另外一个任务以异步的方式执行。 thenCombine 不会以异步方式执行 BiFunction 而 thenCombineAsync 会以异步的方式执行。
何时使用 Async 后缀的方法?
当我们进行合并的方法是一个耗时的方法时,就尽可能的考虑使用 Async 后缀的方法。
在插入数据库时
我们平常的操作是,插入数据库时,如果两个操作中,其中一个操作发生异常,是否会回滚?
@Transactional(rollbackFor = Exception.class)
public void save() {
CompletableFuture<UserEntity> completableFuture = CompletableFuture.supplyAsync(() -> {
UserEntity entity = UserEntity.builder()
.id(111)
.userName("施杰灵")
.password("abc1213")
.birthday("2018-08-08")
.createUser("1")
.createTime(LocalDateTime.now())
.updateUser("2")
.updateTime(LocalDateTime.now())
.build();
return userRepository.save(entity);
}).thenCombine(CompletableFuture.supplyAsync(() -> {
UserEntity entity = UserEntity.builder()
.id(222)
.userName("施杰灵1")
.password("abc12131")
.birthday("2018-08-18")
.createUser("11")
.createTime(LocalDateTime.now())
.updateUser("21")
.updateTime(LocalDateTime.now())
.build();
return userRepository.save(entity);
}), (a, b) -> {
System.out.println(a);
System.out.println(b);
return a;
});
UserEntity join = completableFuture.join();
System.out.println(join);
}
复制代码
经过实际测试,第二个任务抛出异常,是会回滚的。
CompletableFuture 的 Completion 事件
Java 8的CompletableFuture 通过thenAccept 方法提供了这一功能,它接收CompletableFuture 执行完毕后的返回值做参数。
public void test() {
CompletableFuture.supplyAsync(() -> sleep(2))
.thenCombineAsync(
CompletableFuture.supplyAsync(() -> sleep(1)),
(t1, t2) -> t1 + t2
).thenAccept((t) -> System.out.println(t + "------"));
}
private int sleep(int timeout) {
try {
TimeUnit.SECONDS.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
return timeout;
}
复制代码
线程池(ThreadPoolExecutor)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) ;
复制代码
-
int corePoolSize : 核心池的大小,这个参数与后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了 prestartAllCoreThreads() 或者 prestartCoreThread() 方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。 默认情况下 ,在创建了线程池后,线程池中的线程数为 0 ,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
-
int maximumPoolSize : 线程池最大线程数,它表示在线程池中最多能创建多少个线程;
-
long keepAliveTime : 表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize:即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize;但是如果调用了**allowCoreThreadTimeOut(boolean)**方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
-
TimeUnit unit : 参数keepAliveTime的时间单位
-
BlockingQueue<Runnable> workQueue : 一个阻塞队列,用来存储等待执行的任务,这个参数的选择会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择
-
ArrayBlockingQueue
-
LinkedBlockingQueue
-
-
PriorityBlockingQueue
- SynchronousQueue
ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
-
ThreadFactory threadFactory
-
RejectedExecutionHandler handler : 实现RejectedExecutionHandler接口,可自定义处理器
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- CompletableFuture 应用实践
- Java 8 CompletableFuture
- CompletableFuture
- Java并发 -- CompletableFuture
- CompletableFuture之灭霸的故事
- [Java并发-15] CompletableFuture: 异步编程
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Data Mining
Jiawei Han、Micheline Kamber、Jian Pei / Morgan Kaufmann / 2011-7-6 / USD 74.95
The increasing volume of data in modern business and science calls for more complex and sophisticated tools. Although advances in data mining technology have made extensive data collection much easier......一起来看看 《Data Mining》 这本书的介绍吧!