内容简介:本文主要研究一下dubbo的ExecuteLimitFilterdubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ExecuteLimitFilter.javadubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcStatus.java
序
本文主要研究一下dubbo的ExecuteLimitFilter
ExecuteLimitFilter
dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ExecuteLimitFilter.java
public class ExecuteLimitFilter extends ListenableFilter { private static final String EXECUTELIMIT_FILTER_START_TIME = "execugtelimit_filter_start_time"; public ExecuteLimitFilter() { super.listener = new ExecuteLimitListener(); } @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); int max = url.getMethodParameter(methodName, EXECUTES_KEY, 0); if (!RpcStatus.beginCount(url, methodName, max)) { throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited."); } invocation.setAttachment(EXECUTELIMIT_FILTER_START_TIME, String.valueOf(System.currentTimeMillis())); try { return invoker.invoke(invocation); } catch (Throwable t) { if (t instanceof RuntimeException) { throw (RuntimeException) t; } else { throw new RpcException("unexpected exception when ExecuteLimitFilter", t); } } } static class ExecuteLimitListener implements Listener { @Override public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) { RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation), true); } @Override public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) { RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation), false); } private long getElapsed(Invocation invocation) { String beginTime = invocation.getAttachment(EXECUTELIMIT_FILTER_START_TIME); return StringUtils.isNotEmpty(beginTime) ? System.currentTimeMillis() - Long.parseLong(beginTime) : 0; } } }
- ExecuteLimitFilter继承了ListenableFilter,其构造器初始化的listener为ExecuteLimitListener
- invoke方法先调用RpcStatus.beginCount方法来判断是否可以通过,不通过则抛出RpcException,通过则记录开始执行的时间,然后执行invoker.invoke方法,执行结束时会回调Listener的onResponse或onError方法
- ExecuteLimitListener的onResponse及onError方法均会调用RpcStatus.endCount;而该方法会通过getElapsed方法取出execugtelimit_filter_start_time值,计算执行耗时
RpcStatus
dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcStatus.java
public class RpcStatus { private static final ConcurrentMap<String, RpcStatus> SERVICE_STATISTICS = new ConcurrentHashMap<String, RpcStatus>(); private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>(); private final ConcurrentMap<String, Object> values = new ConcurrentHashMap<String, Object>(); private final AtomicInteger active = new AtomicInteger(); private final AtomicLong total = new AtomicLong(); private final AtomicInteger failed = new AtomicInteger(); private final AtomicLong totalElapsed = new AtomicLong(); private final AtomicLong failedElapsed = new AtomicLong(); private final AtomicLong maxElapsed = new AtomicLong(); private final AtomicLong failedMaxElapsed = new AtomicLong(); private final AtomicLong succeededMaxElapsed = new AtomicLong(); //...... public static void beginCount(URL url, String methodName) { beginCount(url, methodName, Integer.MAX_VALUE); } /** * @param url */ public static boolean beginCount(URL url, String methodName, int max) { max = (max <= 0) ? Integer.MAX_VALUE : max; RpcStatus appStatus = getStatus(url); RpcStatus methodStatus = getStatus(url, methodName); if (methodStatus.active.incrementAndGet() > max) { methodStatus.active.decrementAndGet(); return false; } else { appStatus.active.incrementAndGet(); return true; } } /** * @param url * @param elapsed * @param succeeded */ public static void endCount(URL url, String methodName, long elapsed, boolean succeeded) { endCount(getStatus(url), elapsed, succeeded); endCount(getStatus(url, methodName), elapsed, succeeded); } private static void endCount(RpcStatus status, long elapsed, boolean succeeded) { status.active.decrementAndGet(); status.total.incrementAndGet(); status.totalElapsed.addAndGet(elapsed); if (status.maxElapsed.get() < elapsed) { status.maxElapsed.set(elapsed); } if (succeeded) { if (status.succeededMaxElapsed.get() < elapsed) { status.succeededMaxElapsed.set(elapsed); } } else { status.failed.incrementAndGet(); status.failedElapsed.addAndGet(elapsed); if (status.failedMaxElapsed.get() < elapsed) { status.failedMaxElapsed.set(elapsed); } } } //...... }
- RpcStatus的beginCount方法会递增methodStatus.active,然后判断是否大于max值,超出则返回false并递减methodStatus.active;小于等于则递增appStatus.active;endCount方法会递减status.active,递增status.total,然后根据成功与否更新status.succeededMaxElapsed或status.failed、status.failedElapsed、status.failedMaxElapsed
小结
- ExecuteLimitFilter继承了ListenableFilter,其构造器初始化的listener为ExecuteLimitListener
- ExecuteLimitFilter的invoke方法先调用RpcStatus.beginCount方法来判断是否可以通过,不通过则抛出RpcException,通过则记录开始执行的时间,然后执行invoker.invoke方法,执行结束时会回调Listener的onResponse或onError方法
- ExecuteLimitListener的onResponse及onError方法均会调用RpcStatus.endCount;而该方法会通过getElapsed方法取出execugtelimit_filter_start_time值,计算执行耗时
doc
以上所述就是小编给大家介绍的《聊聊dubbo的ExecuteLimitFilter》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
C++ Primer 中文版(第 4 版)
Stanley B.Lippman、Josée LaJoie、Barbara E.Moo / 李师贤、蒋爱军、梅晓勇、林瑛 / 人民邮电出版社 / 2006 / 99.00元
本书是久负盛名的C++经典教程,其内容是C++大师Stanley B. Lippman丰富的实践经验和C++标准委员会原负责人Josée Lajoie对C++标准深入理解的完美结合,已经帮助全球无数程序员学会了C++。本版对前一版进行了彻底的修订,内容经过了重新组织,更加入了C++ 先驱Barbara E. Moo在C++教学方面的真知灼见。既显著改善了可读性,又充分体现了C++语言的最新进展和当......一起来看看 《C++ Primer 中文版(第 4 版)》 这本书的介绍吧!