原 荐 Dubbo + Zipkin + Brave实现全链路追踪

栏目: 后端 · 发布时间: 6年前

内容简介:最近写了一个链路追踪Demo分享下,实现了链路追踪过程中数据的记录,还有能扩展的地方,后期再继续补充。原理参考上面文章

Dubbo + Zipkin + Brave实现全链路追踪

最近写了一个链路追踪Demo分享下,实现了链路追踪过程中数据的记录,还有能扩展的地方,后期再继续补充。

原理参考上面文章 《Dubbo链路追踪——生成全局ID(traceId)》

源码地址

实现链路追踪的目的

  • 服务调用的流程信息,定位服务调用链
  • 记录调用入参及返回值信息,方便问题重现
  • 记录调用时间线,代码重构及调优处理
  • 调用信息统计

分布式跟踪系统还有其他比较成熟的实现,例如:Naver的Pinpoint、Apache的HTrace、阿里的鹰眼Tracing、京东的Hydra、新浪的Watchman,美团点评的CAT,skywalking等。 本次主要利用Dubbo数据传播特性扩展Filter接口来实现链路追踪的目的

重点主要是zipkin及brave使用及特性,当前brave版本为 5.2.0 为 2018年8月份发布的release版本 , zipkin版本为2.2.1 所需JDK为1.8

快速启动zipkin

下载最新的zipkin并启动

wget -O zipkin.jar 'https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec'
java -jar zipkin.jar

输入 http://localhost:9411/zipkin/ 进入WebUI界面如下 原 荐 Dubbo + Zipkin + Brave实现全链路追踪

核心源码

代码的初步版本:方便描述

import brave.Span;
import brave.Tracer;
import brave.Tracing;
import brave.propagation.*;
import brave.sampler.Sampler;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.common.json.JSON;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.remoting.exchange.ResponseCallback;
import com.alibaba.dubbo.rpc.*;
import com.alibaba.dubbo.rpc.protocol.dubbo.FutureAdapter;
import com.alibaba.dubbo.rpc.support.RpcUtils;
import zipkin2.codec.SpanBytesEncoder;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.Sender;
import zipkin2.reporter.okhttp3.OkHttpSender;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * Created with IntelliJ IDEA.
 *
 * @author: bakerZhu
 * @description:
 * @modifytime:
 */
@Activate(group = {Constants.PROVIDER, Constants.CONSUMER})
public class TracingFilter  implements Filter {

	private static final Logger log = LoggerFactory.getLogger(TracingFilter.class);

	private static Tracing tracing;
	private static Tracer tracer;
	private static TraceContext.Extractor<Map<String, String>> extractor;
	private static TraceContext.Injector<Map<String, String>> injector;

	static final Propagation.Getter<Map<String, String>, String> GETTER =
			new Propagation.Getter<Map<String, String>, String>() {
				@Override
				public String get(Map<String, String> carrier, String key) {
					return carrier.get(key);
				}

				@Override
				public String toString() {
					return "Map::get";
				}
			};

	static final Propagation.Setter<Map<String, String>, String> SETTER =
			new Propagation.Setter<Map<String, String>, String>() {
				@Override
				public void put(Map<String, String> carrier, String key, String value) {
					carrier.put(key, value);
				}

				@Override
				public String toString() {
					return "Map::set";
				}
			};

	static {
		// 1
		Sender sender = OkHttpSender.create("http://localhost:9411/api/v2/spans");
		// 2
		AsyncReporter asyncReporter = AsyncReporter.builder(sender)
				.closeTimeout(500, TimeUnit.MILLISECONDS)
				.build(SpanBytesEncoder.JSON_V2);
		// 3
		tracing = Tracing.newBuilder()
				.localServiceName("tracer-client")
				.spanReporter(asyncReporter)
				.sampler(Sampler.ALWAYS_SAMPLE)
				.propagationFactory(ExtraFieldPropagation.newFactory(B3Propagation.FACTORY, "user-name"))
				.build();
		tracer = tracing.tracer();
		// 4
		// 4.1
		extractor = tracing.propagation().extractor(GETTER);
		// 4.2
		injector = tracing.propagation().injector(SETTER);
	}



	public TracingFilter() {
	}

	@Override
	public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {


		RpcContext rpcContext = RpcContext.getContext();
		// 5
		Span.Kind kind = rpcContext.isProviderSide() ? Span.Kind.SERVER : Span.Kind.CLIENT;
		final Span span;
		if (kind.equals(Span.Kind.CLIENT)) {
			//6
			span = tracer.nextSpan();
			//7
			injector.inject(span.context(), invocation.getAttachments());
		} else {
			//8
			TraceContextOrSamplingFlags extracted = extractor.extract(invocation.getAttachments());
			//9
			span = extracted.context() != null ? tracer.joinSpan(extracted.context()) : tracer.nextSpan(extracted);
		}

		if (!span.isNoop()) {
			span.kind(kind).start();
			//10
			String service = invoker.getInterface().getSimpleName();
			String method = RpcUtils.getMethodName(invocation);
			span.kind(kind);
			span.name(service + "/" + method);
			InetSocketAddress remoteAddress = rpcContext.getRemoteAddress();
			span.remoteIpAndPort(
					remoteAddress.getAddress() != null ? remoteAddress.getAddress().getHostAddress() : remoteAddress.getHostName(),remoteAddress.getPort());
		}

		boolean isOneway = false, deferFinish = false;
		try (Tracer.SpanInScope scope = tracer.withSpanInScope(span)){
			//11
			collectArguments(invocation, span, kind);
			Result result = invoker.invoke(invocation);

			if (result.hasException()) {
				onError(result.getException(), span);
			}
			// 12
			isOneway = RpcUtils.isOneway(invoker.getUrl(), invocation);
			// 13
			Future<Object> future = rpcContext.getFuture();

			if (future instanceof FutureAdapter) {
				deferFinish = true;
				((FutureAdapter) future).getFuture().setCallback(new FinishSpanCallback(span));// 14
			}
			return result;
		} catch (Error | RuntimeException e) {
			onError(e, span);
			throw e;
		} finally {
			if (isOneway) { // 15
				span.flush();
			} else if (!deferFinish) { // 16
				span.finish();
			}
		}
	}

