Akka入门系列(五):akka cluster的基本使用

栏目: Scala · 发布时间: 6年前

内容简介:前面一个章节akka cluster管理介绍了我们知道,目前集群后台的使用方式主要有以下几种:

前面一个章节akka cluster管理介绍了 Akka Cluster 的底层原理,这一章就来看看如何使用。

集群后台接入

对外

我们知道,目前集群后台的使用方式主要有以下几种:

  • 后端直接监听指定协议的网络端口,接受外部请求,处理后按指定协议打包后返回响应。常规用法比如实现RESTful的SpringMVC,使用ProtoclBuffer、thrift等做压缩协议由Netty监听等等。
  • 由集群提供客户端API,通过客户端向集群提交请求,可同步/异步的获得结果。
  • 消息队列,通过异步的将消息发送到消息队列,集群监听消息队列获取消息后进行处理,最终将结果反馈到其他消息队列。
  • 监听流,比如对目录下文件的监听处理,或基于流式消息队列的监听处理等。

Akka 提供了以下几个组件以满足这几种不同的调用方式:

  • Akka Http 监听HTTP端口对外响应。 注意,这不是一个我们常见类似于SpringMVC的web服务框架,更多的是一个类似于HttpClient一样进行HTTP通信的 工具 集,但是是基于Actor和ActorStream的
  • Cluster ClientAkka Cluster 提供的远程客户端,用以向集群提交请求并获得结果。
  • Akka Stream 提供了完整的IO流及流式处理的工具集和API。
  • Alpakaa Akka提供的整合 Kafka 的流式处理API。

对内

集群内部调用,一般有以下几种方式:

  • 查找目标,直接调用。由前文可知, Akka Cluster 是完全的 P2P 结构,所以集群种任何一个Actor可以随意去请求任何的其他Actor,只需要简单的指定其ActorPath即可。
  • 发布/订阅模式。 akka.cluster.pubsub.DistributedPubSubMediator 可不使用外部MQ的情况下,直接在集群内部提供点对点或订阅功能。

由于官方的样例中已经提供了比较好的学习代码,本章就不再自己写代码去演示了。

官方在github例子地址: akka-samples

对于Cluster,在 akka-sample-cluster-java 提供了4个例子:

  • simple: 主要演示cluster启动过程中节点的交互过程,对应的是我上一篇文章
  • transformation: 最基本的cluster应用,典型的master-worker模式
  • stats: 主要演示cluster中路由的应用
  • factorial: 主要演示cluster中负载均衡的使用

最简单的例子

通常来讲,使用分布式集群的应用,大概率是并发请求量大,单请求处理较为耗时,可改为并行处理提高响应速度的,而常见的就是master-slave模式,即master接受外部请求、分配任务及返回响应,而真正的处理过程是交给slave去异步做的。所以,在这种集群应用中,不同的节点会分饰不同的角色。

第一个例子:

  • 集群分为前端和后端两部分
  • frontend维护了n个backend,并定期向backend发送hello[n]的消息,比如hello1,hello2
  • backend将字母转换为大写,返回给frontend

由于比较简单,完整的代码我就不贴了,只看几个关键点

前端

public class TransformationFrontend extends AbstractActor {

  List<ActorRef> backends = new ArrayList<>();
  int jobCounter = 0;

  @Override
  public Receive createReceive() {
    return receiveBuilder()
      .match(TransformationJob.class, job -> backends.isEmpty(), job -> {
        sender().tell(new JobFailed("Service unavailable, try again later", job),
          sender());
      })
      .match(TransformationJob.class, job -> {
        jobCounter++;
        backends.get(jobCounter % backends.size())
          .forward(job, getContext());
      })
      .matchEquals(BACKEND_REGISTRATION, message -> {
        getContext().watch(sender());
        backends.add(sender());
      })
      .match(Terminated.class, terminated -> {
        backends.remove(terminated.getActor());
      })
      .build();
  }
}

前端维护了一个backend的ActorRef的列表,在收到自定义的 BACKEND_REGISTRATION 事件后,将消息的发送者,即Backend的actor所对应的 ActorRef 放到该列表去。

TransformationJob 是外部提交的任务,如果列表为空时,会返回 JobFailed ,否则用简单的负载均衡方法将Job转发给对应的后端(当前已收到的job数量对后端数取余)。

由于只有一个Frontend,并且Actor内部有 mailbox 队列,所以这里的jobCounter不会出现并发问题。

后端

public class TransformationBackend extends AbstractActor {
    
  Cluster cluster = Cluster.get(getContext().system());
  LoggingAdapter log = Logging.getLogger(getContext().system(), this);

