复杂业务需求下,我们为什么选择 Akka 作为异步通信框架?

栏目: Scala · 发布时间: 5年前

内容简介:Akka 是 Scala 语言实现的一套基于 Actor 模型的异步通信框架,可用于构建高并发、分布式、可容错、事件驱动的基于 JVM 的应用,在 Spark 中曾被用于实现进程、节点间通信,在实际项目中协助我们成功搭建了满足业务需求的模型部署平台。某国内大型连锁餐饮企业旗下拥有大量门店。餐厅门店的每日生产、订货、排班都依赖于每日客单量预估的合理性,其内部数据团队实现了一套预估模型,需要 TalkingData 帮助构建一个工程化平台以支撑模型的训练和部署,从而将模型真正地应用到实际生产环节中。经过交流,我

Akka 是 Scala 语言实现的一套基于 Actor 模型的异步通信框架,可用于构建高并发、分布式、可容错、事件驱动的基于 JVM 的应用,在 Spark 中曾被用于实现进程、节点间通信,在实际项目中协助我们成功搭建了满足业务需求的模型部署平台。

项目背景

某国内大型连锁餐饮企业旗下拥有大量门店。餐厅门店的每日生产、订货、排班都依赖于每日客单量预估的合理性,其内部数据团队实现了一套预估模型,需要 TalkingData 帮助构建一个工程化平台以支撑模型的训练和部署,从而将模型真正地应用到实际生产环节中。

经过交流,我们发现在实际生产环境中,在各方面存在一些问题:

  • 异步:所有门店的前日销售、业务等数据均由各自门店的店长负责整合上传。上传的开始时间、结束时间、数据的完整性等均不确定。而模型训练和预测均依赖这部分数据,这就意味这无法为模型训练和预测设置统一的开始入口。

  • 高并发:除了一些特殊类型的门店,绝大多数门店的营业时间相对固定,从店长决定整理上传销售数据,到准备物料、排班准备次日营业,留给模型训练和模型预测回吐预测结果的时间大概为 3 小时。如果每个门店的预测指标有 2 至 3 项,那么需要有足够的调度能力在规定时间内完成大概 2 万次模型训练加预测流程。

  • 容错:由于门店数量众多且情况各不相同,仍然有很多潜在的因素可能导致流程出错或失败。原则上,某次流程的失败不应该对其他流程造成任何影响,每个流程在平台层面应该成为互相独立的任务。

因此,我们需要一套轻量化的分布式服务框架,来实现满足上述需求的模型训练预测平台,并在一定程度上保证平台的可拓展性。结合此前团队内的技术积累,最终选择了 Akka 框架用于实现平台的内部通信。

选型过程

消息驱动方式——流程异步化

一次完整的预测任务包括:训练数据准备→模型训练→模型结果导出→预测数据准备→预测结果导出,其中数据准备步骤在时间上不确定,模型相关步骤在执行结果上不确定,如果采用同步模型,将会产生大量的等待线程,占用浪费大量资源。在 Actor 模型中,每个 Actor 作为一个基本计算单元,回应接收到的消息,同时并行的:

  • 发送有限数量的消息给其他 Actor
  • 创建有限数量的新 Actor
  • 指定接受到下一个消息时的行为

上述操作没有顺序执行的假设,因此可以并行进行。发送者与已经发送的消息解耦,可以进行无需等待的异步通信。

复杂业务需求下,我们为什么选择 Akka 作为异步通信框架?

Actor 模型通信方式

Akka 中的 Actor 本质上就是接收消息并采取行动处理消息的对象,是封装状态和行为的对象,它们唯一的通信方式是交换消息——把消息存放在接收方的邮箱里。Actor 自然形成树形结构,这种结构的精髓在于任务被拆开、委托,直到任务小到可以被完整地处理。因此,我们将预测任务的各个步骤拆分抽象,并创建类型消息与步骤对应,将每个步骤交给线程级别的 Actor 执行处理,通过发送不同类型的消息来触发创建不同操作的 Actor,让整个预测流程无需等待。

结构——应对高并发

由于绝大多数门店的营业时间大致相同,平台在流量上会有明显的峰值和低谷,在低谷期间平台需要尽可能减少资源占有量,而在流量峰值来临时平台要能够及时响应,保证足够的可用性。

经过讨论,我们确定了采用 Master-Worker 模式的平台结构,Master 负责接收与分配任务,Worker 负责处理执行具体的模型任务。

Master 和 Worker 均为独立的 ActorSystem,管理内部不不同操作逻辑的 Actor,在空闲状态下占有资源很小。Actor 为线程级别,同样仅占用极少量资源,生命周期由 ActorSystem 统一管理。少量请求时,Actor 线程具有很高的复用率,请求并发高时,ActorSystem 会创建大量的 Actor 线程用来承接请求,保证可用性。

复杂业务需求下,我们为什么选择 Akka 作为异步通信框架?

Akka 中 Actor 的生命周期

子 Actor——模块化提高容错

每个预测任务的模型相关步骤均存在失败的可能性,此外,数据准备过程中的网络波动、内容校验出错等情况,都会导致当前预测任务的失败。对于失败的任务,我们希望能够尽可能记录错误信息,为重跑提供先决条件。

