聊聊dubbo的Filter

栏目: Java · 发布时间: 5年前

内容简介:本文主要研究一下dubbo的Filterdubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.javadubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java

本文主要研究一下dubbo的Filter

Filter

dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java

@SPI
public interface Filter {
    /**
     * Does not need to override/implement this method.
     */
    Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;

    /**
     * Filter itself should only be response for passing invocation, all callbacks has been placed into {@link Listener}
     *
     * @param appResponse
     * @param invoker
     * @param invocation
     * @return
     */
    @Deprecated
    default Result onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
        return appResponse;
    }

    interface Listener {

        void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);

        void onError(Throwable t, Invoker<?> invoker, Invocation invocation);
    }

}
  • Filter定义了invoke、onResponse方法,另外还定义了Listener接口,该接口定义了onResponse、onError方法

ProtocolFilterWrapper

dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java

public class ProtocolFilterWrapper implements Protocol {

    private final Protocol protocol;

    public ProtocolFilterWrapper(Protocol protocol) {
        if (protocol == null) {
            throw new IllegalArgumentException("protocol == null");
        }
        this.protocol = protocol;
    }



    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);

        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {

                    @Override
                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }

                    @Override
                    public URL getUrl() {
                        return invoker.getUrl();
                    }

                    @Override
                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }

                    @Override
                    public Result invoke(Invocation invocation) throws RpcException {
                        Result asyncResult;
                        try {
                            asyncResult = filter.invoke(next, invocation);
                        } catch (Exception e) {
                            // onError callback
                            if (filter instanceof ListenableFilter) {
                                Filter.Listener listener = ((ListenableFilter) filter).listener();
                                if (listener != null) {
                                    listener.onError(e, invoker, invocation);
                                }
                            }
                            throw e;
                        }
                        return asyncResult;
                    }

                    @Override
                    public void destroy() {
                        invoker.destroy();
                    }

                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }

        return new CallbackRegistrationInvoker<>(last, filters);
    }

    @Override
    public int getDefaultPort() {
        return protocol.getDefaultPort();
    }

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
    }

    @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            return protocol.refer(type, url);
        }
        return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
    }

    @Override
    public void destroy() {
        protocol.destroy();
    }

    static class CallbackRegistrationInvoker<T> implements Invoker<T> {

        private final Invoker<T> filterInvoker;
        private final List<Filter> filters;

        public CallbackRegistrationInvoker(Invoker<T> filterInvoker, List<Filter> filters) {
            this.filterInvoker = filterInvoker;
            this.filters = filters;
        }

        @Override
        public Result invoke(Invocation invocation) throws RpcException {
            Result asyncResult = filterInvoker.invoke(invocation);

            asyncResult.thenApplyWithContext(r -> {
                for (int i = filters.size() - 1; i >= 0; i--) {
                    Filter filter = filters.get(i);
                    // onResponse callback
                    if (filter instanceof ListenableFilter) {
                        Filter.Listener listener = ((ListenableFilter) filter).listener();
                        if (listener != null) {
                            listener.onResponse(r, filterInvoker, invocation);
                        }
                    } else {
                        filter.onResponse(r, filterInvoker, invocation);
                    }
                }
                return r;
            });

            return asyncResult;
        }

        @Override
        public Class<T> getInterface() {
            return filterInvoker.getInterface();
        }

        @Override
        public URL getUrl() {
            return filterInvoker.getUrl();
        }

        @Override
        public boolean isAvailable() {
            return filterInvoker.isAvailable();
        }

        @Override
        public void destroy() {
            filterInvoker.destroy();
        }
    }
}
  • ProtocolFilterWrapper实现了Protocol接口,它定义了一个静态类CallbackRegistrationInvoker,该类实现了Invoker接口,其invoke方法首先会调用filterInvoker的invoke方法获取asyncResult,之后通过thenApplyWithContext注册rpc调用完成时的回调,这里会挨个遍历filters,回调每个filter的onResponse方法

小结

替代原来的RpcResult

doc


以上所述就是小编给大家介绍的《聊聊dubbo的Filter》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

疯狂的独角兽

疯狂的独角兽

丹·莱昂斯 / 王天任 / 海南出版社 / 2017-10 / 42

★商业与文学的有机结合,真实与虚幻间嬉笑怒骂,幽默风趣、引人入胜、发人深省的商业小说。 ★《纽约时报》《华尔街日报》《旧金山纪事报》Amazon畅销书,《财富》《纽约邮报》《新闻周刊》《华盛顿邮报》、畅销书《硅谷钢铁侠》作者阿什利·万斯、畅销书《一网打尽》作者布拉德·斯通联袂推荐。 ★作者丹·莱昂斯集小说家、记者、编剧为一身——HBO经典热门剧、豆瓣高分美剧《硅谷》作者;畅销书《乔布斯......一起来看看 《疯狂的独角兽》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具