Dubbo主要的异常类型及处理

栏目: Java · 发布时间: 5年前

内容简介:首先要说的就是超时异常,超时异常在RPC异常中是一个非常基本的异常类型,我原来认为超时异常一定是服务端抛出的,但是当真正的去研究Dubbo的Rpc调用的时候超时异常其实并不是被服务端处理,而是在消费端被处理的。当消费端向服务端发送请求,无论是同步调用,还是异步调用,最终都会通过关于超时的问题参考我原来的文章:远程调用异常,这个异常抛出在通讯层,例如Netty、Dubbo的

首先要说的就是超时异常,超时异常在RPC异常中是一个非常基本的异常类型,我原来认为超时异常一定是服务端抛出的,但是当真正的去研究Dubbo的Rpc调用的时候超时异常其实并不是被服务端处理,而是在消费端被处理的。当消费端向服务端发送请求,无论是同步调用,还是异步调用,最终都会通过 DefaultFeature.get() 方法获取调用结果,如果在超时时间内没有获取到结果,就会抛出超时异常。

关于超时的问题参考我原来的文章: juejin.im/post/5cf0c5…

二、RemotingException

远程调用异常,这个异常抛出在通讯层,例如Netty、Dubbo的 channleFeature 等被关闭或者一些其他的问题,导致消息无法正常发送,或者处理,就会抛出远程调用异常。

三、RpcException

在服务调用的时候所有的异常的异常都会被包装成无论是 TimeoutException 还是 RemotingException 都会被包装成 RpcException 。所以 RpcException 对于Dubbo来说是一个非常重要的异常。

  • RpcException的异常类型:
    public /**final**/ class RpcException extends RuntimeException {
    
      public static final int UNKNOWN_EXCEPTION = 0; // 未知类型
      public static final int NETWORK_EXCEPTION = 1; // 网络异常
      public static final int TIMEOUT_EXCEPTION = 2; // 超时异常
      public static final int BIZ_EXCEPTION = 3;     // 业务异常
      public static final int FORBIDDEN_EXCEPTION = 4; //服务不可用
      public static final int SERIALIZATION_EXCEPTION = 5; //序列化异常
      public static final int NO_INVOKER_AVAILABLE_AFTER_FILTER = 6; //没有可用的invoker
      
      //省略
    }
    复制代码

四、InvocationTargetException

业务异常,对应 RpcException 中的 BIZ_EXCEPTION

五、异常处理

消费方生成接口的代理类后,通过 ClusterLoadbalance 选择要调用 Invoker 。 最终通过 DubboInvoker 向服务提供者发送一个请求。

public class DubboInvoker<T> extends AbstractInvoker<T> {
    
    private final ExchangeClient[] clients;
    
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        // 设置 path 和 version 到 attachment 中
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            // 从 clients 数组中获取 ExchangeClient
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            // 获取异步配置
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            // isOneway 为 true,表示“单向”通信
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);

            // 异步无返回值
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                // 发送请求
                currentClient.send(inv, isSent);
                // 设置上下文中的 future 字段为 null
                RpcContext.getContext().setFuture(null);
                // 返回一个空的 RpcResult
                return new RpcResult();
            } 

            // 异步有返回值
            else if (isAsync) {
                // 发送请求,并得到一个 ResponseFuture 实例
                ResponseFuture future = currentClient.request(inv, timeout);
                // 设置 future 到上下文中
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                // 暂时返回一个空结果
                return new RpcResult();
            } 

            // 同步调用
            else {
                RpcContext.getContext().setFuture(null);
                // 发送请求,得到一个 ResponseFuture 实例,并调用该实例的 get 方法进行等待
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(..., "Invoke remote method timeout....");
        } catch (RemotingException e) {
            throw new RpcException(..., "Failed to invoke remote method: ...");
        }
    }
    
    // 省略其他方法
}
复制代码

DubboInover 中如果是 TimeoutExceptionRemotingException 则会被封装为相应类型的 RpcException 继续向上抛出。

