内容简介:在使用路由功能之前,我们需要先了解下常规概念:在Akka中,提供了两种做路由的方式:直接使用
在使用路由功能之前,我们需要先了解下常规概念:
Router Routee
在Akka中,提供了两种做路由的方式:
akka.routing.Router
直接使用Router类
直接使用 akka.routing.Router
类的原理其实与上一章的最简单的例子是一样的,只不过akka的Router类比我们实现的更复杂、更强大。创建Router类时需提供两个参数:
-
路由规则
akka为Router类提供了以下几种内置的路由算法类:
-
akka.routing.RoundRobinRoutingLogic
-
akka.routing.RandomRoutingLogic
-
akka.routing.SmallestMailboxRoutingLogic
-
akka.routing.BroadcastRoutingLogic
-
akka.routing.ScatterGatherFirstCompletedRoutingLogic
-
akka.routing.TailChoppingRoutingLogic
-
akka.routing.ConsistentHashingRoutingLogic
具体算法介绍请参见文章最后的表格
-
-
路由目标的序列
该序列支持通过调用
router.addRoutee
和router.removeRoutee
进行动态变化,但需要注意的是,akka.routing.Router
类时一个immutable的线程安全类,即不可改变,这里的改变其实是将原来的router内的的routee队列增加/去掉指定routee后copy一份生成一个新的Routerdef removeRoutee(routee: Routee): Router = copy(routees = routees.filterNot(_ == routee))
依赖
<dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-cluster-tools_2.12</artifactId> </dependency>
配置文件application.conf
akka { actor { provider = "cluster" } remote { netty.tcp { hostname = "127.0.0.1" port = 0 } artery { enabled = off canonical.hostname = "127.0.0.1" canonical.port = 0 } } cluster { seed-nodes = [ "akka.tcp://ClusterSystem@127.0.0.1:2551" ] } }
实际做事的SlaveActor
public class SlaveActor extends AbstractActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); @Override public Receive createReceive() { return receiveBuilder() .match(String.class, word-> log.info("Node {} receives: {}", getSelf().path().toSerializationFormat(), word)) .build(); } public static void main(String[] args) { Config config = ConfigFactory.parseString("akka.cluster.roles = [slave]") .withFallback(ConfigFactory.load()); ActorSystem system = ActorSystem.create("ClusterSystem", config); system.actorOf(Props.create(SlaveActor.class), "slaveActor"); } }
包含路由的MasterActor
public class MasterActor extends AbstractActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); private Router router = new Router(new RoundRobinRoutingLogic(), new ArrayList<>()); private Cluster cluster = Cluster.get(getContext().system()); boolean isReady = false; private static final String SLAVE_PATH = "/user/slaveActor"; @Override public void preStart() throws Exception { cluster.subscribe(self(), ClusterEvent.MemberEvent.class, ClusterEvent.ReachabilityEvent.class); } @Override public Receive createReceive() { return receiveBuilder() .match(String.class, msg->{ log.info("Master got: {}", msg); if(!isReady) log.warning("Is not ready yet!"); else { log.info("Routee size: {}", router.routees().length()); router.route(msg, getSender()); } }) .match(ClusterEvent.MemberUp.class, mUp->{ if(mUp.member().hasRole("slave")) { Address address = mUp.member().address(); String path = address + SLAVE_PATH; ActorSelection selection = getContext().actorSelection(path); router = router.addRoutee(selection); isReady=true; log.info("New routee is added!"); } }) .match(ClusterEvent.MemberRemoved.class, mRemoved->{ router = router.removeRoutee(getContext().actorSelection(mRemoved.member().address()+SLAVE_PATH)); log.info("Routee is removed"); }) .match(ClusterEvent.UnreachableMember.class, mRemoved-> { router = router.removeRoutee(getContext().actorSelection(mRemoved.member().address() + SLAVE_PATH)); log.info("Routee is removed"); }) .build(); } public static void main(String[] args) { int port = 2551; // Override the configuration of the port Config config = ConfigFactory.parseString( "akka.remote.netty.tcp.port=" + port + "\n" + "akka.remote.artery.canonical.port=" + port) .withFallback( ConfigFactory.parseString("akka.cluster.roles = [master]")) .withFallback(ConfigFactory.load()); ActorSystem system = ActorSystem.create("ClusterSystem", config); ClusterHttpManagement.get(system); AkkaManagement.get(system).start(); system.actorOf(Props.create(MasterActor.class), "masterActor"); } }
这里将MasterActor监听了集群的MemberUp事件,通过判断事件中包含的role判断是否是SlaveActor加入集群。如果是,则将该SlaveActor加到 Router
中。同时,如果SlaveActor退出或变成Unreachable状态,则从 Router
中删除。
向MasterActor请求的客户端
public class Client { public static void main( String[] args ) throws InterruptedException { Config config = ConfigFactory.load(); ActorSystem system = ActorSystem.create("ClusterSystem", config); ActorSelection toFind = system.actorSelection("akka.tcp://ClusterSystem@127.0.0.1:2551/user/masterActor"); int counter = 0; while(true){ toFind.tell("hello "+counter++, ActorRef.noSender()); System.out.println("Finish telling"); Thread.sleep(2000); } } }
分别启动四个窗口: 一个masterActor节点,两个slaveActor节点,一个Client,可以看到两个slaveActor轮流打印Client传递进去的消息。这时,把其中一个slaveActor关闭,可以看到Client发送的所有消息将被剩下那个slaveActor打印出来。
使用Router Actor
除了我们自己在Actor里调用 akka.routing.Router
类外,Akka还提供了根据配置直接生成一个内置的RouterActor。路由逻辑在remoting和cluster两个模块中都有,如果要启用remoting中的路由,则需要引入remoting的依赖,在cluster环境下并不推荐直接去用remoting中的路由,而是用cluster模块中的cluster aware router。
RouterActor有两种类型:
-
Pool
Router
自动创建Routee
作为自己的子Actor,然后部署到远程节点上。 当Routee
被终止时,会自动从Router
的路由表中删除, 除非使用动态路由(指定resizer),否则Router
不会重新创建新的Routee
,当所有的Routee
都停止时,Router
也自动停止。 -
Group
Routee actor是在Router actor以外单独创建好了,Router
用ActoSelection
向指定的Actor Path发送消息 ,但默认并不监控Routee
。
Router actor可以通过程序配置或文件配置。如果是通过文件配置时,必须要在代码中使用 FromConfig
或 RemoteRouterConfig
(将Routee部署到远程节点去)去显式的读取相关配置,否则即便在配置文件中定义了路由相关配置,akka也不会去使用。
Router actor在转发消息时不会更改消息的sender,而routee actor在回复消息时,消息直接返回到原始的发送者,不再经过router actor。
无论哪种类型,有一块是相同配置:
cluster { enabled = on allow-local-routees = off use-roles = [slave] }
enabled 是否启用cluster aware router
allow-local-routees 能否在本地,即router所在的节点创建和查找routee
use-roles 使用指定的角色来缩小routee的查找范围,如果routee的配置与这里的不同,则router是找不到该routee的。
Pool
我们在上面例子的基础上,把自己new的Router换成akka内置的RouterActor。改动主要有以下几个:
MasterActor
-
配置文件中actor部分增加:
actor { provider = "cluster" deployment { /masterActor/poolRouter { router = round-robin-pool nr-of-instance = 5 cluster { enabled = on allow-local-routees = on use-roles = [master] } } default { cluster { max-nr-of-instances-per-node = 5 } } } }
由于我们的 Router
是在masterActor下创建的RouterActor,取名为 poolRouter
,所以其路径显然是 akka.tcp://ClusterSystem@127.0.0.1:2551/user/masterActor/poolRouter
,masterActor启动时读取的是这个配置文件,所以deployment部分对应的就是masterActor及其子Actor,所以这里只需要填入相对路径就好了。注意,由于Routee是由masterActor创建出来的,所以 use-role
必须是与masterActor保持一致,否则会找不到 Routee
!
- router 指定预设的路由器
- nr-of-instance routee的个数
注意,有两个参数非常关键:
-
actor.deployment.default.cluster.max-nr-of-instances-per-node
它是配置Router在每个节点上部署的最大Actor数,默认是1。虽然上面我们指定了routee数目为5,但是如果只起一个节点,你会发现永远是
1个routee在打印结果。 -
max-total-nr-of-instances
定义router所能创建的routee的总数,默认是10000。通常来说足够用了。
-
修改
MasterActor
。注释掉的部分是直接使用代码而不用配置文件手动创建Router
的,有兴趣的可以自己试下。public class MasterActor extends AbstractActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); private ActorRef router; @Override public void preStart() throws Exception { router = getContext().actorOf(FromConfig.getInstance().props(Props.create(SlaveActor.class)), "poolRouter"); /*int totalInstances = 1000; int maxInstancePerNode = 5, routeeNumbers=5; boolean allowLocalRoutees = true; String role = "master"; ClusterRouterPoolSettings settings = new ClusterRouterPoolSettings(totalInstances, maxInstancePerNode, allowLocalRoutees, role); ClusterRouterPool routerPool = new ClusterRouterPool(new RoundRobinPool(routeeNumbers), settings); router = getContext().actorOf(routerPool.props(Props.create(SlaveActor.class)), "poolRouter");*/ } @Override public Receive createReceive() { return receiveBuilder() .match(String.class, msg->{ log.info("Master got: {}", msg); router.tell(msg, getSender()); }) .build(); } }
-
运行
其他不变,这次只需要启动
Client
和MasterActor
,SlaveActor
在MasterActor
中会自动创建出来。看到日志[INFO] [11/16/2018 14:19:58.361] [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/user/masterActor] Master got: hello [INFO] [11/16/2018 14:19:58.361] [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/user/masterActor/poolRouter/c1] Node akka://ClusterSystem/user/masterActor/poolRouter/c1#-1154482163 receives: hello [INFO] [11/16/2018 14:20:00.362] [ClusterSystem-akka.actor.default-dispatcher-16] [akka://ClusterSystem/user/masterActor] Master got: hello [INFO] [11/16/2018 14:20:00.362] [ClusterSystem-akka.actor.default-dispatcher-16] [akka://ClusterSystem/user/masterActor/poolRouter/c2] Node akka://ClusterSystem/user/masterActor/poolRouter/c2#-50692619 receives: hello [INFO] [11/16/2018 14:20:02.365] [ClusterSystem-akka.actor.default-dispatcher-18] [akka://ClusterSystem/user/masterActor] Master got: hello [INFO] [11/16/2018 14:20:02.365] [ClusterSystem-akka.actor.default-dispatcher-18] [akka://ClusterSystem/user/masterActor/poolRouter/c3] Node akka://ClusterSystem/user/masterActor/poolRouter/c3#1415650532 receives: hello [INFO] [11/16/2018 14:20:04.366] [ClusterSystem-akka.actor.default-dispatcher-3] [akka://ClusterSystem/user/masterActor] Master got: hello [INFO] [11/16/2018 14:20:04.366] [ClusterSystem-akka.actor.default-dispatcher-3] [akka://ClusterSystem/user/masterActor/poolRouter/c4] Node akka://ClusterSystem/user/masterActor/poolRouter/c4#1345851811 receives: hello [INFO] [11/16/2018 14:20:06.368] [ClusterSystem-akka.actor.default-dispatcher-20] [akka://ClusterSystem/user/masterActor] Master got: hello [INFO] [11/16/2018 14:20:06.368] [ClusterSystem-akka.actor.default-dispatcher-20] [akka://ClusterSystem/user/masterActor/poolRouter/c5] Node akka://ClusterSystem/user/masterActor/poolRouter/c5#-1384624865 receives: hello
从c1到c5轮流打印,round-robin负载均衡起作用了。
Group
这种方式下, Routee
是在 Router
外被创建的,一般要求尽量在 Router
启动前启动好 Routee
,因为 Router
在启动过程中会尝试去联络 Routee
。使用时与 Pool
型的很像,区别是
-
需要指定
routees.path
(remote方式下支持完整协议路径,比如akka.tcp://ClusterSystem:2551/user/testActor
, 但是Cluster模式下不支持,只支持相对路径 ) -
不需要指定也没有
nr-of-instance
参数
GroupActor是根据 routees.path
所配置的相对路径,去当前cluster的每一个节点上用 ActorSelection
去查找指定role的Routee(所以use-roles中的配置一定要和slave启动时的role一致),然后直接tell消息过去。由于整个过程是异步的,就意味着GroupActor的消息发送其实根本不关心节点上对应的Routee是否包含Routee或者是否正常启动,只是简单的根据配置去转发而已。
不去检测是否包含Routee,是因为Akka是Peer-to-Peer的设计,天生就要求所有节点对等,在这个约定下,它会认为cluster中所有节点的代码相同,一定会包含Routee。
不去检测是否正常启动,这个则是由于整个通讯都是异步的。
但我个人认为这里还是使用熔断机制来加强的,使用起来会更加方便。
-
修改配置文件
actor { provider = "cluster" deployment { /masterActor/groupRouter { router = round-robin-group cluster { enabled = on allow-local-routees = on use-roles = [slave] } } } }
use-roles中role加不加引号都可以。
-
修改
MasterActor
中Router的名字,与配置文件中保持一致。注释掉的部分是直接使用代码而不用配置文件手动创建Router的,有兴趣的可以自己试下。@Override public void preStart() throws Exception { router = getContext().actorOf(FromConfig.getInstance().props(Props.create(SlaveActor.class)), "groupRouter"); /*List<String> routeesPaths = Arrays.asList("akka/user/slaveActor"); router = getContext().actorOf(new RoundRobinGroup(routeesPaths).props(), "groupRouter");*/ }
-
运行
分别在几个不同窗口启动
MasterActor
、多个SlaveActor
后,检查集群是否稳定后,即所有节点均是UP,如果启用了akka-management-cluster-http
,向监控地址发送查询请求,如127.0.0.1:8558/cluster/members
{ "selfNode": "akka.tcp://ClusterSystem@127.0.0.1:2551", "oldestPerRole": { "master": "akka.tcp://ClusterSystem@127.0.0.1:2551", "dc-default": "akka.tcp://ClusterSystem@127.0.0.1:2551", "slave": "akka.tcp://ClusterSystem@127.0.0.1:4914" }, "leader": "akka.tcp://ClusterSystem@127.0.0.1:2551", "oldest": "akka.tcp://ClusterSystem@127.0.0.1:2551", "unreachable": [], "members": [ { "node": "akka.tcp://ClusterSystem@127.0.0.1:2551", "nodeUid": "-1141014070", "status": "Up", "roles": [ "master", "dc-default" ] }, { "node": "akka.tcp://ClusterSystem@127.0.0.1:4914", "nodeUid": "344021242", "status": "Up", "roles": [ "slave", "dc-default" ] }, { "node": "akka.tcp://ClusterSystem@127.0.0.1:4936", "nodeUid": "678163307", "status": "Up", "roles": [ "slave", "dc-default" ] }, { "node": "akka.tcp://ClusterSystem@127.0.0.1:4957", "nodeUid": "-573369962", "status": "Up", "roles": [ "slave", "dc-default" ] } ] }
然后,启动Client向masterActor发送消息,可以看到均匀的打印出接受的日志,round-robin负载均衡起作用了。再多起几个SlaveActor,会将消息转发到新的actor中去,这就是 Group 比 Pool 方式好的地方,可以动态变化。
此时,你可以尝试修改下配置,将slaveActor变成和masterActor一样的role,再运行后,你会发现有消息丢失,以及转发失败的日志出来。
这是因为在上面所有的例子中,为了方便理解,都是使用一个master+若干slave的方式来演示。
然而Akka的设计是Peer-to-Peer的,即所有节点对等,那么,RouterActor就会理所应当地认为在相同role的节点上都存在Routee,由于并没有去检查Routee是否能工作,直接进行了消息转发,而按照上面的写法masterAcotr所在的节点上压根就没起过slaveActor,所以就造成了消息丢失。
将配置中 allow-local-routees
改为 off
,这时它就不会把masterActor所在节点加到负载列表中去了。但同样的,你可以去起一个空的ActorSystem,看看有什么后果。
附录:
Akka提供的路由算法:
算法 | 说明 | 配置 | 算法类 |
---|---|---|---|
RoundRobin | 轮询的给路由列表中每个Routee发送消息 | round-robin-pool 或 round-robin-group | akka.routing.RoundRobin |
Random | 从路由列表中随机抽取一个Routee发送消息 | random-pool 或 random-group | akka.routing.Random |
SmallestMailbox | 优先选取路由表中mailbox内消息数最少的Routee发送消息 | smallest-mailbox-pool | akka.routing.SmallestMailbox |
Broadcast | 以广播的形式将消息同时转发给所有的Routee | broadcast-pool 或 broadcast-group | akka.routing.Broadcast |
ScatterGatherFirstCompleted | 将消息发送给所有的Routee,并等待第一个返回的结果,将该结果返回给发送者,其他结果被忽略掉 | scatter-gather-pool 或 scatter-gather-group | akka.routing.ScatterGatherFirstCompleted |
TailChopping | 先随机选一个Routee发送消息,等待一个短时间的延迟后,再随机选一个Routee发送消息,等待第一个返回的结果并将该结果发送回发送者,其他结果被忽略掉 | tail-chopping-pool 或 tail-chopping-group | akka.routing.TailChopping |
ConsistentHashing | 使用 一致性Hash算法 选取Routee转发消息 | consistent-hashing-pool 或 consistent-hashing-group | akka.routing.ConsistentHashing |
Balancing | 所有的Routee共享同一个mailbox,它会将繁忙的Routee中的任务重新分配给空闲的Routee,不支持group和广播 | balancing-pool | akka.routing.Balancing |
本章代码地址: https://github.com/EdisonXu/akka-start-demo/tree/master/cluster
以上所述就是小编给大家介绍的《Akka入门系列(六):akka cluster中的路由和负载均衡》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 浅析负载均衡与应用路由
- 弄懂服务路由与负载均衡,微服务搞起
- Centos 7基于DR(直接路由)模式的负载均衡配置详解
- vue路由篇(动态路由、路由嵌套)
- 小程序封装路由文件和路由方法,5种路由方法全解析
- 3分钟了解负载均衡,分清二层负载均衡和三层负载均衡
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。