阅读 11
复杂业务下,我们为何选择 Akka 作为异步通信框架?
Akka 是 Scala 语言实现的一套基于 Actor 模型的异步通信框架,可用于构建高并发、分布式、可容错、事件驱动的基于 JVM 的应用,在 Spark 中曾被用于实现进程、节点间通信,在实际项目中协助我们成功搭建了满足业务需求的模型部署平台。
项目背景
某国内大型连锁餐饮企业旗下拥有大量门店。餐厅门店的每日生产、订货、排班都依赖于每日客单量预估的合理性,其内部数据团队实现了一套预估模型,需要 TalkingData 帮助构建一个工程化平台以支撑模型的训练和部署,从而将模型真正地应用到实际生产环节中。
经过交流,我们发现在实际生产环境中,在各方面存在一些问题:
-
异步:所有门店的前日销售、业务等数据均由各自门店的店长负责整合上传。上传的开始时间、结束时间、数据的完整性等均不确定。而模型训练和预测均依赖这部分数据,这就意味这无法为模型训练和预测设置统一的开始入口。
-
高并发:除了一些特殊类型的门店,绝大多数门店的营业时间相对固定,从店长决定整理上传销售数据,到准备物料、排班准备次日营业,留给模型训练和模型预测回吐预测结果的时间大概为 3 小时。如果每个门店的预测指标有 2 至 3 项,那么需要有足够的调度能力在规定时间内完成大概 2 万次模型训练加预测流程。
-
容错:由于门店数量众多且情况各不相同,仍然有很多潜在的因素可能导致流程出错或失败。原则上,某次流程的失败不应该对其他流程造成任何影响,每个流程在平台层面应该成为互相独立的任务。
因此,我们需要一套轻量化的分布式服务框架,来实现满足上述需求的模型训练预测平台,并在一定程度上保证平台的可拓展性。结合此前团队内的技术积累,最终选择了 Akka 框架用于实现平台的内部通信。
选型过程
消息驱动方式——流程异步化
一次完整的预测任务包括:训练数据准备→模型训练→模型结果导出→预测数据准备→预测结果导出,其中数据准备步骤在时间上不确定,模型相关步骤在执行结果上不确定,如果采用同步模型,将会产生大量的等待线程,占用浪费大量资源。在 Actor 模型中,每个 Actor 作为一个基本计算单元,回应接收到的消息,同时并行的:
-
发送有限数量的消息给其他 Actor
-
创建有限数量的新 Actor
-
指定接受到下一个消息时的行为
上述操作没有顺序执行的假设,因此可以并行进行。发送者与已经发送的消息解耦,可以进行无需等待的异步通信。
Actor 模型通信方式
Akka 中的 Actor 本质上就是接收消息并采取行动处理消息的对象,是封装状态和行为的对象,它们唯一的通信方式是交换消息——把消息存放在接收方的邮箱里。Actor 自然形成树形结构,这种结构的精髓在于任务被拆开、委托,直到任务小到可以被完整地处理。因此,我们将预测任务的各个步骤拆分抽象,并创建类型消息与步骤对应,将每个步骤交给线程级别的 Actor 执行处理,通过发送不同类型的消息来触发创建不同操作的 Actor,让整个预测流程无需等待。
结构——应对高并发
由于绝大多数门店的营业时间大致相同,平台在流量上会有明显的峰值和低谷,在低谷期间平台需要尽可能减少资源占有量,而在流量峰值来临时平台要能够及时响应,保证足够的可用性。
经过讨论,我们确定了采用 Master-Worker 模式的平台结构,Master 负责接收与分配任务,Worker 负责处理执行具体的模型任务。
Master 和 Worker 均为独立的 ActorSystem,管理内部不不同操作逻辑的 Actor,在空闲状态下占有资源很小。Actor 为线程级别,同样仅占用极少量资源,生命周期由 ActorSystem 统一管理。少量请求时,Actor 线程具有很高的复用率,请求并发高时,ActorSystem 会创建大量的 Actor 线程用来承接请求,保证可用性。
Akka 中 Actor 的生命周期
子 Actor——模块化提高容错
每个预测任务的模型相关步骤均存在失败的可能性,此外,数据准备过程中的网络波动、内容校验出错等情况,都会导致当前预测任务的失败。对于失败的任务,我们希望能够尽可能记录错误信息,为重跑提供先决条件。
在 Akka 中,构建了父子 Actor 的树形监督结构,提供 Actor 的监督机制以保证容错性,把处理响应错误的责任交给出错对象以外的实体。父 Actor 创建子 Actor 来委托处理子任务,同时便会自动地监管它们。子 Actor 列表维护在父 Actor 的上下文中,父 Actor 可以访问它。
Akka 中的 Actor 结构
通过更进一步的拆分细化,我们将 Worker 端的 Actor 分为 Prepare 和 Executor 两种,Prepare 为主要负责数据准备步骤,Executor 负责模型相关步骤,统一由 Worker 端的父 Actor 管理,错误和异常均向上层抛出,由 Worker 端的父 Actor 记录并发送给的错误收集模块统一处理。
实践应用
ActorSystem
创建 ActorSystem 时,默认将在 classpath 中寻找 application.conf、 application.json 和 application.properties,并自动加载:
val system=ActorSystem("RsModelActorSystem") val system=ActorSystem("RsModelActorSystem", ConfigFactory.load()) // 同上复制代码
如果想要使用自己的配置文件,可以通过 ConfigFactory 来配置加载:
val system = ActorSystem("UniversityMessageSystem", ConfigFactory.load("own-application.conf")) val config = ConfigFactory.parseString( s""" |akka.remote.netty.tcp.hostname = $host |akka.actor.provider = akka.remote.RemoteActorRefProvider |akka.remote.enabled-transport = akka.remote.netty.tcp |akka.remote.netty.tcp.port = 2445 """.stripMargin) val system = ActorSystem("RsModelActorSystem", config.withFallback(ConfigFactory.load())) // 同上 复制代码
ActorSystem 的配置参数中有大量参数可以自定义,需要根据实际需要修改,例如在该项目中,后期单个算法任务对象大小超过了 Akka remote 默认包大小 128000 bytes,需要修改参数 akka.remote.netty.tcp.maximum-frame-size
Actor
一个 Actor 包含了状态、行为、一个邮箱、子 Actor 和一个监管策略,所有这些封装在一个 Actor 引用里。Actor 对象通常包含一些变量来反映其所处的可能状态,Akka-actor 自身的轻量线程与系统的其他部分完全隔离,因此无须担心并发问题。每当一个消息被处理,它会与 Actor 的当前行为进行匹配。行为是一个函数,它定义了在某个时间点处理当前消息所要采取的动作,需要结合实际需求编写具体逻辑。Actor 的邮箱是连接发送者与接收者的纽带,每个 Actor 有且仅有一个邮箱,所有的发来的消息都在邮箱里排队。可以有不同策略的邮箱实现供选择,缺省时为 FIFO。
编写逻辑
在 Actor 类中,主要逻辑均在 receive 方法中实现,通过偏函数方法,执行并返回对应的逻辑:
//ActorLogging 提供 Actor 内部的日志输出 class RsActor extends Actor with ActorLogging { override def receive: Receive = { case MapMessage(parameters) => println(parameters.get("code")) case MapKeyMessage(parameters, key) => println(parameters.get(key)) case StringMessage(msg) => println(msg.getBytes().length) case o: Object => println(o.getClass) case _: AnyRef => println("233") } }复制代码
生成引用
生成一个可以接收消息的 Actor 实例主要有两个方法:
// 生成一个基于本地类的 Actor 实例 val rsActor = system.actorOf(Props[RsActor], "rsActor") // 生成一个基于远程地址的 Actor 实例 val rmActor = system.actorSelection("akka.tcp://RsModelAkkaSystem@192.168.1.9:2445/user/rsActor") // 使用! 向对应的 Actor 实例发送消息 rsActor ! StringMessage("test") rmActor ! MapMessage(Map("code"->"233"))复制代码
Message
Akka 中对传递的消息内容并没有太严格要求,可以是基本数据类型,也可以是支持序列化的对象:
//scala 的 case class 便于简洁地创建消息类 case class StringMessage(msg: String) extends Serializable case class MapMessage(parameters: Map[String, String]) extends Serializable case class MapKeyMessage(parameters: Map[String, String], key: String) extends Serializable复制代码
Akka 作为一款被广泛使用的开源工具,在实际项目中体现出了很多的优势,异步的消息驱动方式也给我们提供了一套新的思路和实现方法。
作者介绍:李天烨,TalkingData 数据科学家。毕业于东北大学,任职于 TalkingData 数据科学团队,从事数据科学自动化相关工作。
本文转自:InfoQ
以上所述就是小编给大家介绍的《复杂业务下,我们为何选择 Akka 作为异步通信框架?》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Python微型异步爬虫框架
- 5个顶级异步Python框架
- Spring Boot 异步框架的使用
- netty的Future异步回调难理解?手写个带回调异步框架就懂了
- 使用 Go 语言实现一个异步任务框架
- uvloop —— 超级快的 Python 异步网络框架
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
解放战争(上)(1945年8月—1948年9月)
王树增 / 人民文学出版社 / 2009-8 / 60.00
《解放战争》为王树增非虚构文学著述中规模最大的作品。武器简陋、兵力不足的军队对抗拥有现代武器装备的兵力庞大的军队,数量不多、面积有限的解放区最终扩展成为九百六十万平方公里的共和国,解放战争在短短四年时间里演绎的是人类历史上的战争传奇。国际风云,政治智慧,时事洞察,军事谋略,军队意志,作战才能,作品具有宏阔的视角和入微的体察,包含着惊心动魄的人生沉浮和变幻莫测的战场胜负,尽展中国历史上规模最大的一场......一起来看看 《解放战争(上)(1945年8月—1948年9月)》 这本书的介绍吧!