内容简介:在前面remote actor一章提到过,我们知道,分布式集群是由若干节点组成的,那么节点的发现及状态管理是分布式系统一个比较重要的任务。
在前面remote actor一章提到过, akka remoting
是 Peer-to-Peer
的,所以基于 remote
功能的 cluster
是一个去中心化的分布式集群。
Akka Cluster
将多个JVM连接整合在一起,实现消息地址的透明化和统一化使用管理,集成一体化的消息驱动系统。最终目的是将一个大型程序分割成若干子程序,部署到很多JVM上去实现程序的分布式并行运算(单机也可以起很多节点构成集群)。更重要的是, Akka Cluster
集群构建与Actor编程没有直接的联系,集群构建是在ActorSystem层面上,实现了Actor消息地址的透明化,无需考虑目标运行环节是否分布式,可以按照正常的Actor编程模式进行开发。
我们知道,分布式集群是由若干节点组成的,那么节点的发现及状态管理是分布式系统一个比较重要的任务。 Akka Cluster
中将节点的生命周期划分为:
- joining - 当尝试加入集群时的初始状态
- up - 加入集群后的正常状态
- leaving / exiting - 节点退出集群时的中间状态
- down - 集群无法感知某节点后,将其标记为down
- removed - 从集群中被删除,以后也无法再加入集群
其实当参数 akka.cluster.allow-weakly-up-members
启用时(默认是启用的),还有个 weakly up
,它是用于集群出现分裂时,集群无法收敛,则leader无法将状态置为up的临时状态。这个后面再解释。
图中还有两个特殊的名词:
- fd* - 这个表示akka的错误检测机制
Faiulre Detector
被触发后,将节点标记为unreachable - unreachable* -
unreachable
不是一个真正的节点状态,更多的像是一个flag,用来描述集群无法与该节点进行通讯。当错误检测机制侦测到这个节点又能正常通讯时,会移除这个flag。
市面上大多数产品的分布式管理一般用的是注册中心机制,例如zk、consul或etcd。其实是节点把自己的信息注册到所使用的注册中心里,而master通过接受注册中心的通知得知新节点信息。显然本质上是一种master/slave的架构。这种架构有两个问题:
-
master节点一般是单一的,一旦挂了影响就比较大(所以很多master都采用了HA机制),也就是所谓的系统单点故障;
-
通常节点的地址发现是要走master去获取的,当系统并发大时,master节点就可能成为性能瓶颈,即单点性能瓶颈。
Akka
可能就是考虑这两点,采用了P2P的模式,这样任何一个节点都可以作为”master”,任何的节点都可以用来寻找其他节点地址。那它是怎么做到的呢?答案是 Gossip 协议和 CRDT
。
Akka Gossip
基本介绍
Gossip协议
Gossip
协议简单来说,就是病毒式的将信息扩散到整个集群,无法确定何时完成完全扩散,但最终是会到达完全扩散状态的(最终一致性),即收敛。具体介绍可以参考我转载的一片文章—— P2P 网络核心技术:Gossip 协议 ,这里就不再重复叙述,着重介绍下 Akka
是怎么使用 Gossip
的。
CRDT
P2P的分布式系统中,理论上每个节点都能处理外部的请求,以及向其他节点发送请求。而系统中存在的共享变量,可能在同一时间会被两个不同节点的请求用到,即并发安全问题。一般解决方案是队列或自旋,后者本质上还是一种变相的队列。排队就牵扯到两个问题:
- “谁先来的”
很多人下意识会觉得用时间戳就可以了嘛,但在分布式集群中,每个节点如果是一台单独的服务器,那么每个节点的时间戳未必相同(比如未开启Ntp)。
- “同时来的怎么办”
就像git,能merge就merge,不能merge就解决冲突。
CRDT
就是用于解决解决分布式事件的先后顺序及merge问题的数据结构的简称,即 Conflict-Free Replicated Data Types
的缩写,它的作用是保证最终一致性,出处参阅 这份论文 。白话文 谈谈CRDT 和 CRDT介绍 这两篇文章讲的通俗易懂,多的就不再重复了。
Akka
中节点的状态就是一个特殊的 CRDT
,使用向量时钟 Vector Clock
实现方案,关于向量时钟 Vector Clock
可以参见我转发的这篇文章 Vector Clock/Version Clock 。
Akka
的gossip协议发送的具体内容如下:
final case class Gossip( members: immutable.SortedSet[Member], // sorted set of members with their status, sorted by address overview: GossipOverview = GossipOverview(), version: VectorClock = VectorClock(), // vector clock version tombstones: Map[UniqueAddress, Gossip.Timestamp] = Map.empty ) final case class GossipOverview( seen: Set[UniqueAddress] = Set.empty, reachability: Reachability = Reachability.empty ) class Reachability private ( val records: immutable.IndexedSeq[Reachability.Record], val versions: Map[UniqueAddress, Long] )
- members 存放该节点知道的其他节点
- seen 已经收到本次gossip的节点们,每个节点当接受到一个新的gossip消息时,会把自己放到seen里面,作为响应返回给发送者
- reachability 这个由错误检测机制
Faiulre Detector
的心跳模块来维护,用来判断节点是否存活。正常情况下records应该是空的,当有节点处于Unreachable时,才会有记录加到records里。 - version 向量时钟,用于冲突检测和处理
种子节点 SeedNode
SeendNode
一般是提前配置好的一组节点。它用于接受其他节点(可以是种子节点)的加入集群的请求。不同节点,在 Akka Cluster
中启动时会有不同的逻辑:
- 如果是种子节点,并且是 排序 后的种子节点数组中 排第一 的,它会在一个规定的时间内(默认5秒)去尝试加入已存在的集群,即发送
InitJoin
消息到其他种子节点。如果未能成功加入,则自己将 创建一个新的Cluster 。 - 如果是种子节点,但并不是数组中排第一的,则会向其他种子节点发送
InitJoin
消息,如果失败将不断重试,直到能成功加入 第一个返回响应的已加入集群的种子节点 对应的Cluster。 - 如果是普通节点,则会向其他种子节点发送
InitJoin
消息,如果失败将不断重试,直到能成功加入 第一个返回响应的已加入集群的种子节点 对应的Cluster。
这里有一点值得注意,为什么是加入第一个返回响应的种子节点所在的集群?这个问题后面再解释。
过程详解
下面用一个简单的场景来解释整个交互过程,假定我们有两个节点n1和n2,其中n1是种子节点。我们让n2先启动。
上图中的T0、T1表示时间轴,但只是为了方便将步骤拆解,便于理解。其中T4和T5并没有必然的时间前后关系,这里只是假定T4在前,步骤基本是类似的,T5在前也只是稍有不同。
#T0、T1时刻只是为了表明n2在启动时,如果没有种子节点响应,则会一直等待重试
#T2时刻种子节点自己新建一个集群,由于新集群只有它自己,members和seen是一样的,所以把自己作为集群的 leader
。
leader
- Gossip协议中没有
leader
选举过程 -
leader
只是一个角色,任何节点均可以是leader
-
leader
的确定非常简单: 集群收敛后,当前members队列按IP进行排序,排第一位置的节点就是整个集群的leader
-
leader
并非一直不变,如果集群有新节点加入或某节点退出,导致发生Gossip过程,收敛后都会重新确定leader
-
leader
的职责是更新节点在集群中的状态以及将集群的成员移入或移出集群
注意,这里有个地方容易被误解:“n1和n2构成一个集群,不是在T5才收敛吗?怎么在T2就确定 leader
了?”
其实当第一个种子节点新建cluster时,由于只有它一个,即seen和members里内容一样,它判断当前集群已收敛,就把自己当作leader了。所以才有了T2_2和T4。
#T3时刻是n1响应n2的InitJoin请求,具体交互过程如下:
#T3_0种子节点收到n2的 Join
消息后,会做两件事:
- 更新当前Gossip的向量时钟;
- 清空当前Gossip的seen队列,然后把自己加进去。(后续发起Gossip交互时,会优先选择那些没在seen队列中的成员)
#T4时刻因为作为fd能正常与n2进行心跳,n1作为leader就被通知将n2提升为Up状态
#T5时刻是一个 CRDT
的对比过程,对比两个Goissp的 version
,即 VectorClock
,比较的结果有三种:
- Same : 相同,则进行seen队列合并就可以了
- Before : 本地新,则向对端发送本地的Gossip,本地不变
- After : 对端新,则更新本地的Gossip。如果对端的Gossip的seen里没有包含本地,则将自己添加到seen里发送给对端,以减少一次两者间的Gossip交互。
#T5时刻最后集群达到了收敛
Gossip 收敛
从上面的图里可以看到节点初始化时会把自己加入到members里,回传回去,同时,节点在收到新的Gossip时,会把自己加入到seen里面。那么,在一开始,members和seen中的节点数是不同的。
当Gossip传递的消息被整个集群都消化掉的时候,可以称作当前集群的Gossip收敛。靠以下条件判断Gossip收敛:
- 集群中不存在
unreachable
的节点,或者unreachable
的节点应该均处于down
或exiting
状态 - 正常节点均处于
up
或leaving
状态,且members里的节点都在seen里,即集群中所有的节点都收到过该Gossip
代码演示
说了那么多文字, Akka Cluster
提供了监控ClusterEvent的方法,我们可以用代码来校验下上面的知识。
添加依赖
<dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-cluster_2.12</artifactId> <version>2.5.17</version> </dependency>
首先编写 application.conf
配置文件
akka { actor { provider = "cluster" } remote { netty.tcp { hostname = "127.0.0.1" port = 0 } artery { enabled = on canonical.hostname = "127.0.0.1" canonical.port = 0 } } cluster { seed-nodes = [ "akka://ClusterSystem@127.0.0.1:2552", "akka://ClusterSystem@127.0.0.1:2551" ] } }
然后,编写Actor
public class SimpleClusterListener extends AbstractActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); Cluster cluster = Cluster.get(getContext().system()); //subscribe to cluster changes @Override public void preStart() throws Exception { cluster.subscribe(self(), ClusterEvent.initialStateAsEvents(), ClusterEvent.MemberEvent.class, ClusterEvent.UnreachableMember.class); log.info("I'm about to start! Code: {} ", getSelf().hashCode()); } @Override public void postStop() throws Exception { cluster.unsubscribe(self()); } @Override public Receive createReceive() { return receiveBuilder() .match(ClusterEvent.MemberUp.class, mUp->log.info("Member is Up: {}", mUp.member())) .match(ClusterEvent.UnreachableMember.class, mUnreachable->log.info("Member detected as unreachable: {}", mUnreachable.member())) .match(ClusterEvent.MemberRemoved.class, mRemoved->log.info("Member is Removed: {}", mRemoved.member())) .match(ClusterEvent.LeaderChanged.class, msg->log.info("Leader is changed: {}", msg.getLeader())) .match(ClusterEvent.RoleLeaderChanged.class, msg->log.info("RoleLeader is changed: {}", msg.getLeader())) .match(ClusterEvent.MemberEvent.class, event->{}) //ignore .build(); } }
最后是启动类
public class App { public static void main( String[] args ) { if(args.length==0) startup(new String[] {"2551", "2552", "0"}); else startup(args); } public static void startup(String[] ports){ ExecutorService pool = Executors.newFixedThreadPool(ports.length); for(String port : ports){ pool.submit(()->{ // Using input port to start multiple instances Config config = ConfigFactory.parseString( "akka.remote.netty.tcp.port=" + port + "\n" + "akka.remote.artery.canonical.port=" + port) .withFallback(ConfigFactory.load()); // Create an Akka system ActorSystem system = ActorSystem.create("ClusterSystem", config); // Create an system.actorOf(Props.create(SimpleClusterListener.class), "ClusterListener"); }); } } }
这里设置了2552和2551两个种子节点,及一个随机端口启动的普通节点。 故意在配置中把2552放到2551前面去。
带参数2551作为端口启动程序,命名为Node1,启动后,可以看到它会不断尝试连接提供的种子节点中排第一的2552
[WARN] [11/07/2018 17:15:13.823] [ClusterSystem-akka.actor.default-dispatcher-5] [akka://ClusterSystem@127.0.0.1:2551/system/cluster/core/daemon/joinSeedNodeProcess-1] Couldn't join seed nodes after [2] attempts, will try again. seed-nodes=[akka://ClusterSystem@127.0.0.1:2552] [WARN] [11/07/2018 17:15:18.835] [ClusterSystem-akka.actor.default-dispatcher-10] [akka://ClusterSystem@127.0.0.1:2551/system/cluster/core/daemon/joinSeedNodeProcess-1] Couldn't join seed nodes after [3] attempts, will try again. seed-nodes=[akka://ClusterSystem@127.0.0.1:2552]
这时带参数2552启动程序,命名为Node2,命令行会打印
[INFO] [11/07/2018 17:18:21.285] [ClusterSystem-akka.actor.default-dispatcher-10] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2552] - Node [akka://ClusterSystem@127.0.0.1:2552] is JOINING itself (with roles [dc-default]) and forming new cluster [INFO] [11/07/2018 17:18:21.288] [ClusterSystem-akka.actor.default-dispatcher-10] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2552] - Cluster Node [akka://ClusterSystem@127.0.0.1:2552] dc [default] is the new leader [INFO] [11/07/2018 17:18:21.300] [ClusterSystem-akka.actor.default-dispatcher-10] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2552] - Leader is moving node [akka://ClusterSystem@127.0.0.1:2552] to [Up] [INFO] [11/07/2018 17:18:21.308] [ClusterSystem-akka.actor.default-dispatcher-11] [akka://ClusterSystem/user/ClusterListener] Member is Up: Member(address = akka://ClusterSystem@127.0.0.1:2552, status = Up) [INFO] [11/07/2018 17:18:23.323] [ClusterSystem-akka.actor.default-dispatcher-10] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2552] - Received InitJoin message from [Actor[akka://ClusterSystem@127.0.0.1:2551/system/cluster/core/daemon/joinSeedNodeProcess-1#1739420295]] to [akka://ClusterSystem@127.0.0.1:2552] [INFO] [11/07/2018 17:18:23.323] [ClusterSystem-akka.actor.default-dispatcher-10] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2552] - Sending InitJoinAck message from node [akka://ClusterSystem@127.0.0.1:2552] to [Actor[akka://ClusterSystem@127.0.0.1:2551/system/cluster/core/daemon/joinSeedNodeProcess-1#1739420295]] (version [2.5.17]) [INFO] [11/07/2018 17:18:23.549] [ClusterSystem-akka.actor.default-dispatcher-10] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2552] - Node [akka://ClusterSystem@127.0.0.1:2551] is JOINING, roles [dc-default] [INFO] [11/07/2018 17:18:24.236] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2552] - Leader is moving node [akka://ClusterSystem@127.0.0.1:2551] to [Up] [INFO] [11/07/2018 17:18:24.237] [ClusterSystem-akka.actor.default-dispatcher-12] [akka://ClusterSystem/user/ClusterListener] Member is Up: Member(address = akka://ClusterSystem@127.0.0.1:2551, status = Up) [INFO] [11/07/2018 17:18:25.235] [ClusterSystem-akka.actor.default-dispatcher-11] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2552] - Cluster Node [akka://ClusterSystem@127.0.0.1:2552] dc [default] is no longer the leader
Node [akka://ClusterSystem@127.0.0.1:2552] is JOINING itself (with roles [dc-default]) and forming new cluster
说明作为排第一的种子节点,它创建了集群并把自己加了进去。
Cluster Node [akka://ClusterSystem@127.0.0.1:2552] dc [default] is the new leader
说明2552变成了 leader
。
Node [akka://ClusterSystem@127.0.0.1:2551] is JOINING, roles [dc-default]
2551在尝试加入集群
Leader is moving node [akka://ClusterSystem@127.0.0.1:2551] to [Up]
2551成功加入了集群,状态变为Up
Cluster Node [akka://ClusterSystem@127.0.0.1:2552] dc [default] is no longer the leader
集群变化导致新一轮Goissp收敛后, leader
重新选取,2551的IP比2552小,被选为新的 leader
。
可以从Node1的命令行看到证据:
[INFO] [11/07/2018 17:18:25.755] [ClusterSystem-akka.actor.default-dispatcher-9] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka://ClusterSystem@127.0.0.1:2551] - Cluster Node [akka://ClusterSystem@127.0.0.1:2551] dc [default] is the new leader
进阶
其实本来这部分应该放在上面,但是一上来讲理论非常不好消化,至少我个人是如此。所以,我宁愿把好理解的交互步骤放前面,把一些知识点穿插在里面,最后再把无法放进去的干巴巴的理论放最后。
Akka对于Gossip的优化
biased gossip
Failure Detector机制
- 职责是定期检查集群中节点是否可用
- 是 The Phi Accrual Failure Detector 的实现,是一种解耦了观察与行为的增量式错误检测器。它不会简单的判断节点是否可用,而是通过收集各种数据计算出
phi
值,通过与设定好的threshold
进行对比,判断是否出现错误。 - 每个节点会根据集群节点的hash有序环确定临近的几个节点进行监控(默认是5个),方便跨机房进行监控,保证集群节点的全覆盖。目标节点每1秒向这些节点发送心跳。
- 只要有一个monitor认为某节点是
unreachable
状态,那么该节点就会被集群认为是unreachable
- 被标记为
unreachable
的节点,只有在所有的monitor都认为它是reachable
时,它才会被重新认为是reachable
,leader会重新改变它的状态
网络分区与集群分区
当网络出现异常,比如一个跨两地机房的集群,机房间的网络断了。这时:
- 原创建集群的种子节点所在的集群,会重新发起
biased gossip
,直至收敛,确认新的leader
,被隔开的那部分节点会被认为是unreachable
而最终被踢掉 - 被隔开的那部分节点,会重新发起
biased gossip
,其中排序在最前面的种子节点会创建一个新的集群,并产生新的leader
。原集群中的那部分失联节点会被认为是unreachable
而最终被从新集群踢掉 - 两个集群最终都恢复正常能对外提供服务,即原来的一个集群在无人干涉的情况下,分裂成了两个集群
- 当网络恢复后,两个集群会重新发起
biased gossip
,尝试融合,恢复成一个大集群。
总结
由此可见,从设计上来说, Akka Cluster
是完全去中心化,无单点故障和单点性能瓶颈的,具有天然的分布式容错性和可扩容性。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 数据管理流程,基础入门简介
- Golang入门-Golang包管理
- Rust包管理器Cargo入门
- 全栈开发入门实战:后台管理系统
- kustomize 入门:Kubernetes 原生配置管理工具
- Helm 入门介绍:Kubernetes 上的包管理软件
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。