熔断器 Hystrix 源码解析 —— 命令执行(二)之执行隔离策略

栏目: Java · 发布时间: 6年前

内容简介:熔断器 Hystrix 源码解析 —— 命令执行(二)之执行隔离策略

摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-execute-second-isolation-strategy/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Hystrix 1.5.X 版本

熔断器 Hystrix 源码解析 —— 命令执行(二)之执行隔离策略

关注 微信公众号:【芋道源码】 有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有 源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言 将得到 认真 回复。 甚至不知道如何读源码也可以请教噢
  4. 新的 源码解析文章 实时 收到通知。 每周更新一篇左右
  5. 认真的 源码交流微信群。

1. 概述

本文主要分享 Hystrix 命令执行(二)之执行隔离策略

建议 :对 RxJava 已经有一定的了解的基础上阅读本文。

Hystrix 提供两种执行隔离策略( ExecutionIsolationStrategy ) :

两种方式的 优缺点比较 ,推荐阅读 《【翻译】Hystrix文档-实现原理》「依赖隔离」

推荐 Spring Cloud 书籍:

2. HystrixThreadPoolProperties

com.netflix.hystrix.HystrixThreadPoolProperties ,Hystrix 线程池属性配置 抽象类 ,点击 链接 查看,已添加中文注释说明。

com.netflix.hystrix.strategy.properties.HystrixPropertiesThreadPoolDefault ,Hystrix 线程池配置 实现类 ,点击 链接 查看。实际上没什么内容,官方如是说 :

Default implementation of {@link HystrixThreadPoolProperties} using Archaius ( https://github.com/Netflix/archaius )

3. HystrixThreadPoolKey

com.netflix.hystrix.HystrixThreadPoolKey ,Hystrix 线程池标识 接口

FROM HystrixThreadPoolKey 接口注释

A key to represent a {@link HystrixThreadPool} for monitoring, metrics publishing, caching and other such uses.

This interface is intended to work natively with Enums so that implementing code can be an enum that implements this interface.

  • 直白的说 ,希望通过相同的 name ( 标识 ) 获得同 HystrixThreadPoolKey 对象。通过在内部维持一个 name 与 HystrixThreadPoolKey 对象的映射,以达到 枚举 的效果。

HystrixThreadPoolKey 代码如下 :

 1: public interface HystrixThreadPoolKey extends HystrixKey{
 2: class Factory{
 3: private Factory(){
 4: }
 5: 
 6: // used to intern instances so we don't keep re-creating them millions of times for the same key
 7: private static final InternMap<String, HystrixThreadPoolKey> intern
 8: = new InternMap<String, HystrixThreadPoolKey>(
 9: new InternMap.ValueConstructor<String, HystrixThreadPoolKey>() {
10: @Override
11: public HystrixThreadPoolKey create(String key){
12: return new HystrixThreadPoolKeyDefault(key);
13: }
14: });
15: 
16: public static HystrixThreadPoolKey asKey(String name){
17: return intern.interned(name);
18: }
19: 
20: private static class HystrixThreadPoolKeyDefault extends HystrixKeyDefault implements HystrixThreadPoolKey{
21: public HystrixThreadPoolKeyDefault(String name){
22: super(name);
23: }
24: }
25: 
26: /* package-private */ static int getThreadPoolCount(){
27: return intern.size();
28: }
29: }
30: }
  • HystrixThreadPoolKey 实现 com.netflix.hystrix.HystrixKey 接口 ,点击 链接 查看。该接口定义的 #name() 方法,即是上文我们所说的标识( Key )。
  • intern 属性, name 与 HystrixThreadPoolKey 对象的映射,以达到 枚举 的效果。
    • com.netflix.hystrix.util.InternMap ,点击 链接 查看带中文注释的代码。
  • #asKey(name) 方法,从 intern 获得 HystrixThreadPoolKey 对象。
  • #getThreadPoolCount() 方法,获得 HystrixThreadPoolKey 数量。

在 AbstractCommand 构造方法 里,初始化命令的 threadPoolKey 属性,代码如下 :

protected final HystrixThreadPoolKey threadPoolKey;

protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook){

 // ... 省略无关代码

 this.commandGroup = initGroupKey(group);
 this.commandKey = initCommandKey(key, getClass());
 this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
 // 初始化 threadPoolKey
 this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());

}
  • 调用 #initThreadPoolKey(...) 方法,创建最终的 threadPoolKey 属性。代码如下 :

    private static HystrixThreadPoolKey initThreadPoolKey(HystrixThreadPoolKey threadPoolKey, HystrixCommandGroupKey groupKey, String threadPoolKeyOverride){
     if (threadPoolKeyOverride == null) {
     // we don't have a property overriding the value so use either HystrixThreadPoolKey or HystrixCommandGroup
     if (threadPoolKey == null) {
     /* use HystrixCommandGroup if HystrixThreadPoolKey is null */
     return HystrixThreadPoolKey.Factory.asKey(groupKey.name());
     } else {
     return threadPoolKey;
     }
     } else { // threadPoolKeyOverride 可覆盖属性
     // we have a property defining the thread-pool so use it instead
     return HystrixThreadPoolKey.Factory.asKey(threadPoolKeyOverride);
     }
    }
    
    • 优先级 : threadPoolKeyOverride > threadPoolKey > groupKey

