Dubbo压测插件的实现——基于Gatling

栏目: 后端 · 发布时间: 6年前

内容简介:Dubbo 压测插件已开源,本文涉及代码详见Gatling 是一个开源的基于 Scala、Akka、Netty 实现的高性能压测框架,较之其他基于线程实现的压测框架,Gatling 基于 AKKA Actor 模型实现,请求由事件驱动,在系统资源消耗上低于其他压测框架(如内存、连接池等),使得单台施压机可以模拟更多的用户。此外,Gatling 提供了一套简单高效的 DSL(领域特定语言)方便我们编排业务场景,同时也具备流量控制、压力控制的能力并提供了良好的压测报告,所以有赞选择在 Gatling 基础上扩

Dubbo 压测插件已开源,本文涉及代码详见 gatling-dubbo

Gatling 是一个开源的基于 Scala、Akka、Netty 实现的高性能压测框架,较之其他基于线程实现的压测框架,Gatling 基于 AKKA Actor 模型实现,请求由事件驱动,在系统资源消耗上低于其他压测框架(如内存、连接池等),使得单台施压机可以模拟更多的用户。此外,Gatling 提供了一套简单高效的 DSL(领域特定语言)方便我们编排业务场景,同时也具备流量控制、压力控制的能力并提供了良好的压测报告,所以有赞选择在 Gatling 基础上扩展分布式能力,开发了自己的全链路压测引擎 MAXIM。全链路压测中我们主要模拟用户实际使用场景,使用 HTTP 接口作为压测入口,但有赞目前后端服务中 Dubbo 应用比重越来越高,如果可以知道 Dubbo 应用单机水位将对我们把控系统后端服务能力大有裨益。基于 Gatling 的优势和在有赞的使用基础,我们扩展 Gatling 开发了 gatling-dubbo 压测插件。

插件主要结构

实现 Dubbo 压测插件,需实现以下四部分内容:

  • Protocol 和 ProtocolBuild 协议部分,这里主要定义 Dubbo 客户端相关内容,如协议、泛化调用、服务 URL、注册中心等内容,ProtocolBuild 则为 DSL 使用 Protocol 的辅助类
  • Action 和 ActionBuild 执行部分,这里的作用是发起 Dubbo 请求,校验请求结果并记录日志以便后续生成压测报告。ActionBuild 则为 DSL 使用 Action 的辅助类
  • Check 和 CheckBuild 检查部分,全链路压测中我们都使用 Json Path 检查请求结果,这里我们实现了一样的检查逻辑。CheckBuild 则为 DSL 使用 Check 的辅助类
  • DSL Dubbo 插件的领域特定语言,我们提供了一套简单易用的 API 方便编写 Duboo 压测脚本,风格上与原生 HTTP DSL 保持一致

Dubbo压测插件的实现——基于Gatling

Protocol

协议部分由 5 个属性组成,这些属性将在 Action 初始化 Dubbo 客户端时使用,分别是:

  • protocol 协议,设置为 dubbo
  • generic 泛化调用设置,Dubbo 压测插件使用泛化调用发起请求,所以这里设置为 true ,有赞优化了泛化调用的性能,为了使用该特性,引入了一个新值 result_no_change (去掉优化前泛化调用的序列化开销以提升性能)
  • url Dubbo 服务的地址: dubbo://IP地址:端口
  • registryProtocol Dubbo 注册中心的协议,设置为 ETCD3
  • registryAddress Dubbo 注册中心的地址

如果是测试 Dubbo 单机水位,则设置 url,注册中心设置为空;如果是测试 Dubbo 集群水位,则设置注册中心(目前支持 ETCD3),url 设置为空。由于目前注册中心只支持 ETCD3,插件在 Dubbo 集群上使用缺乏灵活性,所以我们又实现了客户端层面的负载均衡,如此便可抛开特定的注册中心来测试 Dubbo 集群水位。该特性目前正在内测中。

object DubboProtocol {  
  val DubboProtocolKey = new ProtocolKey {
    type Protocol = DubboProtocol
    type Components = DubboComponents

    def protocolClass: Class[io.gatling.core.protocol.Protocol] = classOf[DubboProtocol].asInstanceOf[Class[io.gatling.core.protocol.Protocol]]

    def defaultProtocolValue(configuration: GatlingConfiguration): DubboProtocol = throw new IllegalStateException("Can't provide a default value for DubboProtocol")

    def newComponents(system: ActorSystem, coreComponents: CoreComponents): DubboProtocol => DubboComponents = {
      dubboProtocol => DubboComponents(dubboProtocol)
    }
  }
}

case class DubboProtocol(  
    protocol: String, //dubbo
    generic:  String, //泛化调用?
    url:      String, //use url or
    registryProtocol: String,  //use registry
    registryAddress:  String   //use registry
) extends Protocol {
  type Components = DubboComponents
}

