spring cloud hystrix实践

栏目: Java · 发布时间: 5年前

内容简介:快递业务涉及很多外部对接,为了各个对接接口的隔离和失败的降级防止雪崩,所以引入了hystrix作为降级组件。 使用非常方便1.引入依赖:2.启动了增加annotation @EnableCircuitBreaker

介绍

hystrix是spring cloud的熔断降级组件,由netflix公司开源,通过命令模式结合rxjava框架实现,命令模式封装了用户具体业务,使用rxjava对命令的执行结果进行统计,根据统计结果按一定策略执行熔断降级,避免造成应用失败雪崩。

执行流程如下图: spring cloud hystrix实践

流程说明:

1.每次调用创建一个新的HystrixCommand,把依赖调用封装在run()方法中.

2:执行execute()/queue做同步或异步调用.

3:判断熔断器(circuit-breaker)是否打开,如果打开跳到步骤8,进行降级策略,如果关闭进入后续步骤.

4:判断线程池/队列/信号量是否跑满,如果跑满进入降级步骤8,否则继续后续步骤.

5:调用HystrixCommand的run方法.运行依赖逻辑

5a:依赖逻辑调用超时,进入步骤8.

6:判断逻辑是否调用成功

6a:返回成功调用结果

6b:调用出错,进入步骤8.

7:计算熔断器状态,所有的运行状态(成功, 失败, 拒绝,超时)上报给熔断器,用于统计从而判断熔断器状态.

8:getFallback()降级逻辑.

以下四种情况将触发getFallback调用:

(1):run()方法抛出非HystrixBadRequestException异常。

(2):run()方法调用超时

(3):熔断器开启拦截调用

(4):线程池/队列/信号量是否跑满

8a:没有实现getFallback的Command将直接抛出异常

8b:fallback降级逻辑调用成功直接返回

8c:降级逻辑调用失败抛出异常

9:返回执行成功结果

官方文档有详细的使用示例:

https://github.com/Netflix/Hystrix/wiki/How-To-Use

使用

快递业务涉及很多外部对接,为了各个对接接口的隔离和失败的降级防止雪崩,所以引入了hystrix作为降级组件。 使用非常方便

1.引入依赖:

<dependency>  
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-hystrix</artifactId>
    <version>1.4.5.RELEASE</version>
</dependency>

2.启动了增加annotation @EnableCircuitBreaker

3.需要隔离降级的方法增加注解:

@HystrixCommand(groupKey= "yunda",commandKey="scanningOrder",threadPoolKey="scanningOrder-thread",
            threadPoolProperties = {@HystrixProperty(name="maximumSize",value = "20")},
            commandProperties = {@HystrixProperty(name="execution.isolation.thread.timeoutInMilliseconds",value = "3000")},
            fallbackMethod="scanningOrderFallback",
            ignoreExceptions={BusinessException.class}
    )

spring cloud使用aop方式将包含@HystrixCommand注解的方法进行代理,包装了一层HystrixCommond并做了扩展。 该注解上可以配置hystrix各项参数

配置说明:

groupKey指定命令分组,同一个分组使用一个线程池。

commandKey指定命令名称。

threadPoolKey指定代码执行的具体线程名称。

commandProperties 可配置命令执行及指标统计相关参数:

execution.isolation.thread.timeoutInMilliseconds 该参数配置业务代码超时时间(默认1000ms),执行超过该时间会触发降级方法,没有降级方法将抛出HystrixRuntimeException

metrics.rollingStats.timeInMilliseconds参数可配置执行结果统计的滑动窗口时间 默认10000ms

metrics.rollingStats.numBuckets参数可配置滑动窗口包含多少段 默认10

circuitBreaker.requestVolumeThreshold参数可配置触发熔断的请求量阈值 默认20

circuitBreaker.errorThresholdPercentage可配置触发熔断的失败比率,默认50%

circuitBreaker.sleepWindowInMilliseconds可配置触发熔断到恢复的时间窗,默认5s

threadPoolProperties 可配置线程池相关参数,hystrix默认使用线程池进行业务隔离,核心线程数和最大线程数默认都是10个线程,并且使用SynchronizedQueue,即默认限制了最大并发数为10

fallbackMethod指定了降级的方法。

ignoreExceptions指定哪些异常是不需要降级的,比如我们需要给前端返回一个BusinessException,就不需要降级。

核心的配置上面都介绍了,更具体的可以看代码里的HystrixCommandProperties和HystrixThreadPoolProperties两个类。

使用过程中遇到一个问题: 当代码执行异常触发降级之后,降级的方法也是返回一个具体Exception,最终抛出的是对应的Exception 而当方法超时触发降级并且降级的方法也是返回一个具体Exception,最终抛出的却是HystrixRuntimeException 查看代码之后发现commond都是返回了HystrixRuntimeException,而实现aop的HystrixCommandAspect中对HystrixRuntimeException 做了处理,如果是命令执行失败类型(HystrixRuntimeException.FailureType.COMMAND_EXCEPTION),将抛出具体异常 即降级中的Exception,失败类型不是命令执行失败类型(比如超时),将直接抛出HystrixRuntimeException,这样就不能返回我们需要返回的降级方法的异常了,只能通过修改具体实现来解决了。

指标统计

hystrix熔断决策及dashboard展示都是依赖于指标的统计,基于不同的HystrixEventStream实现发射不同的事件流。

类图: spring cloud hystrix实践

HystrixCommandMetrics中的指标流如下:

