内容简介:本文主要研究一下dubbo的ExecuteLimitFilterdubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ExecuteLimitFilter.javadubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcStatus.java
序
本文主要研究一下dubbo的ExecuteLimitFilter
ExecuteLimitFilter
dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ExecuteLimitFilter.java
public class ExecuteLimitFilter extends ListenableFilter {
private static final String EXECUTELIMIT_FILTER_START_TIME = "execugtelimit_filter_start_time";
public ExecuteLimitFilter() {
super.listener = new ExecuteLimitListener();
}
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
int max = url.getMethodParameter(methodName, EXECUTES_KEY, 0);
if (!RpcStatus.beginCount(url, methodName, max)) {
throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " +
url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max +
"\" /> limited.");
}
invocation.setAttachment(EXECUTELIMIT_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));
try {
return invoker.invoke(invocation);
} catch (Throwable t) {
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
}
}
}
static class ExecuteLimitListener implements Listener {
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation), true);
}
@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation), false);
}
private long getElapsed(Invocation invocation) {
String beginTime = invocation.getAttachment(EXECUTELIMIT_FILTER_START_TIME);
return StringUtils.isNotEmpty(beginTime) ? System.currentTimeMillis() - Long.parseLong(beginTime) : 0;
}
}
}
- ExecuteLimitFilter继承了ListenableFilter,其构造器初始化的listener为ExecuteLimitListener
- invoke方法先调用RpcStatus.beginCount方法来判断是否可以通过,不通过则抛出RpcException,通过则记录开始执行的时间,然后执行invoker.invoke方法,执行结束时会回调Listener的onResponse或onError方法
- ExecuteLimitListener的onResponse及onError方法均会调用RpcStatus.endCount;而该方法会通过getElapsed方法取出execugtelimit_filter_start_time值,计算执行耗时
RpcStatus
dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcStatus.java
public class RpcStatus {
private static final ConcurrentMap<String, RpcStatus> SERVICE_STATISTICS = new ConcurrentHashMap<String, RpcStatus>();
private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>();
private final ConcurrentMap<String, Object> values = new ConcurrentHashMap<String, Object>();
private final AtomicInteger active = new AtomicInteger();
private final AtomicLong total = new AtomicLong();
private final AtomicInteger failed = new AtomicInteger();
private final AtomicLong totalElapsed = new AtomicLong();
private final AtomicLong failedElapsed = new AtomicLong();
private final AtomicLong maxElapsed = new AtomicLong();
private final AtomicLong failedMaxElapsed = new AtomicLong();
private final AtomicLong succeededMaxElapsed = new AtomicLong();
//......
public static void beginCount(URL url, String methodName) {
beginCount(url, methodName, Integer.MAX_VALUE);
}
/**
* @param url
*/
public static boolean beginCount(URL url, String methodName, int max) {
max = (max <= 0) ? Integer.MAX_VALUE : max;
RpcStatus appStatus = getStatus(url);
RpcStatus methodStatus = getStatus(url, methodName);
if (methodStatus.active.incrementAndGet() > max) {
methodStatus.active.decrementAndGet();
return false;
} else {
appStatus.active.incrementAndGet();
return true;
}
}
/**
* @param url
* @param elapsed
* @param succeeded
*/
public static void endCount(URL url, String methodName, long elapsed, boolean succeeded) {
endCount(getStatus(url), elapsed, succeeded);
endCount(getStatus(url, methodName), elapsed, succeeded);
}
private static void endCount(RpcStatus status, long elapsed, boolean succeeded) {
status.active.decrementAndGet();
status.total.incrementAndGet();
status.totalElapsed.addAndGet(elapsed);
if (status.maxElapsed.get() < elapsed) {
status.maxElapsed.set(elapsed);
}
if (succeeded) {
if (status.succeededMaxElapsed.get() < elapsed) {
status.succeededMaxElapsed.set(elapsed);
}
} else {
status.failed.incrementAndGet();
status.failedElapsed.addAndGet(elapsed);
if (status.failedMaxElapsed.get() < elapsed) {
status.failedMaxElapsed.set(elapsed);
}
}
}
//......
}
- RpcStatus的beginCount方法会递增methodStatus.active,然后判断是否大于max值,超出则返回false并递减methodStatus.active;小于等于则递增appStatus.active;endCount方法会递减status.active,递增status.total,然后根据成功与否更新status.succeededMaxElapsed或status.failed、status.failedElapsed、status.failedMaxElapsed
小结
- ExecuteLimitFilter继承了ListenableFilter,其构造器初始化的listener为ExecuteLimitListener
- ExecuteLimitFilter的invoke方法先调用RpcStatus.beginCount方法来判断是否可以通过,不通过则抛出RpcException,通过则记录开始执行的时间,然后执行invoker.invoke方法,执行结束时会回调Listener的onResponse或onError方法
- ExecuteLimitListener的onResponse及onError方法均会调用RpcStatus.endCount;而该方法会通过getElapsed方法取出execugtelimit_filter_start_time值,计算执行耗时
doc
以上所述就是小编给大家介绍的《聊聊dubbo的ExecuteLimitFilter》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Unix/Linux编程实践教程
Bruce Molay、杨宗源、黄海涛 / 杨宗源、黄海涛 / 清华大学出版社 / 2004-10-1 / 56.00元
操作系统是计算机最重要的系统软件。Unix操作系统历经了几十年,至今仍是主流的操作系统。本书通过解释Unix的工作原理,循序渐进地讲解实现Unix中系统命令的方法,让读者理解并逐步精通Unix系统编程,进而具有编制Unix应用程序的能力。书中采用启发式、举一反三、图示讲解等多种方法讲授,语言生动、结构合理、易于理解。每一章后均附有大量的习题和编程练习,以供参考。 本书适合作为高等院校计算机及......一起来看看 《Unix/Linux编程实践教程》 这本书的介绍吧!