为了方便 Action 中使用上面这些属性,我们将其装进了 Gatling 的 ProtocolComponents:

case class DubboComponents(dubboProtocol: DubboProtocol) extends ProtocolComponents {  
  def onStart: Option[Session => Session] = None
  def onExit: Option[Session => Unit] = None
}

以上就是关于 Protocol 的定义。为了能在 DSL 中配置上述 Protocol,我们定义了 DubboProtocolBuilder,包含了 5 个方法分别设置 Protocol 的 protocol、generic、url、registryProtocol、registryAddress 5 个属性。

object DubboProtocolBuilderBase {  
  def protocol(protocol: String) = DubboProtocolBuilderGenericStep(protocol)
}

case class DubboProtocolBuilderGenericStep(protocol: String) {  
  def generic(generic: String) = DubboProtocolBuilderUrlStep(protocol, generic)
}

case class DubboProtocolBuilderUrlStep(protocol: String, generic: String) {  
  def url(url: String) = DubboProtocolBuilderRegistryProtocolStep(protocol, generic, url)
}

case class DubboProtocolBuilderRegistryProtocolStep(protocol: String, generic: String, url: String) {  
  def registryProtocol(registryProtocol: String) = DubboProtocolBuilderRegistryAddressStep(protocol, generic, url, registryProtocol)
}

case class DubboProtocolBuilderRegistryAddressStep(protocol: String, generic: String, url: String, registryProtocol: String) {  
  def registryAddress(registryAddress: String) = DubboProtocolBuilder(protocol, generic, url, registryProtocol, registryAddress)
}

case class DubboProtocolBuilder(protocol: String, generic: String, url: String, registryProtocol: String, registryAddress: String) {  
  def build = DubboProtocol(
    protocol = protocol,
    generic = generic,
    url = url,
    registryProtocol = registryProtocol,
    registryAddress = registryAddress
  )
}

Action

DubboAction 包含了 Duboo 请求逻辑、请求结果校验逻辑以及压力控制逻辑,需要扩展 ExitableAction 并实现 execute 方法。

DubboAction 类的域 argTypes、argValues 分别是泛化调用请求参数类型和请求参数值,需为 Expression[] 类型,这样当使用数据 Feeder 作为压测脚本参数输入时,可以使用类似 ${args_types}${args_values} 这样的表达式从数据 Feeder 中解析对应字段的值。

execute 方法必须以异步方式执行 Dubbo 请求,这样前一个 Dubbo 请求执行后但还未等响应返回时虚拟用户就可以通过 AKKA Message 立即发起下一个请求,如此一个虚拟用户可以在很短的时间内构造大量请求。请求方式方面,相比于泛化调用,原生 API 调用需要客户端载入 Dubbo 服务相应的 API 包,但有时候却拿不到,此外,当被测 Dubbo 应用多了,客户端需要载入多个 API 包,所以出于使用上的便利性,Dubbo 压测插件使用泛化调用发起请求。

异步请求响应后会执行 onComplete 方法,校验请求结果,并根据校验结果记录请求成功或失败日志,压测报告就是使用这些日志统计计算的。

为了控制压测时的 RPS,则需要实现 throttle 逻辑。实践中发现,高并发情况下,泛化调用性能远不如原生 API 调用性能,且响应时间成倍增长(如此不能表征 Dubbo 应用的真正性能),导致 Dubbo 压测插件压力控制不准,解决办法是优化泛化调用性能,使之与原生 API 调用的性能相近,请参考 dubbo 泛化调用性能优化

class DubboAction(  
    interface:        String,
    method:           String,
    argTypes:         Expression[Array[String]],
    argValues:        Expression[Array[Object]],
    genericService:   GenericService,
    checks:           List[DubboCheck],
    coreComponents:   CoreComponents,
    throttled:        Boolean,
    val objectMapper: ObjectMapper,
    val next:         Action
) extends ExitableAction with NameGen {

  override def statsEngine: StatsEngine = coreComponents.statsEngine

  override def name: String = genName("dubboRequest")

  override def execute(session: Session): Unit = recover(session) {
    argTypes(session) flatMap { argTypesArray =>
      argValues(session) map { argValuesArray =>
        val startTime = System.currentTimeMillis()
        val f = Future {
          try {
            genericService.$invoke(method, argTypes(session).get, argValues(session).get)
          } finally {
          }
        }

        f.onComplete {
          case Success(result) =>
            val endTime = System.currentTimeMillis()
            val resultMap = result.asInstanceOf[JMap[String, Any]]
            val resultJson = objectMapper.writeValueAsString(resultMap)
            val (newSession, error) = Check.check(resultJson, session, checks)
            error match {
              case None =>
                statsEngine.logResponse(session, interface + "." + method, ResponseTimings(startTime, endTime), Status("OK"), None, None)
                throttle(newSession(session))
              case Some(Failure(errorMessage)) =>
                statsEngine.logResponse(session, interface + "." + method, ResponseTimings(startTime, endTime), Status("KO"), None, Some(errorMessage))
                throttle(newSession(session).markAsFailed)
            }
          case FuFailure(e) =>
            val endTime = System.currentTimeMillis()
            statsEngine.logResponse(session, interface + "." + method, ResponseTimings(startTime, endTime), Status("KO"), None, Some(e.getMessage))
            throttle(session.markAsFailed)
        }
      }
    }
  }

  private def throttle(s: Session): Unit = {
    if (throttled) {
      coreComponents.throttler.throttle(s.scenario, () => next ! s)
    } else {
      next ! s
    }
  }
}

