Akka入门系列(六):akka cluster中的路由和负载均衡

栏目: Scala · 发布时间: 6年前

内容简介:在使用路由功能之前,我们需要先了解下常规概念:在Akka中,提供了两种做路由的方式:直接使用

在使用路由功能之前,我们需要先了解下常规概念:

Router
Routee

在Akka中,提供了两种做路由的方式:

akka.routing.Router

直接使用Router类

直接使用 akka.routing.Router 类的原理其实与上一章的最简单的例子是一样的,只不过akka的Router类比我们实现的更复杂、更强大。创建Router类时需提供两个参数:

  • 路由规则

    akka为Router类提供了以下几种内置的路由算法类:

    • akka.routing.RoundRobinRoutingLogic
    • akka.routing.RandomRoutingLogic
    • akka.routing.SmallestMailboxRoutingLogic
    • akka.routing.BroadcastRoutingLogic
    • akka.routing.ScatterGatherFirstCompletedRoutingLogic
    • akka.routing.TailChoppingRoutingLogic
    • akka.routing.ConsistentHashingRoutingLogic
      具体算法介绍请参见文章最后的表格
  • 路由目标的序列

    该序列支持通过调用 router.addRouteerouter.removeRoutee 进行动态变化,但需要注意的是, akka.routing.Router 类时一个immutable的线程安全类,即不可改变,这里的改变其实是将原来的router内的的routee队列增加/去掉指定routee后copy一份生成一个新的Router

    def removeRoutee(routee: Routee): Router = copy(routees = routees.filterNot(_ == routee))
    

依赖

<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-cluster-tools_2.12</artifactId>
</dependency>

配置文件application.conf

akka {
  actor {
    provider = "cluster"
  }
  remote {
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
    artery {
      enabled = off
      canonical.hostname = "127.0.0.1"
      canonical.port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551"
    ]
  }
}

实际做事的SlaveActor

public class SlaveActor extends AbstractActor {

    LoggingAdapter log = Logging.getLogger(getContext().system(), this);
    
    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(String.class, word-> log.info("Node {} receives: {}", getSelf().path().toSerializationFormat(), word))
                .build();
    }

    public static void main(String[] args) {
        Config config =
                ConfigFactory.parseString("akka.cluster.roles = [slave]")
                        .withFallback(ConfigFactory.load());

        ActorSystem system = ActorSystem.create("ClusterSystem", config);
        system.actorOf(Props.create(SlaveActor.class), "slaveActor");
    }
}

包含路由的MasterActor

public class MasterActor extends AbstractActor {

    LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    private Router router = new Router(new RoundRobinRoutingLogic(), new ArrayList<>());
    private Cluster cluster = Cluster.get(getContext().system());
    boolean isReady = false;
    private static final String SLAVE_PATH = "/user/slaveActor";

    @Override
    public void preStart() throws Exception {
        cluster.subscribe(self(), ClusterEvent.MemberEvent.class, ClusterEvent.ReachabilityEvent.class);
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(String.class, msg->{
                    log.info("Master got: {}", msg);
                    if(!isReady)
                        log.warning("Is not ready yet!");
                    else {
                        log.info("Routee size: {}", router.routees().length());
                        router.route(msg, getSender());
                    }
                })
                .match(ClusterEvent.MemberUp.class, mUp->{
                    if(mUp.member().hasRole("slave")) {
                        Address address = mUp.member().address();
                        String path = address + SLAVE_PATH;
                        ActorSelection selection = getContext().actorSelection(path);
                        router = router.addRoutee(selection);
                        isReady=true;
                        log.info("New routee is added!");
                    }
                })
                .match(ClusterEvent.MemberRemoved.class, mRemoved->{
                    router = router.removeRoutee(getContext().actorSelection(mRemoved.member().address()+SLAVE_PATH));
                    log.info("Routee is removed");
                })
                .match(ClusterEvent.UnreachableMember.class, mRemoved-> {
                    router = router.removeRoutee(getContext().actorSelection(mRemoved.member().address() + SLAVE_PATH));
                    log.info("Routee is removed");
                })
                .build();
    }