4. HystrixConcurrencyStrategy

com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy ,Hystrix 并发策略 抽象类

HystrixConcurrencyStrategy#getThreadPool(...) 方法,代码如下 :

 1: public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties){
 2: final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
 3: 
 4: final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
 5: final int dynamicCoreSize = threadPoolProperties.coreSize().get();
 6: final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
 7: final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
 8: 
 9: final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
10: 
11: if (allowMaximumSizeToDivergeFromCoreSize) {
12: final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
13: if (dynamicCoreSize > dynamicMaximumSize) {
14: logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
15: dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " +
16: dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
17: return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
18: } else {
19: return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
20: }
21: } else {
22: return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
23: }
24: }

com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategyDefault ,Hystrix 并发策略 实现类 。代码如下( 基本没做啥 ) :

public class HystrixConcurrencyStrategyDefault extends HystrixConcurrencyStrategy{

 /**
* 单例
*/
 private static HystrixConcurrencyStrategyDefault INSTANCE = new HystrixConcurrencyStrategyDefault();

 public static HystrixConcurrencyStrategy getInstance(){
 return INSTANCE;
 }

 private HystrixConcurrencyStrategyDefault(){
 }

}

在 AbstractCommand 构造方法 里,初始化命令的 threadPoolKey 属性,代码如下 :

protected final HystrixConcurrencyStrategy concurrencyStrategy;

protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook){

 // ... 省略无关代码
 
 // 初始化 并发策略
 this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
 
}
  • HystrixPlugins ,Hystrix 插件 体系, https://github.com/Netflix/Hystrix/wiki/Plugins 有详细解析。
  • 调用 HystrixPlugins#getConcurrencyStrategy() 获得 HystrixConcurrencyStrategy 对象。默认情况下,使用 HystrixConcurrencyStrategyDefault 。当然你也可以参考 Hystrix 插件体系,实现 自定义 的 HystrixConcurrencyStrategy 实现,以达到 覆写 #getThreadPool()#getBlockingQueue() 等方法。点击 链接 查看该方法代码。

5. HystrixThreadPool

com.netflix.hystrix.HystrixThreadPool ,Hystrix 线程池 接口 。当 Hystrix 命令使用 THREAD 执行隔离策略时, HystrixCommand#run() 方法在 线程池执行 。点击 链接 查看。HystrixThreadPool 定义接口如下 :

  • #getExecutor() :获得 ExecutorService 。
  • #getScheduler() / #getScheduler(Func0<Boolean>) :获得 RxJava Scheduler 。
  • #isQueueSpaceAvailable() :线程池队列是否有 空余
  • #markThreadExecution() / #markThreadCompletion() / #markThreadRejection() :TODO 【2002】【metrics】

5.1 HystrixThreadPoolDefault

com.netflix.hystrix.HystrixThreadPool.HystrixThreadPoolDefault ,Hystrix 线程池 实现类