在 Akka 中,构建了父子 Actor 的树形监督结构,提供 Actor 的监督机制以保证容错性,把处理响应错误的责任交给出错对象以外的实体。父 Actor 创建子 Actor 来委托处理子任务,同时便会自动地监管它们。子 Actor 列表维护在父 Actor 的上下文中,父 Actor 可以访问它。

复杂业务需求下,我们为什么选择 Akka 作为异步通信框架?

Akka 中的 Actor 结构

通过更进一步的拆分细化,我们将 Worker 端的 Actor 分为 Prepare 和 Executor 两种,Prepare 为主要负责数据准备步骤,Executor 负责模型相关步骤,统一由 Worker 端的父 Actor 管理,错误和异常均向上层抛出,由 Worker 端的父 Actor 记录并发送给的错误收集模块统一处理。

实践应用

ActorSystem

创建 ActorSystem 时,默认将在 classpath 中寻找 application.conf、 application.json 和 application.properties,并自动加载:

复制代码

valsystem=ActorSystem("RsModelActorSystem")
valsystem=ActorSystem("RsModelActorSystem", ConfigFactory.load())// 同上

如果想要使用自己的配置文件,可以通过 ConfigFactory 来配置加载:

复制代码

valsystem =ActorSystem("UniversityMessageSystem",
ConfigFactory.load("own-application.conf"))


valconfig =ConfigFactory.parseString(
s"""
|akka.remote.netty.tcp.hostname = $host
|akka.actor.provider = akka.remote.RemoteActorRefProvider
|akka.remote.enabled-transport = akka.remote.netty.tcp
|akka.remote.netty.tcp.port = 2445
""".stripMargin)
valsystem =ActorSystem("RsModelActorSystem",
config.withFallback(ConfigFactory.load()))// 同上

ActorSystem 的配置参数中有大量参数可以自定义,需要根据实际需要修改,例如在该项目中,后期单个算法任务对象大小超过了 Akka remote 默认包大小 128000 bytes,需要修改参数 akka.remote.netty.tcp.maximum-frame-size

Actor

一个 Actor 包含了状态、行为、一个邮箱、子 Actor 和一个监管策略,所有这些封装在一个 Actor 引用里。Actor 对象通常包含一些变量来反映其所处的可能状态,Akka-actor 自身的轻量线程与系统的其他部分完全隔离,因此无须担心并发问题。每当一个消息被处理,它会与 Actor 的当前行为进行匹配。行为是一个函数,它定义了在某个时间点处理当前消息所要采取的动作,需要结合实际需求编写具体逻辑。Actor 的邮箱是连接发送者与接收者的纽带,每个 Actor 有且仅有一个邮箱,所有的发来的消息都在邮箱里排队。可以有不同策略的邮箱实现供选择,缺省时为 FIFO。

编写逻辑

在 Actor 类中,主要逻辑均在 receive 方法中实现,通过偏函数方法,执行并返回对应的逻辑:

复制代码

//ActorLogging 提供 Actor 内部的日志输出
classRsActorextendsActorwithActorLogging{
overridedefreceive:Receive= {
caseMapMessage(parameters) =>
println(parameters.get("code"))

caseMapKeyMessage(parameters, key) =>
println(parameters.get(key))

caseStringMessage(msg) =>
println(msg.getBytes().length)

caseo:Object=>
println(o.getClass)

case_:AnyRef=>
println("233")
}
}

生成引用

生成一个可以接收消息的 Actor 实例主要有两个方法:

复制代码

// 生成一个基于本地类的 Actor 实例
valrsActor = system.actorOf(Props[RsActor],"rsActor")
// 生成一个基于远程地址的 Actor 实例
valrmActor =
system.actorSelection("akka.tcp://RsModelAkkaSystem@192.168.1.9:2445/user/rsActor")

// 使用! 向对应的 Actor 实例发送消息
rsActor !StringMessage("test")
rmActor !MapMessage(Map("code"->"233"))

Message

Akka 中对传递的消息内容并没有太严格要求,可以是基本数据类型,也可以是支持序列化的对象:

复制代码

//scala 的 case class 便于简洁地创建消息类
caseclassStringMessage(msg:String)extendsSerializable
caseclassMapMessage(parameters:Map[String,String])extendsSerializable
caseclassMapKeyMessage(parameters:Map[String,String], key:String)extendsSerializable

其他

Akka 作为一款被广泛使用的开源工具,在实际项目中体现出了很多的优势,异步的消息驱动方式也给我们提供了一套新的思路和实现方法。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Python科学计算(第2版)

Python科学计算(第2版)

张若愚 / 清华大学出版社 / 2016-4-29 / 118

本书介绍如何用 Python 开发科学计算的应用程序,除了介绍数值计算之外,还着重介绍了如何制作交互式二维、三维图像,如何设计精巧的程序界面,如何与 C 语言编写的高速计算程序结合,如何编写声音、图像处理算法等内容。本书采用 IPython notebook 编写,所有的程序均能在本书提供的运行环境中正常运行,书中所印刷的图表以及程序输出为均为自动运行的结果,保证了书中所有程序的正确性以及可读性。......一起来看看 《Python科学计算(第2版)》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

在线进制转换器
在线进制转换器

各进制数互转换器

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具