内容简介:本文主要研究一下dubbo的MonitorFilterdubbo-2.7.2/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.javadubbo-2.7.2/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.java
序
本文主要研究一下dubbo的MonitorFilter
MonitorFilter
dubbo-2.7.2/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.java
@Activate(group = {PROVIDER, CONSUMER})
public class MonitorFilter extends ListenableFilter {
private static final Logger logger = LoggerFactory.getLogger(MonitorFilter.class);
private static final String MONITOR_FILTER_START_TIME = "monitor_filter_start_time";
public MonitorFilter() {
super.listener = new MonitorListener();
}
/**
* The Concurrent counter
*/
private final ConcurrentMap<String, AtomicInteger> concurrents = new ConcurrentHashMap<String, AtomicInteger>();
/**
* The MonitorFactory
*/
private MonitorFactory monitorFactory;
public void setMonitorFactory(MonitorFactory monitorFactory) {
this.monitorFactory = monitorFactory;
}
/**
* The invocation interceptor,it will collect the invoke data about this invocation and send it to monitor center
*
* @param invoker service
* @param invocation invocation.
* @return {@link Result} the invoke result
* @throws RpcException
*/
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
invocation.setAttachment(MONITOR_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));
getConcurrent(invoker, invocation).incrementAndGet(); // count up
}
return invoker.invoke(invocation); // proceed invocation chain
}
// concurrent counter
private AtomicInteger getConcurrent(Invoker<?> invoker, Invocation invocation) {
String key = invoker.getInterface().getName() + "." + invocation.getMethodName();
AtomicInteger concurrent = concurrents.get(key);
if (concurrent == null) {
concurrents.putIfAbsent(key, new AtomicInteger());
concurrent = concurrents.get(key);
}
return concurrent;
}
//......
}
- MonitorFilter继承了ListenableFilter,其invoke方法在invoker的URL中包含有monitor参数时会给invocation设置monitor_filter_start_time的attachment,然后递增当前并发的次数;其创建的listener为MonitorListener
MonitorListener
dubbo-2.7.2/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.java
class MonitorListener implements Listener {
@Override
public void onResponse(Result result, Invoker<?> invoker, Invocation invocation) {
if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), false);
getConcurrent(invoker, invocation).decrementAndGet(); // count down
}
}
@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), true);
getConcurrent(invoker, invocation).decrementAndGet(); // count down
}
}
/**
* The collector logic, it will be handled by the default monitor
*
* @param invoker
* @param invocation
* @param result the invoke result
* @param remoteHost the remote host address
* @param start the timestamp the invoke begin
* @param error if there is an error on the invoke
*/
private void collect(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
try {
URL monitorUrl = invoker.getUrl().getUrlParameter(MONITOR_KEY);
Monitor monitor = monitorFactory.getMonitor(monitorUrl);
if (monitor == null) {
return;
}
URL statisticsURL = createStatisticsUrl(invoker, invocation, result, remoteHost, start, error);
monitor.collect(statisticsURL);
} catch (Throwable t) {
logger.warn("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t);
}
}
/**
* Create statistics url
*
* @param invoker
* @param invocation
* @param result
* @param remoteHost
* @param start
* @param error
* @return
*/
private URL createStatisticsUrl(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
// ---- service statistics ----
long elapsed = System.currentTimeMillis() - start; // invocation cost
int concurrent = getConcurrent(invoker, invocation).get(); // current concurrent count
String application = invoker.getUrl().getParameter(APPLICATION_KEY);
String service = invoker.getInterface().getName(); // service name
String method = RpcUtils.getMethodName(invocation); // method name
String group = invoker.getUrl().getParameter(GROUP_KEY);
String version = invoker.getUrl().getParameter(VERSION_KEY);
int localPort;
String remoteKey, remoteValue;
if (CONSUMER_SIDE.equals(invoker.getUrl().getParameter(SIDE_KEY))) {
// ---- for service consumer ----
localPort = 0;
remoteKey = MonitorService.PROVIDER;
remoteValue = invoker.getUrl().getAddress();
} else {
// ---- for service provider ----
localPort = invoker.getUrl().getPort();
remoteKey = MonitorService.CONSUMER;
remoteValue = remoteHost;
}
String input = "", output = "";
if (invocation.getAttachment(INPUT_KEY) != null) {
input = invocation.getAttachment(INPUT_KEY);
}
if (result != null && result.getAttachment(OUTPUT_KEY) != null) {
output = result.getAttachment(OUTPUT_KEY);
}
return new URL(COUNT_PROTOCOL, NetUtils.getLocalHost(), localPort, service + PATH_SEPARATOR + method, MonitorService.APPLICATION, application, MonitorService.INTERFACE, service, MonitorService.METHOD, method, remoteKey, remoteValue, error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1", MonitorService.ELAPSED, String.valueOf(elapsed), MonitorService.CONCURRENT, String.valueOf(concurrent), INPUT_KEY, input, OUTPUT_KEY, output, GROUP_KEY, group, VERSION_KEY, version);
}
}
- MonitorListener实现了Listener接口,其onResponse及onError方法在invoker的URL中包含有monitor参数时会上报指标,然后递减并发次数
实例
dubbo-2.7.2/dubbo-monitor/dubbo-monitor-api/src/test/java/org/apache/dubbo/monitor/support/MonitorFilterTest.java
public class MonitorFilterTest {
private volatile URL lastStatistics;
private volatile Invocation lastInvocation;
private final Invoker<MonitorService> serviceInvoker = new Invoker<MonitorService>() {
@Override
public Class<MonitorService> getInterface() {
return MonitorService.class;
}
public URL getUrl() {
try {
return URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":20880?" + APPLICATION_KEY + "=abc&" + SIDE_KEY + "=" + CONSUMER_SIDE + "&" + MONITOR_KEY + "=" + URLEncoder.encode("dubbo://" + NetUtils.getLocalHost() + ":7070", "UTF-8"));
} catch (UnsupportedEncodingException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
public boolean isAvailable() {
return false;
}
public Result invoke(Invocation invocation) throws RpcException {
lastInvocation = invocation;
return AsyncRpcResult.newDefaultAsyncResult(invocation);
}
@Override
public void destroy() {
}
};
private MonitorFactory monitorFactory = new MonitorFactory() {
@Override
public Monitor getMonitor(final URL url) {
return new Monitor() {
public URL getUrl() {
return url;
}
@Override
public boolean isAvailable() {
return true;
}
@Override
public void destroy() {
}
public void collect(URL statistics) {
MonitorFilterTest.this.lastStatistics = statistics;
}
public List<URL> lookup(URL query) {
return Arrays.asList(MonitorFilterTest.this.lastStatistics);
}
};
}
};
@Test
public void testFilter() throws Exception {
MonitorFilter monitorFilter = new MonitorFilter();
monitorFilter.setMonitorFactory(monitorFactory);
Invocation invocation = new RpcInvocation("aaa", new Class<?>[0], new Object[0]);
RpcContext.getContext().setRemoteAddress(NetUtils.getLocalHost(), 20880).setLocalAddress(NetUtils.getLocalHost(), 2345);
Result result = monitorFilter.invoke(serviceInvoker, invocation);
result.thenApplyWithContext((r) -> {
monitorFilter.listener().onResponse(r, serviceInvoker, invocation);
return r;
});
while (lastStatistics == null) {
Thread.sleep(10);
}
Assertions.assertEquals("abc", lastStatistics.getParameter(MonitorService.APPLICATION));
Assertions.assertEquals(MonitorService.class.getName(), lastStatistics.getParameter(MonitorService.INTERFACE));
Assertions.assertEquals("aaa", lastStatistics.getParameter(MonitorService.METHOD));
Assertions.assertEquals(NetUtils.getLocalHost() + ":20880", lastStatistics.getParameter(MonitorService.PROVIDER));
Assertions.assertEquals(NetUtils.getLocalHost(), lastStatistics.getAddress());
Assertions.assertEquals(null, lastStatistics.getParameter(MonitorService.CONSUMER));
Assertions.assertEquals(1, lastStatistics.getParameter(MonitorService.SUCCESS, 0));
Assertions.assertEquals(0, lastStatistics.getParameter(MonitorService.FAILURE, 0));
Assertions.assertEquals(1, lastStatistics.getParameter(MonitorService.CONCURRENT, 0));
Assertions.assertEquals(invocation, lastInvocation);
}
@Test
public void testSkipMonitorIfNotHasKey() {
MonitorFilter monitorFilter = new MonitorFilter();
MonitorFactory mockMonitorFactory = mock(MonitorFactory.class);
monitorFilter.setMonitorFactory(mockMonitorFactory);
Invocation invocation = new RpcInvocation("aaa", new Class<?>[0], new Object[0]);
Invoker invoker = mock(Invoker.class);
given(invoker.getUrl()).willReturn(URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":20880?" + APPLICATION_KEY + "=abc&" + SIDE_KEY + "=" + CONSUMER_SIDE));
monitorFilter.invoke(invoker, invocation);
verify(mockMonitorFactory, never()).getMonitor(any(URL.class));
}
@Test
public void testGenericFilter() throws Exception {
MonitorFilter monitorFilter = new MonitorFilter();
monitorFilter.setMonitorFactory(monitorFactory);
Invocation invocation = new RpcInvocation("$invoke", new Class<?>[]{String.class, String[].class, Object[].class}, new Object[]{"xxx", new String[]{}, new Object[]{}});
RpcContext.getContext().setRemoteAddress(NetUtils.getLocalHost(), 20880).setLocalAddress(NetUtils.getLocalHost(), 2345);
Result result = monitorFilter.invoke(serviceInvoker, invocation);
result.thenApplyWithContext((r) -> {
monitorFilter.listener().onResponse(r, serviceInvoker, invocation);
return r;
});
while (lastStatistics == null) {
Thread.sleep(10);
}
Assertions.assertEquals("abc", lastStatistics.getParameter(MonitorService.APPLICATION));
Assertions.assertEquals(MonitorService.class.getName(), lastStatistics.getParameter(MonitorService.INTERFACE));
Assertions.assertEquals("xxx", lastStatistics.getParameter(MonitorService.METHOD));
Assertions.assertEquals(NetUtils.getLocalHost() + ":20880", lastStatistics.getParameter(MonitorService.PROVIDER));
Assertions.assertEquals(NetUtils.getLocalHost(), lastStatistics.getAddress());
Assertions.assertEquals(null, lastStatistics.getParameter(MonitorService.CONSUMER));
Assertions.assertEquals(1, lastStatistics.getParameter(MonitorService.SUCCESS, 0));
Assertions.assertEquals(0, lastStatistics.getParameter(MonitorService.FAILURE, 0));
Assertions.assertEquals(1, lastStatistics.getParameter(MonitorService.CONCURRENT, 0));
Assertions.assertEquals(invocation, lastInvocation);
}
@Test
public void testSafeFailForMonitorCollectFail() {
MonitorFilter monitorFilter = new MonitorFilter();
MonitorFactory mockMonitorFactory = mock(MonitorFactory.class);
Monitor mockMonitor = mock(Monitor.class);
Mockito.doThrow(new RuntimeException()).when(mockMonitor).collect(any(URL.class));
monitorFilter.setMonitorFactory(mockMonitorFactory);
given(mockMonitorFactory.getMonitor(any(URL.class))).willReturn(mockMonitor);
Invocation invocation = new RpcInvocation("aaa", new Class<?>[0], new Object[0]);
monitorFilter.invoke(serviceInvoker, invocation);
}
}
- MonitorFilterTest验证了testFilter、testSkipMonitorIfNotHasKey、testGenericFilter、testSafeFailForMonitorCollectFail这几个场景
小结
MonitorFilter继承了ListenableFilter,其invoke方法在invoker的URL中包含有monitor参数时会给invocation设置monitor_filter_start_time的attachment,然后递增当前并发的次数;其创建的listener为MonitorListener;MonitorListener实现了Listener接口,其onResponse及onError方法在invoker的URL中包含有monitor参数时会上报指标,然后递减并发次数
doc
以上所述就是小编给大家介绍的《聊聊dubbo的MonitorFilter》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Programming Python
Mark Lutz / O'Reilly Media / 2006-8-30 / USD 59.99
Already the industry standard for Python users, "Programming Python" from O'Reilly just got even better. This third edition has been updated to reflect current best practices and the abundance of chan......一起来看看 《Programming Python》 这本书的介绍吧!