内容简介:作者 | 小牛Java 工程师,关注服务端技术
作者 | 小牛
Java 工程师,关注服务端技术
前提
随着业务的发展,现代分布式系统对于垂直扩展、水平扩展、容错性的要求越来越高。常见的一些编程模式已经不能很好的解决这些问题。
解决并发问题核心是并发线程中的数据通讯问题,一般有两种策略:
-
共享数据
-
消息传递
共享数据
基于 JVM 内存模型的设计,需要通过加锁等同步机制保证共享数据的一致性。但其实使用锁对于高并发系统并不是一个很好的解决方案:
-
运行低效,代价昂贵,非常限制并发。
-
调用线程会被阻塞,以致于它不能去做其他有意义的任务。
-
很难实现,比较容易出现死锁等各种问题。
消息传递
与共享数据方式相比,消息传递机制的最大优点就是不会产生竞争。实现消息传递的两种常见形式:
-
基于 Channel 的消息传递
-
基于 Actor 模型的消息传递
常见的 RabbitMQ 等消息队列,都可以认为是基于 Channel 的消息传递模式,而本文主要会介绍 Actor 模型相关内容。
Actor 模型
Actor 的基础就是消息传递,一个 Actor 可以认为是一个基本的计算单元,它能接收消息并基于其执行运算,它也可以发送消息给其他 Actor。Actors 之间相互隔离,它们之间并不共享内存。
Actor 本身封装了状态和行为,在进行并发编程时,Actor 只需要关注消息和它本身。而消息是一个不可变对象,所以 Actor 不需要去关注锁和内存原子性等一系列多线程常见的问题。
所以 Actor 是由状态(State)、行为(Behavior)和邮箱(MailBox,可以认为是一个消息队列)三部分组成:
-
状态:Actor 中的状态指 Actor 对象的变量信息,状态由 Actor 自己管理,避免了并发环境下的锁和内存原子性等问题。
-
行为:Actor 中的计算逻辑,通过 Actor 接收到的消息来改变 Actor 的状态。
-
邮箱:邮箱是 Actor 和 Actor 之间的通信桥梁,邮箱内部通过 FIFO(先入先出)消息队列来存储发送方 Actor 消息,接受方 Actor 从邮箱队列中获取消息。
模型概念
可以看出按消息的流向,可以将 Actor 分为发送方和接收方,一个 Actor 既可以是发送方也可以是接受方。
另外我们可以了解到 Actor 是串行处理消息的,另外 Actor 中消息不可变。
Actor 模型特点
-
对并发模型进行了更高的抽象。
-
使用了异步、非阻塞、高性能的事件驱动编程模型。
-
轻量级事件处理(1 GB 内存可容纳百万级别 Actor)。
简单了解了 Actor 模型,我们来看一个基于其实现的框架。
Akka Actor
Akka 是一个构建在 JVM 上,基于 Actor 模型的的并发框架,为构建伸缩性强,有弹性的响应式并发应用提高更好的平台。
ActorSystem
ActorSystem 可以看做是 Actor 的系统工厂或管理者。主要有以下功能:
-
管理调度服务
-
配置相关参数
-
日志功能
Actor 层次结构
Akka 官网展示的 Actor 层次结构示意图
Akka 有在系统中初始化三个 Actor:
-
/
所谓的根监护人。这是系统中所有 Actor 的父亲,当系统被终止时,它也是最后一个被停止的。 -
/user
这是所有用户创建的 Actor 的父亲。不要被 user 这个名字所迷惑,他与最终用户没有关系,也和用户处理无关。你使用 Akka 库所创建的所有 Actor 的路径都将以/user/
开头 -
/system
系统监护人
我们可以使用 system.actorOf()
来创建一个在 /user
路径下的 Actor。尽管它只是在用户创建的层次的最高级 Actor,但是我们把它称作顶级 Actor。
Akka 里的 Actor 总是属于其父母。可以通过调用 context.actorOf()
创建一个 Actor。这种方式向现有的 Actor 树内加入了一个新的 Actor,这个 Actor 的创建者就成为了这个 Actor 的父 Actor。
Actor 生命周期
Akka Actor 生命周期示意图
Actor 在被创建后存在,并且在用户请求关闭时消失。当 Actor 被关闭后,其所有的子Actor 都将被依次地关闭.
AKKA 为 Actor 生命周期的每个阶段都提供了钩子(hook)方法,我们可以通过重写这些方法来管理 Actor 的生命周期。
Actor 被定义为 trait,可以认为就是一个接口,其中一个典型的方法对是 preStart()
与 postStop()
,顾名思义,两个方法分别在启动和停止时被调用。
ActorRef
在使用 system.actorOf()
创建 Actor 时,其实返回的是一个 ActorRef 对象。
ActorRef 可以看做是 Actor 的引用,是一个 Actor 的不可变,可序列化的句柄(handle),它可能不在本地或同一个 ActorSystem 中,它是实现网络空间位置透明性的关键设计。
ActorRef 最重要功能是支持向它所代表的 Actor 发送消息:
ref ! message
Dispatcher 和 MailBox
ActorRef 将消息处理能力委派给 Dispatcher,实际上,当我们创建 ActorSystem 和 ActorRef 时,Dispatcher 和 MailBox 就已经被创建了。
Dispatcher 从 ActorRef 中获取消息并传递给 MailBox,Dispatcher 封装了一个线程池,之后在线程池中执行 MailBox。
因为 MailBox 实现了 Runnable 接口,所以可以通过 Java 的线程池调用。
流程
通过了解上面的一些概念,我们可以 Akka Actor 的处理流程归纳如下:
-
创建 ActorSystem
-
通过 ActorSystem 创建 ActorRef,并将消息发送到 ActorRef
-
ActorRef 将消息传递到 Dispatcher中
-
Dispatcher 依次的将消息发送到 Actor 邮箱中
-
Dispatcher 将邮箱推送至一个线程中
-
邮箱取出一条消息并委派给 Actor 的 receive 方法
简略流程图如下:
EventBus
接下来我们看一个 Actor 的应用:EventBus。在异步处理场景下,运用最为广泛的消息处理模式即是 Pub-Sub 模式。基于 Pub-Sub 模式,还可以根据不同的场景衍生出特殊的模式,例如针对一个 Publisher 和多个 Subscriber,演化为 Broadcast 模式和 Message Router 模式。
EventBus 则通过引入总线来彻底解除 Publisher 与 Subscriber 之间的耦合,类似 设计模式 中的 Mediator 模式。总线就是 Mediator,用以协调 Publisher 与 Subscriber 之间的关系。对于 Publisher 而言,只需要把消息发布给 EventBus 即可;对于 Subscriber 而言,只需要在 EventBus 注册需要处理的事件并实现处理流程即可。
在没有使用 EventBus 的时候,Publisher 必须显式的调用 Subscriber 的方法。例如订单支付成功后,必须在订单处理模块调用积分模块处理积分,调用服务号模块进行通知。而且这样的显示调用会越来越多,每次都要去修改订单模块加一个调用。这样订单处理模块和那些模块就都紧密耦合在一起了。我们看看 EventBus 怎么解决这个问题。
EventBus 定义
要使用 Akka EventBus, 首先要实现一个 EventBus 接口。
trait EventBus { type Event type Classifier type Subscriber //#event-bus-api def subscribe(subscriber: Subscriber, to: Classifier): Boolean def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean def unsubscribe(subscriber: Subscriber): Unit def publish(event: Event): Unit //#event-bus-api }
如上所示:
-
Event 就是需要发布到总线上的事件
-
Classifier 分类器用于对订阅者进行绑定和筛选
-
Subscriber 注册到总线上的订阅者。
所幸的是,我们不需要要从头实现 EventBus 接口,Akka 提供了一个 LookupClassification 帮助我们实现 Pub-Sub 模式,我们要做的最主要就是实现 publish 方法。
class XrEventBus extends EventBus with LookupClassification { type Event = XrEvent type Classifier = XrEventType type Subscriber = ActorRef override protected def publish(event: Event, subscriber: Subscriber): Unit = { subscriber ! event } // 其他方法... }
可以看到:
-
Event 的类型是我们自己定义的 XrEvent。
-
分类起是基于 XrEventType,也就是事件类型的。我们系统中定义了很多时间类型,例如 XrEventType.ORDER_PAID 是订单支付事件,XrEventType.DOC_REGISTERED 是用户注册事件。
-
Subscriber 其实就是一个 Actor。
-
Publisher 只是简单的将 Event 作为一个消息发布给所有 Subscriber。
事件发布和订阅
Subscriber 这边则需要实现对事件的处理。
class ScoreEventHandler extends Actor with Logging { override def receive = { // 订单支付成功 case XrEvent(XrEventType.ORDER_PAID, order: OrderResponse) => // 处理订单支付成功事件 // 处理其他事件 } }
然后我们通过调用 EventBus.subscribe 进行事件订阅。
val eventBus = new XrEventBus // 积分事件处理模块 val scoreEventHandler = XingrenSingletons.akkaSystem.actorOf( Props[ScoreEventHandler], name = "scoreEventHandler")) eventBus.subscribe(scoreEventHandler, XrEventType.ORDER_PAID) // 订阅其他事件.. // 微信服务号事件处理模块 val weixinXrEventHandler = XingrenSingletons.akkaSystem.actorOf( Props[WeixinXrMessageActor], name = "weixinXrEventHandler")) eventBus.subscribe(weixinXrEventHandler, XrEventType.ORDER_PAID) // 订阅其他事件..
最后,我们的订单处理模块只需要调用 EventBus.publish 发布订单支付事件就好了。至于那些需要处理该事件的模块,自然会去订阅这个事件。上面 XrEventBus 的实现里可以看到,发布其实就是用 Actor 的消息发送机制,将消息发布给了所有的 Subscriber。
XrEventBus.publish(XrEventType.ORDER_PAID, new OrderResponse(order, product))
至此,我们的订单处理模块和积分处理模块、微信服务号模块就安全解耦了,很漂亮不是吗?
总结
当然 Actor 还有其他很多应用场景。例如并发流式处理,甚至我们系统中的定时任务,也是通过 Actor 实现的。
总之,Actor 为我们提供了更高层次的并发抽象模型,让我们不必关心底层的实现,只需着重实现业务逻辑。对于一些并发的场景,是很值得尝试的一种方案。
全文完
以下文章您可能也会感兴趣:
我们正在招聘 Java 工程师,欢迎有兴趣的同学投递简历到 rd-hr@xingren.com 。
杏仁技术站
长按左侧二维码关注我们,这里有一群热血青年期待着与您相会。
以上所述就是小编给大家介绍的《Actor 模型及 Akka 简介》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 神经网络模型量化方法简介
- 函数式 UI 简介:一种基于模型的方法
- f-GAN简介:GAN模型的生产车间
- f-GAN简介:GAN模型的生产车间
- 金融模型中的三种错误:使用TensorFlow Probability进行分析的简介
- Transformers 简介(上)
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。