内容简介:熔断器 Hystrix 源码解析 —— 命令执行(二)之执行隔离策略
摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-execute-second-isolation-strategy/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文主要基于 Hystrix 1.5.X 版本
- 1. 概述
- 2. HystrixThreadPoolProperties
- 3. HystrixThreadPoolKey
- 4. HystrixConcurrencyStrategy
- 5. HystrixThreadPool
- 6. HystrixScheduler
- 666. 彩蛋
关注 微信公众号:【芋道源码】 有福利:
- RocketMQ / MyCAT / Sharding-JDBC 所有 源码分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
- 您对于源码的疑问每条留言 都 将得到 认真 回复。 甚至不知道如何读源码也可以请教噢 。
- 新的 源码解析文章 实时 收到通知。 每周更新一篇左右 。
- 认真的 源码交流微信群。
1. 概述
本文主要分享 Hystrix 命令执行(二)之执行隔离策略 。
建议 :对 RxJava 已经有一定的了解的基础上阅读本文。
Hystrix 提供两种执行隔离策略( ExecutionIsolationStrategy ) :
-
SEMAPHORE:信号量,命令在 调用线程 执行。在 《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》「3. TryableSemaphore」 已经详细解析。 -
THREAD:线程池,命令在 线程池 执行。在 《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》「5. #executeCommandWithSpecifiedIsolation(…)」 的#executeCommandWithSpecifiedIsolation(...)方法中,调用Observable#subscribeOn(Scheduler)方法,指定在 RxJava Scheduler 执行。- 如果你暂时不了解 Scheduler ,可以阅读 《RxJava 源码解析 —— Scheduler》 。
- 如果你暂时不了解
Observable#subscribeOn(Scheduler),可以阅读 《RxJava 源码解析 —— Observable#subscribeOn(Scheduler)》 。
两种方式的 优缺点比较 ,推荐阅读 《【翻译】Hystrix文档-实现原理》「依赖隔离」 。
推荐 Spring Cloud 书籍:
- 请支持正版。下载盗版, 等于主动编写低级 BUG 。
- 程序猿DD —— 《Spring Cloud微服务实战》
- 周立 —— 《Spring Cloud与 Docker 微服务架构实战》
- 两书齐买,京东包邮。
2. HystrixThreadPoolProperties
com.netflix.hystrix.HystrixThreadPoolProperties ,Hystrix 线程池属性配置 抽象类 ,点击 链接 查看,已添加中文注释说明。
com.netflix.hystrix.strategy.properties.HystrixPropertiesThreadPoolDefault ,Hystrix 线程池配置 实现类 ,点击 链接 查看。实际上没什么内容,官方如是说 :
Default implementation of {@link HystrixThreadPoolProperties} using Archaius ( https://github.com/Netflix/archaius )
3. HystrixThreadPoolKey
com.netflix.hystrix.HystrixThreadPoolKey ,Hystrix 线程池标识 接口 。
FROM HystrixThreadPoolKey 接口注释
A key to represent a {@link HystrixThreadPool} for monitoring, metrics publishing, caching and other such uses.
This interface is intended to work natively with Enums so that implementing code can be an enum that implements this interface.
- 直白的说 ,希望通过相同的
name( 标识 ) 获得同 HystrixThreadPoolKey 对象。通过在内部维持一个name与 HystrixThreadPoolKey 对象的映射,以达到 枚举 的效果。
HystrixThreadPoolKey 代码如下 :
1: public interface HystrixThreadPoolKey extends HystrixKey{
2: class Factory{
3: private Factory(){
4: }
5:
6: // used to intern instances so we don't keep re-creating them millions of times for the same key
7: private static final InternMap<String, HystrixThreadPoolKey> intern
8: = new InternMap<String, HystrixThreadPoolKey>(
9: new InternMap.ValueConstructor<String, HystrixThreadPoolKey>() {
10: @Override
11: public HystrixThreadPoolKey create(String key){
12: return new HystrixThreadPoolKeyDefault(key);
13: }
14: });
15:
16: public static HystrixThreadPoolKey asKey(String name){
17: return intern.interned(name);
18: }
19:
20: private static class HystrixThreadPoolKeyDefault extends HystrixKeyDefault implements HystrixThreadPoolKey{
21: public HystrixThreadPoolKeyDefault(String name){
22: super(name);
23: }
24: }
25:
26: /* package-private */ static int getThreadPoolCount(){
27: return intern.size();
28: }
29: }
30: }
- HystrixThreadPoolKey 实现
com.netflix.hystrix.HystrixKey接口 ,点击 链接 查看。该接口定义的#name()方法,即是上文我们所说的标识( Key )。 -
intern属性,name与 HystrixThreadPoolKey 对象的映射,以达到 枚举 的效果。-
com.netflix.hystrix.util.InternMap,点击 链接 查看带中文注释的代码。
-
-
#asKey(name)方法,从intern获得 HystrixThreadPoolKey 对象。 -
#getThreadPoolCount()方法,获得 HystrixThreadPoolKey 数量。
在 AbstractCommand 构造方法 里,初始化命令的 threadPoolKey 属性,代码如下 :
protected final HystrixThreadPoolKey threadPoolKey;
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook){
// ... 省略无关代码
this.commandGroup = initGroupKey(group);
this.commandKey = initCommandKey(key, getClass());
this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
// 初始化 threadPoolKey
this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
}
-
调用
#initThreadPoolKey(...)方法,创建最终的threadPoolKey属性。代码如下 :private static HystrixThreadPoolKey initThreadPoolKey(HystrixThreadPoolKey threadPoolKey, HystrixCommandGroupKey groupKey, String threadPoolKeyOverride){ if (threadPoolKeyOverride == null) { // we don't have a property overriding the value so use either HystrixThreadPoolKey or HystrixCommandGroup if (threadPoolKey == null) { /* use HystrixCommandGroup if HystrixThreadPoolKey is null */ return HystrixThreadPoolKey.Factory.asKey(groupKey.name()); } else { return threadPoolKey; } } else { // threadPoolKeyOverride 可覆盖属性 // we have a property defining the thread-pool so use it instead return HystrixThreadPoolKey.Factory.asKey(threadPoolKeyOverride); } }- 优先级 :
threadPoolKeyOverride>threadPoolKey>groupKey
- 优先级 :
4. HystrixConcurrencyStrategy
com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy ,Hystrix 并发策略 抽象类 。
HystrixConcurrencyStrategy#getThreadPool(...) 方法,代码如下 :
1: public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties){
2: final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
3:
4: final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
5: final int dynamicCoreSize = threadPoolProperties.coreSize().get();
6: final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
7: final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
8:
9: final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
10:
11: if (allowMaximumSizeToDivergeFromCoreSize) {
12: final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
13: if (dynamicCoreSize > dynamicMaximumSize) {
14: logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
15: dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " +
16: dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
17: return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
18: } else {
19: return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
20: }
21: } else {
22: return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
23: }
24: }
- 第 2 行 :调用
#getThreadFactory(...)方法,获得 ThreadFactory 。点击 链接 查看方法代码。-
PlatformSpecific#getAppEngineThreadFactory()方法,无需细看,适用于 Google App Engine 场景。
-
- 第 4 至 7 行 :「2. HystrixThreadPoolProperties」有详细解析。
- 第 9 行 :调用
#getBlockingQueue()方法,获得线程池的阻塞队列。点击 链接 查看方法代码。- 当
maxQueueSize <= 0时( 默认值 :-1) 时,使用 SynchronousQueue 。超过线程池的maximumPoolSize时,提交任务 被拒绝 。 - 当
SynchronousQueue > 0时,使用 LinkedBlockingQueue 。超过线程池的maximumPoolSize时,任务被拒绝。超过线程池的maximumPoolSize+ 线程池队列的maxQueueSize时,提交任务 被阻塞等待 。
- 当
- 第 11 至 23 行 :创建 ThreadPoolExecutor 。看起来代码比较多,根据
allowMaximumSizeToDivergeFromCoreSize的情况,计算线程池的maximumPoolSize属性。计算的方式和HystrixThreadPoolProperties#actualMaximumSize()方法是一致的。
com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategyDefault ,Hystrix 并发策略 实现类 。代码如下( 基本没做啥 ) :
public class HystrixConcurrencyStrategyDefault extends HystrixConcurrencyStrategy{
/**
* 单例
*/
private static HystrixConcurrencyStrategyDefault INSTANCE = new HystrixConcurrencyStrategyDefault();
public static HystrixConcurrencyStrategy getInstance(){
return INSTANCE;
}
private HystrixConcurrencyStrategyDefault(){
}
}
在 AbstractCommand 构造方法 里,初始化命令的 threadPoolKey 属性,代码如下 :
protected final HystrixConcurrencyStrategy concurrencyStrategy;
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook){
// ... 省略无关代码
// 初始化 并发策略
this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
}
- HystrixPlugins ,Hystrix 插件 体系, https://github.com/Netflix/Hystrix/wiki/Plugins 有详细解析。
- 调用
HystrixPlugins#getConcurrencyStrategy()获得 HystrixConcurrencyStrategy 对象。默认情况下,使用 HystrixConcurrencyStrategyDefault 。当然你也可以参考 Hystrix 插件体系,实现 自定义 的 HystrixConcurrencyStrategy 实现,以达到 覆写#getThreadPool(),#getBlockingQueue()等方法。点击 链接 查看该方法代码。
5. HystrixThreadPool
com.netflix.hystrix.HystrixThreadPool ,Hystrix 线程池 接口 。当 Hystrix 命令使用 THREAD 执行隔离策略时, HystrixCommand#run() 方法在 线程池执行 。点击 链接 查看。HystrixThreadPool 定义接口如下 :
-
#getExecutor():获得 ExecutorService 。 -
#getScheduler()/#getScheduler(Func0<Boolean>):获得 RxJava Scheduler 。 -
#isQueueSpaceAvailable():线程池队列是否有 空余 。 -
#markThreadExecution()/#markThreadCompletion()/#markThreadRejection():TODO 【2002】【metrics】
5.1 HystrixThreadPoolDefault
com.netflix.hystrix.HystrixThreadPool.HystrixThreadPoolDefault ,Hystrix 线程池 实现类 。
构造方法,代码如下 :
1: private final HystrixThreadPoolProperties properties;
2: private final BlockingQueue<Runnable> queue;
3: private final ThreadPoolExecutor threadPool;
4: private final HystrixThreadPoolMetrics metrics;
5: private final int queueSize;
6:
7: public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults){
8: // 初始化 HystrixThreadPoolProperties
9: this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
10: // 获得 HystrixConcurrencyStrategy
11: HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
12: // 队列大小
13: this.queueSize = properties.maxQueueSize().get();
14:
15: // TODO 【2002】【metrics】
16: this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
17: concurrencyStrategy.getThreadPool(threadPoolKey, properties), // 初始化 ThreadPoolExecutor
18: properties);
19:
20: // 获得 ThreadPoolExecutor
21: this.threadPool = this.metrics.getThreadPool();
22: this.queue = this.threadPool.getQueue(); // 队列
23:
24: // TODO 【2002】【metrics】
25: /* strategy: HystrixMetricsPublisherThreadPool */
26: HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
27: }
- 第 9 行 :初始化 HystrixThreadPoolProperties 。
- 第 11 行 :初始化 HystrixConcurrencyStrategy 。
- 第 13 行 :初始化
queueSize。 - 第 16 至 18 行 :TODO 【2002】【metrics】
- 第 17 行 :调用
HystrixConcurrencyStrategy#getThreadPool(...)方法,初始化 ThreadPoolExecutor 。
- 第 17 行 :调用
- 第 21 行 : 获得 ThreadPoolExecutor 。
- 第 22 行 : 获得 ThreadPoolExecutor 的队列。
- 第 26 行 :TODO 【2002】【metrics】
#getExecutor() 方法,代码如下 :
@Override
public ThreadPoolExecutor getExecutor(){
touchConfig();
return threadPool;
}
- 调用
#touchConfig()方法, 动态 调整threadPool的coreSize/maximumSize/keepAliveTime参数。点击 链接 查看该方法。
#getScheduler() / #getScheduler(Func0<Boolean>) 方法,代码如下 :
@Override
public Scheduler getScheduler(){
//by default, interrupt underlying threads on timeout
return getScheduler(new Func0<Boolean>() {
@Override
public Boolean call(){
return true;
}
});
}
@Override
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread){
touchConfig();
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}
- HystrixContextScheduler 和
shouldInterruptThread都在「6. HystrixContextScheduler」详细解析。
#isQueueSpaceAvailable() 方法,代码如下 :
@Override
public boolean isQueueSpaceAvailable(){
if (queueSize <= 0) {
// we don't have a queue so we won't look for space but instead
// let the thread-pool reject or not
return true;
} else {
return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();
}
}
- 由于线程池的队列大小不能 动态 调整,该方法的 实现 通过
HystrixThreadPoolProperties.queueSizeRejectionThreshold属性控制。 - 注意
queueSize属性,决定了线程池的队列类型。-
queueSize <= 0时,#isQueueSpaceAvailable()都返回true的原因是,线程池使用 SynchronousQueue 作为队列,不支持 新 任务排队,任务超过线程池的maximumPoolSize时,新任务被拒绝。 -
queueSize > 0时,#isQueueSpaceAvailable()根据情况true/false的原因是,线程池使用 LinkedBlockingQueue 作为队列,支持 一定数量 的 阻塞 排队,但是这个数量无法调整。通过#isQueueSpaceAvailable()方法的判断, 动态 调整。另外,初始 配置 的queueSize要 相对大 ,否则即使queueSizeRejectionThreshold配置的大于queueSize,实际提交任务到线程池,也会被 拒绝 。
-
5.2 Factory
com.netflix.hystrix.HystrixThreadPool.Factory ,HystrixThreadPool 工厂类,不仅限于 HystrixThreadPool 的创建,也提供了 HystrixThreadPool 的管理( HystrixThreadPool 的容器 )。
threadPools 属性,维护创建的 HystrixThreadPool 对应的映射,代码如下 :
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
- Key 为
HystrixThreadPoolKey#name(),每个 HystrixThreadPoolKey 对应一个 HystrixThreadPool 对象。
#getInstance(...) 方法,获得 HystrixThreadPool 对象,代码如下 :
/* package */static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder){
// get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
String key = threadPoolKey.name();
// this should find it for all but the first time
HystrixThreadPool previouslyCached = threadPools.get(key);
if (previouslyCached != null) {
return previouslyCached;
}
// if we get here this is the first time so we need to initialize
synchronized (HystrixThreadPool.class) {
if (!threadPools.containsKey(key)) {
threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
}
}
return threadPools.get(key);
}
- 根据
threadPoolKey先从threadPool获取已创建的 HystrixThreadPool ;获取不到,创建对应的 HystrixThreadPool 返回,并添加到threadPool。
#shutdown() / #shutdown(timeout, unit) 方法,比较易懂,点击 链接 查看。
5.3 初始化
在 AbstractCommand 构造方法 里,初始化命令的 threadPool 属性,代码如下 :
protected final HystrixThreadPool threadPool;
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook){
// ... 省略其他代码
// 初始化 threadPoolKey
this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
// 初始化 threadPool
this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
}
- 调用
#initThreadPool(...)方法,获得 HystrixThreadPool ,点击 链接 查看。
6. HystrixScheduler
Hystrix 实现了 自定义的 RxJava Scheduler ,整体类图如下 :
- HystrixContextScheduler ( 实现 RxJava Scheduler 抽象类 ),内嵌类型为 ThreadPoolScheduler ( 实现 RxJava Scheduler 抽象类 )的
actualScheduler属性。 - HystrixContextWorker ( 实现 RxJava Worker 抽象类 ),内嵌类型为 ThreadPoolWorker ( 实现 RxJava Worker 抽象类 )的
worker属性。
6.1 HystrixContextScheduler
构造方法,代码如下 :
public class HystrixContextScheduler extends Scheduler{
private final HystrixConcurrencyStrategy concurrencyStrategy;
private final Scheduler actualScheduler;
private final HystrixThreadPool threadPool;
// ... 省略无关代码
public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread){
this.concurrencyStrategy = concurrencyStrategy;
this.threadPool = threadPool;
this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
}
}
-
actualScheduler属性,类型为 ThreadPoolScheduler 。
#createWorker() 方法,代码如下 :
@Override
public Worker createWorker(){
return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}
- 使用
actualScheduler创建 ThreadPoolWorker ,传参给 HystrixContextSchedulerWorker 。
6.2 HystrixContextSchedulerWorker
构造方法,代码如下 :
private class HystrixContextSchedulerWorker extends Worker{
private final Worker worker;
// ... 省略无关代码
private HystrixContextSchedulerWorker(Worker actualWorker){
this.worker = actualWorker;
}
}
-
worker属性,类型为 ThreadPoolWorker 。
#schedule(Action0) 方法,代码如下 :
@Override
public Subscription schedule(Action0 action){
if (threadPool != null) {
if (!threadPool.isQueueSpaceAvailable()) {
throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
}
}
return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
}
- 调用
ThreadPool#isQueueSpaceAvailable()方法,判断线程池队列是否有 空余 。这个就是 HystrixContextScheduler 的 实际 用途。
#unsubscribe() / #isUnsubscribed() 方法,使用 worker 判断,点击 链接 查看。
6.3 ThreadPoolScheduler
ThreadPoolScheduler 比较简单,点击 链接 查看。
6.4 ThreadPoolWorker
构造方法,代码如下 :
private static class ThreadPoolWorker extends Worker{
private final HystrixThreadPool threadPool;
private final CompositeSubscription subscription = new CompositeSubscription();
private final Func0<Boolean> shouldInterruptThread;
// ... 省略无关代码
public ThreadPoolWorker(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread){
this.threadPool = threadPool;
this.shouldInterruptThread = shouldInterruptThread;
}
}
-
subscription属性,订阅信息。
#schedule(Action0) 方法,代码如下 :
1: @Override
2: public Subscription schedule(final Action0 action){
3: // 未订阅,返回
4: if (subscription.isUnsubscribed()) {
5: // don't schedule, we are unsubscribed
6: return Subscriptions.unsubscribed();
7: }
8:
9: // 创建 ScheduledAction
10: // This is internal RxJava API but it is too useful.
11: ScheduledAction sa = new ScheduledAction(action);
12:
13: // 添加到 订阅
14: subscription.add(sa);
15: sa.addParent(subscription);
16:
17: // 提交 任务
18: ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
19: FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
20: sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
21:
22: return sa;
23: }
- 第 4 至 7 行 :未订阅,返回。
- 第 11 行 : 创建 ScheduledAction 。在 TODO 【2013】【ScheduledAction】 详细解析。
- 第 14 至 15 行 :添加到订阅(
subscription)。 - 第 18 至 20 行 :使用
threadPool,提交任务,并创建 FutureCompleterWithConfigurableInterrupt 添加到订阅(sa)。 - 第 22 行 :返回订阅(
sa)。整体订阅关系如下 :
#unsubscribe() / #isUnsubscribed() 方法,使用 subscription 判断,点击 链接 查看。
6.5 FutureCompleterWithConfigurableInterrupt
com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler.FutureCompleterWithConfigurableInterrupt ,实现类似 rx.internal.schedulers.ScheduledAction.FutureCompleter ,在它的基础上,支持配置 FutureTask#cancel(Boolean) 是否可 打断 运行( mayInterruptIfRunning )。
构造方法,代码如下 :
private static class FutureCompleterWithConfigurableInterrupt implements Subscription{
private final FutureTask<?> f;
private final Func0<Boolean> shouldInterruptThread;
private final ThreadPoolExecutor executor;
// ... 省略无关代码
private FutureCompleterWithConfigurableInterrupt(FutureTask<?> f, Func0<Boolean> shouldInterruptThread, ThreadPoolExecutor executor){
this.f = f;
this.shouldInterruptThread = shouldInterruptThread;
this.executor = executor;
}
}
当命令执行超时,或是主动取消命令执行时,调用 #unsubscribe() 方法,取消执行 。
当命令执行超时,或是主动取消命令执行时,调用 #unsubscribe() 方法,取消执行 。
当命令执行超时,或是主动取消命令执行时,调用 #unsubscribe() 方法,取消执行 。
#unsubscribe() 方法,代码如下 :
@Override
public void unsubscribe(){
// 从 线程池 移除 任务
executor.remove(f);
// 根据 shouldInterruptThread 配置,是否强制取消
if (shouldInterruptThread.call()) {
f.cancel(true);
} else {
f.cancel(false);
}
}
- 根据
shouldInterruptThread方法,判断是否 强制 取消。 -
shouldInterruptThread对应的方法,实现代码如下 :
subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call(){
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
- 当
executionIsolationThreadInterruptOnTimeout = true时,命令可执行 超时 。当命令可执行 超时 时, 强制 取消。 - 当使用
HystrixCommand.queue()返回的 Future ,可以使用Future#cancel(Boolean)取消命令执行。从shouldInterruptThread对应的方法可以看到,如果此时不满足命令执行 超时 的条件,命令执行取消的方式是 非强制 的。此时当executionIsolationThreadInterruptOnFutureCancel = true时,并且调用Future#cancel(Boolean)传递mayInterruptIfRunning = true,强制取消命令执行。- 模拟测试用例 :
CommandHelloWorld#testAsynchronous3() -
HystrixCommand#queue():点击 链接 查看Future#cancel(Boolean)方法。
- 模拟测试用例 :
666. 彩蛋
一边写一边想明白了 RxJava 的一些东西,挺舒服的赶脚。
继续 Go On ~ 周末嗨不停。
胖友,分享一波朋友圈可好!
以上所述就是小编给大家介绍的《熔断器 Hystrix 源码解析 —— 命令执行(二)之执行隔离策略》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 熔断器 Hystrix 源码解析 —— 执行命令方式
- [译] Istio 熔断器解析
- Hystrix系列之熔断器实现原理
- [译] 3 分钟简述熔断器使用方法
- 不只是容错-从熔断器看有限状态机
- 服务容错模式:舱壁模式、熔断器的异同点
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
代码大全(第2版)
[美] 史蒂夫·迈克康奈尔 / 金戈、汤凌、陈硕、张菲 译、裘宗燕 审校 / 电子工业出版社 / 2006-3 / 128.00元
第2版的《代码大全》是著名IT畅销书作者史蒂夫·迈克康奈尔11年前的经典著作的全新演绎:第2版不是第一版的简单修订增补,而是完全进行了重写;增加了很多与时俱进的内容。这也是一本完整的软件构建手册,涵盖了软件构建过程中的所有细节。它从软件质量和编程思想等方面论述了软件构建的各个问题,并详细论述了紧跟潮流的新技术、高屋建瓴的观点、通用的概念,还含有丰富而典型的程序示例。这本书中所论述的技术不仅填补了初......一起来看看 《代码大全(第2版)》 这本书的介绍吧!
JS 压缩/解压工具
在线压缩/解压 JS 代码
URL 编码/解码
URL 编码/解码