内容简介:熔断器 Hystrix 源码解析 —— 执行命令方式
摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-execute-mode/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文主要基于 Hystrix 1.5.X 版本
关注 微信公众号:【芋道源码】 有福利:
- RocketMQ / MyCAT / Sharding-JDBC 所有 源码分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
- 您对于源码的疑问每条留言 都 将得到 认真 回复。 甚至不知道如何读源码也可以请教噢 。
- 新的 源码解析文章 实时 收到通知。 每周更新一篇左右 。
- 认真的 源码交流微信群。
1. 概述
本文主要分享 Hystrix 执行命令方法 。
建议 :对 RxJava 已经有一定的了解的基础上阅读本文。
在官方提供的示例中,我们看到 CommandHelloWorld 通过继承 HystrixCommand 抽象类,有四种调用方式:
| 方法 | ||
|---|---|---|
#execute() |
同步 调用,返回 直接 结果 | |
#queue() |
异步 调用,返回 java.util.concurrent.Future |
|
#observe() |
异步 调用,返回 rx.Observable 。向 Observable 注册 rx.Subscriber 处理结果 |
|
#toObservable() |
未调用 ,返回 rx.Observable 。向 Observable 注册 rx.Subscriber 处理结果 |
- 第四种方式,点击
#testToObservable()查看笔者补充的示例。
推荐 Spring Cloud 书籍:
- 请支持正版。下载盗版, 等于主动编写低级 BUG 。
- 程序猿DD —— 《Spring Cloud微服务实战》
- 周立 —— 《Spring Cloud与 Docker 微服务架构实战》
- 两书齐买,京东包邮。
2. 实现
// AbstractCommand.java
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R>{
// ... 省略无关属性与方法
public Observable<R> toObservable(){
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call(){
// ....
}
}
}
public Observable<R> observe(){
// us a ReplaySubject to buffer the eagerly subscribed-to Observable
ReplaySubject<R> subject = ReplaySubject.create();
// eagerly kick off subscription
final Subscription sourceSubscription = toObservable().subscribe(subject);
// return the subject that can be subscribed to later while the execution has already started
return subject.doOnUnsubscribe(new Action0() {
@Override
public void call(){
sourceSubscription.unsubscribe();
}
});
}
}
// HystrixCommand.java
public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R>{
// ... 省略无关属性与方法
public Future<R> queue(){
final Future<R> delegate = toObservable().toBlocking().toFuture();
final Future<R> f = new Future<R>() {
// ... 包装 delegate
}
// ...
return f;
}
public R execute(){
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
protected abstract R run() throws Exception;
}
-
#toObservable()方法 : 未 做订阅,返回干净的 Observable 。 这就是为什么上文说“未调用” 。 -
#observe()方法 :调用#toObservable()方法的基础上,向 Observable 注册rx.subjects.ReplaySubject发起订阅 。- ReplaySubject 会发射所有来自原始 Observable 的数据给观察者,无论它们是何时订阅的。感兴趣的同学可以阅读 《ReactiveX/RxJava文档中文版 —— Subject》 。
-
#queue()方法 :调用#toObservable()方法的基础上,调用:-
Observable#toBlocking()方法 :将 Observable 转换成 阻塞 的rx.observables.BlockingObservable。 -
BlockingObservable#toFuture()方法 :返回可获得#run()抽象方法 执行结果的 Future 。-
#run()方法 :子类实现该方法,执行 正常的业务逻辑 。- BlockingObservable 在「3. BlockingObservable」详细解析。
-
-
-
#execute()方法 :调用#queue()方法的基础上,调用Future#get()方法, 同步 返回#run()的执行结果。 -
整理四种调用方式如下:
FROM 《【翻译】Hystrix文档-实现原理》
3. BlockingObservable
本小节为 拓展内容 ,源码解析 RxJava ( 非 Hystrix ) 的 rx.observables.BlockingObservable 的实现,所以你可以选择:
- 1 ) 跳过本小节,不影响对本文的理解。
- 2 ) 选择阅读 《ReactiveX/RxJava文档中文版 —— 阻塞操作》 ,理解 BlockingObservable 的原理。
- 3 ) 选择阅读本小节,理解 BlockingObservable 的原理以及实现。
《RxJava 源码解析 —— BlockingObservable》
666. 彩蛋
第一篇 Hystrix 正式的源码解析。
梳理 Hystrix 的源码还是蛮痛苦的,主要是因为对 RxJava 不够熟悉。
胖友,分享一波朋友圈可好!
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- [译] Istio 熔断器解析
- Hystrix系列之熔断器实现原理
- [译] 3 分钟简述熔断器使用方法
- 不只是容错-从熔断器看有限状态机
- 服务容错模式:舱壁模式、熔断器的异同点
- [译] Grab 熔断器设计:如何应对突发打车峰值
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
JS 压缩/解压工具
在线压缩/解压 JS 代码
Markdown 在线编辑器
Markdown 在线编辑器