	static void onError(Throwable error, Span span) {
		span.error(error);
		if (error instanceof RpcException) {
			span.tag("dubbo.error_msg", RpcExceptionEnum.getMsgByCode(((RpcException) error).getCode()));
		}
	}

	static void collectArguments(Invocation invocation, Span span, Span.Kind kind) {
		if (kind == Span.Kind.CLIENT) {
			StringBuilder fqcn = new StringBuilder();
			Object[] args = invocation.getArguments();
			if (args != null && args.length > 0) {
				try {
					fqcn.append(JSON.json(args));
				} catch (IOException e) {
					log.warn(e.getMessage(), e);
				}
			}
			span.tag("args", fqcn.toString());
		}
	}



	static final class FinishSpanCallback implements ResponseCallback {
		final Span span;

		FinishSpanCallback(Span span) {
			this.span = span;
		}

		@Override
		public void done(Object response) {
			span.finish();
		}

		@Override
		public void caught(Throwable exception) {
			onError(exception, span);
			span.finish();
		}
	}
	// 17
	private enum RpcExceptionEnum {
		UNKNOWN_EXCEPTION(0, "unknown exception"),
		NETWORK_EXCEPTION(1, "network exception"),
		TIMEOUT_EXCEPTION(2, "timeout exception"),
		BIZ_EXCEPTION(3, "biz exception"),
		FORBIDDEN_EXCEPTION(4, "forbidden exception"),
		SERIALIZATION_EXCEPTION(5, "serialization exception"),;

		private int code;

		private String msg;

		RpcExceptionEnum(int code, String msg) {
			this.code = code;
			this.msg = msg;
		}

		public static String getMsgByCode(int code) {
			for (RpcExceptionEnum error : RpcExceptionEnum.values()) {
				if (code == error.code) {
					return error.msg;
				}
			}
			return null;
		}
	}
}
  1. 构建客户端发送工具
  2. 构建异步reporter
  3. 构建tracing上下文
  4. 初始化injector 和 Extractor [tab]4.1 extractor 指数据提取对象,用于在carrier中提取TraceContext相关信息或者采样标记信息到TraceContextOrSamplingFlags 中 -4.2 injector 用于将TraceContext中的各种数据注入到carrier中,其中carrier一半是指数据传输中的载体,类似于Dubbo中Invocation中的attachment(附件集合)
  5. 判断此次调用是作为服务端还是客户端
  6. rpc客户端调用会从ThreadLocal中获取parent的 TraceContext ,为新生成的Span指定traceId及 parentId如果没有parent traceContext 则生成的Span为 root span
  7. 将Span绑定的TraceContext中 属性信息 Copy 到 Invocation中达到远程参数传递的作用
  8. rpc服务提供端 , 从invocation中提取TraceContext相关信息及采样数据信息
  9. 生成span , 兼容初次服务端调用
  10. 记录接口信息及远程IP Port
  11. 将创建的Span 作为当前Span (可以通过Tracer.currentSpan 访问到它) 并设置查询范围
  12. oneway调用即只请求不接受结果
  13. 如果future不为空则为 async 调用 在回调中finish span
  14. 设置异步回调,回调代码执行span finish() .
  15. oneway调用 因为不需等待返回值 即没有 cr (Client Receive) 需手动flush()
  16. 同步调用 业务代码执行完毕后需手动finish()
  17. 设置枚举类 与 Dubbo中RpcException保持对应

测试项

  • Dubbo sync async oneway 测试
  • RPC异常测试
  • 普通业务异常测试
  • 并发测试

配置方式

POM依赖添加

<dependency>
    <groupId>com.github.baker</groupId>
    <artifactId>Tracing</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>

资源目录根路径下添加tracing.properties文件 原 荐 Dubbo + Zipkin + Brave实现全链路追踪 一次调用信息 原 荐 Dubbo + Zipkin + Brave实现全链路追踪 调用链 原 荐 Dubbo + Zipkin + Brave实现全链路追踪 调用成功失败汇总 原 荐 Dubbo + Zipkin + Brave实现全链路追踪 zipkinHost 指定zipkin服务器IP:PORT 默认为localhost:9411 serviceName 指定应用名称 默认为trace-default

调用链: 原 荐 Dubbo + Zipkin + Brave实现全链路追踪

待扩展项

  • 抽象数据传输(扩展Kafka数据传输)
  • 调用返回值数据打印
  • 更灵活的配置方式

源码地址


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Web Scalability for Startup Engineers

Web Scalability for Startup Engineers

Artur Ejsmont / McGraw / 2015-6-23 / USD 34.81

Design and build scalable web applications quickly This is an invaluable roadmap for meeting the rapid demand to deliver scalable applications in a startup environment. With a focus on core concept......一起来看看 《Web Scalability for Startup Engineers》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具