Dubbo主要的异常类型及处理

栏目: Java · 发布时间: 6年前

内容简介:首先要说的就是超时异常,超时异常在RPC异常中是一个非常基本的异常类型,我原来认为超时异常一定是服务端抛出的,但是当真正的去研究Dubbo的Rpc调用的时候超时异常其实并不是被服务端处理,而是在消费端被处理的。当消费端向服务端发送请求,无论是同步调用,还是异步调用,最终都会通过关于超时的问题参考我原来的文章:远程调用异常,这个异常抛出在通讯层,例如Netty、Dubbo的

首先要说的就是超时异常,超时异常在RPC异常中是一个非常基本的异常类型,我原来认为超时异常一定是服务端抛出的,但是当真正的去研究Dubbo的Rpc调用的时候超时异常其实并不是被服务端处理,而是在消费端被处理的。当消费端向服务端发送请求,无论是同步调用,还是异步调用,最终都会通过 DefaultFeature.get() 方法获取调用结果,如果在超时时间内没有获取到结果,就会抛出超时异常。

关于超时的问题参考我原来的文章: juejin.im/post/5cf0c5…

二、RemotingException

远程调用异常,这个异常抛出在通讯层,例如Netty、Dubbo的 channleFeature 等被关闭或者一些其他的问题,导致消息无法正常发送,或者处理,就会抛出远程调用异常。

三、RpcException

在服务调用的时候所有的异常的异常都会被包装成无论是 TimeoutException 还是 RemotingException 都会被包装成 RpcException 。所以 RpcException 对于Dubbo来说是一个非常重要的异常。

  • RpcException的异常类型:
    public /**final**/ class RpcException extends RuntimeException {
    
      public static final int UNKNOWN_EXCEPTION = 0; // 未知类型
      public static final int NETWORK_EXCEPTION = 1; // 网络异常
      public static final int TIMEOUT_EXCEPTION = 2; // 超时异常
      public static final int BIZ_EXCEPTION = 3;     // 业务异常
      public static final int FORBIDDEN_EXCEPTION = 4; //服务不可用
      public static final int SERIALIZATION_EXCEPTION = 5; //序列化异常
      public static final int NO_INVOKER_AVAILABLE_AFTER_FILTER = 6; //没有可用的invoker
      
      //省略
    }
    复制代码

四、InvocationTargetException

业务异常,对应 RpcException 中的 BIZ_EXCEPTION

五、异常处理

消费方生成接口的代理类后,通过 ClusterLoadbalance 选择要调用 Invoker 。 最终通过 DubboInvoker 向服务提供者发送一个请求。

public class DubboInvoker<T> extends AbstractInvoker<T> {
    
    private final ExchangeClient[] clients;
    
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        // 设置 path 和 version 到 attachment 中
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            // 从 clients 数组中获取 ExchangeClient
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            // 获取异步配置
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            // isOneway 为 true,表示“单向”通信
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);

            // 异步无返回值
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                // 发送请求
                currentClient.send(inv, isSent);
                // 设置上下文中的 future 字段为 null
                RpcContext.getContext().setFuture(null);
                // 返回一个空的 RpcResult
                return new RpcResult();
            } 

            // 异步有返回值
            else if (isAsync) {
                // 发送请求,并得到一个 ResponseFuture 实例
                ResponseFuture future = currentClient.request(inv, timeout);
                // 设置 future 到上下文中
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                // 暂时返回一个空结果
                return new RpcResult();
            } 

            // 同步调用
            else {
                RpcContext.getContext().setFuture(null);
                // 发送请求,得到一个 ResponseFuture 实例,并调用该实例的 get 方法进行等待
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(..., "Invoke remote method timeout....");
        } catch (RemotingException e) {
            throw new RpcException(..., "Failed to invoke remote method: ...");
        }
    }
    
    // 省略其他方法
}
复制代码

DubboInover 中如果是 TimeoutExceptionRemotingException 则会被封装为相应类型的 RpcException 继续向上抛出。

