Lab2A. Raft 选主实现

栏目: 后端 · 发布时间: 5年前

内容简介:总结下 MIT6.824 Lab2A Raft 选主的实验笔记。本文代码:Raft 将一致性问题分解成三个子问题:Leader 选举、日志复制、安全性保证,分别对应 Lab 的 2A, 2B, 2C,均可参考原论文图 2 中对 Raft 实现的简要总结。本小节实验目标:Lab 限制 leader 每秒最多发送 10 次心跳请求,实现时取心跳间隔为 100ms。相应的,选举超时时间应比心跳大一个量级左右,我实现时取

总结下 MIT6.824 Lab2A Raft 选主的实验笔记。本文代码: MIT6.824/raft

Lab2A

Raft 将一致性问题分解成三个子问题:Leader 选举、日志复制、安全性保证,分别对应 Lab 的 2A, 2B, 2C,均可参考原论文图 2 中对 Raft 实现的简要总结。本小节实验目标:

  • 实现 Leader 选举:选出单个 leader 并保持领导地位,直到自己 crash
  • 实现心跳通信:实现 leader 与其他节点的无日志 AppendEntries RPC 调用

Leader 选举

Lab 限制 leader 每秒最多发送 10 次心跳请求,实现时取心跳间隔为 100ms。相应的,选举超时时间应比心跳大一个量级左右,我实现时取 400 + rand.Intn(4) * 100 ,即 400~800ms 内的随机值,尽可能避免选举 split vote 情况。

选举流程

参考上一篇文章: Leader 选举

Lab2A. Raft 选主实现

发起投票

定义 Raft 节点:

type Raft struct {
	mu        sync.Mutex          // 共享锁
	peers     []*labrpc.ClientEnd // 集群中的全部节点
	persister *Persister          // 持久化工具
	me        int                 // 本节点在 peers 中的索引

	curTerm  int           // 节点目前的任期号
	votedFor int           // 节点目前的投票对象
	entries  []LogEntry    // 本地日志
	state    PeerState     // 节点状态
	timer    *RaftTimer    // 选举超时定时器
	entryCh  chan LogEntry // 日志处理 channel
}

每个节点在 Make 初始化时都选择时长随机的 RaftTimer,之后启动新的 goroutine 监听 timer 超时和 entryCh 心跳请求,当 RaftTimer 超时后,变身为候选人发起投票。

代码实现:

// 投票参数
type RequestVoteArgs struct {
	Term        int // 候选人的任期号
	CandidateId int // 候选人 id
}

// 响应投票
type RequestVoteReply struct {
	Term        int  // 选民节点的任期号
	VoteGranted bool // 是否赢得该选票
}

// 候选人发起投票
func (rf *Raft) vote() {
	rf.curTerm++
	rf.state = Candidate
	rf.votedFor = rf.me

	args := RequestVoteArgs{
		Term:        rf.curTerm,
		CandidateId: rf.me,
	}
	replyCh := make(chan RequestVoteReply, len(rf.peers))
	var wg sync.WaitGroup
	for i := range rf.peers {
		if i == rf.me {
			continue
		}

		wg.Add(1)
		go func(server int) {
			defer wg.Done()
			var reply RequestVoteReply
			if succ := rf.sendRequestVote(server, &args, &reply); !succ {
				return
			}
			replyCh <- reply
		}(i)
	}
	go func() {
		wg.Wait()
		close(replyCh) // 避免资源泄漏
	}()

	votes := 1
	targetVotes := len(rf.peers)/2 + 1
	for reply := range replyCh {
		// 已有更新 leader,回退到 follower
		if reply.Term > rf.curTerm {
			rf.back2Follower(reply.Term)
			return
		}
		if reply.VoteGranted {
			votes++
		}

		// 如果选票已过半,不再等待已 crash 的节点调用超时
		if votes >= targetVotes {
			break
		}
	}

	// 因 split vote 等原因未达到多数票
	if votes < len(rf.peers)/2+1 {
		rf.resetElectTimer()
		return
	}

	// 成功当选,立刻发送心跳
	rf.state = Leader
	go rf.heartbeat()
}

注意减少选举耗时:候选人收集选票过程中,实时计票过半后即可结束选举,而非等待所有请求都返回了才去计票。假设有的节点已 crash,那 RPC 调用将超时返回 false,超时时间为 100ms,若不立即结束选举,候选人将白白浪费 100ms 时间,也就无法及时选出 leader

响应投票

Raft 对投票节点提出了三点要求:

  • 每轮能投几张:一个任期内,一个节点只能投一张票
  • 是否要投:候选人的日志至少要和自己的一样新,才投票
  • 投给谁:first-come-first-served,投给第一个符合条件的候选人