构造方法,代码如下 :

 1: private final HystrixThreadPoolProperties properties;
 2: private final BlockingQueue<Runnable> queue;
 3: private final ThreadPoolExecutor threadPool;
 4: private final HystrixThreadPoolMetrics metrics;
 5: private final int queueSize;
 6: 
 7: public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults){
 8: // 初始化 HystrixThreadPoolProperties
 9: this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
10: // 获得 HystrixConcurrencyStrategy
11: HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
12: // 队列大小
13: this.queueSize = properties.maxQueueSize().get();
14: 
15: // TODO 【2002】【metrics】
16: this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
17: concurrencyStrategy.getThreadPool(threadPoolKey, properties), // 初始化 ThreadPoolExecutor
18: properties);
19: 
20: // 获得 ThreadPoolExecutor
21: this.threadPool = this.metrics.getThreadPool();
22: this.queue = this.threadPool.getQueue(); // 队列
23: 
24: // TODO 【2002】【metrics】
25: /* strategy: HystrixMetricsPublisherThreadPool */
26: HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
27: }
  • 第 9 行 :初始化 HystrixThreadPoolProperties 。
  • 第 11 行 :初始化 HystrixConcurrencyStrategy 。
  • 第 13 行 :初始化 queueSize
  • 第 16 至 18 行 :TODO 【2002】【metrics】
    • 第 17 行 :调用 HystrixConcurrencyStrategy#getThreadPool(...) 方法,初始化 ThreadPoolExecutor 。
  • 第 21 行 : 获得 ThreadPoolExecutor 。
  • 第 22 行 : 获得 ThreadPoolExecutor 的队列。
  • 第 26 行 :TODO 【2002】【metrics】

#getExecutor() 方法,代码如下 :

@Override
public ThreadPoolExecutor getExecutor(){
 touchConfig();
 return threadPool;
}
  • 调用 #touchConfig() 方法, 动态 调整 threadPoolcoreSize / maximumSize / keepAliveTime 参数。点击 链接 查看该方法。

#getScheduler() / #getScheduler(Func0<Boolean>) 方法,代码如下 :

@Override
public Scheduler getScheduler(){
 //by default, interrupt underlying threads on timeout
 return getScheduler(new Func0<Boolean>() {
 @Override
 public Boolean call(){
 return true;
 }
 });
}

@Override
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread){
 touchConfig();
 return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}
  • HystrixContextScheduler 和 shouldInterruptThread 都在「6. HystrixContextScheduler」详细解析。

#isQueueSpaceAvailable() 方法,代码如下 :

@Override
public boolean isQueueSpaceAvailable(){
 if (queueSize <= 0) {
 // we don't have a queue so we won't look for space but instead
 // let the thread-pool reject or not
 return true;
 } else {
 return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();
 }
}
  • 由于线程池的队列大小不能 动态 调整,该方法的 实现 通过 HystrixThreadPoolProperties.queueSizeRejectionThreshold 属性控制。
  • 注意 queueSize 属性,决定了线程池的队列类型。
    • queueSize <= 0 时, #isQueueSpaceAvailable() 都返回 true 的原因是,线程池使用 SynchronousQueue 作为队列,不支持 任务排队,任务超过线程池的 maximumPoolSize 时,新任务被拒绝。
    • queueSize > 0 时, #isQueueSpaceAvailable() 根据情况 true / false 的原因是,线程池使用 LinkedBlockingQueue 作为队列,支持 一定数量阻塞 排队,但是这个数量无法调整。通过 #isQueueSpaceAvailable() 方法的判断, 动态 调整。另外,初始 配置queueSize相对大 ,否则即使 queueSizeRejectionThreshold 配置的大于 queueSize ,实际提交任务到线程池,也会被 拒绝

5.2 Factory

com.netflix.hystrix.HystrixThreadPool.Factory ,HystrixThreadPool 工厂类,不仅限于 HystrixThreadPool 的创建,也提供了 HystrixThreadPool 的管理( HystrixThreadPool 的容器 )。

threadPools 属性,维护创建的 HystrixThreadPool 对应的映射,代码如下 :

final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
  • Key 为 HystrixThreadPoolKey#name() ,每个 HystrixThreadPoolKey 对应一个 HystrixThreadPool 对象。

#getInstance(...) 方法,获得 HystrixThreadPool 对象,代码如下 :

