Spring Cloud 源码学习之 Hystrix 工作原理

栏目: Java · 发布时间: 5年前

内容简介:参考信息与文中链接请点击 阅读原文,感兴趣可移步PC端阅读,移动端阅读源码很不友好(截图字太小,贴代码排版太乱)。本文学习了 Hystrix 工作原理及源码,关注点在整体处理流程,不涉及具体的实现细节。后续将逐渐写Metrics收集、断路器、隔离、请求缓存等,有兴趣可以关注奥。下面 流程图 来源于 Hystrix Wiki,展现了 Hystrix 工作原理,官方 Wiki 中对每一步都做了详细的描述,可以直接参考。

参考信息与文中链接请点击 阅读原文,感兴趣可移步PC端阅读,移动端阅读源码很不友好(截图字太小,贴代码排版太乱)。

本文学习了 Hystrix 工作原理及源码,关注点在整体处理流程,不涉及具体的实现细节。后续将逐渐写Metrics收集、断路器、隔离、请求缓存等,有兴趣可以关注奥。

下面 流程图 来源于 Hystrix Wiki,展现了 Hystrix 工作原理,官方 Wiki 中对每一步都做了详细的描述,可以直接参考。

Spring Cloud 源码学习之 Hystrix 工作原理

文中源码基于 Spring Cloud  Finchley.SR1 、Spring Boot  2.0.6.RELEASE .

工作原理简述

当需要完成某项任务时,通过 Hystrix 将任务包裹起来,交由 Hystrix 来完成任务,从而享受 Hystrix 带来保护。这和古代镖局生意有点类似,将任务委托给镖局,以期安全完成任务。

上图展示了 Hystrix 完成任务的处理流程,下面对1到9步骤进行简述:

1.构建命令

Hystrix 提供了两个Command,  HystrixCommand  和  HystrixObservableCommand ,可以使用这两个对象来包裹待执行的任务。

例如使用  @HystrixCommand 注解标记方法,Hystrix 将利用AOP自动将目标方法包装成HystrixCommand来执行。

@HystrixCommand
public String hello() {
   ...
}

也可以继承HystrixCommand或HystrixObservableCommand来创建Command,例如:

public class MyCommand extends HystrixCommand {
 
   public MyCommand(HystrixCommandGroupKey group) {
       super(group);
   }
 
   @Override
   protected Object run() throws Exception {
       // 需要做的事情及需要返回的结果
       return null;
   }
}

任务委托给 Hystrix 后,Hystrix 可以应用自己的一系列保护机制,在执行用户任务的各节点(执行前、执行后、异常、超时等)做一系列的事情。

2.执行命令

有四种方式执行command。

  • R execute():同步执行,从依赖服务得到单一结果对象

  • Future  queue() :异步执行,返回一个 Future 以便获取执行结果,也是单一结果对象

  • Observable  observe() :hot observable,创建Observable后会订阅Observable,可以返回多个结果

  • Observable  toObservable() :cold observable,返回一个Observable,只有订阅时才会执行,可以返回多个结果

execute()的实现为  queue().get() ;  queue() 的实现为  toObservable().toBlocking().toFuture()

最后Obserable都由toObservable()来创建,本文的主要内容就是toObservable()。

// 利用queue()拿到Future, 执行 get()同步等待拿到执行结果
public R execute() {
   ...
   return queue().get();
}
 
// 利用toObservable()得到Observable最后转成Future
public Future<R> queue() {
   final Future<R> delegate = toObservable().toBlocking().toFuture();
   ...
}
 
// 利用toObservable()得到Observable并直接订阅它,立即执行命令
public Observable<R> observe() {
   ReplaySubject<R> subject = ReplaySubject.create();
   final Subscription sourceSubscription = toObservable().subscribe(subject);
   ...
}

3.检查缓存

第3到9步骤构成了 Hystrix 的保护能力,通过这一些列步骤来执行任务,从而起到保护作用。

如果启用了 Hystrix Cache,任务执行前将先判断是否有相同命令执行的缓存。如果有则直接返回缓存的结果;如果没有缓存的结果,但启动了缓存,将缓存本次执行结果以供后续使用。

