内容简介:在现行微服务的趋势下,一次调用的过程中涉及多个服务节点,产生的日志分布在不同的服务器上,虽说可以使用ELK技术将分散的日志,汇总到es中,但是如何将这些日志贯穿起来,则是一个关键问题。如果需要查看一次调用的全链路日志,则一般的做法是通过在系统边界中产生一个在使用HTTP协议作为服务协议的系统里,可以统一使用一个封装好的http client做traceId透传。但是dubbo实现traceId透传就稍微复杂些了。根据上节讲的
在现行微服务的趋势下,一次调用的过程中涉及多个服务节点,产生的日志分布在不同的服务器上,虽说可以使用ELK技术将分散的日志,汇总到es中,但是如何将这些日志贯穿起来,则是一个关键问题。
如果需要查看一次调用的全链路日志,则一般的做法是通过在系统边界中产生一个 traceId
,向调用链的后续服务传递 traceId
,后续服务继续使用 traceId
打印日志,并再向其他后续服务传递 traceId
,此过程简称, traceId透传 。
在使用HTTP协议作为服务协议的系统里,可以统一使用一个封装好的http client做traceId透传。但是dubbo实现traceId透传就稍微复杂些了。根据上节讲的 《☆聊聊Dubbo(六):核心源码-Filter链原理》 ,一般情况下, 会自定义Filter来实现traceId透传 ,但还有两种比较特殊的实现方式: (1)重新实现dubbo内部的相关类;(2)基于RpcContext实现;
1 基于重写实现
1.1 源码分析
Proxy是 Dubbo 使用javassist为consumer 端service生成的动态代理instance。
Implement是provider端的service实现instance。
traceId透传,即要求Proxy 和 Implement具有相同的traceId。Dubbo具有良好的分层特征, transport的对象是RPCInvocation 。
所以,重写的重点逻辑实现,就是Proxy将traceId放入RPCInvocation,交由Client进行序列化和TCP传输,Server反序列化得到RPCInvocation,取出traceId,交由Implement即可。
下面为consumer端 JavassistProxyFactory
的代码分析:
public class JavassistProxyFactory extends AbstractProxyFactory { /** * Spring容器启动时,该代理工厂类方法会为Consumer生成Service代理类 * invoker和interfaces都是从Spring配置文件中读取出来 */ @SuppressWarnings("unchecked") public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { // 生成Service代理类的每个方法的字节码,都调用了InvokerInvocationHandler.invoke(...)方法, // 做实际RpcInvocation包装、序列化、TCP传输、反序列化结果 return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); } public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper类不能正确处理带$的类名 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } } 复制代码
下面为consumer端 InvokerInvocationHandler
的代码分析:
public class InvokerInvocationHandler implements InvocationHandler { private final Invoker<?> invoker; public InvokerInvocationHandler(Invoker<?> handler){ this.invoker = handler; } /** * 真正调用RPC时,各个Service代理的字节码里调用了这个通用的invoke * proxy就是之前生成的代理对象,第二个参数是方法名,第三个参数是参数列表 * 知道了(1)哪个接口(2)哪个方法(3)参数是什么,就完全可以映射到Provider端实现并获取返回值 */ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } // 因为到这里,还是consumer端的业务线程,所以在这里取ThreadLocal里的traceId, // 再放入RpcInvocation的attachment,那么Provider就可以从收到的RpcInvocation实例取出透传的traceId return invoker.invoke(new RpcInvocation(method, args)).recreate(); } } 复制代码
下面为Provider端 DubboProtocol
的代码分析:
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); //如果是callback 需要处理高版本调用低版本的问题 if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){ String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || methodsStr.indexOf(",") == -1){ hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods){ if (inv.getMethodName().equals(method)){ hasMethod = true; break; } } } if (!hasMethod){ logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv ); return null; } } // Provider收到报文之后,从线程池中取出一个线程,反序列化出RpcInvocation、并调用实现类的对应方法 // 所以,此处就是Provider端的实现类的线程,取出traceId,放入ThreadLocal中 RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } 复制代码
1.2 具体实现
package com.alibaba.dubbo.rpc.proxy; /** * traceId工具类这个类是新添加的 */ public class TraceIdUtil { private static final ThreadLocal<String> TRACE_ID = new ThreadLocal<String>(); public static String getTraceId() { return TRACE_ID.get(); } public static void setTraceId(String traceId) { TRACE_ID.set(traceId); } } /** * InvokerHandler 这个类 是修改的 */ public class InvokerInvocationHandler implements InvocationHandler { private final Invoker<?> invoker; public InvokerInvocationHandler(Invoker<?> handler){ this.invoker = handler; } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } // 这里将cosumer 端的traceId放入RpcInvocation RpcInvocation rpcInvocation = new RpcInvocation(method, args); rpcInvocation.setAttachment("traceId", TraceIdUtil.getTraceId()); return invoker.invoke(rpcInvocation).recreate(); } } package com.alibaba.dubbo.rpc.protocol.dubbo; /** * dubbo protocol support 重新实现DubboProtocol * */ public class DubboProtocol extends AbstractProtocol { private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); //如果是callback 需要处理高版本调用低版本的问题 if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){ String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || methodsStr.indexOf(",") == -1){ hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods){ if (inv.getMethodName().equals(method)){ hasMethod = true; break; } } } if (!hasMethod){ logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv ); return null; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); // 这里将收到的consumer端的traceId放入provider端的thread local TraceIdUtil.setTraceId(inv.getAttachment("traceId")); return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } } } 复制代码
2 基于RpcContext实现
在具体讲解自定义filter来实现透传traceId的方案前,我们先来研究下RpcContext对象。 其RpcContext本质上是个ThreadLocal对象,其维护了一次rpc交互的上下文信息 。
/* * Copyright 1999-2011 Alibaba Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.dubbo.rpc; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.common.utils.NetUtils; /** * Thread local context. (API, ThreadLocal, ThreadSafe) * * 注意:RpcContext是一个临时状态记录器,当接收到RPC请求,或发起RPC请求时,RpcContext的状态都会变化。 * 比如:A调B,B再调C,则B机器上,在B调C之前,RpcContext记录的是A调B的信息,在B调C之后,RpcContext记录的是B调C的信息。 * * @see com.alibaba.dubbo.rpc.filter.ContextFilter * @author qian.lei * @author william.liangf * @export */ public class RpcContext { private static final ThreadLocal<RpcContext> LOCAL = new ThreadLocal<RpcContext>() { @Override protected RpcContext initialValue() { return new RpcContext(); } }; /** * get context. * * @return context */ public static RpcContext getContext() { return LOCAL.get(); } /** * remove context. * * @see com.alibaba.dubbo.rpc.filter.ContextFilter */ public static void removeContext() { LOCAL.remove(); } private Future<?> future; private List<URL> urls; private URL url; private String methodName; private Class<?>[] parameterTypes; private Object[] arguments; private InetSocketAddress localAddress; private InetSocketAddress remoteAddress; private final Map<String, String> attachments = new HashMap<String, String>(); private final Map<String, Object> values = new HashMap<String, Object>(); // now we don't use the 'values' map to hold these objects // we want these objects to be as generic as possible private Object request; private Object response; @Deprecated private List<Invoker<?>> invokers; @Deprecated private Invoker<?> invoker; @Deprecated private Invocation invocation; protected RpcContext() { } /** * Get the request object of the underlying RPC protocol, e.g. HttpServletRequest * * @return null if the underlying protocol doesn't provide support for getting request */ public Object getRequest() { return request; } /** * Get the request object of the underlying RPC protocol, e.g. HttpServletRequest * * @return null if the underlying protocol doesn't provide support for getting request or the request is not of the specified type */ @SuppressWarnings("unchecked") public <T> T getRequest(Class<T> clazz) { return (request != null && clazz.isAssignableFrom(request.getClass())) ? (T) request : null; } public void setRequest(Object request) { this.request = request; } /** * Get the response object of the underlying RPC protocol, e.g. HttpServletResponse * * @return null if the underlying protocol doesn't provide support for getting response */ public Object getResponse() { return response; } /** * Get the response object of the underlying RPC protocol, e.g. HttpServletResponse * * @return null if the underlying protocol doesn't provide support for getting response or the response is not of the specified type */ @SuppressWarnings("unchecked") public <T> T getResponse(Class<T> clazz) { return (response != null && clazz.isAssignableFrom(response.getClass())) ? (T) response : null; } public void setResponse(Object response) { this.response = response; } /** * is provider side. * * @return provider side. */ public boolean isProviderSide() { URL url = getUrl(); if (url == null) { return false; } InetSocketAddress address = getRemoteAddress(); if (address == null) { return false; } String host; if (address.getAddress() == null) { host = address.getHostName(); } else { host = address.getAddress().getHostAddress(); } return url.getPort() != address.getPort() || ! NetUtils.filterLocalHost(url.getIp()).equals(NetUtils.filterLocalHost(host)); } /** * is consumer side. * * @return consumer side. */ public boolean isConsumerSide() { URL url = getUrl(); if (url == null) { return false; } InetSocketAddress address = getRemoteAddress(); if (address == null) { return false; } String host; if (address.getAddress() == null) { host = address.getHostName(); } else { host = address.getAddress().getHostAddress(); } return url.getPort() == address.getPort() && NetUtils.filterLocalHost(url.getIp()).equals(NetUtils.filterLocalHost(host)); } /** * get future. * * @param <T> * @return future */ @SuppressWarnings("unchecked") public <T> Future<T> getFuture() { return (Future<T>) future; } /** * set future. * * @param future */ public void setFuture(Future<?> future) { this.future = future; } public List<URL> getUrls() { return urls == null && url != null ? (List<URL>) Arrays.asList(url) : urls; } public void setUrls(List<URL> urls) { this.urls = urls; } public URL getUrl() { return url; } public void setUrl(URL url) { this.url = url; } /** * get method name. * * @return method name. */ public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } /** * get parameter types. * * @serial */ public Class<?>[] getParameterTypes() { return parameterTypes; } public void setParameterTypes(Class<?>[] parameterTypes) { this.parameterTypes = parameterTypes; } /** * get arguments. * * @return arguments. */ public Object[] getArguments() { return arguments; } public void setArguments(Object[] arguments) { this.arguments = arguments; } /** * set local address. * * @param address * @return context */ public RpcContext setLocalAddress(InetSocketAddress address) { this.localAddress = address; return this; } /** * set local address. * * @param host * @param port * @return context */ public RpcContext setLocalAddress(String host, int port) { if (port < 0) { port = 0; } this.localAddress = InetSocketAddress.createUnresolved(host, port); return this; } /** * get local address. * * @return local address */ public InetSocketAddress getLocalAddress() { return localAddress; } public String getLocalAddressString() { return getLocalHost() + ":" + getLocalPort(); } /** * get local host name. * * @return local host name */ public String getLocalHostName() { String host = localAddress == null ? null : localAddress.getHostName(); if (host == null || host.length() == 0) { return getLocalHost(); } return host; } /** * set remote address. * * @param address * @return context */ public RpcContext setRemoteAddress(InetSocketAddress address) { this.remoteAddress = address; return this; } /** * set remote address. * * @param host * @param port * @return context */ public RpcContext setRemoteAddress(String host, int port) { if (port < 0) { port = 0; } this.remoteAddress = InetSocketAddress.createUnresolved(host, port); return this; } /** * get remote address. * * @return remote address */ public InetSocketAddress getRemoteAddress() { return remoteAddress; } /** * get remote address string. * * @return remote address string. */ public String getRemoteAddressString() { return getRemoteHost() + ":" + getRemotePort(); } /** * get remote host name. * * @return remote host name */ public String getRemoteHostName() { return remoteAddress == null ? null : remoteAddress.getHostName(); } /** * get local host. * * @return local host */ public String getLocalHost() { String host = localAddress == null ? null : localAddress.getAddress() == null ? localAddress.getHostName() : NetUtils.filterLocalHost(localAddress.getAddress().getHostAddress()); if (host == null || host.length() == 0) { return NetUtils.getLocalHost(); } return host; } /** * get local port. * * @return port */ public int getLocalPort() { return localAddress == null ? 0 : localAddress.getPort(); } /** * get remote host. * * @return remote host */ public String getRemoteHost() { return remoteAddress == null ? null : remoteAddress.getAddress() == null ? remoteAddress.getHostName() : NetUtils.filterLocalHost(remoteAddress.getAddress().getHostAddress()); } /** * get remote port. * * @return remote port */ public int getRemotePort() { return remoteAddress == null ? 0 : remoteAddress.getPort(); } /** * get attachment. * * @param key * @return attachment */ public String getAttachment(String key) { return attachments.get(key); } /** * set attachment. * * @param key * @param value * @return context */ public RpcContext setAttachment(String key, String value) { if (value == null) { attachments.remove(key); } else { attachments.put(key, value); } return this; } /** * remove attachment. * * @param key * @return context */ public RpcContext removeAttachment(String key) { attachments.remove(key); return this; } /** * get attachments. * * @return attachments */ public Map<String, String> getAttachments() { return attachments; } /** * set attachments * * @param attachment * @return context */ public RpcContext setAttachments(Map<String, String> attachment) { this.attachments.clear(); if (attachment != null && attachment.size() > 0) { this.attachments.putAll(attachment); } return this; } public void clearAttachments() { this.attachments.clear(); } /** * get values. * * @return values */ public Map<String, Object> get() { return values; } /** * set value. * * @param key * @param value * @return context */ public RpcContext set(String key, Object value) { if (value == null) { values.remove(key); } else { values.put(key, value); } return this; } /** * remove value. * * @param key * @return value */ public RpcContext remove(String key) { values.remove(key); return this; } /** * get value. * * @param key * @return value */ public Object get(String key) { return values.get(key); } public RpcContext setInvokers(List<Invoker<?>> invokers) { this.invokers = invokers; if (invokers != null && invokers.size() > 0) { List<URL> urls = new ArrayList<URL>(invokers.size()); for (Invoker<?> invoker : invokers) { urls.add(invoker.getUrl()); } setUrls(urls); } return this; } public RpcContext setInvoker(Invoker<?> invoker) { this.invoker = invoker; if (invoker != null) { setUrl(invoker.getUrl()); } return this; } public RpcContext setInvocation(Invocation invocation) { this.invocation = invocation; if (invocation != null) { setMethodName(invocation.getMethodName()); setParameterTypes(invocation.getParameterTypes()); setArguments(invocation.getArguments()); } return this; } /** * @deprecated Replace to isProviderSide() */ @Deprecated public boolean isServerSide() { return isProviderSide(); } /** * @deprecated Replace to isConsumerSide() */ @Deprecated public boolean isClientSide() { return isConsumerSide(); } /** * @deprecated Replace to getUrls() */ @Deprecated @SuppressWarnings({ "unchecked", "rawtypes" }) public List<Invoker<?>> getInvokers() { return invokers == null && invoker != null ? (List)Arrays.asList(invoker) : invokers; } /** * @deprecated Replace to getUrl() */ @Deprecated public Invoker<?> getInvoker() { return invoker; } /** * @deprecated Replace to getMethodName(), getParameterTypes(), getArguments() */ @Deprecated public Invocation getInvocation() { return invocation; } /** * 异步调用 ,需要返回值,即使步调用Future.get方法,也会处理调用超时问题. * @param callable * @return 通过future.get()获取返回结果. */ @SuppressWarnings("unchecked") public <T> Future<T> asyncCall(Callable<T> callable) { try { try { setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString()); final T o = callable.call(); //local调用会直接返回结果. if (o != null) { FutureTask<T> f = new FutureTask<T>(new Callable<T>() { public T call() throws Exception { return o; } }); f.run(); return f; } else { } } catch (Exception e) { throw new RpcException(e); } finally { removeAttachment(Constants.ASYNC_KEY); } } catch (final RpcException e) { return new Future<T>() { public boolean cancel(boolean mayInterruptIfRunning) { return false; } public boolean isCancelled() { return false; } public boolean isDone() { return true; } public T get() throws InterruptedException, ExecutionException { throw new ExecutionException(e.getCause()); } public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return get(); } }; } return ((Future<T>)getContext().getFuture()); } /** * oneway调用,只发送请求,不接收返回结果. * @param callable */ public void asyncCall(Runnable runable) { try { setAttachment(Constants.RETURN_KEY, Boolean.FALSE.toString()); runable.run(); } catch (Throwable e) { //FIXME 异常是否应该放在future中? throw new RpcException("oneway call error ." + e.getMessage(), e); } finally { removeAttachment(Constants.RETURN_KEY); } } } 复制代码
注:RpcContext里的attachments信息会填入到RpcInvocation对象中, 一起传递过去。
因此有人就建议可以简单的把traceId注入到RpcContext中,这样就可以简单的实现traceId的透传了,事实是否如此,先让我们来一起实践一下。
定义Dubbo接口类:
public interface IEchoService { String echo(String name); } 复制代码
编写服务端代码(Provider):
@Service("echoService") public class EchoServiceImpl implements IEchoService { @Override public String echo(String name) { String traceId = RpcContext.getContext().getAttachment("traceId"); System.out.println("name = " + name + ", traceId = " + traceId); return name; } public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("spring-dubbo-test-producer.xml"); System.out.println("server start"); while (true) { try { Thread.sleep(1000L); } catch (InterruptedException e) { } } } } 复制代码
编写客户端代码(Consumer):
public class EchoServiceConsumer { public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("spring-dubbo-test-consumer.xml"); IEchoService service = (IEchoService) applicationContext .getBean("echoService"); // *) 设置traceId RpcContext.getContext().setAttachment("traceId", "100001"); System.out.println(RpcContext.getContext().getAttachments()); // *) 第一调用 service.echo("lilei"); // *) 第二次调用 System.out.println(RpcContext.getContext().getAttachments()); service.echo("hanmeimei"); } } 复制代码
执行的结果如下:
服务端输出: name = lilei, traceId = 100001 name = hanmeimei, traceId = null 客户端输出: {traceId=100001} {} 复制代码
从服务端的输出信息中,我们可以惊喜的发现,traceId确实传递过去了,但是只有第一次有,第二次没有。而从客户端对RpcContext的内容输出,也印证了这个现象,同时产生这个现象的本质原因是 RpcContext对象的attachment在一次rpc交互后被清空了 。
给RpcContext的clearAttachments方法, 设置断点后复现. 我们可以找到如下调用堆栈:
java.lang.Thread.State: RUNNABLE at com.alibaba.dubbo.rpc.RpcContext.clearAttachments(RpcContext.java:438) at com.alibaba.dubbo.rpc.filter.ConsumerContextFilter.invoke(ConsumerContextFilter.java:50) at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:91) at com.alibaba.dubbo.rpc.protocol.InvokerWrapper.invoke(InvokerWrapper.java:53) at com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:77) at com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:227) at com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:72) at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:52) at com.alibaba.dubbo.common.bytecode.proxy0.echo(proxy0.java:-1) at com.test.dubbo.EchoServiceConsumer.main(EchoServiceConsumer.java:20) 复制代码
其最直接的调用为Dubbo自带的ConsumerContextFilter,让我们来分析其代码:
@Activate( group = {"consumer"}, order = -10000 ) public class ConsumerContextFilter implements Filter { public ConsumerContextFilter() { } public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { RpcContext.getContext().setInvoker(invoker).setInvocation(invocation) .setLocalAddress(NetUtils.getLocalHost(), 0) .setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort()); if(invocation instanceof RpcInvocation) { ((RpcInvocation)invocation).setInvoker(invoker); } Result var3; try { var3 = invoker.invoke(invocation); } finally { RpcContext.getContext().clearAttachments(); } return var3; } } 复制代码
确实在finally代码片段中, 我们发现RpcContext在每次rpc调用后, 都会清空attachment对象 。
既然我们找到了本质原因,那么解决方法, 可以在每次调用的时候,重新设置下traceId ,比如像这样(看着感觉吃像相对难看了一点):
// *) 第一调用 RpcContext.getContext().setAttachment("traceId", "100001"); service.echo("lilei"); // *) 第二次调用 RpcContext.getContext().setAttachment("traceId", "100001"); service.echo("hanmeimei"); 复制代码
3 基于Filter实现
先引入一个 工具 类:
package com.test.dubbo; public class TraceIdFilter implements Filter { @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { String traceId = RpcContext.getContext().getAttachment("traceId"); if ( !StringUtils.isEmpty(traceId) ) { // *) 从RpcContext里获取traceId并保存 TraceIdUtils.setTraceId(traceId); } else { // *) 交互前重新设置traceId, 避免信息丢失 RpcContext.getContext().setAttachment("traceId", TraceIdUtils.getTraceId()); } // *) 实际的rpc调用 return invoker.invoke(invocation); } } 复制代码
然后我们定义一个Filter类:
package com.test.dubbo; public class TraceIdFilter implements Filter { @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { String traceId = RpcContext.getContext().getAttachment("traceId"); if ( !StringUtils.isEmpty(traceId) ) { // *) 从RpcContext里获取traceId并保存 TraceIdUtils.setTraceId(traceId); } else { // *) 交互前重新设置traceId, 避免信息丢失 RpcContext.getContext().setAttachment("traceId", TraceIdUtils.getTraceId()); } // *) 实际的rpc调用 return invoker.invoke(invocation); } } 复制代码
在resource目录下, 添加META-INF/dubbo目录, 继而添加com.alibaba.dubbo.rpc.Filter文件:
编辑(com.alibaba.dubbo.rpc.Filter文件)内容如下:
traceIdFilter=com.test.dubbo.TraceIdFilter 复制代码
然后我们给dubbo的producer和consumer都配置对应的filter项:
服务端: <dubbo:service interface="com.test.dubbo.IEchoService" ref="echoService" version="1.0.0" filter="traceIdFilter"/> 客户端: <dubbo:reference interface="com.test.dubbo.IEchoService" id="echoService" version="1.0.0" filter="traceIdFilter"/> 复制代码
服务端的测试代码小改为如下:
@Service("echoService") public class EchoServiceImpl implements IEchoService { @Override public String echo(String name) { String traceId = TraceIdUtils.getTraceId(); System.out.println("name = " + name + ", traceId = " + traceId); return name; } } 复制代码
客户端的测试代码片段为:
// *) 第一调用 RpcContext.getContext().setAttachment("traceId", "100001"); service.echo("lilei"); // *) 第二次调用 service.echo("hanmeimei"); 复制代码
同样的代码, 测试结果如下:
服务端输出: name = lilei, traceId = 100001 name = hanmeimei, traceId = 100001 客户端输出: {traceId=100001} {} 复制代码
符合预期,感觉这个方案就非常优雅了。 RpcContext的attachment依旧被清空(ConsumerContextFilter在自定义的Filter后执行) ,但是每次rpc交互前,traceId会被重新注入,保证跟踪线索透传成功。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Language Implementation Patterns
Terence Parr / Pragmatic Bookshelf / 2010-1-10 / USD 34.95
Knowing how to create domain-specific languages (DSLs) can give you a huge productivity boost. Instead of writing code in a general-purpose programming language, you can first build a custom language ......一起来看看 《Language Implementation Patterns》 这本书的介绍吧!