/* package */static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder){
 // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
 String key = threadPoolKey.name();

 // this should find it for all but the first time
 HystrixThreadPool previouslyCached = threadPools.get(key);
 if (previouslyCached != null) {
 return previouslyCached;
 }

 // if we get here this is the first time so we need to initialize
 synchronized (HystrixThreadPool.class) {
 if (!threadPools.containsKey(key)) {
 threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
 }
 }
 return threadPools.get(key);
}
  • 根据 threadPoolKey 先从 threadPool 获取已创建的 HystrixThreadPool ;获取不到,创建对应的 HystrixThreadPool 返回,并添加到 threadPool

#shutdown() / #shutdown(timeout, unit) 方法,比较易懂,点击 链接 查看。

5.3 初始化

在 AbstractCommand 构造方法 里,初始化命令的 threadPool 属性,代码如下 :

protected final HystrixThreadPool threadPool;

protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook){

 // ... 省略其他代码

 // 初始化 threadPoolKey
 this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
 // 初始化 threadPool
 this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);

}
  • 调用 #initThreadPool(...) 方法,获得 HystrixThreadPool ,点击 链接 查看。

6. HystrixScheduler

Hystrix 实现了 自定义的 RxJava Scheduler ,整体类图如下 :

熔断器 Hystrix 源码解析 —— 命令执行(二)之执行隔离策略

  • HystrixContextScheduler ( 实现 RxJava Scheduler 抽象类 ),内嵌类型为 ThreadPoolScheduler ( 实现 RxJava Scheduler 抽象类 )的 actualScheduler 属性。
  • HystrixContextWorker ( 实现 RxJava Worker 抽象类 ),内嵌类型为 ThreadPoolWorker ( 实现 RxJava Worker 抽象类 )的 worker 属性。

6.1 HystrixContextScheduler

构造方法,代码如下 :

public class HystrixContextScheduler extends Scheduler{

 private final HystrixConcurrencyStrategy concurrencyStrategy;
 private final Scheduler actualScheduler;
 private final HystrixThreadPool threadPool;
 
 // ... 省略无关代码
 
 public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread){
 this.concurrencyStrategy = concurrencyStrategy;
 this.threadPool = threadPool;
 this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
 }
}
  • actualScheduler 属性,类型为 ThreadPoolScheduler 。

#createWorker() 方法,代码如下 :

@Override
public Worker createWorker(){
 return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}
  • 使用 actualScheduler 创建 ThreadPoolWorker ,传参给 HystrixContextSchedulerWorker 。

6.2 HystrixContextSchedulerWorker

构造方法,代码如下 :

private class HystrixContextSchedulerWorker extends Worker{

 private final Worker worker;

 // ... 省略无关代码

 private HystrixContextSchedulerWorker(Worker actualWorker){
 this.worker = actualWorker;
 }
 
}
  • worker 属性,类型为 ThreadPoolWorker 。

#schedule(Action0) 方法,代码如下 :

@Override
public Subscription schedule(Action0 action){
 if (threadPool != null) {
 if (!threadPool.isQueueSpaceAvailable()) {
 throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
 }
 }
 return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
}
  • 调用 ThreadPool#isQueueSpaceAvailable() 方法,判断线程池队列是否有 空余 。这个就是 HystrixContextScheduler 的 实际 用途。

#unsubscribe() / #isUnsubscribed() 方法,使用 worker 判断,点击 链接 查看。

6.3 ThreadPoolScheduler

ThreadPoolScheduler 比较简单,点击 链接 查看。

6.4 ThreadPoolWorker

构造方法,代码如下 :

private static class ThreadPoolWorker extends Worker{

 private final HystrixThreadPool threadPool;
 private final CompositeSubscription subscription = new CompositeSubscription();
 private final Func0<Boolean> shouldInterruptThread;

 // ... 省略无关代码

 public ThreadPoolWorker(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread){
 this.threadPool = threadPool;
 this.shouldInterruptThread = shouldInterruptThread;
 }
}
  • subscription 属性,订阅信息。