    public static void main(String[] args) {
        int port = 2551;

        // Override the configuration of the port
        Config config =
                ConfigFactory.parseString(
                        "akka.remote.netty.tcp.port=" + port + "\n" +
                                "akka.remote.artery.canonical.port=" + port)
                        .withFallback(
                                ConfigFactory.parseString("akka.cluster.roles = [master]"))
                        .withFallback(ConfigFactory.load());

        ActorSystem system = ActorSystem.create("ClusterSystem", config);
        ClusterHttpManagement.get(system);
        AkkaManagement.get(system).start();
        system.actorOf(Props.create(MasterActor.class), "masterActor");
    }
}

这里将MasterActor监听了集群的MemberUp事件,通过判断事件中包含的role判断是否是SlaveActor加入集群。如果是,则将该SlaveActor加到 Router 中。同时,如果SlaveActor退出或变成Unreachable状态,则从 Router 中删除。

向MasterActor请求的客户端

public class Client
{
    public static void main( String[] args ) throws InterruptedException {
        Config config = ConfigFactory.load();
        ActorSystem system = ActorSystem.create("ClusterSystem", config);
        ActorSelection toFind = system.actorSelection("akka.tcp://ClusterSystem@127.0.0.1:2551/user/masterActor");
        int counter = 0;
        while(true){
            toFind.tell("hello "+counter++, ActorRef.noSender());
            System.out.println("Finish telling");
            Thread.sleep(2000);
        }
    }
}

分别启动四个窗口: 一个masterActor节点,两个slaveActor节点,一个Client,可以看到两个slaveActor轮流打印Client传递进去的消息。这时,把其中一个slaveActor关闭,可以看到Client发送的所有消息将被剩下那个slaveActor打印出来。

使用Router Actor

除了我们自己在Actor里调用 akka.routing.Router 类外,Akka还提供了根据配置直接生成一个内置的RouterActor。路由逻辑在remoting和cluster两个模块中都有,如果要启用remoting中的路由,则需要引入remoting的依赖,在cluster环境下并不推荐直接去用remoting中的路由,而是用cluster模块中的cluster aware router。

RouterActor有两种类型:

  • Pool
    Router 自动创建 Routee 作为自己的子Actor,然后部署到远程节点上。 Routee 被终止时,会自动从 Router 的路由表中删除, 除非使用动态路由(指定resizer),否则 Router 不会重新创建新的 Routee ,当所有的 Routee 都停止时, Router 也自动停止。
  • Group
    Routee actor是在Router actor以外单独创建好了, RouterActoSelection 向指定的Actor Path发送消息 ,但默认并不监控 Routee

Router actor可以通过程序配置或文件配置。如果是通过文件配置时,必须要在代码中使用 FromConfigRemoteRouterConfig (将Routee部署到远程节点去)去显式的读取相关配置,否则即便在配置文件中定义了路由相关配置,akka也不会去使用。

Router actor在转发消息时不会更改消息的sender,而routee actor在回复消息时,消息直接返回到原始的发送者,不再经过router actor。

无论哪种类型,有一块是相同配置:

cluster {
  enabled = on
  allow-local-routees = off
  use-roles = [slave]
}

enabled 是否启用cluster aware router
allow-local-routees 能否在本地,即router所在的节点创建和查找routee
use-roles 使用指定的角色来缩小routee的查找范围,如果routee的配置与这里的不同,则router是找不到该routee的。

Pool

我们在上面例子的基础上,把自己new的Router换成akka内置的RouterActor。改动主要有以下几个:

MasterActor
  1. 配置文件中actor部分增加:
    actor {
        provider = "cluster"
        deployment {
          /masterActor/poolRouter {
            router = round-robin-pool
            nr-of-instance = 5
            cluster {
              enabled = on
              allow-local-routees = on
              use-roles = [master]
            }
          }
          default {
            cluster {
              max-nr-of-instances-per-node = 5
            }
          }
        }
      }
    