  //subscribe to cluster changes, MemberUp
  @Override
  public void preStart() {
    cluster.subscribe(self(), MemberUp.class);
  }

  //re-subscribe when restart
  @Override
  public void postStop() {
    cluster.unsubscribe(self());
  }
    
  @Override
  public Receive createReceive() {
    return receiveBuilder()
      .match(TransformationJob.class, job -> {
        sender().tell(new TransformationResult(self().path().toSerializationFormat(), job.getText().toUpperCase()),
          self());
      })
      .match(CurrentClusterState.class, state -> {
        for (Member member : state.getMembers()) {
          if (member.status().equals(MemberStatus.up())) {
            register(member);
          }
        }
      })
      .match(MemberUp.class, mUp -> {
        register(mUp.member());
      })
      .build();
  }

  void register(Member member) {
    if (member.hasRole("frontend")) {
      log.info("Trying to register myself: {}", self().path().toSerializationFormat());
      getContext().actorSelection(member.address() + "/user/frontend").tell(
              BACKEND_REGISTRATION, self());
    }
  }
}

后端在 prestart() 时去监听了 MemberUp 的事件,当收到 MemberUp 时,通过简单的判断当前Member的角色是 frontend 就尝试给frontend发送注册消息,把自己的ActorRef加到frontend所维护的列表中。

这里为了体现是哪个后端所做的job,我加上了相关日志,在运行时可以仔细观察以下。

这里有个小问题不妨思考下:如果前端此时并未启动,这个 BACKEND_REGISTRATION 会怎么样呢?

启动

前端的启动

public static void main(String[] args) {
    // Override the configuration of the port when specified as program argument
    final String port = args.length > 0 ? args[0] : "0";
    final Config config = 
      ConfigFactory.parseString(
          "akka.remote.netty.tcp.port=" + port + "\n" +
          "akka.remote.artery.canonical.port=" + port)
      .withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]"))
      .withFallback(ConfigFactory.load());

    ActorSystem system = ActorSystem.create("ClusterSystem", config);

    final ActorRef frontend = system.actorOf(
        Props.create(TransformationFrontend.class), "frontend");
    final FiniteDuration interval = Duration.create(2, TimeUnit.SECONDS);
    final Timeout timeout = new Timeout(Duration.create(5, TimeUnit.SECONDS));
    final ExecutionContext ec = system.dispatcher();
    final AtomicInteger counter = new AtomicInteger();
    system.scheduler().schedule(interval, interval, new Runnable() {
      public void run() {
        ask(frontend,
            new TransformationJob("hello-" + counter.incrementAndGet()),
            timeout).onSuccess(new OnSuccess<Object>() {
          public void onSuccess(Object result) {
            System.out.println(result);
          }
        }, ec);
      }

    }, ec);

  }
Member

后端的启动我就不贴了,简单的读取application.conf,覆盖端口配置和角色,然后启动 ActorSystem 去创建actor。

前面的那个问题:“如果前端此时并未启动,这个 BACKEND_REGISTRATION 会怎么样呢?”

在官方提供的代码中, TransformationApp 这个类整合了前后端节点的启动,但是它把后端分配为2551和2552端口,即把两个后端作为了种子节点,而前端作为了普通节点。如果是像它这样放在一起启动倒也没什么,但是如果单独一个个去运行时,就可能会出现前端在等待集群创建,而后端在memberup时,并没有找到前端Actor,导致注册失败。因为除非Cluster发生变化导致重新Gossip改变节点状态,否则MemberUp事件不会再发,这时哪怕前端启动了,其维护的后端列表依然为空。所以,一般情况下,像这种master-slave的用法,最好master就作为种子节点。

可以看到这样去实现master-slave虽然可以,但是依然存在些问题。好在 Akka 已经提供好了解决方案,就是Router和Routee,我们下章继续。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

机器学习实战:基于Scikit-Learn和TensorFlow

机器学习实战:基于Scikit-Learn和TensorFlow

Aurélien Géron / 王静源、贾玮、边蕤、邱俊涛 / 机械工业出版社 / 2018-8 / 119.00

本书主要分为两个部分。第一部分为第1章到第8章,涵盖机器学习的基础理论知识和基本算法——从线性回归到随机森林等,帮助读者掌握Scikit-Learn的常用方法;第二部分为第9章到第16章,探讨深度学习和常用框架TensorFlow,一步一个脚印地带领读者使用TensorFlow搭建和训练深度神经网络,以及卷积神经网络。一起来看看 《机器学习实战:基于Scikit-Learn和TensorFlow》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具