4.检查断路器是否打开

断路器(circuit-breaker)和保险丝类似,保险丝在发生危险时将会烧断以保护电路,而断路器可以在达到我们设定的阀值时触发短路(比如请求失败率达到50%),拒绝执行任何请求。

如果断路器被打开,Hystrix 将不会执行命令,直接进入Fallback处理逻辑。

5.检查线程池/信号量情况

Hystrix 隔离方式有线程池隔离和信号量隔离。当使用Hystrix线程池时,Hystrix 默认为每个依赖服务分配10个线程,当10个线程都繁忙时,将拒绝执行命令。信号量同理。

6.执行具体的任务

通过 HystrixObservableCommand.construct()  或者  HystrixCommand.run()  来运行用户真正的任务。

7.计算链路健康情况

每次开始执行command、结束执行command以及发生异常等情况时,都会记录执行情况,例如:成功、失败、拒绝以及超时等情况,会定期处理这些数据,再根据设定的条件来判断是否开启断路器。

8.命令失败时执行 Fallback 逻辑

在命令失败时执行用户指定的 Fallback 逻辑。上图中的断路、线程池拒绝、信号量拒绝、执行执行、执行超时都会进入 Fallback 处理。

9.返回执行结果

原始结果将以Observable形式返回,在返回给用户之前,会根据调用方式的不同做一些处理。

下面是 Hystrix Return flow。

Spring Cloud 源码学习之 Hystrix 工作原理

源码学习

小故事

由于最终入口都是  toObservable() ,就从 AbstractCommand的  Observable<R> toObservable()  方法开始。

Hystrix 使用观察者模式, Observable 即被观察者,被观察者状态变更时,观察者可以做出各项响应。举个例子:大厅中一位演讲者正在分享,厅中有观众和工作人员,可能发生如下事情:

被观察者 事件            观察者
-----------------------------------
演讲者 分享到精彩处 -> 观众鼓掌
演讲者 讲的口干舌燥 -> 工作人员递上一瓶水
演讲者 放出自己的二维码 -> 观众扫描

因为 Hystrix 基于RxJava,RxJava 初次看会比较复杂。为了便于下文理解,可以将Observable理解为数据源、数据发射器,上面例子中,演讲者各种行为都可以抽象为数据源在发射数据,而各种接收者可以做出各种响应。

toObservable()

toObservable()主要源码如下:

public Observable<R> toObservable() {
   final AbstractCommand<R> _cmd = this;
   // 命令执行结束后的清理者
   final Action0 terminateCommandCleanup = new Action0() {...};
   // 取消订阅时处理者
   final Action0 unsubscribeCommandCleanup = new Action0() {...};
   // 重点:Hystrix 核心逻辑: 断路器、隔离
   final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {...};
   // 发射数据(OnNext表示发射数据)时的Hook
   final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {...};
   // 命令执行完成的Hook
   final Action0 fireOnCompletedHook = new Action0() {...};
 
   // 通过Observable.defer()创建一个Observable
   return Observable.defer(new Func0<Observable<R>>() {
       @Override
       public Observable<R> call() {
           final boolean requestCacheEnabled = isRequestCachingEnabled();
           final String cacheKey = getCacheKey();
 
           // 首先尝试从请求缓存中获取结果
           if (requestCacheEnabled) {
               HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
               if (fromCache != null) {
                   isResponseFromCache = true;
                   return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
               }
           }
 
           // 使用上面的Func0:applyHystrixSemantics 来创建Observable
           Observable<R> hystrixObservable =
                   Observable.defer(applyHystrixSemantics)
                           .map(wrapWithAllOnNextHooks);
 
           Observable<R> afterCache;
 
           // 如果启用请求缓存,将Observable包装成HystrixCachedObservable并进行相关处理
           if (requestCacheEnabled && cacheKey != null) {
               HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
               ...
           } else {
               afterCache = hystrixObservable;
           }
 
           // 返回Observable
           return afterCache
                   .doOnTerminate(terminateCommandCleanup)  
                   .doOnUnsubscribe(unsubscribeCommandCleanup)
                   .doOnCompleted(fireOnCompletedHook);
       }
   });
}

