内容简介:对于其他服务器,它看起来像一个网络分区-服务器暂时断开连接,而对于崩溃的服务器本身,情况则大不相同,因为重新启动后,其所有内存状态都会丢失。
女主宣言
今天小编为大家分享一篇关于Golang实现Raft的文章,本篇文章为系列中的第四篇,对Raft中通过添加持久性和一些优化来完成Raft的基本实现。希望能对大家有所帮助。
PS:丰富的一线技术、多元化的表现形式,尽在“ 3 60云计算 ”,点关注哦!
本 篇 文章为Raft系列文章中的第四篇, 在该部分中,我们将通过添加持久性和一些优化来完成Raft的基本实现。
1
持久化
像Raft这样的共识算法的目标是通过在隔离的服务器之间复制任务来创建一个比其各个部分具有更高可用性的系统。 到目前为止,我们一直专注于网络分区的故障情况,其中群集中的某些服务器与其他服务器(或与客户端)断开连接。 失败的另一种模式是崩溃,其中服务器停止工作并重新启动。
对于其他服务器,它看起来像一个网络分区-服务器暂时断开连接,而对于崩溃的服务器本身,情况则大不相同,因为重新启动后,其所有内存状态都会丢失。
正是由于这个原因,Raft论文中的图2清楚地标记了哪个状态应该保持不变;持久状态将在每次更新时写入并刷新到持久化存储中。在服务器发出下一个RPC或答复正在进行的RPC之前,服务器必须保留的任何状态都将保留。
Raft只能通过保留其状态的子集来实现,即:
-
currentTerm - 此服务器观察到的最新任期
-
votedFor - 此服务器在最新任期为其投票的节点ID
-
log - Raft日志条目
2
命令传递语义
在Raft中,视不同情况,一个命令可以多次传递给客户端。 有几种可能发生这种情况的场景,包括崩溃导致重新启动(再次重播日志时)。
就消息传递语义而言,Raft选择的是”至少一次”。提交命令后,它将最终复制到所有客户端,但是某些客户端可能多次看到同一命令。因此,建议命令带有唯一的ID,并且客户端应忽略已交付的命令。这在Raft论文中的第8节有更详细的描述。
3
存储接口
为了实现持久性,我们在代码中添加了以下接口:
type Storage interface { Set(key string, value []byte) Get(key string) ([]byte, bool) // HasData returns true iff any Sets were made on this Storage. HasData() bool }
可以将它看作是一个映射,从字符串映射到一个由持久存储支持的通用字节切片。
4
恢复和保存状态
CM构造函数现在将接受一个 Storage 作为参数并调用:
if cm.storage.HasData() { cm.restoreFromStorage(cm.storage) }
restoreFromStorage 方法也是新增。它从存储中加载持久状态变量,使用标准的 encoding/gob 包对它们进行反序列化:
func (cm *ConsensusModule) restoreFromStorage(storage Storage) { if termData, found := cm.storage.Get("currentTerm"); found { d := gob.NewDecoder(bytes.NewBuffer(termData)) if err := d.Decode(&cm.currentTerm); err != nil { log.Fatal(err) } } else { log.Fatal("currentTerm not found in storage") } if votedData, found := cm.storage.Get("votedFor"); found { d := gob.NewDecoder(bytes.NewBuffer(votedData)) if err := d.Decode(&cm.votedFor); err != nil { log.Fatal(err) } } else { log.Fatal("votedFor not found in storage") } if logData, found := cm.storage.Get("log"); found { d := gob.NewDecoder(bytes.NewBuffer(logData)) if err := d.Decode(&cm.log); err != nil { log.Fatal(err) } } else { log.Fatal("log not found in storage") } }
镜像方法为 persistToStorage - 将所有这些状态变量编码并保存到提供的 Storage 中:
func (cm *ConsensusModule) persistToStorage() { var termData bytes.Buffer if err := gob.NewEncoder(&termData).Encode(cm.currentTerm); err != nil { log.Fatal(err) } cm.storage.Set("currentTerm", termData.Bytes()) var votedData bytes.Buffer if err := gob.NewEncoder(&votedData).Encode(cm.votedFor); err != nil { log.Fatal(err) } cm.storage.Set("votedFor", votedData.Bytes()) var logData bytes.Buffer if err := gob.NewEncoder(&logData).Encode(cm.log); err != nil { log.Fatal(err) } cm.storage.Set("log", logData.Bytes()) }
我们只需在这些状态变量发生变化的每个点调用 pesistToStorage 来实现持久化。如果看一下第2部分中CM的代码与本部分之间的区别,会发现它们散布在少数地方。
当然,这不是实现持久性的最有效的方法,但是简单有效,所以足以满足我们的需要。效率最低的是保存整个日志,这在实际应用中可能很大。为了真正解决这个问题,Raft有一个日志压缩机制,该机制在本文的第7节中进行了描述。我们不打算实现压缩,但是可以将其作为练习添加到我们的实现中。
5
崩溃伸缩
实施持久性后,我们的Raft集群在一定程度上可以应对崩溃。 只要集群中的少数节点崩溃并在以后的某个时间点重新启动,集群就将对客户端保持可用。 具有2N + 1个服务器的Raft群集将容忍N台故障服务器,并且只要其他N + 1台服务器仍保持相互连接,便会保持可用。
如果查看此部分的测试,会注意到添加了许多新测试。崩溃伸缩可以测试更大范围的人为情况组合,本文中也对此进行了一定程度的描述。
6
不可靠的RPC交付
需要注意的另一个方面是不可靠的RPC交付。 到目前为止,我们已经假设在连接的服务器之间发送的RPC可能到达目的地的时间很短。 如果查看server.go,会注意到它使用了一种称为 RPCProxy 的类型来实现这些延迟。 每个RPC都会延迟1-5毫秒,以模拟位于同一数据中心的节点的真实性。
RPCProxy让我们实现的另一件事是可选的不可靠交付。启用 RAFT_UNRELIABLE_RPC 环境变量后,RPC有时会明显延迟(延迟75毫秒)或完全中断。模拟了实际的网络故障。
我们可以在 RAFT_UNRELIABLE_RPC 开启的情况下重新运行所有测试,并观察Raft群集在出现这些故障时的行为。如果有兴趣,可以尝试调整 RPCProxy,不仅让RPC请求延迟,还可以让RPC答复延迟。
7
优化发送AppendEntries
正如在第2部分中简要提到的,当前的领导者执行效率很低。领导者在 LeaderSendHeartbeats 中发送AE,定时器每隔50毫秒调用一次。 假设提交了一条新命令; 领导者将等到下一个50毫秒的边界,而不是立即通知跟随者。 更糟的是,因为需要两次AE往返来通知跟随者命令已提交。 如图:
在时间(1),领导者将心跳AE发送给跟随者,并在几毫秒内获得响应。例如,在35毫秒后提交了新命令。领导者一直等到下一个50毫秒边界(2)才将更新的日志发送给跟随者。跟随者答复该命令已成功添加到日志(3)。此时,领导者已经提高了提交索引(假设它获得了多数),可以立即通知跟随者,但是它一直等到下一个50毫秒边界(4)为止。最后,当跟随者收到更新的 leaderCommit时,它可以将新提交的命令通知其自己的客户端。
在领导者的 Submit(X) 和跟随者的 commitChan <-X 之间经过的大部分时间对于实现来讲都是不必要的。
真正想要的是使序列看起来像这样:
看一下实现的新部分,从startLeader开始。
func (cm *ConsensusModule) startLeader() { cm.state = Leader for _, peerId := range cm.peerIds { cm.nextIndex[peerId] = len(cm.log) cm.matchIndex[peerId] = -1 } cm.dlog("becomes Leader; term=%d, nextIndex=%v, matchIndex=%v; log=%v", cm.currentTerm, cm.nextIndex, cm.matchIndex, cm.log) // This goroutine runs in the background and sends AEs to peers: // * Whenever something is sent on triggerAEChan // * ... Or every 50 ms, if no events occur on triggerAEChan go func(heartbeatTimeout time.Duration) { // Immediately send AEs to peers. cm.leaderSendAEs() t := time.NewTimer(heartbeatTimeout) defer t.Stop() for { doSend := false select { case <-t.C: doSend = true // Reset timer to fire again after heartbeatTimeout. t.Stop() t.Reset(heartbeatTimeout) case _, ok := <-cm.triggerAEChan: if ok { doSend = true } else { return } // Reset timer for heartbeatTimeout. if !t.Stop() { <-t.C } t.Reset(heartbeatTimeout) } if doSend { cm.mu.Lock() if cm.state != Leader { cm.mu.Unlock() return } cm.mu.Unlock() cm.leaderSendAEs() } } }(50 * time.Millisecond) }
不仅要等待50 ms的计时,startLeader中的循环还要等待两个可能的事件之一:
-
在cm.triggerAEChan上发送
-
计时器计数50毫秒
我们将很快看到触发 cm.triggerAEChan 的原因。这是现在应该发送AE的信号。每当触发通道时,计时器都会重置,并执行心跳逻辑-如果领导者没有新的要报告的内容,则最多等待50毫秒。
还要注意,实际发送AE的方法已从 leaderSendHeartbeats 重命名为 leaderSendAE,可以更好地在新代码中反映其目的。
我们所期望的,触发cm.triggerAEChan的方法之一是Submit:
func (cm *ConsensusModule) Submit(command interface{}) bool { cm.mu.Lock() cm.dlog("Submit received by %v: %v", cm.state, command) if cm.state == Leader { cm.log = append(cm.log, LogEntry{Command: command, Term: cm.currentTerm}) cm.persistToStorage() cm.dlog("... log=%v", cm.log) cm.mu.Unlock() cm.triggerAEChan <- struct{}{} return true } cm.mu.Unlock() return false }
修改成:
-
每当提交新命令时,都会调用cm.persistToStorage来保留新的日志条目。
-
一个空结构在 cm.triggerAEChan 上发送。将通知领导者goroutine中的循环。
-
锁定处理将重新排序;在发送cm.triggerAEChan时不想保持锁定,因为在某些情况下可能导致死锁。
在领导者中处理AE答复并推进提交索引的代码中 cm.triggerAEChan 将被通知。
if cm.commitIndex != savedCommitIndex { cm.dlog("leader sets commitIndex := %d", cm.commitIndex) // Commit index changed: the leader considers new entries to be // committed. Send new entries on the commit channel to this // leader's clients, and notify followers by sending them AEs. cm.newCommitReadyChan <- struct{}{} cm.triggerAEChan <- struct{}{} }
这个优化很重要,它使实现比以前对新命令的响应速度更快。
8
批量处理命令提交
现在,每次调用 Submit 都会触发很多活动 - 领导者立即向所有跟随者广播RPC。如果想一次提交多个命令,连接Raft群集的网络可能会被RPC淹没。
尽管它看起来效率低,但是安全。Raft的RPC都是幂等的,也就是说多次获得具有基本相同信息的RPC不会造成任何危害。
如果担心一次要频繁提交许多命令时的网络流量,那么批处理应该很容易实现。最简单的方法是提供一种将整个命令片段传递到Submit的方法。这样Raft实现中的代码改动会很小,并且客户端将能够提交整个命令组,而不会产生太多的RPC通信。有兴趣的可以尝试一下!
9
总结
到此,我们结束了有关Raft分布式共识算法的系列文章就结束了。
如果对文章或代码有任何疑问或意见,可以留言。
如果有兴趣学习成熟的 Go 实现的Raft项目代码,可以参考:
https://github.com/etcd-io/etcd/tree/master/raft 是etcd的Raft部分,它是一个分布式键值数据库。
https://github.com/hashicorp/raft 是一个独立的Raft共识模块,可以绑定到不同的客户端。
360云计算
由360云平台团队打造的技术分享公众号,内容涉及 数据库、大数据、微服务、容器、AIOps、IoT 等众多技术领域,通过夯实的技术积累和丰富的一线实战经验,为你带来最有料的技术分享
以上所述就是小编给大家介绍的《Go实现Raft第四篇:持久化和调优》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- (实战)indexedDB + webSocket实现数据持久化保存数据
- RabbitMQ 的消息持久化与 Spring AMQP 的实现剖析
- 攻击者如何借助授权插件,实现macOS持久化凭据窃取
- go实现区块链[3]-遍历区块链与数据库持久化
- RocketMQ 持久化
- docker数据持久化
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。