#schedule(Action0) 方法,代码如下 :

 1: @Override
 2: public Subscription schedule(final Action0 action){
 3: // 未订阅,返回
 4: if (subscription.isUnsubscribed()) {
 5: // don't schedule, we are unsubscribed
 6: return Subscriptions.unsubscribed();
 7: }
 8: 
 9: // 创建 ScheduledAction
10: // This is internal RxJava API but it is too useful.
11: ScheduledAction sa = new ScheduledAction(action);
12: 
13: // 添加到 订阅
14: subscription.add(sa);
15: sa.addParent(subscription);
16: 
17: // 提交 任务
18: ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
19: FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
20: sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
21: 
22: return sa;
23: }
  • 第 4 至 7 行 :未订阅,返回。
  • 第 11 行 : 创建 ScheduledAction 。在 TODO 【2013】【ScheduledAction】 详细解析。
  • 第 14 至 15 行 :添加到订阅( subscription )。
  • 第 18 至 20 行 :使用 threadPool ,提交任务,并创建 FutureCompleterWithConfigurableInterrupt 添加到订阅( sa )。
  • 第 22 行 :返回订阅( sa )。整体订阅关系如下 : 熔断器 Hystrix 源码解析 —— 命令执行(二)之执行隔离策略

#unsubscribe() / #isUnsubscribed() 方法,使用 subscription 判断,点击 链接 查看。

6.5 FutureCompleterWithConfigurableInterrupt

com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler.FutureCompleterWithConfigurableInterrupt ,实现类似 rx.internal.schedulers.ScheduledAction.FutureCompleter ,在它的基础上,支持配置 FutureTask#cancel(Boolean) 是否可 打断 运行( mayInterruptIfRunning )。

构造方法,代码如下 :

private static class FutureCompleterWithConfigurableInterrupt implements Subscription{
 private final FutureTask<?> f;
 private final Func0<Boolean> shouldInterruptThread;
 private final ThreadPoolExecutor executor;

 // ... 省略无关代码

 private FutureCompleterWithConfigurableInterrupt(FutureTask<?> f, Func0<Boolean> shouldInterruptThread, ThreadPoolExecutor executor){
 this.f = f;
 this.shouldInterruptThread = shouldInterruptThread;
 this.executor = executor;
 }
}

当命令执行超时,或是主动取消命令执行时,调用 #unsubscribe() 方法,取消执行 。

当命令执行超时,或是主动取消命令执行时,调用 #unsubscribe() 方法,取消执行 。

当命令执行超时,或是主动取消命令执行时,调用 #unsubscribe() 方法,取消执行 。

#unsubscribe() 方法,代码如下 :

@Override
public void unsubscribe(){
 // 从 线程池 移除 任务
 executor.remove(f);
 // 根据 shouldInterruptThread 配置,是否强制取消
 if (shouldInterruptThread.call()) {
 f.cancel(true);
 } else {
 f.cancel(false);
 }
}
  • 根据 shouldInterruptThread 方法,判断是否 强制 取消。
  • shouldInterruptThread 对应的方法,实现代码如下 :
subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
 @Override
 public Boolean call(){
 return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
 }
}));
  • executionIsolationThreadInterruptOnTimeout = true 时,命令可执行 超时 。当命令可执行 超时 时, 强制 取消。
  • 当使用 HystrixCommand.queue() 返回的 Future ,可以使用 Future#cancel(Boolean) 取消命令执行。从 shouldInterruptThread 对应的方法可以看到,如果此时不满足命令执行 超时 的条件,命令执行取消的方式是 非强制 的。此时当 executionIsolationThreadInterruptOnFutureCancel = true 时,并且调用 Future#cancel(Boolean) 传递 mayInterruptIfRunning = true ,强制取消命令执行。

666. 彩蛋

一边写一边想明白了 RxJava 的一些东西,挺舒服的赶脚。

继续 Go On ~ 周末嗨不停。

胖友,分享一波朋友圈可好!


以上所述就是小编给大家介绍的《熔断器 Hystrix 源码解析 —— 命令执行(二)之执行隔离策略》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

WWW信息体系结构(影印版第2版)

WWW信息体系结构(影印版第2版)

Louis Rosenfeld / 清华大学出版社 / 2003-6 / 49.8

如今的网站和内联网已经变得比以前越来越大,越来越有价值,而且越来越复杂,同时其用户也变得更忙,也更加不能容忍错误的发生。数目庞大的信息、快速的变化、新兴的技术和公司策略是设计师、信息体系结构构建师和网站管理员必须面对的事情,而这些已经让某些网让看起来像是个快速增长却规划很差的城市——到处都是路,却无法导航。规划精良的信息体系结构当前正是最关键性的。 本书介绍的是如何使用美学和机械学的理念创建......一起来看看 《WWW信息体系结构(影印版第2版)》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具