上面的代码可以换种思维方式来理解。平时开发时都是下面这种模式,按顺序不断的做事情,是一个很好的执行者。

public void methodA{
   try {
       // 1. 做第一件事情
       // 2. 调用methodB()做第二件事情
       // 3. 做第三件事情
       ...
   } catch (Exception e) {
       // 处理错误
   } finally {
       // 最后一定要做的事情
   }
}

用一张图来看  toObservable() 方法。这种方式是“军师型”,排兵布阵,先创造了各个处理者,然后创造被观察者,再设置Observable发生各种情况时由谁来处理,完全掌控全局。

Spring Cloud 源码学习之 Hystrix 工作原理

解释下Action0、Func1这种对象。Action、Func和Runnable、Callable类似,是一个可以被执行的实体。Action没有返回值,Action0…ActionN表示有0..N个参数,Action0就表示没有参数;Func有返值,0..N一样表示参数。

public interface Action0 extends Action {
   void call();
}
public interface Func1<T, R> extends Function {
   R call(T t);
}

下面用核心的  applyHystrixSemantics 来阐述一下。

// applyHystrixSemantics 是一个Func0(理解为执行实体或处理者),表示没有参数,返回值是Observable。
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
   // Func0 做的事情如下
   @Override
   public Observable<R> call() {
       // 如果未订阅,返回一个"哑炮" Observable, 即一个不会发射任何数据的Observable
       if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
           return Observable.never();
       }
       // 调用applyHystrixSemantics()来创建Observable
       return applyHystrixSemantics(_cmd);
   }
};

因此,当执行Func0: applyHystrixSemantics时,可以得到一个Observable。 toObservable() 大量代码在准备处理者(观察者),实际使用时是方法最后的  Observable.defer(new Func0<observable >(){…}</observable

Observable.defer

defer译为延迟,表示演讲者会等有观众来时才开始分享。Observable.defer() 就是说:必须有观察者订阅时, Observable 才开始发射数据。而defer()的参数是个Func0,是一个会返回Observable的执行实体。下面看看defer():

return Observable.defer(new Func0<Observable<R>>() {
   @Override
   public Observable<R> call() {
       // 再一次使用Observable.defer()技能,这次用的是applyHystrixSemantics这个Func0
       Observable<R> hystrixObservable =
               Observable.defer(applyHystrixSemantics)
                       .map(wrapWithAllOnNextHooks);
       ... // 此处忽略了请求缓存处理,上面已有提及
       Observable<R> afterCache;
       ...
       // 为Observable绑定几个特定事件的处理者,这都是上门创建的Action0
       return afterCache
               .doOnTerminate(terminateCommandCleanup)
               .doOnUnsubscribe(unsubscribeCommandCleanup)
               .doOnCompleted(fireOnCompletedHook);
   }
});

applyHystrixSemantics()

接着看applyHystrixSemantics这个Func0,Func0的call()中调用的是applyHystrixSemantics()函数。

// Semantics 译为语义, 应用Hystrix语义很拗口,其实就是应用Hystrix的断路器、隔离特性
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
   // 源码中有很多executionHook、eventNotifier的操作,这是Hystrix拓展性的一种体现。这里面啥事也没做,留了个口子,开发人员可以拓展
   executionHook.onStart(_cmd);
 
   // 判断断路器是否开启
   if (circuitBreaker.attemptExecution()) {
       // 获取执行信号
       final TryableSemaphore executionSemaphore = getExecutionSemaphore();
       final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
       final Action0 singleSemaphoreRelease = new Action0() {...};
       final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {...};
 
       // 判断是否信号量拒绝
       if (executionSemaphore.tryAcquire()) {
           try {
               // 重点:处理隔离策略和Fallback策略
               return executeCommandAndObserve(_cmd)
                       .doOnError(markExceptionThrown)
                       .doOnTerminate(singleSemaphoreRelease)
                       .doOnUnsubscribe(singleSemaphoreRelease);
           } catch (RuntimeException e) {
               return Observable.error(e);
           }
       } else {
           return handleSemaphoreRejectionViaFallback();
       }
   }
   // 开启了断路器,执行Fallback
   else {
       return handleShortCircuitViaFallback();
   }
}

