聊聊dubbo的MonitorFilter

栏目: Java · 发布时间: 6年前

内容简介:本文主要研究一下dubbo的MonitorFilterdubbo-2.7.2/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.javadubbo-2.7.2/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.java

本文主要研究一下dubbo的MonitorFilter

MonitorFilter

dubbo-2.7.2/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.java

@Activate(group = {PROVIDER, CONSUMER})
public class MonitorFilter extends ListenableFilter {

    private static final Logger logger = LoggerFactory.getLogger(MonitorFilter.class);
    private static final String MONITOR_FILTER_START_TIME = "monitor_filter_start_time";

    public MonitorFilter() {
        super.listener = new MonitorListener();
    }
    /**
     * The Concurrent counter
     */
    private final ConcurrentMap<String, AtomicInteger> concurrents = new ConcurrentHashMap<String, AtomicInteger>();

    /**
     * The MonitorFactory
     */
    private MonitorFactory monitorFactory;

    public void setMonitorFactory(MonitorFactory monitorFactory) {
        this.monitorFactory = monitorFactory;
    }


    /**
     * The invocation interceptor,it will collect the invoke data about this invocation and send it to monitor center
     *
     * @param invoker    service
     * @param invocation invocation.
     * @return {@link Result} the invoke result
     * @throws RpcException
     */
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
            invocation.setAttachment(MONITOR_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));
            getConcurrent(invoker, invocation).incrementAndGet(); // count up
        }
        return invoker.invoke(invocation); // proceed invocation chain
    }

    // concurrent counter
    private AtomicInteger getConcurrent(Invoker<?> invoker, Invocation invocation) {
        String key = invoker.getInterface().getName() + "." + invocation.getMethodName();
        AtomicInteger concurrent = concurrents.get(key);
        if (concurrent == null) {
            concurrents.putIfAbsent(key, new AtomicInteger());
            concurrent = concurrents.get(key);
        }
        return concurrent;
    }

    //......
}
  • MonitorFilter继承了ListenableFilter,其invoke方法在invoker的URL中包含有monitor参数时会给invocation设置monitor_filter_start_time的attachment,然后递增当前并发的次数;其创建的listener为MonitorListener

MonitorListener

dubbo-2.7.2/dubbo-monitor/dubbo-monitor-api/src/main/java/org/apache/dubbo/monitor/support/MonitorFilter.java

class MonitorListener implements Listener {