接下来我们直接看服务提供方如果发生异常是如何处理的。服务提供方收到消费方的请求后,首先会对请求进行解码,把请求反序列化为一个 Request 对象。 Request 对象

ChannelEventRunnable#run()
  —> DecodeHandler#received(Channel, Object)
    —> HeaderExchangeHandler#received(Channel, Object)
      // 在这里会创建Response,服务的代理类会返回RpcResultd对象,并设置到Response中。
      —> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
        —> DubboProtocol.requestHandler#reply(ExchangeChannel, Object)
             // 取得Invoker后会先进行fiter的处理
          —> Filter#invoke(Invoker, Invocation)
               // 主要封装了服务调用逻辑
            —> AbstractProxyInvoker#invoke(Invocation)
              —> Wrapper0#invokeMethod(Object, String, Class[], Object[])
                —> DemoServiceImpl#sayHello(String)
复制代码

在这里主要看下 AbstractProxyInvoker#invoke(Invocation) 方法:

public Result invoke(Invocation invocation) throws RpcException {
        RpcContext rpcContext = RpcContext.getContext();
        try {
            // 模版方法,由avassistProxyFactory创建,真正的服务调用逻辑
            Object obj = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
            // 如果是异步操作
            if (RpcUtils.isReturnTypeFuture(invocation)) {
                return new AsyncRpcResult((CompletableFuture<Object>) obj);
            } else if (rpcContext.isAsyncStarted()) { // ignore obj in case of RpcContext.startAsync()? always rely on user to write back.
                return new AsyncRpcResult(((AsyncContextImpl)(rpcContext.getAsyncContext())).getInternalFuture());
            } else {
                return new RpcResult(obj);
            }
        } catch (InvocationTargetException e) { // 服务调用发生了异常
            // TODO async throw exception before async thread write back, should stop asyncContext
            if (rpcContext.isAsyncStarted() && !rpcContext.stopAsync()) {
                logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
            }
             // 设置真正的异常类型
            return new RpcResult(e.getTargetException());
        } catch (Throwable e) {
            throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
复制代码

代理类

/** Wrapper0 是在运行时生成的,大家可使用 Arthas 进行反编译 */
public class Wrapper0 extends Wrapper implements ClassGenerator.DC {
    public static String[] pns;
    public static Map pts;
    public static String[] mns;
    public static String[] dmns;
    public static Class[] mts0;

    // 省略其他方法

    public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
        DemoService demoService;
        try {
            // 类型转换
            demoService = (DemoService)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        try {
            // 根据方法名调用指定的方法
            if ("sayHello".equals(string) && arrclass.length == 1) {
                return demoService.sayHello((String)arrobject[0]);
            }
        }
        catch (Throwable throwable) {
            // 创建一个InvocationTargetException
            throw new InvocationTargetException(throwable);
        }
        throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.alibaba.dubbo.demo.DemoService.").toString());
    }
}
复制代码

doInvoke 是一个抽象方法,这个需要由具体的 Invoker 实例实现。Invoker 实例是在运行时通过 JavassistProxyFactory 创建的,Dubbo 会在运行时通过 Javassist 框架为服务生成代理类,并实现 invokeMethod 方法,该方法最终会根据调用信息调用具体的服务。在调用具体服务的时候并不会抛出服务真正发生的异常(targetException),而是将发生的异常包装为 InvocationTargetException 向上抛出。在 AbstractProxyInvoker#invoke(Invocation) 方法又会从 InvocationTargetException 获取到targetExceptions设置到 RpcResult 返回给消费端。

到这里我们可能会产生一个疑问,那真正的服务调用逻(服务端)的异常是如何被抛出来的那?答案是 ExceptionFilter

org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper#buildInvokerChain 中会将过滤器链构建起来,在 AbstractProxyInvoker#invoke(Invocation) 执行之前会先执行过滤器。

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {

                    @Override
                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }

                    @Override
                    public URL getUrl() {
                        return invoker.getUrl();
                    }

                    @Override
                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }

                    @Override
                    public Result invoke(Invocation invocation) throws RpcException {
                        logger.info("--filter " + filter.getClass().getSimpleName() + " invoke execute");
                        Result result = filter.invoke(next, invocation);
                        if (result instanceof AsyncRpcResult) {
                            AsyncRpcResult asyncResult = (AsyncRpcResult) result;
                            asyncResult.thenApplyWithContext(r -> filter.onResponse(r, invoker, invocation));
                            return asyncResult;
                        } else {
                            logger.info("--filter " + filter.getClass().getSimpleName() + " response execute");
                            result = filter.onResponse(result, invoker, invocation);
                            return result;
                        }

                    }

                    @Override
                    public void destroy() {
                        invoker.destroy();
                    }

                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }
        return last;
    }