接下来我们直接看服务提供方如果发生异常是如何处理的。服务提供方收到消费方的请求后,首先会对请求进行解码,把请求反序列化为一个 Request 对象。 Request 对象

ChannelEventRunnable#run()
  —> DecodeHandler#received(Channel, Object)
    —> HeaderExchangeHandler#received(Channel, Object)
      // 在这里会创建Response,服务的代理类会返回RpcResultd对象,并设置到Response中。
      —> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
        —> DubboProtocol.requestHandler#reply(ExchangeChannel, Object)
             // 取得Invoker后会先进行fiter的处理
          —> Filter#invoke(Invoker, Invocation)
               // 主要封装了服务调用逻辑
            —> AbstractProxyInvoker#invoke(Invocation)
              —> Wrapper0#invokeMethod(Object, String, Class[], Object[])
                —> DemoServiceImpl#sayHello(String)
复制代码

在这里主要看下 AbstractProxyInvoker#invoke(Invocation) 方法:

public Result invoke(Invocation invocation) throws RpcException {
        RpcContext rpcContext = RpcContext.getContext();
        try {
            // 模版方法,由avassistProxyFactory创建,真正的服务调用逻辑
            Object obj = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
            // 如果是异步操作
            if (RpcUtils.isReturnTypeFuture(invocation)) {
                return new AsyncRpcResult((CompletableFuture<Object>) obj);
            } else if (rpcContext.isAsyncStarted()) { // ignore obj in case of RpcContext.startAsync()? always rely on user to write back.
                return new AsyncRpcResult(((AsyncContextImpl)(rpcContext.getAsyncContext())).getInternalFuture());
            } else {
                return new RpcResult(obj);
            }
        } catch (InvocationTargetException e) { // 服务调用发生了异常
            // TODO async throw exception before async thread write back, should stop asyncContext
            if (rpcContext.isAsyncStarted() && !rpcContext.stopAsync()) {
                logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
            }
             // 设置真正的异常类型
            return new RpcResult(e.getTargetException());
        } catch (Throwable e) {
            throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
复制代码

代理类

/** Wrapper0 是在运行时生成的,大家可使用 Arthas 进行反编译 */
public class Wrapper0 extends Wrapper implements ClassGenerator.DC {
    public static String[] pns;
    public static Map pts;
    public static String[] mns;
    public static String[] dmns;
    public static Class[] mts0;

    // 省略其他方法

    public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
        DemoService demoService;
        try {
            // 类型转换
            demoService = (DemoService)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        try {
            // 根据方法名调用指定的方法
            if ("sayHello".equals(string) && arrclass.length == 1) {
                return demoService.sayHello((String)arrobject[0]);
            }
        }
        catch (Throwable throwable) {
            // 创建一个InvocationTargetException
            throw new InvocationTargetException(throwable);
        }
        throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.alibaba.dubbo.demo.DemoService.").toString());
    }
}
复制代码

doInvoke 是一个抽象方法,这个需要由具体的 Invoker 实例实现。Invoker 实例是在运行时通过 JavassistProxyFactory 创建的,Dubbo 会在运行时通过 Javassist 框架为服务生成代理类,并实现 invokeMethod 方法,该方法最终会根据调用信息调用具体的服务。在调用具体服务的时候并不会抛出服务真正发生的异常(targetException),而是将发生的异常包装为 InvocationTargetException 向上抛出。在 AbstractProxyInvoker#invoke(Invocation) 方法又会从 InvocationTargetException 获取到targetExceptions设置到 RpcResult 返回给消费端。

到这里我们可能会产生一个疑问,那真正的服务调用逻(服务端)的异常是如何被抛出来的那?答案是 ExceptionFilter

org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper#buildInvokerChain 中会将过滤器链构建起来,在 AbstractProxyInvoker#invoke(Invocation) 执行之前会先执行过滤器。

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {

                    @Override
                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }

                    @Override
                    public URL getUrl() {
                        return invoker.getUrl();
                    }

                    @Override
                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }

                    @Override
                    public Result invoke(Invocation invocation) throws RpcException {
                        logger.info("--filter " + filter.getClass().getSimpleName() + " invoke execute");
                        Result result = filter.invoke(next, invocation);
                        if (result instanceof AsyncRpcResult) {
                            AsyncRpcResult asyncResult = (AsyncRpcResult) result;
                            asyncResult.thenApplyWithContext(r -> filter.onResponse(r, invoker, invocation));
                            return asyncResult;
                        } else {
                            logger.info("--filter " + filter.getClass().getSimpleName() + " response execute");
                            result = filter.onResponse(result, invoker, invocation);
                            return result;
                        }

                    }

                    @Override
                    public void destroy() {
                        invoker.destroy();
                    }

                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }
        return last;
    }