        @Override
        public void onResponse(Result result, Invoker<?> invoker, Invocation invocation) {
            if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
                collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), false);
                getConcurrent(invoker, invocation).decrementAndGet(); // count down
            }
        }

        @Override
        public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
            if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
                collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), true);
                getConcurrent(invoker, invocation).decrementAndGet(); // count down
            }
        }

        /**
         * The collector logic, it will be handled by the default monitor
         *
         * @param invoker
         * @param invocation
         * @param result     the invoke result
         * @param remoteHost the remote host address
         * @param start      the timestamp the invoke begin
         * @param error      if there is an error on the invoke
         */
        private void collect(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
            try {
                URL monitorUrl = invoker.getUrl().getUrlParameter(MONITOR_KEY);
                Monitor monitor = monitorFactory.getMonitor(monitorUrl);
                if (monitor == null) {
                    return;
                }
                URL statisticsURL = createStatisticsUrl(invoker, invocation, result, remoteHost, start, error);
                monitor.collect(statisticsURL);
            } catch (Throwable t) {
                logger.warn("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t);
            }
        }

        /**
         * Create statistics url
         *
         * @param invoker
         * @param invocation
         * @param result
         * @param remoteHost
         * @param start
         * @param error
         * @return
         */
        private URL createStatisticsUrl(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
            // ---- service statistics ----
            long elapsed = System.currentTimeMillis() - start; // invocation cost
            int concurrent = getConcurrent(invoker, invocation).get(); // current concurrent count
            String application = invoker.getUrl().getParameter(APPLICATION_KEY);
            String service = invoker.getInterface().getName(); // service name
            String method = RpcUtils.getMethodName(invocation); // method name
            String group = invoker.getUrl().getParameter(GROUP_KEY);
            String version = invoker.getUrl().getParameter(VERSION_KEY);

            int localPort;
            String remoteKey, remoteValue;
            if (CONSUMER_SIDE.equals(invoker.getUrl().getParameter(SIDE_KEY))) {
                // ---- for service consumer ----
                localPort = 0;
                remoteKey = MonitorService.PROVIDER;
                remoteValue = invoker.getUrl().getAddress();
            } else {
                // ---- for service provider ----
                localPort = invoker.getUrl().getPort();
                remoteKey = MonitorService.CONSUMER;
                remoteValue = remoteHost;
            }
            String input = "", output = "";
            if (invocation.getAttachment(INPUT_KEY) != null) {
                input = invocation.getAttachment(INPUT_KEY);
            }
            if (result != null && result.getAttachment(OUTPUT_KEY) != null) {
                output = result.getAttachment(OUTPUT_KEY);
            }

            return new URL(COUNT_PROTOCOL, NetUtils.getLocalHost(), localPort, service + PATH_SEPARATOR + method, MonitorService.APPLICATION, application, MonitorService.INTERFACE, service, MonitorService.METHOD, method, remoteKey, remoteValue, error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1", MonitorService.ELAPSED, String.valueOf(elapsed), MonitorService.CONCURRENT, String.valueOf(concurrent), INPUT_KEY, input, OUTPUT_KEY, output, GROUP_KEY, group, VERSION_KEY, version);
        }

    }
  • MonitorListener实现了Listener接口,其onResponse及onError方法在invoker的URL中包含有monitor参数时会上报指标,然后递减并发次数

实例

dubbo-2.7.2/dubbo-monitor/dubbo-monitor-api/src/test/java/org/apache/dubbo/monitor/support/MonitorFilterTest.java

public class MonitorFilterTest {

    private volatile URL lastStatistics;

    private volatile Invocation lastInvocation;

    private final Invoker<MonitorService> serviceInvoker = new Invoker<MonitorService>() {
        @Override
        public Class<MonitorService> getInterface() {
            return MonitorService.class;
        }

        public URL getUrl() {
            try {
                return URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":20880?" + APPLICATION_KEY + "=abc&" + SIDE_KEY + "=" + CONSUMER_SIDE + "&" + MONITOR_KEY + "=" + URLEncoder.encode("dubbo://" + NetUtils.getLocalHost() + ":7070", "UTF-8"));
            } catch (UnsupportedEncodingException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
        }

        @Override
        public boolean isAvailable() {
            return false;
        }

        public Result invoke(Invocation invocation) throws RpcException {
            lastInvocation = invocation;
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        }

        @Override
        public void destroy() {
        }
    };

    private MonitorFactory monitorFactory = new MonitorFactory() {
        @Override
        public Monitor getMonitor(final URL url) {
            return new Monitor() {
                public URL getUrl() {
                    return url;
                }

                @Override
                public boolean isAvailable() {
                    return true;
                }

                @Override
                public void destroy() {
                }

                public void collect(URL statistics) {
                    MonitorFilterTest.this.lastStatistics = statistics;
                }

                public List<URL> lookup(URL query) {
                    return Arrays.asList(MonitorFilterTest.this.lastStatistics);
                }
            };
        }
    };

    @Test
    public void testFilter() throws Exception {
        MonitorFilter monitorFilter = new MonitorFilter();
        monitorFilter.setMonitorFactory(monitorFactory);
        Invocation invocation = new RpcInvocation("aaa", new Class<?>[0], new Object[0]);
        RpcContext.getContext().setRemoteAddress(NetUtils.getLocalHost(), 20880).setLocalAddress(NetUtils.getLocalHost(), 2345);
        Result result = monitorFilter.invoke(serviceInvoker, invocation);
        result.thenApplyWithContext((r) -> {
            monitorFilter.listener().onResponse(r, serviceInvoker, invocation);
            return r;
        });
        while (lastStatistics == null) {
            Thread.sleep(10);
        }
        Assertions.assertEquals("abc", lastStatistics.getParameter(MonitorService.APPLICATION));
        Assertions.assertEquals(MonitorService.class.getName(), lastStatistics.getParameter(MonitorService.INTERFACE));
        Assertions.assertEquals("aaa", lastStatistics.getParameter(MonitorService.METHOD));
        Assertions.assertEquals(NetUtils.getLocalHost() + ":20880", lastStatistics.getParameter(MonitorService.PROVIDER));
        Assertions.assertEquals(NetUtils.getLocalHost(), lastStatistics.getAddress());
        Assertions.assertEquals(null, lastStatistics.getParameter(MonitorService.CONSUMER));
        Assertions.assertEquals(1, lastStatistics.getParameter(MonitorService.SUCCESS, 0));
        Assertions.assertEquals(0, lastStatistics.getParameter(MonitorService.FAILURE, 0));
        Assertions.assertEquals(1, lastStatistics.getParameter(MonitorService.CONCURRENT, 0));
        Assertions.assertEquals(invocation, lastInvocation);
    }

    @Test
    public void testSkipMonitorIfNotHasKey() {
        MonitorFilter monitorFilter = new MonitorFilter();
        MonitorFactory mockMonitorFactory = mock(MonitorFactory.class);
        monitorFilter.setMonitorFactory(mockMonitorFactory);
        Invocation invocation = new RpcInvocation("aaa", new Class<?>[0], new Object[0]);
        Invoker invoker = mock(Invoker.class);
        given(invoker.getUrl()).willReturn(URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":20880?" + APPLICATION_KEY + "=abc&" + SIDE_KEY + "=" + CONSUMER_SIDE));

        monitorFilter.invoke(invoker, invocation);

        verify(mockMonitorFactory, never()).getMonitor(any(URL.class));
    }

    @Test
    public void testGenericFilter() throws Exception {
        MonitorFilter monitorFilter = new MonitorFilter();
        monitorFilter.setMonitorFactory(monitorFactory);
        Invocation invocation = new RpcInvocation("$invoke", new Class<?>[]{String.class, String[].class, Object[].class}, new Object[]{"xxx", new String[]{}, new Object[]{}});
        RpcContext.getContext().setRemoteAddress(NetUtils.getLocalHost(), 20880).setLocalAddress(NetUtils.getLocalHost(), 2345);
        Result result = monitorFilter.invoke(serviceInvoker, invocation);
        result.thenApplyWithContext((r) -> {
            monitorFilter.listener().onResponse(r, serviceInvoker, invocation);
            return r;
        });
        while (lastStatistics == null) {
            Thread.sleep(10);
        }
        Assertions.assertEquals("abc", lastStatistics.getParameter(MonitorService.APPLICATION));
        Assertions.assertEquals(MonitorService.class.getName(), lastStatistics.getParameter(MonitorService.INTERFACE));
        Assertions.assertEquals("xxx", lastStatistics.getParameter(MonitorService.METHOD));
        Assertions.assertEquals(NetUtils.getLocalHost() + ":20880", lastStatistics.getParameter(MonitorService.PROVIDER));
        Assertions.assertEquals(NetUtils.getLocalHost(), lastStatistics.getAddress());
        Assertions.assertEquals(null, lastStatistics.getParameter(MonitorService.CONSUMER));
        Assertions.assertEquals(1, lastStatistics.getParameter(MonitorService.SUCCESS, 0));
        Assertions.assertEquals(0, lastStatistics.getParameter(MonitorService.FAILURE, 0));
        Assertions.assertEquals(1, lastStatistics.getParameter(MonitorService.CONCURRENT, 0));
        Assertions.assertEquals(invocation, lastInvocation);
    }

    @Test
    public void testSafeFailForMonitorCollectFail() {
        MonitorFilter monitorFilter = new MonitorFilter();
        MonitorFactory mockMonitorFactory = mock(MonitorFactory.class);
        Monitor mockMonitor = mock(Monitor.class);
        Mockito.doThrow(new RuntimeException()).when(mockMonitor).collect(any(URL.class));

        monitorFilter.setMonitorFactory(mockMonitorFactory);
        given(mockMonitorFactory.getMonitor(any(URL.class))).willReturn(mockMonitor);
        Invocation invocation = new RpcInvocation("aaa", new Class<?>[0], new Object[0]);

        monitorFilter.invoke(serviceInvoker, invocation);
    }
}
  • MonitorFilterTest验证了testFilter、testSkipMonitorIfNotHasKey、testGenericFilter、testSafeFailForMonitorCollectFail这几个场景

小结

MonitorFilter继承了ListenableFilter,其invoke方法在invoker的URL中包含有monitor参数时会给invocation设置monitor_filter_start_time的attachment,然后递增当前并发的次数;其创建的listener为MonitorListener;MonitorListener实现了Listener接口,其onResponse及onError方法在invoker的URL中包含有monitor参数时会上报指标,然后递减并发次数

doc


以上所述就是小编给大家介绍的《聊聊dubbo的MonitorFilter》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Programming Python

Programming Python

Mark Lutz / O'Reilly Media / 2006-8-30 / USD 59.99

Already the industry standard for Python users, "Programming Python" from O'Reilly just got even better. This third edition has been updated to reflect current best practices and the abundance of chan......一起来看看 《Programming Python》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

MD5 加密
MD5 加密

MD5 加密工具

HEX CMYK 转换工具
HEX CMYK 转换工具

HEX CMYK 互转工具