内容简介:总结下 MIT6.824 Lab2A Raft 选主的实验笔记。本文代码:Raft 将一致性问题分解成三个子问题:Leader 选举、日志复制、安全性保证,分别对应 Lab 的 2A, 2B, 2C,均可参考原论文图 2 中对 Raft 实现的简要总结。本小节实验目标:Lab 限制 leader 每秒最多发送 10 次心跳请求,实现时取心跳间隔为 100ms。相应的,选举超时时间应比心跳大一个量级左右,我实现时取
总结下 MIT6.824 Lab2A Raft 选主的实验笔记。本文代码: MIT6.824/raft
Lab2A
Raft 将一致性问题分解成三个子问题:Leader 选举、日志复制、安全性保证,分别对应 Lab 的 2A, 2B, 2C,均可参考原论文图 2 中对 Raft 实现的简要总结。本小节实验目标:
- 实现 Leader 选举:选出单个 leader 并保持领导地位,直到自己 crash
- 实现心跳通信:实现 leader 与其他节点的无日志 AppendEntries RPC 调用
Leader 选举
Lab 限制 leader 每秒最多发送 10 次心跳请求,实现时取心跳间隔为 100ms。相应的,选举超时时间应比心跳大一个量级左右,我实现时取 400 + rand.Intn(4) * 100 ,即 400~800ms 内的随机值,尽可能避免选举 split vote 情况。
选举流程
参考上一篇文章: Leader 选举
发起投票
定义 Raft 节点:
type Raft struct {
mu sync.Mutex // 共享锁
peers []*labrpc.ClientEnd // 集群中的全部节点
persister *Persister // 持久化工具
me int // 本节点在 peers 中的索引
curTerm int // 节点目前的任期号
votedFor int // 节点目前的投票对象
entries []LogEntry // 本地日志
state PeerState // 节点状态
timer *RaftTimer // 选举超时定时器
entryCh chan LogEntry // 日志处理 channel
}
每个节点在 Make 初始化时都选择时长随机的 RaftTimer,之后启动新的 goroutine 监听 timer 超时和 entryCh 心跳请求,当 RaftTimer 超时后,变身为候选人发起投票。
代码实现:
// 投票参数
type RequestVoteArgs struct {
Term int // 候选人的任期号
CandidateId int // 候选人 id
}
// 响应投票
type RequestVoteReply struct {
Term int // 选民节点的任期号
VoteGranted bool // 是否赢得该选票
}
// 候选人发起投票
func (rf *Raft) vote() {
rf.curTerm++
rf.state = Candidate
rf.votedFor = rf.me
args := RequestVoteArgs{
Term: rf.curTerm,
CandidateId: rf.me,
}
replyCh := make(chan RequestVoteReply, len(rf.peers))
var wg sync.WaitGroup
for i := range rf.peers {
if i == rf.me {
continue
}
wg.Add(1)
go func(server int) {
defer wg.Done()
var reply RequestVoteReply
if succ := rf.sendRequestVote(server, &args, &reply); !succ {
return
}
replyCh <- reply
}(i)
}
go func() {
wg.Wait()
close(replyCh) // 避免资源泄漏
}()
votes := 1
targetVotes := len(rf.peers)/2 + 1
for reply := range replyCh {
// 已有更新 leader,回退到 follower
if reply.Term > rf.curTerm {
rf.back2Follower(reply.Term)
return
}
if reply.VoteGranted {
votes++
}
// 如果选票已过半,不再等待已 crash 的节点调用超时
if votes >= targetVotes {
break
}
}
// 因 split vote 等原因未达到多数票
if votes < len(rf.peers)/2+1 {
rf.resetElectTimer()
return
}
// 成功当选,立刻发送心跳
rf.state = Leader
go rf.heartbeat()
}
注意减少选举耗时:候选人收集选票过程中,实时计票过半后即可结束选举,而非等待所有请求都返回了才去计票。假设有的节点已 crash,那 RPC 调用将超时返回 false,超时时间为 100ms,若不立即结束选举,候选人将白白浪费 100ms 时间,也就无法及时选出 leader
响应投票
Raft 对投票节点提出了三点要求:
- 每轮能投几张:一个任期内,一个节点只能投一张票
- 是否要投:候选人的日志至少要和自己的一样新,才投票
- 投给谁:first-come-first-served,投给第一个符合条件的候选人
代码实现(干净整洁的代码):
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
reply.Voter = rf.me
reply.Term = rf.curTerm
switch {
case args.Term < rf.curTerm: // 拒绝处理
reply.VoteGranted = false
return
case args.Term == rf.curTerm: // 每个任期只能投一票
if rf.votedFor == VOTE_NIL || rf.votedFor == args.CandidateId {
reply.VoteGranted = true
rf.votedFor = args.CandidateId
rf.back2Follower(args.Term)
}
case args.Term > rf.curTerm: // 直接投票
reply.VoteGranted = true
rf.votedFor = args.CandidateId
rf.back2Follower(args.Term)
}
return
}
比较候选人与自己的日志将在 2B 中实现。
心跳通信
Raft 将客户端的命令封装为 log entry:
type LogEntry struct {
Index int // 日志索引号
Term int // 写入日志时节点的任期号
Command interface{} // 客户端命令
}
心跳请求
当候选人成功竞选为 leader 后要 立刻 给集群中其他节点发送心跳,避免有的节点也超时发起新一轮选举。
代码实现:
// 心跳请求
type AppendEntriesArgs struct {
Term int // leader 任期号
LeaderId int // leader id
PrevLogIndex int // 暂时不用
PrevLogTerm int //
Entries []LogEntry // 批量日志,心跳时为空
}
// 心跳响应
type AppendEntriesReply struct {
Term int // 节点任期号
Succ bool // 心跳是否成功响应
}
// leader 发送心跳
func (rf *Raft) heartbeat() {
t := time.NewTicker(HEARTBEAT_INTERVAL) // 100ms
for {
if !rf.isLeader() {
return
}
args := AppendEntriesArgs{
Term: rf.curTerm,
LeaderId: rf.me,
PrevLogIndex: 0,
PrevLogTerm: 0,
Entries: nil, // 心跳时为空日志
}
replyCh := make(chan AppendEntriesReply, len(rf.peers))
var wg sync.WaitGroup
for i := range rf.peers {
if i == rf.me {
continue
}
wg.Add(1)
go func(server int) {
defer wg.Done()
var reply AppendEntriesReply
if succ := rf.sendAppendEntries(server, &args, &reply); !succ {
return
}
replyCh <- reply
}(i)
}
wg.Wait()
close(replyCh)
var lived int
for reply := range replyCh {
if reply.Term > rf.curTerm {
// 发现新 leader,如网络分区恢复
rf.back2Follower(reply.Term)
return
}
lived++
}
// 未收到来自大多数节点的心跳,重新开始选举
if lived < len(rf.peers)/2+1 {
rf.vote() // 重新开始投票
return
}
<-t.C
}
}
响应心跳
对于心跳请求,节点需对比任期号,并进行日志的一致性检查:
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
if len(args.Entries) > 0 {
log.Fatal("invalid entry in 2A")
}
reply.Term = rf.curTerm
if rf.curTerm > args.Term {
reply.Succ = false
return
}
// 检查双方日志的一致性
if i := len(rf.entries) - 1; i >= 0 {
switch {
case i < args.PrevLogIndex: // 本地少日志,让 leader nextIndex[i]-- 后再同步
reply.Succ = false
return
case i == args.PrevLogIndex:
if rf.entries[i].Term != args.PrevLogTerm { // term 不匹配
reply.Succ = false
return
}
case i > args.PrevLogIndex: // 强制删除
rf.entries = rf.entries[args.PrevLogIndex:]
}
}
rf.entries = append(rf.entries, args.Entries...)
rf.entryCh <- LogEntry{Term: args.Term}
reply.Succ = true
return
}
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- php如何实现session,自己实现session,laravel如何实现session
- AOP如何实现及实现原理
- webpack 实现 HMR 及其实现原理
- Docker实现原理之 - OverlayFS实现原理
- 为什么实现 .NET 的 ICollection 集合时需要实现 SyncRoot 属性?如何正确实现这个属性?
- 自己实现集合框架(十):顺序栈的实现
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
大数据时代小数据分析
屈泽中 / 电子工业出版社 / 2015-7-1 / 69.00元
《大数据时代小数据分析》是一本大数据时代下进行小数据分析的入门级教材,通过数据分析的知识点,将各类分析工具进行串联和对比,例如:在进行线性规划的时候可以选择使用Excel或LINGO或Crystal Ball。工具的应用难易结合,让读者循序渐进地学习相关工具。JMP和Mintab用来分析数据,分析的结果使用Excel、LINGO、Crystal Ball来建立数据模型,最后使用Xcelsius来动......一起来看看 《大数据时代小数据分析》 这本书的介绍吧!