内容简介:随着业务的越来越复杂,保证程序的健壮性对程序猿来说也变得更加的重要,毕竟不写Bug的程序猿不是一个好的程序猿。但怎样尽可能的保证咱们的程序能够稳定的运行,以及出错后能够进行相应的补偿,这里就需要咱们使用熔断机制了。PS:在进入正文之前,不妨思考一下两个问题:①熔断机制究竟为我们解决了什么问题?
前言
随着业务的越来越复杂,保证程序的健壮性对程序猿来说也变得更加的重要,毕竟不写Bug的程序猿不是一个好的程序猿。但怎样尽可能的保证咱们的程序能够稳定的运行,以及出错后能够进行相应的补偿,这里就需要咱们使用熔断机制了。
PS:在进入正文之前,不妨思考一下两个问题:
①熔断机制究竟为我们解决了什么问题?
②我们怎样去自己实现一个简单的熔断?
自定义熔断的实现
这里咱们简单的实现了一个超时后进行熔断的例子,这里有用到AspectJ的相关知识,对于熟悉Spring AOP知识的同学应该没什么问题。
主要分为两步:
- 使用
Future
控制是否超时,超时后将任务cancel
掉。 - 调用咱们自己定义好的
fallback
方法进行处理。在这里需要注意的是,fallback
方法参数应该要与原方法相同,这样咱们才能进行补偿措施。例如:咱们可以在fallback
方法借助消息中间件将这些参数进行存储,然后在适当的时候从消息中间件中读取出来进行补偿消费处理。
1@RestController 2public class HelloController { 3 private Random random = new Random(); 4 5 @MyHystrixCommand(fallback="errorMethod") 6 @RequestMapping("/hello") 7 public String hello(@RequestParam("name") String message) throws InterruptedException { 8 int time = random.nextInt(200); 9 System.out.println("spend time : " + time + "ms"); 10 Thread.sleep(time); 11 System.out.println("hhhhhhhhhhhhhhhhhhhhhhhhh"); 12 return "hello world:" + message; 13 } 14 15 public String errorMethod(String message) { 16 return "error message"; 17 } 18} 复制代码
1@Target(ElementType.METHOD) 2@Retention(RetentionPolicy.RUNTIME) 3@Documented 4public @interface MyHystrixCommand { 5 int value() default 100; 6 String fallback() default ""; 7} 复制代码
1@Aspect 2@Component 3public class MyHystrixCommandAspect { 4 5 ExecutorService executor = Executors.newFixedThreadPool(10); 6 7 @Pointcut(value = "@annotation(MyHystrixCommand)") 8 public void pointCut() { 9 10 } 11 12 @Around(value = "pointCut()&&@annotation(hystrixCommand)") 13 public Object doPointCut(ProceedingJoinPoint joinPoint, MyHystrixCommand hystrixCommand) throws Throwable { 14 int timeout = hystrixCommand.value(); 15 Future future = executor.submit(() -> { 16 try { 17 return joinPoint.proceed(); 18 } catch (Throwable throwable) { 19 } 20 return null; 21 }); 22 Object returnValue = null; 23 try { 24 returnValue = future.get(timeout, TimeUnit.MILLISECONDS); 25 } catch (InterruptedException | ExecutionException | TimeoutException e) { 26 future.cancel(true); 27 if (StringUtils.isBlank(hystrixCommand.fallback())){ 28 throw new Exception("fallback is null"); 29 } 30 returnValue = invokeFallbackMethod(joinPoint, hystrixCommand.fallback()); 31 } 32 return returnValue; 33 } 34 35 private Object invokeFallbackMethod(ProceedingJoinPoint joinPoint, String fallback) throws Exception { 36 Method method = findFallbackMethod(joinPoint, fallback); 37 if (method == null) { 38 throw new Exception("can not find fallback :" + fallback + " method"); 39 } else { 40 method.setAccessible(true); 41 try { 42 Object invoke = method.invoke(joinPoint.getTarget(), joinPoint.getArgs()); 43 return invoke; 44 } catch (IllegalAccessException | InvocationTargetException e) { 45 throw e; 46 } 47 } 48 } 49 50 51 private Method findFallbackMethod(ProceedingJoinPoint joinPoint, String fallbackMethodName) { 52 Signature signature = joinPoint.getSignature(); 53 MethodSignature methodSignature = (MethodSignature) signature; 54 Method method = methodSignature.getMethod(); 55 Class<?>[] parameterTypes = method.getParameterTypes(); 56 Method fallbackMethod = null; 57 try { 58 //这里通过判断必须取和原方法一样参数的fallback方法 59 fallbackMethod = joinPoint.getTarget().getClass().getMethod(fallbackMethodName, parameterTypes); 60 } catch (NoSuchMethodException e) { 61 } 62 return fallbackMethod; 63 } 64 65} 复制代码
当然,上述例子只是一个简单的超时后熔断处理的实现方式。咱们在实际应用中,还有可能并发超过指定阈值后咱们也需要进行降级处理,一个最普通的场景: 秒杀案例 。这些东西在 Hystrix
中都有相应的处理,它提供了线程池和信号量这两种方式去解决并发的问题。
什么是Hystrix?
咱们看一下 官方介绍
In a distributed environment, inevitably some of the many service dependencies will fail. Hystrix is a library that helps you control the interactions between these distributed services by adding latency tolerance and fault tolerance logic. Hystrix does this by isolating points of access between the services, stopping cascading failures across them, and providing fallback options, all of which improve your system’s overall resiliency.
在分布式环境中,调用一些服务不可避免的会出现失败, Hystrix
帮助咱们添加了一些容忍策略,并且将服务进行隔离处理,防止一个服务的失败影响到了另一个服务的调用,这些都提高了咱们系统的弹性。
Hystrix的处理流程
这里咱们结合一下Spring Cloud Hystrix进行说明,从 HystrixCommandAspect
开始分析:
1@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()") 2 public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable { 3 Method method = getMethodFromTarget(joinPoint); 4 ... 5 MetaHolder metaHolder = metaHolderFactory.create(joinPoint);//第一步 6 HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);//第二步 7 ... 8 Object result; 9 try { 10 //第三步 11 if (!metaHolder.isObservable()) { 12 result = CommandExecutor.execute(invokable, executionType, metaHolder); 13 } else { 14 result = executeObservable(invokable, executionType, metaHolder); 15 } 16 } 17 .... 18 return result; 19 } 复制代码
这个切面主要针对 HystrixCommand
和 HystrixCollapser
这两个注解,前者用于进行熔断降级处理,后者用来根据配置进行合并请求(类比数据库操作,将多个insert语句合并成一个insert batch语句)。咱们侧重进行 HystrixCommand
这一块的分析。
第一步:获取元数据(MetaHolder)
这段代码对应上面的 MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
,里面封装了比如调用方法 method
,参数 args
,方法所属对象 target
,动态代理对象 proxy
,回调方法 fallbackMethod
等等一些元数据的封装。这些数据在创建命令对象时会被使用。
第二步:获取调用者(HystrixInvokable)
它持有一个命令对象,并且可以在合适的时候通过这个命令对象完成具体的业务逻辑,针对 HystrixCommand
上述的命令对象就是 GenericObservableCommand
和 GenericCommand
的一种,这里命令对象的选择和方法的返回值有关,如果返回值为 Observable
类型,则创建 GenericObservableCommand
命令,否则创建 GenericCommand
命令。
第三步:执行命令(execute)
1 public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException { 2 ... 3 switch (executionType) { 4 case SYNCHRONOUS: { 5 return castToExecutable(invokable, executionType).execute(); 6 } 7 case ASYNCHRONOUS: { 8 HystrixExecutable executable = castToExecutable(invokable, executionType); 9 if (metaHolder.hasFallbackMethodCommand() 10 && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) { 11 return new FutureDecorator(executable.queue()); 12 } 13 return executable.queue(); 14 } 15 case OBSERVABLE: { 16 HystrixObservable observable = castToObservable(invokable); 17 return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable(); 18 } 19 ... 20 } 21 } 复制代码
从上面的代码段中,可以很容易的看出共有三种策略,同步、异步、OBSERVABLE,而 Observable
又分为 Cold Observable(observable.toObservable())
和 Hot Observable(observable.observe())
。所以说总共有四种执行方式。但是底层都会调用到 AbstractCommand.toObservable()
方法。
- execute():同步执行,返回一个单一的对象结果,发生错误时抛出异常。
- queue():异步执行,返回一个
Future
对象,包含着执行结束后返回的单一结果。 - observe():这个方法返回一个
Observable
对象,它代表操作的多个结果,但是已经被订阅者消费掉了。 - toObservable():这个方法返回一个
Observable
对象,它代表操作的多个结果,需要咱们自己手动订阅并消费掉。
在执行逻辑中,大量用到了 RxJava
,各种回调处理,看的着实头晕,感兴趣的同学可以自行阅读源码,我这里只是介绍一些关键的流程点。
①首先会检查是否命中缓存( toObservable
方法中),命中缓存则直接返回:
1/* try from cache first */ 2 if (requestCacheEnabled) { 3 HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey); 4 if (fromCache != null) { 5 isResponseFromCache = true; 6 return handleRequestCacheHitAndEmitValues(fromCache, _cmd); 7 } 8} 复制代码
②检查断路器是否打开,如果断路器打开,则通过 handleShortCircuitViaFallback
直接进行fallback处理:
1private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { 2 executionHook.onStart(_cmd); 3 4 /* determine if we're allowed to execute */ 5 if (circuitBreaker.allowRequest()) { 6 }else { 7 return handleShortCircuitViaFallback(); 8 } 9 ... 10} 复制代码
③检查是否用了信号量,如果用了,则判断是否被占满,占满后则抛出异常,通过 handleSemaphoreRejectionViaFallback
直接转到fallback中进行执行,不执行后面的逻辑。如果没用,则会返回一个默认的 TryableSemaphoreNoOp.DEFAULT
,在进行 executionSemaphore.tryAcquire()
时始终返回true。
1if (executionSemaphore.tryAcquire()) { 2 try { 3 /* used to track userThreadExecutionTime */ 4 executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); 5 return executeCommandAndObserve(_cmd) 6 .doOnError(markExceptionThrown) 7 .doOnTerminate(singleSemaphoreRelease) 8 .doOnUnsubscribe(singleSemaphoreRelease); 9 } catch (RuntimeException e) { 10 return Observable.error(e); 11 } 12} else { 13 return handleSemaphoreRejectionViaFallback(); 14} 复制代码
④执行命令中的逻辑
通过重写 AbstractCommand
中的 getExecutionObservable()
方法使得下面两个命令类中的相应逻辑被调用。
- GenericCommand中的run()方法
- GenericObservableCommand中的construct()方法
如果 run
或者 construct
中设置了超时时间,如果执行时间超过了阈值,则会抛出 TimeoutException
,或者在执行过程中抛出其他异常,都会进入fallback中进行处理逻辑。
⑤发生异常后执行fallback
1 private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, 2 final HystrixEventType eventType, 3 final FailureType failureType, 4 final String message, 5 final Exception originalException) { 6} 复制代码
最终都会调用到这个方法,咱们看看 FailureType
具体有哪几种类型。
- COMMAND_EXCEPTION:执行
run
方法或者construct
方法抛出异常时。 - TIMEOUT:超时情况下。
- SHORTCIRCUIT:断路器直接打开时,直接执行
handleShortCircuitViaFallback
方法。 - REJECTED_THREAD_EXECUTION:线程池、请求队列被占满的情况下。
- REJECTED_SEMAPHORE_EXECUTION:信号量占满情况下。
- BAD_REQUEST_EXCEPTION:
- REJECTED_SEMAPHORE_FALLBACK:
总结
Hystrix
中大量用了 RxJava
,阅读源码看起来不免会觉得头晕,可以考虑在关键点打几个断点看看,不然各种回调会让你绕圈圈。不过个人觉得 RxJava
代码看起来还是蛮优美的,只不过有些许不适应而已,后面有时间会研究一下 RxJava
。
END
以上所述就是小编给大家介绍的《如何实现一个简单的熔断以及Hystrix原理分析》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Hystrix系列之熔断器实现原理
- 稳定性五件套:熔断的原理和实现
- 五千字长文详解Istio实践之熔断和限流工作原理
- 微服务分布式系统熔断实战-为何我们需要API级别熔断?
- 接口请求熔断处理机制
- 聊聊微服务的隔离和熔断
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。