DubboActionBuilder 则是获取 Protocol 属性并初始化 Dubbo 客户端:

case class DubboActionBuilder(interface: String, method: String, argTypes: Expression[Array[String]], argValues: Expression[Array[Object]], checks: List[DubboCheck]) extends ActionBuilder {  
  private def components(protocolComponentsRegistry: ProtocolComponentsRegistry): DubboComponents =
    protocolComponentsRegistry.components(DubboProtocol.DubboProtocolKey)

  override def build(ctx: ScenarioContext, next: Action): Action = {
    import ctx._
    val protocol = components(protocolComponentsRegistry).dubboProtocol
    //Dubbo客户端配置
    val reference = new ReferenceConfig[GenericService]
    val application = new ApplicationConfig
    application.setName("gatling-dubbo")
    reference.setApplication(application)
    reference.setProtocol(protocol.protocol)
    reference.setGeneric(protocol.generic)
    if (protocol.url == "") {
      val registry = new RegistryConfig
      registry.setProtocol(protocol.registryProtocol)
      registry.setAddress(protocol.registryAddress)
      reference.setRegistry(registry)
    } else {
      reference.setUrl(protocol.url)
    }
    reference.setInterface(interface)
    val cache = ReferenceConfigCache.getCache
    val genericService = cache.get(reference)
    val objectMapper: ObjectMapper = new ObjectMapper()
    new DubboAction(interface, method, argTypes, argValues, genericService, checks, coreComponents, throttled, objectMapper, next)
  }
}

LambdaProcessBuilder 则提供了设置 Dubbo 泛化调用入参的 DSL 以及接下来要介绍的 Check 部分的 DSL

case class DubboProcessBuilder(interface: String, method: String, argTypes: Expression[Array[String]] = _ => Success(Array.empty[String]), argValues: Expression[Array[Object]] = _ => Success(Array.empty[Object]), checks: List[DubboCheck] = Nil) extends DubboCheckSupport {

  def argTypes(argTypes: Expression[Array[String]]): DubboProcessBuilder = copy(argTypes = argTypes)

  def argValues(argValues: Expression[Array[Object]]): DubboProcessBuilder = copy(argValues = argValues)

  def check(dubboChecks: DubboCheck*): DubboProcessBuilder = copy(checks = checks ::: dubboChecks.toList)

  def build(): ActionBuilder = DubboActionBuilder(interface, method, argTypes, argValues, checks)
}

Check

全链路压测中,我们都使用 Json Path 校验 HTTP 请求结果,Dubbo 压测插件中,我们也实现了基于 Json Path 的校验。实现 Check,必须实现 Gatling check 中的 Extender 和 Preparer:

package object dubbo {  
  type DubboCheck = Check[String]

  val DubboStringExtender: Extender[DubboCheck, String] =
    (check: DubboCheck) => check

  val DubboStringPreparer: Preparer[String, String] =
    (result: String) => Success(result)
}

基于 Json Path 的校验逻辑:

trait DubboJsonPathOfType {  
  self: DubboJsonPathCheckBuilder[String] =>

  def ofType[X: JsonFilter](implicit extractorFactory: JsonPathExtractorFactory) = new DubboJsonPathCheckBuilder[X](path, jsonParsers)
}

object DubboJsonPathCheckBuilder {  
  val CharsParsingThreshold = 200 * 1000

  def preparer(jsonParsers: JsonParsers): Preparer[String, Any] =
    response => {
      if (response.length() > CharsParsingThreshold || jsonParsers.preferJackson)
        jsonParsers.safeParseJackson(response)
      else
        jsonParsers.safeParseBoon(response)
    }

  def jsonPath(path: Expression[String])(implicit extractorFactory: JsonPathExtractorFactory, jsonParsers: JsonParsers) =
    new DubboJsonPathCheckBuilder[String](path, jsonParsers) with DubboJsonPathOfType
}

