内容简介:首先要说的就是超时异常,超时异常在RPC异常中是一个非常基本的异常类型,我原来认为超时异常一定是服务端抛出的,但是当真正的去研究Dubbo的Rpc调用的时候超时异常其实并不是被服务端处理,而是在消费端被处理的。当消费端向服务端发送请求,无论是同步调用,还是异步调用,最终都会通过关于超时的问题参考我原来的文章:远程调用异常,这个异常抛出在通讯层,例如Netty、Dubbo的
首先要说的就是超时异常,超时异常在RPC异常中是一个非常基本的异常类型,我原来认为超时异常一定是服务端抛出的,但是当真正的去研究Dubbo的Rpc调用的时候超时异常其实并不是被服务端处理,而是在消费端被处理的。当消费端向服务端发送请求,无论是同步调用,还是异步调用,最终都会通过 DefaultFeature.get()
方法获取调用结果,如果在超时时间内没有获取到结果,就会抛出超时异常。
关于超时的问题参考我原来的文章: juejin.im/post/5cf0c5…
二、RemotingException
远程调用异常,这个异常抛出在通讯层,例如Netty、Dubbo的 channle
、 Feature
等被关闭或者一些其他的问题,导致消息无法正常发送,或者处理,就会抛出远程调用异常。
三、RpcException
在服务调用的时候所有的异常的异常都会被包装成无论是 TimeoutException
还是 RemotingException
都会被包装成 RpcException
。所以 RpcException
对于Dubbo来说是一个非常重要的异常。
-
RpcException的异常类型:
public /**final**/ class RpcException extends RuntimeException { public static final int UNKNOWN_EXCEPTION = 0; // 未知类型 public static final int NETWORK_EXCEPTION = 1; // 网络异常 public static final int TIMEOUT_EXCEPTION = 2; // 超时异常 public static final int BIZ_EXCEPTION = 3; // 业务异常 public static final int FORBIDDEN_EXCEPTION = 4; //服务不可用 public static final int SERIALIZATION_EXCEPTION = 5; //序列化异常 public static final int NO_INVOKER_AVAILABLE_AFTER_FILTER = 6; //没有可用的invoker //省略 } 复制代码
四、InvocationTargetException
业务异常,对应 RpcException
中的 BIZ_EXCEPTION
。
五、异常处理
消费方生成接口的代理类后,通过 Cluster
和 Loadbalance
选择要调用 Invoker
。
最终通过 DubboInvoker
向服务提供者发送一个请求。
public class DubboInvoker<T> extends AbstractInvoker<T> { private final ExchangeClient[] clients; protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); // 设置 path 和 version 到 attachment 中 inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { // 从 clients 数组中获取 ExchangeClient currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { // 获取异步配置 boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); // isOneway 为 true,表示“单向”通信 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 异步无返回值 if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); // 发送请求 currentClient.send(inv, isSent); // 设置上下文中的 future 字段为 null RpcContext.getContext().setFuture(null); // 返回一个空的 RpcResult return new RpcResult(); } // 异步有返回值 else if (isAsync) { // 发送请求,并得到一个 ResponseFuture 实例 ResponseFuture future = currentClient.request(inv, timeout); // 设置 future 到上下文中 RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); // 暂时返回一个空结果 return new RpcResult(); } // 同步调用 else { RpcContext.getContext().setFuture(null); // 发送请求,得到一个 ResponseFuture 实例,并调用该实例的 get 方法进行等待 return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(..., "Invoke remote method timeout...."); } catch (RemotingException e) { throw new RpcException(..., "Failed to invoke remote method: ..."); } } // 省略其他方法 } 复制代码
在 DubboInover
中如果是 TimeoutException
和 RemotingException
则会被封装为相应类型的 RpcException
继续向上抛出。
接下来我们直接看服务提供方如果发生异常是如何处理的。服务提供方收到消费方的请求后,首先会对请求进行解码,把请求反序列化为一个 Request
对象。 Request
对象
ChannelEventRunnable#run() —> DecodeHandler#received(Channel, Object) —> HeaderExchangeHandler#received(Channel, Object) // 在这里会创建Response,服务的代理类会返回RpcResultd对象,并设置到Response中。 —> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request) —> DubboProtocol.requestHandler#reply(ExchangeChannel, Object) // 取得Invoker后会先进行fiter的处理 —> Filter#invoke(Invoker, Invocation) // 主要封装了服务调用逻辑 —> AbstractProxyInvoker#invoke(Invocation) —> Wrapper0#invokeMethod(Object, String, Class[], Object[]) —> DemoServiceImpl#sayHello(String) 复制代码
在这里主要看下 AbstractProxyInvoker#invoke(Invocation)
方法:
public Result invoke(Invocation invocation) throws RpcException { RpcContext rpcContext = RpcContext.getContext(); try { // 模版方法,由avassistProxyFactory创建,真正的服务调用逻辑 Object obj = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()); // 如果是异步操作 if (RpcUtils.isReturnTypeFuture(invocation)) { return new AsyncRpcResult((CompletableFuture<Object>) obj); } else if (rpcContext.isAsyncStarted()) { // ignore obj in case of RpcContext.startAsync()? always rely on user to write back. return new AsyncRpcResult(((AsyncContextImpl)(rpcContext.getAsyncContext())).getInternalFuture()); } else { return new RpcResult(obj); } } catch (InvocationTargetException e) { // 服务调用发生了异常 // TODO async throw exception before async thread write back, should stop asyncContext if (rpcContext.isAsyncStarted() && !rpcContext.stopAsync()) { logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e); } // 设置真正的异常类型 return new RpcResult(e.getTargetException()); } catch (Throwable e) { throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e); } } 复制代码
代理类
/** Wrapper0 是在运行时生成的,大家可使用 Arthas 进行反编译 */ public class Wrapper0 extends Wrapper implements ClassGenerator.DC { public static String[] pns; public static Map pts; public static String[] mns; public static String[] dmns; public static Class[] mts0; // 省略其他方法 public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException { DemoService demoService; try { // 类型转换 demoService = (DemoService)object; } catch (Throwable throwable) { throw new IllegalArgumentException(throwable); } try { // 根据方法名调用指定的方法 if ("sayHello".equals(string) && arrclass.length == 1) { return demoService.sayHello((String)arrobject[0]); } } catch (Throwable throwable) { // 创建一个InvocationTargetException throw new InvocationTargetException(throwable); } throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.alibaba.dubbo.demo.DemoService.").toString()); } } 复制代码
doInvoke
是一个抽象方法,这个需要由具体的 Invoker 实例实现。Invoker 实例是在运行时通过 JavassistProxyFactory
创建的,Dubbo 会在运行时通过 Javassist 框架为服务生成代理类,并实现 invokeMethod
方法,该方法最终会根据调用信息调用具体的服务。在调用具体服务的时候并不会抛出服务真正发生的异常(targetException),而是将发生的异常包装为 InvocationTargetException
向上抛出。在 AbstractProxyInvoker#invoke(Invocation)
方法又会从 InvocationTargetException
获取到targetExceptions设置到 RpcResult
返回给消费端。
到这里我们可能会产生一个疑问,那真正的服务调用逻(服务端)的异常是如何被抛出来的那?答案是 ExceptionFilter
。
org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper#buildInvokerChain
中会将过滤器链构建起来,在 AbstractProxyInvoker#invoke(Invocation)
执行之前会先执行过滤器。
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (!filters.isEmpty()) { for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { @Override public Class<T> getInterface() { return invoker.getInterface(); } @Override public URL getUrl() { return invoker.getUrl(); } @Override public boolean isAvailable() { return invoker.isAvailable(); } @Override public Result invoke(Invocation invocation) throws RpcException { logger.info("--filter " + filter.getClass().getSimpleName() + " invoke execute"); Result result = filter.invoke(next, invocation); if (result instanceof AsyncRpcResult) { AsyncRpcResult asyncResult = (AsyncRpcResult) result; asyncResult.thenApplyWithContext(r -> filter.onResponse(r, invoker, invocation)); return asyncResult; } else { logger.info("--filter " + filter.getClass().getSimpleName() + " response execute"); result = filter.onResponse(result, invoker, invocation); return result; } } @Override public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; } 复制代码
加了两行日志可以清楚的看到过滤器执行的流程。我们主要观察当服务调用逻辑抛出异常时 ExceptionFilter#onResponse
是如何作用的。
public Result onResponse(Result result, Invoker<?> invoker, Invocation invocation) { if (result.hasException() && GenericService.class != invoker.getInterface()) { try { Throwable exception = result.getException(); // directly throw if its checked exception // 如果是检查异常则直接抛出 if (!(exception instanceof RuntimeException) && (exception instanceof Exception)) { return result; } // directly throw if the exception appears in the signature // 接口方法签名声明了异常则直接抛出 try { Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes()); Class<?>[] exceptionClassses = method.getExceptionTypes(); for (Class<?> exceptionClass : exceptionClassses) { if (exception.getClass().equals(exceptionClass)) { return result; } } } catch (NoSuchMethodException e) { return result; } // for the exception not found in methods signature, print ERROR message in servers log. // 方法签名没有声明的异常,在server端打印错误日志 logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost() + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception); // directly throw if exception class and interface class are in the same jar file. // 如果异常类和接口类位于同一个jar文件中,则直接抛出。 String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface()); String exceptionFile = ReflectUtils.getCodeBase(exception.getClass()); if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) { return result; } // directly throw if its JDK exception // 如果是JDK异常,则直接抛出 String className = exception.getClass().getName(); if (className.startsWith("java.") || className.startsWith("javax.")) { return result; } // directly throw if its dubbo exception // 如果是dubbo异常,直接抛出 if (exception instanceof RpcException) { return result; } // otherwise, wrap with RuntimeException and throw back to the client // 否则,使用RuntimeException包装并返回给客户端 return new RpcResult(new RuntimeException(StringUtils.toString(exception))); } catch (Throwable e) { logger.warn("Fail to ExceptionFilter when called by " + RpcContext.getContext().getRemoteHost() + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e); return result; } } return result; } 复制代码
然后再回到客户端, InvokerInvocationHandler
中获得 RpcResult
,调用 org.apache.dubbo.rpc.RpcResult#recreate()
方法将异常在客户端打印出来。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 如何使用策略模式处理多种类型请求
- WebApplicationContext 中特殊的 bean 类型(一)--- 请求/异常处理
- 默认值+TS类型约束提高数据处理成功率
- Spark 2.4 让你飞一般的处理复杂数据类型
- 必须会的SQL语句(五) NULL数据处理和类型转换
- vue开发黑科技--利用引用类型的值处理复杂数据的编辑
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
数据资本时代
Viktor Mayer-Schnberger / 李晓霞、周涛 / 中信出版集团股份有限公司 / 2018-11-1 / CNY 58.00
【编辑推荐】 大数据除了能对我们的生活、工作、思维产生重大变革外,还能够做什么?畅销书《大数据时代》作者舍恩伯格在新书《数据资本时代》中,展示了大数据将如何从根本上改变经济——这并不是因为数据是一种新型石油,而是因为数据是一种新型润滑脂,它将给市场带来巨大能量,给公司带来巨大压力,使金融资本的作用大大削弱。赢家是市场,而并非资本。 这本书在当下国内出版,可以说恰逢其时。时下,中国经济正......一起来看看 《数据资本时代》 这本书的介绍吧!