复制代码

加了两行日志可以清楚的看到过滤器执行的流程。我们主要观察当服务调用逻辑抛出异常时 ExceptionFilter#onResponse 是如何作用的。

public Result onResponse(Result result, Invoker<?> invoker, Invocation invocation) {
        if (result.hasException() && GenericService.class != invoker.getInterface()) {
            try {
                Throwable exception = result.getException();

                // directly throw if its checked exception
                // 如果是检查异常则直接抛出
                if (!(exception instanceof RuntimeException) && (exception instanceof Exception)) {
                    return result;
                }
                // directly throw if the exception appears in the signature
                // 接口方法签名声明了异常则直接抛出
                try {
                    Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
                    Class<?>[] exceptionClassses = method.getExceptionTypes();
                    for (Class<?> exceptionClass : exceptionClassses) {
                        if (exception.getClass().equals(exceptionClass)) {
                            return result;
                        }
                    }
                } catch (NoSuchMethodException e) {
                    return result;
                }

                // for the exception not found in methods signature, print ERROR message in servers log.
                // 方法签名没有声明的异常,在server端打印错误日志
                logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
                        + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                        + ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception);

                // directly throw if exception class and interface class are in the same jar file.
                // 如果异常类和接口类位于同一个jar文件中,则直接抛出。
                String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
                String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
                if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) {
                    return result;
                }
                // directly throw if its JDK exception
                // 如果是JDK异常,则直接抛出
                String className = exception.getClass().getName();
                if (className.startsWith("java.") || className.startsWith("javax.")) {
                    return result;
                }
                // directly throw if its dubbo exception
                // 如果是dubbo异常,直接抛出
                if (exception instanceof RpcException) {
                    return result;
                }

                // otherwise, wrap with RuntimeException and throw back to the client
                // 否则,使用RuntimeException包装并返回给客户端
                return new RpcResult(new RuntimeException(StringUtils.toString(exception)));
            } catch (Throwable e) {
                logger.warn("Fail to ExceptionFilter when called by " + RpcContext.getContext().getRemoteHost()
                        + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                        + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
                return result;
            }
        }
        return result;
    }
复制代码

然后再回到客户端, InvokerInvocationHandler 中获得 RpcResult ,调用 org.apache.dubbo.rpc.RpcResult#recreate() 方法将异常在客户端打印出来。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

从零开始学微信公众号运营推广

从零开始学微信公众号运营推广

叶龙 / 清华大学出版社 / 2017-6-1 / 39.80

本书是丛书的第2本,具体内容如下。 第1章 运营者入门——选择、注册和认证 第2章 变现和赚钱——如何从0到100万 第3章 决定打开率——标题的取名和优化 第4章 决定美观度——图片的选取和优化 第5章 决定停留率——正文的编辑和优化 第6章 决定欣赏率——版式的编辑和优化 第7章 数据的分析——用户内容的精准营销 书中从微信运营入门开始,以商业变......一起来看看 《从零开始学微信公众号运营推广》 这本书的介绍吧!

SHA 加密
SHA 加密

SHA 加密工具

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

html转js在线工具
html转js在线工具

html转js在线工具