class DubboJsonPathCheckBuilder[X: JsonFilter](  
    private[check] val path:        Expression[String],
    private[check] val jsonParsers: JsonParsers
)(implicit extractorFactory: JsonPathExtractorFactory)
  extends DefaultMultipleFindCheckBuilder[DubboCheck, String, Any, X](
    DubboStringExtender,
    DubboJsonPathCheckBuilder.preparer(jsonParsers)
  ) {
  import extractorFactory._

  def findExtractor(occurrence: Int) = path.map(newSingleExtractor[X](_, occurrence))
  def findAllExtractor = path.map(newMultipleExtractor[X])
  def countExtractor = path.map(newCountExtractor)
}

DubboCheckSupport 则提供了设置 jsonPath 表达式的 DSL

trait DubboCheckSupport {  
  def jsonPath(path: Expression[String])(implicit extractorFactory: JsonPathExtractorFactory, jsonParsers: JsonParsers) =
    DubboJsonPathCheckBuilder.jsonPath(path)
}
  • Dubbo 压测脚本中可以设置一个或多个 check 校验请求结果,使用 DSL check 方法*

DSL

trait AwsDsl 提供顶层 DSL。我们还定义了 dubboProtocolBuilder2DubboProtocol、dubboProcessBuilder2ActionBuilder 两个 Scala 隐式方法,以自动构造 DubboProtocol 和 ActionBuilder。

此外,泛化调用中使用的参数类型为 Java 类型,而我们的压测脚本使用 Scala 编写,所以这里需要做两种语言间的类型转换,所以我们定义了 transformJsonDubboData 方法

trait DubboDsl extends DubboCheckSupport {  
  val Dubbo = DubboProtocolBuilderBase

  def dubbo(interface: String, method: String) = DubboProcessBuilder(interface, method)

  implicit def dubboProtocolBuilder2DubboProtocol(builder: DubboProtocolBuilder): DubboProtocol = builder.build

  implicit def dubboProcessBuilder2ActionBuilder(builder: DubboProcessBuilder): ActionBuilder = builder.build()

  def transformJsonDubboData(argTypeName: String, argValueName: String, session: Session): Session = {
    session.set(argTypeName, toArray(session(argTypeName).as[JList[String]]))
      .set(argValueName, toArray(session(argValueName).as[JList[Any]]))
  }

  private def toArray[T:ClassTag](value: JList[T]): Array[T] = {
    value.asScala.toArray
  }
}
object Predef extends DubboDsl

Dubbo 压测脚本和数据 Feeder 示例

压测脚本示例:

import io.gatling.core.Predef._  
import io.gatling.dubbo.Predef._

import scala.concurrent.duration._

class DubboTest extends Simulation {  
  val dubboConfig = Dubbo
    .protocol("dubbo")
    .generic("true")
    //直连某台Dubbo机器,只单独压测一台机器的水位
    .url("dubbo://IP地址:端口")
    //或设置注册中心,压测该Dubbo应用集群的水位,支持ETCD3注册中心
    .registryProtocol("")
    .registryAddress("")

  val jsonFileFeeder = jsonFile("data.json").circular  //数据Feeder
  val dubboScenario = scenario("load test dubbo")
    .forever("repeated") {
      feed(jsonFileFeeder)
        .exec(session => transformJsonDubboData("args_types1", "args_values1", session))
        .exec(dubbo("com.xxx.xxxService", "methodName")
          .argTypes("${args_types1}")
          .argValues("${args_values1}")
          .check(jsonPath("$.code").is("200"))
        )
    }

  setUp(
    dubboScenario.inject(atOnceUsers(10))
      .throttle(
        reachRps(10) in (1 seconds),
        holdFor(30 seconds))
  ).protocols(dubboConfig)
}

data.json 示例:

[
  {
  "args_types1": ["com.xxx.xxxDTO"],
  "args_values1": [{
    "field1": "111",
    "field2": "222",
    "field3": "333"
  }]
  }
]

Dubbo 压测报告示例

Dubbo压测插件的实现——基于Gatling

我的系列博客 混沌工程 - 软件系统高可用、弹性化的必由之路 异步系统的两种测试方法

我的其他测试相关开源项目 捉虫记:方便产品、开发、测试三方协同自测的管理工具

招聘有赞测试组在持续招人中,大量岗位空缺,只要你来,就能帮你点亮全栈开发技能树,有意向换工作的同学可以发简历到 sunjun【@】youzan.com

欢迎关注我们的公众号

Dubbo压测插件的实现——基于Gatling

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Programming Concurrency on the JVM

Programming Concurrency on the JVM

Venkat Subramaniam / The Pragmatic Bookshelf / 2011-6-1 / USD 35.00

Concurrency on the Java platform has evolved, from the synchronization model of JDK to software transactional memory (STM) and actor-based concurrency. This book is the first to show you all these con......一起来看看 《Programming Concurrency on the JVM》 这本书的介绍吧!

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具