spring cloud hystrix实践

栏目: Java · 发布时间: 7年前

内容简介:快递业务涉及很多外部对接,为了各个对接接口的隔离和失败的降级防止雪崩,所以引入了hystrix作为降级组件。 使用非常方便1.引入依赖:2.启动了增加annotation @EnableCircuitBreaker

介绍

hystrix是spring cloud的熔断降级组件,由netflix公司开源,通过命令模式结合rxjava框架实现,命令模式封装了用户具体业务,使用rxjava对命令的执行结果进行统计,根据统计结果按一定策略执行熔断降级,避免造成应用失败雪崩。

执行流程如下图: spring cloud hystrix实践

流程说明:

1.每次调用创建一个新的HystrixCommand,把依赖调用封装在run()方法中.

2:执行execute()/queue做同步或异步调用.

3:判断熔断器(circuit-breaker)是否打开,如果打开跳到步骤8,进行降级策略,如果关闭进入后续步骤.

4:判断线程池/队列/信号量是否跑满,如果跑满进入降级步骤8,否则继续后续步骤.

5:调用HystrixCommand的run方法.运行依赖逻辑

5a:依赖逻辑调用超时,进入步骤8.

6:判断逻辑是否调用成功

6a:返回成功调用结果

6b:调用出错,进入步骤8.

7:计算熔断器状态,所有的运行状态(成功, 失败, 拒绝,超时)上报给熔断器,用于统计从而判断熔断器状态.

8:getFallback()降级逻辑.

以下四种情况将触发getFallback调用:

(1):run()方法抛出非HystrixBadRequestException异常。

(2):run()方法调用超时

(3):熔断器开启拦截调用

(4):线程池/队列/信号量是否跑满

8a:没有实现getFallback的Command将直接抛出异常

8b:fallback降级逻辑调用成功直接返回

8c:降级逻辑调用失败抛出异常

9:返回执行成功结果

官方文档有详细的使用示例:

https://github.com/Netflix/Hystrix/wiki/How-To-Use

使用

快递业务涉及很多外部对接,为了各个对接接口的隔离和失败的降级防止雪崩,所以引入了hystrix作为降级组件。 使用非常方便

1.引入依赖:

<dependency>  
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-hystrix</artifactId>
    <version>1.4.5.RELEASE</version>
</dependency>

2.启动了增加annotation @EnableCircuitBreaker

3.需要隔离降级的方法增加注解:

@HystrixCommand(groupKey= "yunda",commandKey="scanningOrder",threadPoolKey="scanningOrder-thread",
            threadPoolProperties = {@HystrixProperty(name="maximumSize",value = "20")},
            commandProperties = {@HystrixProperty(name="execution.isolation.thread.timeoutInMilliseconds",value = "3000")},
            fallbackMethod="scanningOrderFallback",
            ignoreExceptions={BusinessException.class}
    )

spring cloud使用aop方式将包含@HystrixCommand注解的方法进行代理,包装了一层HystrixCommond并做了扩展。 该注解上可以配置hystrix各项参数

配置说明:

groupKey指定命令分组,同一个分组使用一个线程池。

commandKey指定命令名称。

threadPoolKey指定代码执行的具体线程名称。

commandProperties 可配置命令执行及指标统计相关参数:

execution.isolation.thread.timeoutInMilliseconds 该参数配置业务代码超时时间(默认1000ms),执行超过该时间会触发降级方法,没有降级方法将抛出HystrixRuntimeException

metrics.rollingStats.timeInMilliseconds参数可配置执行结果统计的滑动窗口时间 默认10000ms

metrics.rollingStats.numBuckets参数可配置滑动窗口包含多少段 默认10

circuitBreaker.requestVolumeThreshold参数可配置触发熔断的请求量阈值 默认20

circuitBreaker.errorThresholdPercentage可配置触发熔断的失败比率,默认50%

circuitBreaker.sleepWindowInMilliseconds可配置触发熔断到恢复的时间窗,默认5s

threadPoolProperties 可配置线程池相关参数,hystrix默认使用线程池进行业务隔离,核心线程数和最大线程数默认都是10个线程,并且使用SynchronizedQueue,即默认限制了最大并发数为10

fallbackMethod指定了降级的方法。

ignoreExceptions指定哪些异常是不需要降级的,比如我们需要给前端返回一个BusinessException,就不需要降级。

核心的配置上面都介绍了,更具体的可以看代码里的HystrixCommandProperties和HystrixThreadPoolProperties两个类。

使用过程中遇到一个问题: 当代码执行异常触发降级之后,降级的方法也是返回一个具体Exception,最终抛出的是对应的Exception 而当方法超时触发降级并且降级的方法也是返回一个具体Exception,最终抛出的却是HystrixRuntimeException 查看代码之后发现commond都是返回了HystrixRuntimeException,而实现aop的HystrixCommandAspect中对HystrixRuntimeException 做了处理,如果是命令执行失败类型(HystrixRuntimeException.FailureType.COMMAND_EXCEPTION),将抛出具体异常 即降级中的Exception,失败类型不是命令执行失败类型(比如超时),将直接抛出HystrixRuntimeException,这样就不能返回我们需要返回的降级方法的异常了,只能通过修改具体实现来解决了。

指标统计

hystrix熔断决策及dashboard展示都是依赖于指标的统计,基于不同的HystrixEventStream实现发射不同的事件流。

类图: spring cloud hystrix实践

HystrixCommandMetrics中的指标流如下:

//统计滚动窗口总请求数 失败请求数及失败比率用于熔断判断
private HealthCountsStream healthCountsStream;  
//统计滚动窗口时间内各分段时间的命令执行结果
private final RollingCommandEventCounterStream rollingCommandEventCounterStream;  
//汇总命令所有时间执行结果
private final CumulativeCommandEventCounterStream cumulativeCommandEventCounterStream;  
//滚动窗口不同占比请求的命令耗时统计
private final RollingCommandLatencyDistributionStream rollingCommandLatencyDistributionStream;  
//滚动窗口不同占比请求的用户代码耗时统计
private final RollingCommandUserLatencyDistributionStream rollingCommandUserLatencyDistributionStream;  
//滚动窗口最大并发统计
private final RollingCommandMaxConcurrencyStream rollingCommandMaxConcurrencyStream;

以上的统计除了rollingCommandMaxConcurrencyStream是基于HystrixCommandStartStream,其他都是基于HystrixCommandCompletionStream,选择一个统计流看看具体实现,比如RollingCommandEventCounterStream。 类图:

spring cloud hystrix实践

看下基类的BucketedCounterStream的Observable实现

this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() {  
    @Override
    public Observable<Bucket> call() {
        return inputEventStream
                .observe()
                .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
                .flatMap(reduceBucketToSummary)                //for a given bucket, turn it into a long array containing counts of event types
                .startWith(emptyEventCountsToStart);           //start it with empty arrays to make consumer logic as generic as possible (windows are always full)
    }
});

inputEventStream是上面的HystrixCommandCompletionStream命令完成事件流,通过rxjava的window api按每秒打开一个窗口做处理,

rxjava window api:

https://mcxiaoke.gitbooks.io/rxdocs/content/operators/Window.html 处理方法是:

this.reduceBucketToSummary = new Func1<Observable<Event>, Observable<Bucket>>() {  
    @Override
    public Observable<Bucket> call(Observable<Event> eventBucket) {
        return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket);
    }
};

appendRawEventToBucket在HystrixCommandMetrics定义:

public static final Func2<long[], HystrixCommandCompletion, long[]> appendEventToBucket = new Func2<long[], HystrixCommandCompletion, long[]>() {  
    @Override
    public long[] call(long[] initialCountArray, HystrixCommandCompletion execution) {
        ExecutionResult.EventCounts eventCounts = execution.getEventCounts();
        for (HystrixEventType eventType: ALL_EVENT_TYPES) {
            switch (eventType) {
                case EXCEPTION_THROWN: break; //this is just a sum of other anyway - don't do the work here
                default:
                    initialCountArray[eventType.ordinal()] += eventCounts.getCount(eventType);
                    break;
            }
        }
        return initialCountArray;
    }
};

做的处理就是对命令的执行结果转换为了Long数组,不同执行结果次数存于不同下标。 而BucketedRollingCounterStream继承于BucketedCounterStream,实现的Observable如下:

this.sourceStream = bucketedStream      //stream broken up into buckets  
    .window(numBuckets, 1)          //emit overlapping windows of buckets
    .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
    .doOnSubscribe(new Action0() {
        @Override
        public void call() {
            isSourceCurrentlySubscribed.set(true);
        }
    })
    .doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
            isSourceCurrentlySubscribed.set(false);
        }
    })
    .share()                        //multiple subscribers should get same data
    .onBackpressureDrop();          //if there are slow consumers, data should not buffer

也是window api发射numBuckets(默认10)之后新开窗口并且skip了1,即默认窗口时间10s分10段,每次达到10段新开窗口并skip最后1s的窗口,从而达到了滚动的效果。对每秒的window Observable做reduceWindowToSummary操作,实现如下:

Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {  
    @Override
    public Observable<Output> call(Observable<Bucket> window) {
        return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
    }
};

使用rxjava 的scan api对每一项应用了reduceBucket操作,scan api定义:

https://mcxiaoke.gitbooks.io/rxdocs/content/operators/Scan.html reduceBucket定义在HystrixCommandMetrics的bucketAggregator:

public static final Func2<long[], long[], long[]> bucketAggregator = new Func2<long[], long[], long[]>() {  
    @Override
    public long[] call(long[] cumulativeEvents, long[] bucketEventCounts) {
        for (HystrixEventType eventType: ALL_EVENT_TYPES) {
            switch (eventType) {
                case EXCEPTION_THROWN:
                    for (HystrixEventType exceptionEventType: HystrixEventType.EXCEPTION_PRODUCING_EVENT_TYPES) {
                        cumulativeEvents[eventType.ordinal()] += bucketEventCounts[exceptionEventType.ordinal()];
                    }
                    break;
                default:
                    cumulativeEvents[eventType.ordinal()] += bucketEventCounts[eventType.ordinal()];
                    break;
            }
        }
        return cumulativeEvents;
    }
};

实现了bucket里面的不同执行结果次数的相加,做到了滚动窗口内每个桶内各项执行结果的统计。

总体指标流程如下图: spring cloud hystrix实践

实现涉及到很多rxjava api 可看文档:

https://mcxiaoke.gitbooks.io/rxdocs/content/Intro.html 由于本人能力有限,如有不对之处,欢迎指正。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Sovereign Individual

The Sovereign Individual

James Dale Davidson、William Rees-Mogg / Free Press / 1999-08-26 / USD 16.00

Two renowned investment advisors and authors of the bestseller The Great Reckoning bring to light both currents of disaster and the potential for prosperity and renewal in the face of radical changes ......一起来看看 《The Sovereign Individual》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

MD5 加密
MD5 加密

MD5 加密工具