熔断器 Hystrix 源码解析 —— 执行命令方式

栏目: Java · 发布时间: 6年前

内容简介:熔断器 Hystrix 源码解析 —— 执行命令方式

摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-execute-mode/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Hystrix 1.5.X 版本

熔断器 Hystrix 源码解析 —— 执行命令方式

关注 微信公众号:【芋道源码】 有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有 源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言 将得到 认真 回复。 甚至不知道如何读源码也可以请教噢
  4. 新的 源码解析文章 实时 收到通知。 每周更新一篇左右
  5. 认真的 源码交流微信群。

1. 概述

本文主要分享 Hystrix 执行命令方法

建议 :对 RxJava 已经有一定的了解的基础上阅读本文。

在官方提供的示例中,我们看到 CommandHelloWorld 通过继承 HystrixCommand 抽象类,有四种调用方式:

方法
#execute() 同步 调用,返回 直接 结果
#queue() 异步 调用,返回 java.util.concurrent.Future
#observe() 异步 调用,返回 rx.Observable 。向 Observable 注册 rx.Subscriber 处理结果
#toObservable() 未调用 ,返回 rx.Observable 。向 Observable 注册 rx.Subscriber 处理结果

熔断器 Hystrix 源码解析 —— 执行命令方式

推荐 Spring Cloud 书籍:

2. 实现

// AbstractCommand.java
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R>{
 
 // ... 省略无关属性与方法
 
 public Observable<R> toObservable(){
 
 return Observable.defer(new Func0<Observable<R>>() {
 @Override
 public Observable<R> call(){
 // ....
 }
 }
 
 }
 
 public Observable<R> observe(){
 // us a ReplaySubject to buffer the eagerly subscribed-to Observable
 ReplaySubject<R> subject = ReplaySubject.create();
 // eagerly kick off subscription
 final Subscription sourceSubscription = toObservable().subscribe(subject);
 // return the subject that can be subscribed to later while the execution has already started
 return subject.doOnUnsubscribe(new Action0() {
 @Override
 public void call(){
 sourceSubscription.unsubscribe();
 }
 });
 }

}

// HystrixCommand.java
public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R>{

 // ... 省略无关属性与方法
 
 public Future<R> queue(){
 final Future<R> delegate = toObservable().toBlocking().toFuture();
 final Future<R> f = new Future<R>() {
 // ... 包装 delegate
 }
 // ...
 return f;
 }

 public R execute(){
 try {
 return queue().get();
 } catch (Exception e) {
 throw Exceptions.sneakyThrow(decomposeException(e));
 }
 }
 
 protected abstract R run() throws Exception;

}
  • #toObservable() 方法 : 做订阅,返回干净的 Observable 。 这就是为什么上文说“未调用”
  • #observe() 方法 :调用 #toObservable() 方法的基础上,向 Observable 注册 rx.subjects.ReplaySubject 发起订阅
  • #queue() 方法 :调用 #toObservable() 方法的基础上,调用:
    • Observable#toBlocking() 方法 :将 Observable 转换成 阻塞rx.observables.BlockingObservable
    • BlockingObservable#toFuture() 方法 :返回可获得 #run() 抽象方法 执行结果的 Future 。
      • #run() 方法 :子类实现该方法,执行 正常的业务逻辑
        • BlockingObservable 在「3. BlockingObservable」详细解析。
  • #execute() 方法 :调用 #queue() 方法的基础上,调用 Future#get() 方法, 同步 返回 #run() 的执行结果。
  • 整理四种调用方式如下:

    熔断器 Hystrix 源码解析 —— 执行命令方式

3. BlockingObservable

本小节为 拓展内容 ,源码解析 RxJava ( 非 Hystrix ) 的 rx.observables.BlockingObservable 的实现,所以你可以选择:

《RxJava 源码解析 —— BlockingObservable》

666. 彩蛋

第一篇 Hystrix 正式的源码解析。

梳理 Hystrix 的源码还是蛮痛苦的,主要是因为对 RxJava 不够熟悉。

胖友,分享一波朋友圈可好!


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

高扩展性网站的50条原则

高扩展性网站的50条原则

[美] Martin L. Abbott、[美]Michael T. Fisher / 张欣、杨海玲 / 人民邮电出版社 / 2012-6-3 / 35.00元

《高扩展性网站的50条原则》给出了设计高扩展网站的50条原则,如不要过度设计、设计时就考虑扩展性、把方案简化3倍以上、减少DNS查找、尽可能减少对象等,每个原则都与不同的主题绑定在一起。大部分原则是面向技术的,只有少量原则解决的是与关键习惯和方法有关的问题,当然,每个原则都对构建可扩展的产品至关重要。 主要内容包括: 通过克隆、复制、分离功能和拆分数据集提高网站扩展性; 采用横向......一起来看看 《高扩展性网站的50条原则》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

MD5 加密
MD5 加密

MD5 加密工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具