//统计滚动窗口总请求数 失败请求数及失败比率用于熔断判断
private HealthCountsStream healthCountsStream;  
//统计滚动窗口时间内各分段时间的命令执行结果
private final RollingCommandEventCounterStream rollingCommandEventCounterStream;  
//汇总命令所有时间执行结果
private final CumulativeCommandEventCounterStream cumulativeCommandEventCounterStream;  
//滚动窗口不同占比请求的命令耗时统计
private final RollingCommandLatencyDistributionStream rollingCommandLatencyDistributionStream;  
//滚动窗口不同占比请求的用户代码耗时统计
private final RollingCommandUserLatencyDistributionStream rollingCommandUserLatencyDistributionStream;  
//滚动窗口最大并发统计
private final RollingCommandMaxConcurrencyStream rollingCommandMaxConcurrencyStream;

以上的统计除了rollingCommandMaxConcurrencyStream是基于HystrixCommandStartStream,其他都是基于HystrixCommandCompletionStream,选择一个统计流看看具体实现,比如RollingCommandEventCounterStream。 类图:

spring cloud hystrix实践

看下基类的BucketedCounterStream的Observable实现

this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() {  
    @Override
    public Observable<Bucket> call() {
        return inputEventStream
                .observe()
                .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
                .flatMap(reduceBucketToSummary)                //for a given bucket, turn it into a long array containing counts of event types
                .startWith(emptyEventCountsToStart);           //start it with empty arrays to make consumer logic as generic as possible (windows are always full)
    }
});

inputEventStream是上面的HystrixCommandCompletionStream命令完成事件流,通过rxjava的window api按每秒打开一个窗口做处理,

rxjava window api:

https://mcxiaoke.gitbooks.io/rxdocs/content/operators/Window.html 处理方法是:

this.reduceBucketToSummary = new Func1<Observable<Event>, Observable<Bucket>>() {  
    @Override
    public Observable<Bucket> call(Observable<Event> eventBucket) {
        return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket);
    }
};

appendRawEventToBucket在HystrixCommandMetrics定义:

public static final Func2<long[], HystrixCommandCompletion, long[]> appendEventToBucket = new Func2<long[], HystrixCommandCompletion, long[]>() {  
    @Override
    public long[] call(long[] initialCountArray, HystrixCommandCompletion execution) {
        ExecutionResult.EventCounts eventCounts = execution.getEventCounts();
        for (HystrixEventType eventType: ALL_EVENT_TYPES) {
            switch (eventType) {
                case EXCEPTION_THROWN: break; //this is just a sum of other anyway - don't do the work here
                default:
                    initialCountArray[eventType.ordinal()] += eventCounts.getCount(eventType);
                    break;
            }
        }
        return initialCountArray;
    }
};

做的处理就是对命令的执行结果转换为了Long数组,不同执行结果次数存于不同下标。 而BucketedRollingCounterStream继承于BucketedCounterStream,实现的Observable如下:

this.sourceStream = bucketedStream      //stream broken up into buckets  
    .window(numBuckets, 1)          //emit overlapping windows of buckets
    .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
    .doOnSubscribe(new Action0() {
        @Override
        public void call() {
            isSourceCurrentlySubscribed.set(true);
        }
    })
    .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
            isSourceCurrentlySubscribed.set(false);
        }
    })
    .share()                        //multiple subscribers should get same data
    .onBackpressureDrop();          //if there are slow consumers, data should not buffer

也是window api发射numBuckets(默认10)之后新开窗口并且skip了1,即默认窗口时间10s分10段,每次达到10段新开窗口并skip最后1s的窗口,从而达到了滚动的效果。对每秒的window Observable做reduceWindowToSummary操作,实现如下:

Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {  
    @Override
    public Observable<Output> call(Observable<Bucket> window) {
        return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
    }
};

使用rxjava 的scan api对每一项应用了reduceBucket操作,scan api定义:

https://mcxiaoke.gitbooks.io/rxdocs/content/operators/Scan.html reduceBucket定义在HystrixCommandMetrics的bucketAggregator:

public static final Func2<long[], long[], long[]> bucketAggregator = new Func2<long[], long[], long[]>() {  
    @Override
    public long[] call(long[] cumulativeEvents, long[] bucketEventCounts) {
        for (HystrixEventType eventType: ALL_EVENT_TYPES) {
            switch (eventType) {
                case EXCEPTION_THROWN:
                    for (HystrixEventType exceptionEventType: HystrixEventType.EXCEPTION_PRODUCING_EVENT_TYPES) {
                        cumulativeEvents[eventType.ordinal()] += bucketEventCounts[exceptionEventType.ordinal()];
                    }
                    break;
                default:
                    cumulativeEvents[eventType.ordinal()] += bucketEventCounts[eventType.ordinal()];
                    break;
            }
        }
        return cumulativeEvents;
    }
};

实现了bucket里面的不同执行结果次数的相加,做到了滚动窗口内每个桶内各项执行结果的统计。

总体指标流程如下图: spring cloud hystrix实践

实现涉及到很多rxjava api 可看文档:

https://mcxiaoke.gitbooks.io/rxdocs/content/Intro.html 由于本人能力有限,如有不对之处,欢迎指正。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Software Design 中文版 01

Software Design 中文版 01

[日] 技术评论社 / 人民邮电出版社 / 2014-3 / 39.00

《Software Design》是日本主流的计算机技术读物,旨在帮助程序员更实时、深入地了解前沿技术,扩大视野,提升技能。内容涵盖多平台软件开发技巧、云技术应用、大数据分析、网络通信技术、深度互联时代下的移动开发、虚拟化、人工智能等最前沿实践性讲解。以人脑思维模式,激发计算机操控的无限可能;以软件开发技巧,挖掘系统与硬件的最大价值。 《Software Design 中文版 01》的主题为......一起来看看 《Software Design 中文版 01》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

html转js在线工具
html转js在线工具

html转js在线工具