内容简介:本文主要研究一下dubbo的Filterdubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.javadubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java
序
本文主要研究一下dubbo的Filter
Filter
dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Filter.java
@SPI public interface Filter { /** * Does not need to override/implement this method. */ Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException; /** * Filter itself should only be response for passing invocation, all callbacks has been placed into {@link Listener} * * @param appResponse * @param invoker * @param invocation * @return */ @Deprecated default Result onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) { return appResponse; } interface Listener { void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation); void onError(Throwable t, Invoker<?> invoker, Invocation invocation); } }
- Filter定义了invoke、onResponse方法,另外还定义了Listener接口,该接口定义了onResponse、onError方法
ProtocolFilterWrapper
dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/ProtocolFilterWrapper.java
public class ProtocolFilterWrapper implements Protocol { private final Protocol protocol; public ProtocolFilterWrapper(Protocol protocol) { if (protocol == null) { throw new IllegalArgumentException("protocol == null"); } this.protocol = protocol; } private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (!filters.isEmpty()) { for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { @Override public Class<T> getInterface() { return invoker.getInterface(); } @Override public URL getUrl() { return invoker.getUrl(); } @Override public boolean isAvailable() { return invoker.isAvailable(); } @Override public Result invoke(Invocation invocation) throws RpcException { Result asyncResult; try { asyncResult = filter.invoke(next, invocation); } catch (Exception e) { // onError callback if (filter instanceof ListenableFilter) { Filter.Listener listener = ((ListenableFilter) filter).listener(); if (listener != null) { listener.onError(e, invoker, invocation); } } throw e; } return asyncResult; } @Override public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return new CallbackRegistrationInvoker<>(last, filters); } @Override public int getDefaultPort() { return protocol.getDefaultPort(); } @Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER)); } @Override public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER); } @Override public void destroy() { protocol.destroy(); } static class CallbackRegistrationInvoker<T> implements Invoker<T> { private final Invoker<T> filterInvoker; private final List<Filter> filters; public CallbackRegistrationInvoker(Invoker<T> filterInvoker, List<Filter> filters) { this.filterInvoker = filterInvoker; this.filters = filters; } @Override public Result invoke(Invocation invocation) throws RpcException { Result asyncResult = filterInvoker.invoke(invocation); asyncResult.thenApplyWithContext(r -> { for (int i = filters.size() - 1; i >= 0; i--) { Filter filter = filters.get(i); // onResponse callback if (filter instanceof ListenableFilter) { Filter.Listener listener = ((ListenableFilter) filter).listener(); if (listener != null) { listener.onResponse(r, filterInvoker, invocation); } } else { filter.onResponse(r, filterInvoker, invocation); } } return r; }); return asyncResult; } @Override public Class<T> getInterface() { return filterInvoker.getInterface(); } @Override public URL getUrl() { return filterInvoker.getUrl(); } @Override public boolean isAvailable() { return filterInvoker.isAvailable(); } @Override public void destroy() { filterInvoker.destroy(); } } }
- ProtocolFilterWrapper实现了Protocol接口,它定义了一个静态类CallbackRegistrationInvoker,该类实现了Invoker接口,其invoke方法首先会调用filterInvoker的invoke方法获取asyncResult,之后通过thenApplyWithContext注册rpc调用完成时的回调,这里会挨个遍历filters,回调每个filter的onResponse方法
小结
替代原来的RpcResult
doc
以上所述就是小编给大家介绍的《聊聊dubbo的Filter》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。