复制代码

加了两行日志可以清楚的看到过滤器执行的流程。我们主要观察当服务调用逻辑抛出异常时 ExceptionFilter#onResponse 是如何作用的。

public Result onResponse(Result result, Invoker<?> invoker, Invocation invocation) {
        if (result.hasException() && GenericService.class != invoker.getInterface()) {
            try {
                Throwable exception = result.getException();

                // directly throw if its checked exception
                // 如果是检查异常则直接抛出
                if (!(exception instanceof RuntimeException) && (exception instanceof Exception)) {
                    return result;
                }
                // directly throw if the exception appears in the signature
                // 接口方法签名声明了异常则直接抛出
                try {
                    Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
                    Class<?>[] exceptionClassses = method.getExceptionTypes();
                    for (Class<?> exceptionClass : exceptionClassses) {
                        if (exception.getClass().equals(exceptionClass)) {
                            return result;
                        }
                    }
                } catch (NoSuchMethodException e) {
                    return result;
                }

                // for the exception not found in methods signature, print ERROR message in servers log.
                // 方法签名没有声明的异常,在server端打印错误日志
                logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
                        + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                        + ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception);

                // directly throw if exception class and interface class are in the same jar file.
                // 如果异常类和接口类位于同一个jar文件中,则直接抛出。
                String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
                String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
                if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) {
                    return result;
                }
                // directly throw if its JDK exception
                // 如果是JDK异常,则直接抛出
                String className = exception.getClass().getName();
                if (className.startsWith("java.") || className.startsWith("javax.")) {
                    return result;
                }
                // directly throw if its dubbo exception
                // 如果是dubbo异常,直接抛出
                if (exception instanceof RpcException) {
                    return result;
                }

                // otherwise, wrap with RuntimeException and throw back to the client
                // 否则,使用RuntimeException包装并返回给客户端
                return new RpcResult(new RuntimeException(StringUtils.toString(exception)));
            } catch (Throwable e) {
                logger.warn("Fail to ExceptionFilter when called by " + RpcContext.getContext().getRemoteHost()
                        + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                        + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
                return result;
            }
        }
        return result;
    }
复制代码

然后再回到客户端, InvokerInvocationHandler 中获得 RpcResult ,调用 org.apache.dubbo.rpc.RpcResult#recreate() 方法将异常在客户端打印出来。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

数据资本时代

数据资本时代

Viktor Mayer-Schnberger / 李晓霞、周涛 / 中信出版集团股份有限公司 / 2018-11-1 / CNY 58.00

【编辑推荐】 大数据除了能对我们的生活、工作、思维产生重大变革外,还能够做什么?畅销书《大数据时代》作者舍恩伯格在新书《数据资本时代》中,展示了大数据将如何从根本上改变经济——这并不是因为数据是一种新型石油,而是因为数据是一种新型润滑脂,它将给市场带来巨大能量,给公司带来巨大压力,使金融资本的作用大大削弱。赢家是市场,而并非资本。 这本书在当下国内出版,可以说恰逢其时。时下,中国经济正......一起来看看 《数据资本时代》 这本书的介绍吧!

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具