代码实现(干净整洁的代码):

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
	reply.Voter = rf.me
	reply.Term = rf.curTerm

	switch {
	case args.Term < rf.curTerm: // 拒绝处理
		reply.VoteGranted = false
		return
	case args.Term == rf.curTerm: // 每个任期只能投一票
		if rf.votedFor == VOTE_NIL || rf.votedFor == args.CandidateId {
			reply.VoteGranted = true
			rf.votedFor = args.CandidateId
			rf.back2Follower(args.Term)
		}
	case args.Term > rf.curTerm: // 直接投票
		reply.VoteGranted = true
		rf.votedFor = args.CandidateId
		rf.back2Follower(args.Term)
	}

	return
}

比较候选人与自己的日志将在 2B 中实现。

心跳通信

Raft 将客户端的命令封装为 log entry:

type LogEntry struct {
	Index   int         // 日志索引号
	Term    int         // 写入日志时节点的任期号
	Command interface{} // 客户端命令
}

心跳请求

当候选人成功竞选为 leader 后要 立刻 给集群中其他节点发送心跳,避免有的节点也超时发起新一轮选举。

代码实现:

// 心跳请求
type AppendEntriesArgs struct {
	Term         int        // leader 任期号
	LeaderId     int        // leader id
	PrevLogIndex int        // 暂时不用
	PrevLogTerm  int        //
	Entries      []LogEntry // 批量日志,心跳时为空
}

// 心跳响应
type AppendEntriesReply struct {
	Term int  // 节点任期号
	Succ bool // 心跳是否成功响应
}

// leader 发送心跳
func (rf *Raft) heartbeat() {
	t := time.NewTicker(HEARTBEAT_INTERVAL) // 100ms
	for {
		if !rf.isLeader() {
			return
		}

		args := AppendEntriesArgs{
			Term:         rf.curTerm,
			LeaderId:     rf.me,
			PrevLogIndex: 0,
			PrevLogTerm:  0,
			Entries:      nil, // 心跳时为空日志
		}
		replyCh := make(chan AppendEntriesReply, len(rf.peers))
		var wg sync.WaitGroup
		for i := range rf.peers {
			if i == rf.me {
				continue
			}
			wg.Add(1)

			go func(server int) {
				defer wg.Done()
				var reply AppendEntriesReply
				if succ := rf.sendAppendEntries(server, &args, &reply); !succ {
					return
				}
				replyCh <- reply
			}(i)
		}
		wg.Wait()
		close(replyCh)

		var lived int
		for reply := range replyCh {
			if reply.Term > rf.curTerm {
				// 发现新 leader,如网络分区恢复
				rf.back2Follower(reply.Term)
				return
			}
			lived++
		}

		// 未收到来自大多数节点的心跳,重新开始选举
		if lived < len(rf.peers)/2+1 {
			rf.vote() // 重新开始投票
			return
		}

		<-t.C
	}
}

响应心跳

对于心跳请求,节点需对比任期号,并进行日志的一致性检查:

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
	if len(args.Entries) > 0 {
		log.Fatal("invalid entry in 2A")
	}

	reply.Term = rf.curTerm
	if rf.curTerm > args.Term {
		reply.Succ = false
		return
	}

	// 检查双方日志的一致性
	if i := len(rf.entries) - 1; i >= 0 {
		switch {
		case i < args.PrevLogIndex: // 本地少日志,让 leader nextIndex[i]-- 后再同步
			reply.Succ = false
			return
		case i == args.PrevLogIndex:
			if rf.entries[i].Term != args.PrevLogTerm { // term 不匹配
				reply.Succ = false
				return
			}
		case i > args.PrevLogIndex: // 强制删除
			rf.entries = rf.entries[args.PrevLogIndex:]
		}
	}
	rf.entries = append(rf.entries, args.Entries...)
	rf.entryCh <- LogEntry{Term: args.Term}

	reply.Succ = true
	return
}

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

性能之巅

性能之巅

Brendan Gregg / 徐章宁、吴寒思、陈磊 / 电子工业出版社 / 2015-8-15 / 128

《性能之巅:洞悉系统、企业与云计算》基于Linux 和Solaris 系统阐述了适用于所有系统的性能理论和方法,Brendan Gregg 将业界普遍承认的性能方法、工具和指标收集于本书之中。阅读本书,你能洞悉系统运作的方式,学习到分析和提高系统与应用程序性能的方法,这些性能方法同样适用于大型企业与云计算这类最为复杂的环境的性能分析与调优。一起来看看 《性能之巅》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试