由于我们的 Router 是在masterActor下创建的RouterActor,取名为 poolRouter ,所以其路径显然是 akka.tcp://ClusterSystem@127.0.0.1:2551/user/masterActor/poolRouter ,masterActor启动时读取的是这个配置文件,所以deployment部分对应的就是masterActor及其子Actor,所以这里只需要填入相对路径就好了。注意,由于Routee是由masterActor创建出来的,所以 use-role 必须是与masterActor保持一致,否则会找不到 Routee !

- router 指定预设的路由器

- nr-of-instance routee的个数

注意,有两个参数非常关键:

  • actor.deployment.default.cluster.max-nr-of-instances-per-node 它是配置Router在每个节点上部署的最大Actor数,默认是1。虽然上面我们指定了routee数目为5,但是如果只起一个节点,你会发现永远是
    1个routee在打印结果。
  • max-total-nr-of-instances 定义router所能创建的routee的总数,默认是10000。通常来说足够用了。
  1. 修改 MasterActor 。注释掉的部分是直接使用代码而不用配置文件手动创建 Router 的,有兴趣的可以自己试下。

    public class MasterActor extends AbstractActor {
    
        LoggingAdapter log = Logging.getLogger(getContext().system(), this);
    
        private ActorRef router;
    
        @Override
        public void preStart() throws Exception {
            router = getContext().actorOf(FromConfig.getInstance().props(Props.create(SlaveActor.class)), "poolRouter");
            /*int totalInstances = 1000;
            int maxInstancePerNode = 5, routeeNumbers=5;
            boolean allowLocalRoutees = true;
            String role = "master";
            ClusterRouterPoolSettings settings = new ClusterRouterPoolSettings(totalInstances, maxInstancePerNode, allowLocalRoutees, role);
            ClusterRouterPool routerPool = new ClusterRouterPool(new RoundRobinPool(routeeNumbers), settings);
            router = getContext().actorOf(routerPool.props(Props.create(SlaveActor.class)), "poolRouter");*/
        }
    
        @Override
        public Receive createReceive() {
            return receiveBuilder()
                    .match(String.class, msg->{
                        log.info("Master got: {}", msg);
                        router.tell(msg, getSender());
                    })
                    .build();
        }
    }
    
  2. 运行

    其他不变,这次只需要启动 ClientMasterActorSlaveActorMasterActor 中会自动创建出来。看到日志

    [INFO] [11/16/2018 14:19:58.361] [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/user/masterActor] Master got: hello
    [INFO] [11/16/2018 14:19:58.361] [ClusterSystem-akka.actor.default-dispatcher-2] [akka://ClusterSystem/user/masterActor/poolRouter/c1] Node akka://ClusterSystem/user/masterActor/poolRouter/c1#-1154482163 receives: hello
    [INFO] [11/16/2018 14:20:00.362] [ClusterSystem-akka.actor.default-dispatcher-16] [akka://ClusterSystem/user/masterActor] Master got: hello
    [INFO] [11/16/2018 14:20:00.362] [ClusterSystem-akka.actor.default-dispatcher-16] [akka://ClusterSystem/user/masterActor/poolRouter/c2] Node akka://ClusterSystem/user/masterActor/poolRouter/c2#-50692619 receives: hello
    [INFO] [11/16/2018 14:20:02.365] [ClusterSystem-akka.actor.default-dispatcher-18] [akka://ClusterSystem/user/masterActor] Master got: hello
    [INFO] [11/16/2018 14:20:02.365] [ClusterSystem-akka.actor.default-dispatcher-18] [akka://ClusterSystem/user/masterActor/poolRouter/c3] Node akka://ClusterSystem/user/masterActor/poolRouter/c3#1415650532 receives: hello
    [INFO] [11/16/2018 14:20:04.366] [ClusterSystem-akka.actor.default-dispatcher-3] [akka://ClusterSystem/user/masterActor] Master got: hello
    [INFO] [11/16/2018 14:20:04.366] [ClusterSystem-akka.actor.default-dispatcher-3] [akka://ClusterSystem/user/masterActor/poolRouter/c4] Node akka://ClusterSystem/user/masterActor/poolRouter/c4#1345851811 receives: hello
    [INFO] [11/16/2018 14:20:06.368] [ClusterSystem-akka.actor.default-dispatcher-20] [akka://ClusterSystem/user/masterActor] Master got: hello
    [INFO] [11/16/2018 14:20:06.368] [ClusterSystem-akka.actor.default-dispatcher-20] [akka://ClusterSystem/user/masterActor/poolRouter/c5] Node akka://ClusterSystem/user/masterActor/poolRouter/c5#-1384624865 receives: hello
    

从c1到c5轮流打印,round-robin负载均衡起作用了。

Group

这种方式下, Routee 是在 Router 外被创建的,一般要求尽量在 Router 启动前启动好 Routee ,因为 Router 在启动过程中会尝试去联络 Routee 。使用时与 Pool 型的很像,区别是

  • 需要指定 routees.path (remote方式下支持完整协议路径,比如 akka.tcp://ClusterSystem:2551/user/testActor但是Cluster模式下不支持,只支持相对路径 )
  • 不需要指定也没有 nr-of-instance 参数

GroupActor是根据 routees.path 所配置的相对路径,去当前cluster的每一个节点上用 ActorSelection 去查找指定role的Routee(所以use-roles中的配置一定要和slave启动时的role一致),然后直接tell消息过去。由于整个过程是异步的,就意味着GroupActor的消息发送其实根本不关心节点上对应的Routee是否包含Routee或者是否正常启动,只是简单的根据配置去转发而已。

不去检测是否包含Routee,是因为Akka是Peer-to-Peer的设计,天生就要求所有节点对等,在这个约定下,它会认为cluster中所有节点的代码相同,一定会包含Routee。

不去检测是否正常启动,这个则是由于整个通讯都是异步的。

但我个人认为这里还是使用熔断机制来加强的,使用起来会更加方便。

  1. 修改配置文件
    actor {
        provider = "cluster"
        deployment {
          /masterActor/groupRouter {
            router = round-robin-group
            cluster {
              enabled = on
              allow-local-routees = on
              use-roles = [slave]
            }
          }
        }
    }
    

use-roles中role加不加引号都可以。

  1. 修改 MasterActor 中Router的名字,与配置文件中保持一致。注释掉的部分是直接使用代码而不用配置文件手动创建Router的,有兴趣的可以自己试下。

    @Override
    public void preStart() throws Exception {
        router = getContext().actorOf(FromConfig.getInstance().props(Props.create(SlaveActor.class)), "groupRouter");
        /*List<String> routeesPaths = Arrays.asList("akka/user/slaveActor");
        router = getContext().actorOf(new RoundRobinGroup(routeesPaths).props(), "groupRouter");*/
    }
    
  2. 运行

    分别在几个不同窗口启动 MasterActor 、多个 SlaveActor 后,检查集群是否稳定后,即所有节点均是UP,如果启用了 akka-management-cluster-http ,向监控地址发送查询请求,如

    127.0.0.1:8558/cluster/members

    {
        "selfNode": "akka.tcp://ClusterSystem@127.0.0.1:2551",
        "oldestPerRole": {
            "master": "akka.tcp://ClusterSystem@127.0.0.1:2551",
            "dc-default": "akka.tcp://ClusterSystem@127.0.0.1:2551",
            "slave": "akka.tcp://ClusterSystem@127.0.0.1:4914"
        },
        "leader": "akka.tcp://ClusterSystem@127.0.0.1:2551",
        "oldest": "akka.tcp://ClusterSystem@127.0.0.1:2551",
        "unreachable": [],
        "members": [
            {
                "node": "akka.tcp://ClusterSystem@127.0.0.1:2551",
                "nodeUid": "-1141014070",
                "status": "Up",
                "roles": [
                    "master",
                    "dc-default"
                ]
            },
            {
                "node": "akka.tcp://ClusterSystem@127.0.0.1:4914",
                "nodeUid": "344021242",
                "status": "Up",
                "roles": [
                    "slave",
                    "dc-default"
                ]
            },
            {
                "node": "akka.tcp://ClusterSystem@127.0.0.1:4936",
                "nodeUid": "678163307",
                "status": "Up",
                "roles": [
                    "slave",
                    "dc-default"
                ]
            },
            {
                "node": "akka.tcp://ClusterSystem@127.0.0.1:4957",
                "nodeUid": "-573369962",
                "status": "Up",
                "roles": [
                    "slave",
                    "dc-default"
                ]
            }
        ]
    }
    

然后,启动Client向masterActor发送消息,可以看到均匀的打印出接受的日志,round-robin负载均衡起作用了。再多起几个SlaveActor,会将消息转发到新的actor中去,这就是 GroupPool 方式好的地方,可以动态变化。

此时,你可以尝试修改下配置,将slaveActor变成和masterActor一样的role,再运行后,你会发现有消息丢失,以及转发失败的日志出来。

这是因为在上面所有的例子中,为了方便理解,都是使用一个master+若干slave的方式来演示。

然而Akka的设计是Peer-to-Peer的,即所有节点对等,那么,RouterActor就会理所应当地认为在相同role的节点上都存在Routee,由于并没有去检查Routee是否能工作,直接进行了消息转发,而按照上面的写法masterAcotr所在的节点上压根就没起过slaveActor,所以就造成了消息丢失。

将配置中 allow-local-routees 改为 off ,这时它就不会把masterActor所在节点加到负载列表中去了。但同样的,你可以去起一个空的ActorSystem,看看有什么后果。

附录:

Akka提供的路由算法:

算法 说明 配置 算法类
RoundRobin 轮询的给路由列表中每个Routee发送消息 round-robin-pool 或 round-robin-group akka.routing.RoundRobin
Random 从路由列表中随机抽取一个Routee发送消息 random-pool 或 random-group akka.routing.Random
SmallestMailbox 优先选取路由表中mailbox内消息数最少的Routee发送消息 smallest-mailbox-pool akka.routing.SmallestMailbox
Broadcast 以广播的形式将消息同时转发给所有的Routee broadcast-pool 或 broadcast-group akka.routing.Broadcast
ScatterGatherFirstCompleted 将消息发送给所有的Routee,并等待第一个返回的结果,将该结果返回给发送者,其他结果被忽略掉 scatter-gather-pool 或 scatter-gather-group akka.routing.ScatterGatherFirstCompleted
TailChopping 先随机选一个Routee发送消息,等待一个短时间的延迟后,再随机选一个Routee发送消息,等待第一个返回的结果并将该结果发送回发送者,其他结果被忽略掉 tail-chopping-pool 或 tail-chopping-group akka.routing.TailChopping
ConsistentHashing 使用 一致性Hash算法 选取Routee转发消息 consistent-hashing-pool 或 consistent-hashing-group akka.routing.ConsistentHashing
Balancing 所有的Routee共享同一个mailbox,它会将繁忙的Routee中的任务重新分配给空闲的Routee,不支持group和广播 balancing-pool akka.routing.Balancing

本章代码地址: https://github.com/EdisonXu/akka-start-demo/tree/master/cluster


以上所述就是小编给大家介绍的《Akka入门系列(六):akka cluster中的路由和负载均衡》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Software Paradigms

Software Paradigms

Stephen H. Kaisler / Wiley-Interscience / 2005-03-17 / USD 93.95

Software Paradigms provides the first complete compilation of software paradigms commonly used to develop large software applications, with coverage ranging from discrete problems to full-scale applic......一起来看看 《Software Paradigms》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具