executeCommandAndObserve()

下面看 executeCommandAndObserve() 方法,处理隔离策略和各种Fallback.

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
   final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
 
   final Action1<R> markEmits = new Action1<R>() {...};
   final Action0 markOnCompleted = new Action0() {...};
 
   // 利用Func1获取处理Fallback的 Observable
   final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
       @Override
       public Observable<R> call(Throwable t) {
           circuitBreaker.markNonSuccess();
           Exception e = getExceptionFromThrowable(t);
           executionResult = executionResult.setExecutionException(e);
           // 拒绝处理
           if (e instanceof RejectedExecutionException) {
               return handleThreadPoolRejectionViaFallback(e);
           // 超时处理    
           } else if (t instanceof HystrixTimeoutException) {
               return handleTimeoutViaFallback();
           } else if (t instanceof HystrixBadRequestException) {
               return handleBadRequestByEmittingError(e);
           } else {
               ...
               return handleFailureViaFallback(e);
           }
       }
   };
 
   final Action1<Notification<? super R>> setRequestContext ...
 
   Observable<R> execution;
   // 利用特定的隔离策略来处理
   if (properties.executionTimeoutEnabled().get()) {
       execution = executeCommandWithSpecifiedIsolation(_cmd)
               .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
   } else {
       execution = executeCommandWithSpecifiedIsolation(_cmd);
   }
 
   return execution.doOnNext(markEmits)
           .doOnCompleted(markOnCompleted)
           // 绑定Fallback的处理者
           .onErrorResumeNext(handleFallback)
           .doOnEach(setRequestContext);
}

executeCommandWithSpecifiedIsolation()

接着看隔离特性的处理:executeCommandWithSpecifiedIsolation()

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
   // 线程池隔离
   if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
       // 再次使用 Observable.defer(), 通过执行Func0来得到Observable
       return Observable.defer(new Func0<Observable<R>>() {
           @Override
           public Observable<R> call() {
               // 收集metric信息
               metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
               ...
               try {
                     ... // 获取真正的用户Task
                   return getUserExecutionObservable(_cmd);
               } catch (Throwable ex) {
                   return Observable.error(ex);
               }
               ...
           }
           // 绑定各种处理者
       }).doOnTerminate(new Action0() {...})
           .doOnUnsubscribe(new Action0() {...})
           // 绑定超时处理者
           .subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
           @Override
           public Boolean call() {
               return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
           }
       }));
   }
   // 信号量隔离,和线程池大同小异,全部省略了
   else {
       return Observable.defer(new Func0<Observable<R>>() {...}
   }
}

getUserExecutionObservable()就不接着写了,可以自己看下,就是拿到用户真正要执行的任务。这个任务就是这样被Hystrix包裹着,置于层层防护之下。

倒过来看

上面方法层层调用,倒过来看,就是先创建一个Observable,然后绑定各种事件对应的处理者,如下图:

Spring Cloud 源码学习之 Hystrix 工作原理

各类doOnXXXX,表示发生XXX事件时做什么事情。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

交易系统

交易系统

武剑锋 / 上海人民出版社 / 2011-1 / 32.00元

《交易系统:更新与跨越》是中国第一部研究证券交易系统的专业著作,填补了这一领域的学术空白。既回顾和总结了系统规划、建设和上线过程中,技术管理、架构设计、应用调优、切换部署、运行维护等方面的经验和教训,也从较为宏观的角度描述了独具中国特色的交易技术支撑体系,特别是,通过分析其他资本市场交易系统的近年来发展历程和后续的技术发展规划,探索了未来业务创新和技术创新的大致框架和可能的模式。相信《交易系统:更......一起来看看 《交易系统》 这本书的介绍吧!

MD5 加密
MD5 加密

MD5 加密工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试