内容简介:作者 | 赵云翔杏仁后端工程师,关注如何优雅写 bug。
作者 | 赵云翔
杏仁后端工程师,关注如何优雅写 bug。
前言
Hystrix 是非常优秀的一款熔断限流框架,由 Netflix 开源,很多公司都在使用,我司也如此,其上手简单且具备可视化监控平台。 目前 Hystrix 已经不再维护,但还是值得学习借鉴。 虽然 Hystrix 是使用 RxJava 进行编写的,但并不妨碍理解逻辑。 本文主要介绍 HystrixCommand 的执行过程。
Hystrix 简单介绍
-
什么是 Hystrix?
Hystrix 是一个容错库,旨在隔离对远程系统、服务和第三方库的访问点,停止级联故障,并在错误不可避免的复杂分布式系统中能够弹性恢复。
-
核心概念
-
Command 命令
Command 是 Hystrix 的入口,对用户来说,我们只需要创建对应的 command,将需要保护的接口包装起来就可以。 可以无需关注再之后的逻辑。 与 Spring 深度集成后还可以通过注解的方式,就更加对开发友好了。
-
Circuit Breaker 断路器
断路器 ,是从电气领域引申过来的概念,具有 过载 、 短路 和 欠电压保护 功能,有保护线路和电源的能力。 在 Hystrix 中即为当请求超过一定比例响应失败时,hystrix 会对请求进行拦截处理,保证服务的稳定性,以及防止出现服务之间级联雪崩的可能性。
-
Isolation 隔离策略
隔离策略是 Hystrix 的设计亮点所在,利用
舱壁模式
的思想来对访问的资源进行隔离,每个资源是独立的依赖,单个资源的异常不应该影响到其他。 Hystrix 的隔离策略目前有两种: 线程池隔离 , 信号量隔离 。 -
Hystrix 的运行流程
官方的
How it Works
对流程有很详细的介绍,图示清晰,相信看完流程图就能对运行流程有一定的了解。
一次 Command 执行
HystrixCommand
是标准的 命令模式
实现,每一次请求即为一次命令的创建执行经历的过程。 从上 Hystrix 流程图
可以看出创建流程最终会指向 toObservable
, Observable
即为被观察者,作用是发送数据给观察者进行相应的,因此可以知道这个方法应该是较为关键的。
UML
-
HystrixInvokable 标记这个一个可执行的接口,没有任何抽象方法或常量。
-
HystrixExecutable 是为
HystrixCommand
设计的接口,主要提供执行命令的抽象方法,例如:execute()
,queue()
,observe()
。 -
HystrixObservable 是为
Observable
设计的接口,主要提供自动订阅 (observe()
) 和生成 Observable (toObservable()
) 的抽象方法。 -
HystrixInvokableInfo 提供大量的状态查询 (获取属性配置,是否开启断路器等)。
-
AbstractCommand 核心逻辑 的实现。
-
HystrixCommand 定制逻辑实现以及留给用户实现的接口 (比如:
run()
)。
样例代码
通过新建一个 command 来看 Hystrix 是如何创建并执行的。 HystrixCommand 是一个抽象类,其中有一个 run
方法需要我们实现自己的业务逻辑,以下是偷懒采用匿名内部类的形式呈现。 构造方法的内部实现我们就不关注了,直接看下执行的逻辑吧。
HystrixCommand demo = new HystrixCommand<String>(HystrixCommandGroupKey.Factory.asKey ("demo-group")) { @Override protected String run() { return "Hello World~"; } }; demo.execute();
执行过程
流程图
这是官方给出的一次完整调用的链路。 上述的 demo 中我们直接调用了 execute
方法,所以调用的路径为 execute()
-> queue()
-> toObservable()
-> toBlocking()
-> toFuture()
-> get()
。 核心的逻辑其实就在 toObservable()
中。
HystrixCommand.java
execute()
execute
方法为同步调用返回结果,并对异常作处理。 内部会调用 queue
。
// 同步调用执行 public R execute() { try { //queue() 返回的是 Future 类型的对象,所以这里是阻塞 get return queue().get(); } catch (Exception e) { throw decomposeException(e); } }
queue()
queue
的第一行代码完成了核心的订阅逻辑.
-
toObservable()
生成了 Hystrix 的 Observable 对象。 -
将
Observable
转换为BlockingObservable
可以阻塞控制数据发送。 -
toFuture
实现对BlockingObservable
的订阅。
public Future<R> queue() { // 着重关注的是这行代码 // 完成了 Observable 的创建及订阅 //toBlocking() 是将 Observable 转为 BlockingObservable, 转换后的 Observable 可以阻塞数据的发送 final Future<R> delegate = toObservable().toBlocking().toFuture(); final Future<R> f = new Future<R>() { // 由于 toObservable().toBlocking().toFuture() 返回的 Future 如果中断了, // 不会对当前线程进行中断,所以这里将返回的 Future 进行了再次包装,处理异常逻辑 ... } // 判断是否已经结束了,有异常则直接抛出 if (f.isDone()) { try { f.get(); return f; } catch (Exception e) { // 省略这段判断 } } return f; }
BlockingObservable.java
// 被包装的 Observable private final Observable<? extends T> o; //toBlocking() 会调用该静态方法将 源 Observable 简单包装成 BlockingObservable public static <T> BlockingObservable<T> from (final Observable<? extends T> o) { return new BlockingObservable<T>(o); } public Future<T> toFuture() { return BlockingOperatorToFuture.toFuture ((Observable<T>) o); }
BlockingOperatorToFuture.java
ReactiveX 关于 toFuture 的解读
The toFuture
operator applies to the BlockingObservable
subclass, so in order to use it, you must first convert your source Observable into a BlockingObservable
by means of either the BlockingObservable.from
method or the Observable.toBlocking
operator.
toFuture
只能作用于 BlockingObservable
所以也才会有上文想要转换为 BlockingObservable 的操作。
// 该操作将源 Observable 转换为返回单个数据项的 Future public static <T> Future<T> toFuture (Observable<? extends T> that) { // CountDownLatch 判断是否完成 final CountDownLatch finished = new CountDownLatch (1); // 存储执行结果 final AtomicReference<T> value = new AtomicReference<T>(); // 存储错误结果 final AtomicReference<Throwable> error = new AtomicReference<Throwable>(); //single() 方法可以限制 Observable 只发送单条数据 // 如果有多条数据 会抛 IllegalArgumentException // 如果没有数据可以发送 会抛 NoSuchElementException @SuppressWarnings ("unchecked") final Subscription s = ((Observable<T>) that).single().subscribe (new Subscriber<T>() { //single() 返回的 Observable 就可以对其进行标准的处理了 @Override public void onCompleted() { finished.countDown(); } @Override public void onError (Throwable e) { error.compareAndSet (null, e); finished.countDown(); } @Override public void onNext (T v) { // "single" guarantees there is only one "onNext" value.set (v); } }); // 最后将 Subscription 返回的数据封装成 Future, 实现对应的逻辑 return new Future<T>() { // 可以查看源码 }; }
AbstractCommand.java
AbstractCommand
是 toObservable
实现的地方,属于 Hystrix 的核心逻辑,代码较长,可以和方法调用的流程图一起食用。 toObservable
主要是完成缓存和创建 Observable,requestLog 的逻辑,当第一次创建 Observable 时, applyHystrixSemantics
方法是 Hystrix 的最最核心的实现,可以跳着看。
tips : 下文中有很多 Action 和 Function,他们很相似,都有 call 方法,但是区别在于 Function 有返回值,而 Action 没有,方法后跟着的数字代表有几个入参。 Func0/Func3 即没有入参和有三个入参。
toObservable
toObservable
代码较长且分层还是清晰的,所以下面一块一块写。 其逻辑和文章开始提到的 Hystrix 流程图
是完全一致的。
public Observable<R> toObservable() { final AbstractCommand<R> _cmd = this; // 此处省略掉了很多个 Action 和 Function, 大部分是来做扫尾清理的函数,所以用到的时候再说 //defer 在上篇 rxjava 入门中提到过,是一种创建型的操作符,每次订阅时会产生新的 Observable, 回调方法中所实现的才是真正我们需要的 Observable return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { // 校验命令的状态,保证其只执行一次 if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) { IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance."); //TODO make a new error type for this throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + "command executed multiple times - this is not permitted.", ex, null); } commandStartTimestamp = System.currentTimeMillis(); //properties 为当前 command 的所有属性 // 允许记录请求 log 时会保存当前执行的 command if (properties.requestLogEnabled().get()) { //log this command execution regardless of what happened if (currentRequestLog != null) { currentRequestLog.addExecutedCommand(_cmd); } } // 是否开启了请求缓存 final boolean requestCacheEnabled = isRequestCachingEnabled(); // 获取缓存 key final String cacheKey = getCacheKey(); // 开启缓存后,尝试从缓存中取 if (requestCacheEnabled) { HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey); if (fromCache != null) { isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } // 没有开启请求缓存时,就执行正常的逻辑 Observable<R> hystrixObservable = // 这里又通过 defer 创建了我们需要的 Observable Observable.defer(applyHystrixSemantics) // 发送前会先走一遍 hook, 默认 executionHook 是空实现的,所以这里就跳过了 .map(wrapWithAllOnNextHooks); // 得到最后的封装好的 Observable 后,将其放入缓存 if (requestCacheEnabled && cacheKey != null) { //wrap it for caching HystrixCachedObservable<R> toCache = HystrixCachedObservable.from (hystrixObservable, _cmd); HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache); if (fromCache != null) { //another thread beat us so we'll use the cached value instead toCache.unsubscribe(); isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } else { //we just created an ObservableCommand so we cast and return it afterCache = toCache.toObservable(); } } else { afterCache = hystrixObservable; } return afterCache // 终止时的操作 .doOnTerminate(terminateCommandCleanup) //perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line)) // 取消订阅时的操作 .doOnUnsubscribe(unsubscribeCommandCleanup) //perform cleanup once // 完成时的操作 .doOnCompleted(fireOnCompletedHook); } }
handleRequestCacheHitAndEmitValues
缓存击中时的处理
private Observable<R> handleRequestCacheHitAndEmitValues(final HystrixCommandResponseFromCache<R> fromCache, final AbstractCommand<R> _cmd) { try { // Hystrix 中有大量的 hook 如果有心做二次开发的,可以利用这些 hook 做到很完善的监控 executionHook.onCacheHit(this); } catch (Throwable hookEx) { logger.warn ("Error calling HystrixCommandExecutionHook.onCacheHit", hookEx); } // 将缓存的结果赋给当前 command return fromCache.toObservableWithStateCopiedInto (this) //doOnTerminate 或者是后面看到的 doOnUnsubscribe,doOnError, 都指的是在响应 onTerminate/onUnsubscribe/onError 后的操作,即在 Observable 的生命周期上注册一个动作优雅的处理逻辑 .doOnTerminate (new Action0() { @Override public void call() { // 命令最终状态的不同进行不同处理 if (commandState.compareAndSet (CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) { cleanUpAfterResponseFromCache (false); //user code never ran } else if (commandState.compareAndSet (CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) { cleanUpAfterResponseFromCache (true); //user code did run } } }) .doOnUnsubscribe (new Action0() { @Override public void call() { // 命令最终状态的不同进行不同处理 if (commandState.compareAndSet (CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) { cleanUpAfterResponseFromCache (false); //user code never ran } else if (commandState.compareAndSet (CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) { cleanUpAfterResponseFromCache (true); //user code did run } } }); }
applyHystrixSemantics
因为本片文章的主要目的是在讲执行流程,所以失败回退和断路器相关的就留到以后的文章中再写。
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() { @Override public Observable<R> call() { // 不再订阅了就返回不发送数据的 Observable if (commandState.get().equals (CommandState.UNSUBSCRIBED)) { // 不发送任何数据或通知 return Observable.never(); } return applyHystrixSemantics (_cmd); } }; private Observable<R> applyHystrixSemantics (final AbstractCommand<R> _cmd) { // 标记开始执行的 hook // 如果 hook 内抛异常了,会快速失败且没有 fallback 处理 executionHook.onStart (_cmd); /* determine if we're allowed to execute */ // 断路器核心逻辑:判断是否允许执行 (TODO) if (circuitBreaker.allowRequest()) { // Hystrix 自己造的信号量轮子,之所以不用 juc 下,官方解释为 juc 的 Semphore 实现太复杂,而且没有动态调节的信号量大小的能力,简而言之,不满足需求! // 根据不同隔离策略 (线程池隔离 / 信号量隔离) 获取不同的 TryableSemphore final TryableSemaphore executionSemaphore = getExecutionSemaphore(); // Semaphore 释放标志 final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean (false); // 释放信号量的 Action final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet (false, true)) { executionSemaphore.release(); } } }; // 异常处理 final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() { @Override public void call (Throwable t) { // HystrixEventNotifier 是 hystrix 的插件,不同的事件发送不同的通知,默认是空实现. eventNotifier.markEvent (HystrixEventType.EXCEPTION_THROWN, commandKey); } }; // 线程池隔离的 TryableSemphore 始终为 true if (executionSemaphore.tryAcquire()) { try { /* used to track userThreadExecutionTime */ //executionResult 是一次命令执行的结果信息封装 // 这里设置起始时间是为了记录命令的生命周期,执行过程中会 set 其他属性进去 executionResult = executionResult.setInvocationStartTime (System.currentTimeMillis()); return executeCommandAndObserve (_cmd) // 报错时的处理 .doOnError (markExceptionThrown) // 终止时释放 .doOnTerminate (singleSemaphoreRelease) // 取消订阅时释放 .doOnUnsubscribe (singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error (e); } } else { //tryAcquire 失败后会做 fallback 处理,TODO return handleSemaphoreRejectionViaFallback(); } } else { // 断路器短路 (拒绝请求) fallback 处理 TODO return handleShortCircuitViaFallback(); } }
executeCommandAndObserve
/** * 执行 run 方法的地方 */ private Observable<R> executeCommandAndObserve (final AbstractCommand<R> _cmd) { // 获取当前上下文 final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread(); // 发送数据时的 Action 响应 final Action1<R> markEmits = new Action1<R>() { @Override public void call (R r) { // 如果 onNext 时需要上报时,做以下处理 if (shouldOutputOnNextEvents()) { //result 标记 executionResult = executionResult.addEvent (HystrixEventType.EMIT); // 通知 eventNotifier.markEvent (HystrixEventType.EMIT, commandKey); } //commandIsScalar 是一个我不解的地方,在网上也没有查到好的解释 // 该方法为抽象方法,有 HystrixCommand 实现返回 true.HystrixObservableCommand 返回 false if (commandIsScalar()) { // 耗时 long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); // 通知 eventNotifier.markCommandExecution (getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList()); eventNotifier.markEvent (HystrixEventType.SUCCESS, commandKey); executionResult = executionResult.addEvent ((int) latency, HystrixEventType.SUCCESS); // 断路器标记成功 (断路器半开时的反馈,决定是否关闭断路器) circuitBreaker.markSuccess(); } } }; final Action0 markOnCompleted = new Action0() { @Override public void call() { if (!commandIsScalar()) { // 同 markEmits 类似处理 } } }; // 失败回退的逻辑 final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() { @Override public Observable<R> call (Throwable t) { // 不是重点略过了 } }; // 请求上下文的处理 final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() { @Override public void call (Notification<? super R> rNotification) { setRequestContextIfNeeded (currentRequestContext); } }; Observable<R> execution; // 如果有执行超时限制,会将包装后的 Observable 再转变为支持 TimeOut 的 if (properties.executionTimeoutEnabled().get()) { // 根据不同的隔离策略包装为不同的 Observable execution = executeCommandWithSpecifiedIsolation (_cmd) //lift 是 rxjava 中一种基本操作符 可以将 Observable 转换成另一种 Observable // 包装为带有超时限制的 Observable .lift (new HystrixObservableTimeoutOperator<R>(_cmd)); } else { execution = executeCommandWithSpecifiedIsolation (_cmd); } return execution.doOnNext (markEmits) .doOnCompleted (markOnCompleted) .onErrorResumeNext (handleFallback) .doOnEach (setRequestContext); }
executeCommandWithSpecifiedIsolation
根据不同的隔离策略创建不同的执行 Observable
private Observable<R> executeCommandWithSpecifiedIsolation (final AbstractCommand<R> _cmd) { if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { //mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE) return Observable.defer (new Func0<Observable<R>>() { @Override public Observable<R> call() { // 由于源码太长,这里只关注正常的流程,需要详细了解可以去看看源码 if (threadState.compareAndSet (ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) { try { return getUserExecutionObservable (_cmd); } catch (Throwable ex) { return Observable.error (ex); } } else { //command has already been unsubscribed, so return immediately return Observable.error (new RuntimeException ("unsubscribed before executing run()")); } }}) .doOnTerminate (new Action0() {}) .doOnUnsubscribe (new Action0() {}) // 指定在某一个线程上执行,是 rxjava 中很重要的线程调度的概念 .subscribeOn (threadPool.getScheduler (new Func0<Boolean>() { })); } else { // 信号量隔离策略 return Observable.defer (new Func0<Observable<R>>() { // 逻辑与线程池大致相同 }); } }
getUserExecutionObservable
获取用户执行的逻辑
private Observable<R> getUserExecutionObservable (final AbstractCommand<R> _cmd) { Observable<R> userObservable; try { //getExecutionObservable 是抽象方法,有 HystrixCommand 自行实现 userObservable = getExecutionObservable(); } catch (Throwable ex) { //the run() method is a user provided implementation so can throw instead of using Observable.onError //so we catch it here and turn it into Observable.error userObservable = Observable.error (ex); } // 将 Observable 作其他中转 return userObservable .lift (new ExecutionHookApplication (_cmd)) .lift (new DeprecatedOnRunHookApplication (_cmd)); }
lift 操作符
lift 可以转换成一个新的 Observable, 它很像一个代理,将原来的 Observable 代理到自己这里,订阅时通知原来的 Observable 发送数据,经自己这里流转加工处理再返回给订阅者。 Map/FlatMap
操作符底层其实就是用的 lift
进行实现的。
getExecutionObservable
@Override final protected Observable<R> getExecutionObservable() { return Observable.defer (new Func0<Observable<R>>() { @Override public Observable<R> call() { try { //just 操作符就是直接执行的 Observable //run 方法就是我们实现的业务逻辑: Hello World~ return Observable.just (run()); } catch (Throwable ex) { return Observable.error (ex); } } }).doOnSubscribe (new Action0() { @Override public void call() { // 执行订阅时将执行线程记为当前线程,必要时我们可以 interrupt executionThread.set (Thread.currentThread()); } }); }
总结
本文主要对 Hystrix 进行了简单介绍,并通过一次请求的执行结合源码分析了其执行过程。 篇幅所限省略了断路器(CircuitBreaker),失败回退(Fallback)等组件的逻辑,有兴趣的同学可以去拉源码看下。
参考资料:
-
https://github.com/Netflix/Hystrix/wiki/How-it-Works
-
http://reactivex.io/documentation/observable.html
-
https://github.com/ruanyf/document-style-guide
-
https://blog.csdn.net/qq_24530405/article/details/66969886
全文完
以下文章您可能也会感兴趣:
我们正在招聘 Java 工程师,欢迎有兴趣的同学投递简历到 rd-hr@xingren.com 。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Java程序员修炼之道
[英] Benjamin J. Evans、[荷兰] Martijn Verburg / 吴海星 / 人民邮电出版社 / 2013-7 / 89.00元
本书分为四部分,第一部分全面介绍Java 7 的新特性,第二部分探讨Java 关键编程知识和技术,第三部分讨论JVM 上的新语言和多语言编程,第四部分将平台和多语言编程知识付诸实践。从介绍Java 7 的新特性入手,本书涵盖了Java 开发中最重要的技术,比如依赖注入、测试驱动的开发和持续集成,探索了JVM 上的非Java 语言,并详细讲解了多语言项目, 特别是涉及Groovy、Scala 和Cl......一起来看看 《Java程序员修炼之道》 这本书的介绍吧!
CSS 压缩/解压工具
在线压缩/解压 CSS 代码
XML、JSON 在线转换
在线XML、JSON转换工具