内容简介:go-raft实现
goraft是Raft协议的Golang版本的实现,项目地址为: https://github.com/goraft/raft 。整个代码质量较高,值得仔细品味。因此,整理了该博文探究下其内部实现。
数据结构
goraft主要抽象了server、peer和log三个结构,分别代表服务节点、Follower节点和日志。
server
Raft作为一种多节点状态一致性维护协议,运行过程中必然涉及到多个物理节点,server就是用来抽象其中的每个节点,维护节点的状态信息。其结构如下:
type server struct {
*eventDispatcher
name string
path string
state string
transporter Transporter
context interface{}
currentTerm uint64
votedFor string
log *Log
leader string
peers map[string]*Peer
mutex sync.RWMutex
syncedPeer map[string]bool
stopped chan bool
c chan *ev
electionTimeout time.Duration
heartbeatInterval time.Duration
snapshot *Snapshot
// PendingSnapshot is an unfinished snapshot.
// After the pendingSnapshot is saved to disk,
// it will be set to snapshot and also will be
// set to nil.
pendingSnapshot *Snapshot
stateMachine StateMachine
maxLogEntriesPerRequest uint64
connectionString string
routineGroup sync.WaitGroup
}
- state:每个节点总是处于以下状态的一种:follower、candidate、leader
- currentTerm:Raft协议关键概念,每个term内都会产生一个新的leader
- peers:raft中每个节点需要了解其他节点信息,尤其是leader节点
- syncedPeer:对于leader来说,该成员记录了日志已经被sync到了哪些follower
- c:当前节点的命令通道,所有的命令都通过该channel来传递
- pendingSnapshot:暂时未知
peer
peer描述的是集群中其他节点的信息,结构如下:
// A peer is a reference to another server involved in the consensus protocol.
type Peer struct {
server *server
Name string `json:"name"`
ConnectionString string `json:"connectionString"`
prevLogIndex uint64
stopChan chan bool
heartbeatInterval time.Duration
lastActivity time.Time
sync.RWMutex
}
- server:peer中的某些方法会依赖server的状态,如peer内的appendEntries方法需要获取server的currentTerm
- Name:peer的名称
- ConnectionString:peer的ip地址,形式为”ip:port”
- prevLogIndex:这个很关键,记录了该peer的当前日志index,接下来leader将该index之后的日志继续发往该peer
- lastActivity:记录peer的上次活跃时间
log
log是Raft协议的核心,Raft使用日志来存储客户发起的命令,并通过日志内容的同步来维护多节点上状态的一致性。
// A log is a collection of log entries that are persisted to durable storage.
type Log struct {
ApplyFunc func(*LogEntry, Command) (interface{}, error)
file *os.File
path string
entries []*LogEntry
commitIndex uint64
mutex sync.RWMutex
startIndex uint64 // the index before the first entry in the Log entries
startTerm uint64
initialized bool
}
- ApplyFunc:日志被应用至状态机的方法,这个应该由使用raft的客户决定
- file:日志文件句柄
- path:日志文件路径
- entries:内存日志项缓存
- commitIndex:日志提交点,小于该提交点的日志均已经被应用至状态机
- startIndex/startTerm:日志中起始日志项的index和term
log entry
log entry是客户发起的command存储在日志文件中的内容
type LogEntry struct {
Index *uint64 `protobuf:"varint,1,req" json:"Index,omitempty"`
Term *uint64 `protobuf:"varint,2,req" json:"Term,omitempty"`
CommandName *string `protobuf:"bytes,3,req" json:"CommandName,omitempty"`
Command []byte `protobuf:"bytes,4,opt" json:"Command,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
// A log entry stores a single item in the log.
type LogEntry struct {
pb *protobuf.LogEntry
Position int64 // position in the log file
log *Log
event *ev
}
- LogEntry是日志项在内存中的描述结构,其最终存储在日志文件是经过protocol buffer编码以后的信息
- Position代表日志项存储在日志文件内的偏移
- 编码后的日志项包含Index、Term,原始Command的名称以及Command具体内容
关键流程
客户端请求
客户端使用go-raft的时候,先初始化环境,这里不仔细描述,接下来看客户如何发起一个请求:
command := &raft.DefaultJoinCommand{}
if _, err := s.raftServer.Do(command); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
客户命令执行的入口是Do:
func (s *server) Do(command Command) (interface{}, error) {
return s.send(command)
}
// Sends an event to the event loop to be processed. The function will wait until the event is actually processed before returning.
func (s *server) send(value interface{}) (interface{}, error) {
if !s.Running() {
return nil, StopError
}
event := &ev{target: value, c: make(chan error, 1)}
select {
case s.c <- event:
case <-s.stopped:
return nil, StopError
}
select {
case <-s.stopped:
return nil, StopError
case err := <-event.c:
return event.returnValue, err
}
}
send的处理流程很简单,首先将命令写入到server的命令channel,然后等待命令处理完成。
而server作为leader启动完成时会进入一个leaderLoop来处理所有用户的命令:
func (s *server) leaderLoop() {
logIndex, _ := s.log.lastInfo()
......
// Begin to collect response from followers
for s.State() == Leader {
select {
case <-s.stopped:
......
case e := <-s.c:
switch req := e.target.(type) {
// 代表客户端命令
case Command:
s.processCommand(req, e)
continue
......
}
}
}
processCommand处理如下:
// Processes a command.
func (s *server) processCommand(command Command, e *ev) {
s.debugln("server.command.process")
// Create an entry for the command in the log.
entry, err := s.log.createEntry(s.currentTerm, command, e)
if err != nil {
s.debugln("server.command.log.entry.error:", err)
e.c <- err
return
}
if err := s.log.appendEntry(entry); err != nil {
s.debugln("server.command.log.error:", err)
e.c <- err
return
}
s.syncedPeer[s.Name()] = true
if len(s.peers) == 0 {
commitIndex := s.log.currentIndex()
s.log.setCommitIndex(commitIndex)
s.debugln("commit index ", commitIndex)
}
}
这里的逻辑比较简单,创建日志项并将日志项append至日志文件,如果过程中由任何错误,就将这个错误写入e.c:e.c <- err,这样等待在该channel的客户端就会收到通知,立即返回。
如果没有错误,这时候客户端还是处于等待状态的,这是因为虽然该Command被leader节点成功处理了,但是该Command的日志还没有被同步至大多数Follow节点,因此该Command也就无法被提交,所以发起该Command的客户端依然等在那,Command被提交,这在后面的日志同步过程中会有所体现。
日志同步
go-raft的leader向Follower同步日志是在heartbeat中完成的:
// Listens to the heartbeat timeout and flushes an AppendEntries RPC.
func (p *Peer) heartbeat(c chan bool) {
stopChan := p.stopChan
c <- true
ticker := time.Tick(p.heartbeatInterval)
for {
select {
case flush := <-stopChan:
if flush {
// before we can safely remove a node
// we must flush the remove command to the node first
p.flush()
return
} else {
return
}
case <-ticker:
start := time.Now()
p.flush()
duration := time.Now().Sub(start)
p.server.DispatchEvent(newEvent(HeartbeatEventType, duration, nil))
}
}
}
func (p *Peer) flush() {
debugln("peer.heartbeat.flush: ", p.Name)
prevLogIndex := p.getPrevLogIndex()
term := p.server.currentTerm
entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)
if entries != nil {
p.sendAppendEntriesRequest(newAppendEntriesRequest(term, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries))
} else {
p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.snapshot))
}
}
核心的逻辑是将leader上的日志通过构造一个AppendEntriesRequest发送给从节点,当然只同步那些Follower上还没有的日志,即prevLogIndex以后的log entry。
// Sends an AppendEntries request to the peer through the transport.
func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
resp := p.server.Transporter().SendAppendEntriesRequest(p.server, p, req)
if resp == nil {
p.server.DispatchEvent(newEvent(HeartbeatIntervalEventType, p, nil))
return
}
p.setLastActivity(time.Now())
// If successful then update the previous log index.
p.Lock()
if resp.Success() {
......
}
......
resp.peer = p.Name
// Send response to server for processing.
p.server.sendAsync(resp)
}
这里会将Follower的心跳的响应继续发送给server。server会在leaderLoop中处理该类消息:
func (s *server) leaderLoop() {
logIndex, _ := s.log.lastInfo()
......
// Begin to collect response from followers
for s.State() == Leader {
select {
case e := <-s.c:
switch req := e.target.(type) {
case Command:
s.processCommand(req, e)
continue
case *AppendEntriesRequest:
e.returnValue, _ = s.processAppendEntriesRequest(req)
case *AppendEntriesResponse:
s.processAppendEntriesResponse(req)
case *RequestVoteRequest:
e.returnValue, _ = s.processRequestVoteRequest(req)
}
// Callback to event.
e.c <- err
}
}
s.syncedPeer = nil
}
处理Follower的响应在函数processAppendEntriesResponse中:
func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
// If we find a higher term then change to a follower and exit.
if resp.Term() > s.Term() {
s.updateCurrentTerm(resp.Term(), "")
return
}
// panic response if it's not successful.
if !resp.Success() {
return
}
// if one peer successfully append a log from the leader term,
// we add it to the synced list
if resp.append == true {
s.syncedPeer[resp.peer] = true
}
if len(s.syncedPeer) < s.QuorumSize() {
return
}
// Determine the committed index that a majority has.
var indices []uint64
indices = append(indices, s.log.currentIndex())
for _, peer := range s.peers {
indices = append(indices, peer.getPrevLogIndex())
}
sort.Sort(sort.Reverse(uint64Slice(indices)))
commitIndex := indices[s.QuorumSize()-1]
committedIndex := s.log.commitIndex
if commitIndex > committedIndex {
s.log.sync()
s.log.setCommitIndex(commitIndex)
}
}
这里会判断如果多数的Follower都已经同步日志了,那么就可以检查所有的Follower此时的日志点,并根据log index排序,leader会算出这些Follower的提交点,然后提交,调用setCommitIndex。
// Updates the commit index and writes entries after that index to the stable storage.
func (l *Log) setCommitIndex(index uint64) error {
l.mutex.Lock()
defer l.mutex.Unlock()
// this is not error any more after limited the number of sending entries
// commit up to what we already have
if index > l.startIndex+uint64(len(l.entries)) {
index = l.startIndex + uint64(len(l.entries))
}
if index < l.commitIndex {
return nil
}
for i := l.commitIndex + 1; i <= index; i++ {
entryIndex := i - 1 - l.startIndex
entry := l.entries[entryIndex]
l.commitIndex = entry.Index()
// Decode the command.
command, err := newCommand(entry.CommandName(), entry.Command())
if err != nil {
return err
}
returnValue, err := l.ApplyFunc(entry, command)
if entry.event != nil {
entry.event.returnValue = returnValue
entry.event.c <- err
}
_, isJoinCommand := command.(JoinCommand)
if isJoinCommand {
return nil
}
}
return nil
}
这里的提交主要是设置好commitIndex,并且将日志项中的Command应用到状态机。最后,判断这个LogEntry是不是由客户直接发起的,如果是,那么还需要将状态机的处理结果通过event.c返回给客户端,这样,客户端就可以返回了,请回顾上面的客户端请求。
选主
在Raft协议运行过程中,Leader节点会周期性的给Follower发送心跳,心跳的作用有二:一方面,Follower通过心跳确认Leader此时还是活着的;第二,Leader通过心跳将自身的日志同步发送给Follower。
但是,如果Follower在超过一定时间后没有收到Leader的心跳信息,就认定Leader可能离线,于是,该Follower就会变成Candidate,发起一次选主,通知其他节点开始为我投票。
func (s *server) followerLoop() {
since := time.Now()
electionTimeout := s.ElectionTimeout()
timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
for s.State() == Follower {
var err error
update := false
select {
......
// 超过一定时间未收到请求
case <-timeoutChan:
if s.promotable() {
// 状态变为Candidate
s.setState(Candidate)
} else {
update = true
}
}
}
......
}
// The main event loop for the server
func (s *server) loop() {
defer s.debugln("server.loop.end")
state := s.State()
for state != Stopped {
switch state {
case Follower:
s.followerLoop()
// 状态变为Candidate后,进入candidateLoop
case Candidate:
s.candidateLoop()
case Leader:
s.leaderLoop()
case Snapshotting:
s.snapshotLoop()
}
state = s.State()
}
}
当节点状态由Follower变为Candidate后,就会进入candidateLoop来触发一次选主过程。
func (s *server) candidateLoop() {
for s.State() == Candidate {
if doVote {
s.currentTerm++
s.votedFor = s.name
// 向所有其他节点发起Vote请求
respChan = make(chan *RequestVoteResponse, len(s.peers))
for _, peer := range s.peers {
s.routineGroup.Add(1)
go func(peer *Peer) {
defer s.routineGroup.Done()
peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)
}(peer)
}
// 自己给自己投一票
votesGranted = 1
timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
doVote = false
}
// 如果多数节点同意我作为Leader,设置新状态
if votesGranted == s.QuorumSize() {
s.setState(Leader)
return
}
// 等待其他节点的选主请求的响应
select {
case <-s.stopped:
s.setState(Stopped)
return
case resp := <-respChan:
if success := s.processVoteResponse(resp); success {
votesGranted++
}
......
case <-timeoutChan:
// 如果再一次超时了,重新发起选主请求
doVote = true
}
}
}
别看上面的代码很多,但是其中逻辑非常清楚。就不作过多说明了。
上面描述了一个Follower节点变为Candidate后,如何发起一次选主,接下来看看一个节点在收到其他节点发起的选主请求后的处理,在函数processRequestVoteRequest():
// Processes a "request vote" request.
func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
if req.Term < s.Term() {
return newRequestVoteResponse(s.currentTerm, false), false
}
if req.Term > s.Term() {
s.updateCurrentTerm(req.Term, "")
} else if s.votedFor != "" && s.votedFor != req.CandidateName {
return newRequestVoteResponse(s.currentTerm, false), false
}
lastIndex, lastTerm := s.log.lastInfo()
if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm {
return newRequestVoteResponse(s.currentTerm, false), false
}
s.votedFor = req.CandidateName
return newRequestVoteResponse(s.currentTerm, true), true
}
接受一个远程节点的选主请求需要满足以下条件:
- 远程节点的term必须要大于等于当前节点的term;
- 远程节点的log必须比当前节点的更新;
- 当前节点的term和远程节点的选主请求的term如果一样且当前节点未给任何其他节点投出自己的选票。
整个流程其实也是蛮简单的。
节点变更
在Raft协议中,节点的变更也是作为一个客户的命令通过一致性协议统一管理:也就是说,节点变更命令被写入Leader的日志,然后再由Leader同步到Follower,最后如果多数Follower成功写入该日志,主节点提交该日志。
在Go-Raft中,存在两种节点变更命令:DefaultJoinCommand和DefaultLeaveCommand,对于这两种命令的处理关键在于这两个命令的Apply方法,如下:
func (c *DefaultJoinCommand) Apply(server Server) (interface{}, error) {
err := server.AddPeer(c.Name, c.ConnectionString)
return []byte("join"), err
}
func (c *DefaultLeaveCommand) Apply(server Server) (interface{}, error) {
err := server.RemovePeer(c.Name)
return []byte("leave"), err
}
增加节点最终的提交方法是AddPeer:
func (s *server) AddPeer(name string, connectiongString string) error {
if s.peers[name] != nil {
return nil
}
if s.name != name {
peer := newPeer(s, name, connectiongString, s.heartbeatInterval)
// 如果是主上新增一个peer,那还需要启动后台协程发送
if s.State() == Leader {
peer.startHeartbeat()
}
s.peers[peer.Name] = peer
s.DispatchEvent(newEvent(AddPeerEventType, name, nil))
}
// Write the configuration to file.
s.writeConf()
return nil
}
// Removes a peer from the server.
func (s *server) RemovePeer(name string) error {
// Skip the Peer if it has the same name as the Server
if name != s.Name() {
// Return error if peer doesn't exist.
peer := s.peers[name]
if peer == nil {
return fmt.Errorf("raft: Peer not found: %s", name)
}
// 如果是Leader,停止给移除节点的心跳协程
if s.State() == Leader {
s.routineGroup.Add(1)
go func() {
defer s.routineGroup.Done()
peer.stopHeartbeat(true)
}()
}
delete(s.peers, name)
s.DispatchEvent(newEvent(RemovePeerEventType, name, nil))
}
// Write the configuration to file.
s.writeConf()
return nil
}
Snapshot
根据Raft论文描述,随着系统运行,存储命令的日志文件会一直增长,为了避免这种情况,论文中引入了Snapshot。Snapshot的出发点很简单:淘汰掉那些无用的日志项,那么问题就来了:
- 哪些日志项是无用的,可以丢弃?
- 如何丢弃无用日志项?
接下来我们各个击破:
- 如果某个日志项中存储的用户命令(Command)已经被提交到状态机中,那么它就被视为无用的,可以被清理;
- 因为日志的提交是按照index顺序执行的,因此,只要知道当前副本的提交点(commit index),那么在此之前的所有日志项必然也已经被提交了,因此,这个提交点之前(包括该提交点)的日志都可以被删除。实现上,只要将提交点之后的日志写入新的日志文件,再删除老的日志文件,就大功告成了;
- 最后需要注意的一点是:在回收日志文件之前,必须要对当前的系统状态机进行保存,否则,状态机数据丢失以后,又删了日志,状态真的就无法恢复了。
goraft的Snapshot是由应用主动触发的,调用其内部函数TakeSnapshot:
func (s *server) TakeSnapshot() error {
......
lastIndex, lastTerm := s.log.commitInfo()
......
path := s.SnapshotPath(lastIndex, lastTerm)
s.pendingSnapshot = &Snapshot{lastIndex, lastTerm, nil, nil, path}
// 首先应用保存状态机当前状态
state, err := s.stateMachine.Save()
if err != nil {
return err
}
// 准备Snapshot状态:包括当前日志的index,当前peer等
peers := make([]*Peer, 0, len(s.peers)+1)
for _, peer := range s.peers {
peers = append(peers, peer.clone())
}
s.pendingSnapshot.Peers = peers
s.pendingSnapshot.State = state
s.saveSnapshot()
// 最后,回收日志项:s.log.compact()
if lastIndex-s.log.startIndex > NumberOfLogEntriesAfterSnapshot {
compactIndex := lastIndex - NumberOfLogEntriesAfterSnapshot
compactTerm := s.log.getEntry(compactIndex).Term()
s.log.compact(compactIndex, compactTerm)
}
return nil
}
关于compact()函数就不作仔细描述了,有兴趣的朋友可以自行阅读,非常简单的。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- php如何实现session,自己实现session,laravel如何实现session
- AOP如何实现及实现原理
- webpack 实现 HMR 及其实现原理
- Docker实现原理之 - OverlayFS实现原理
- 为什么实现 .NET 的 ICollection 集合时需要实现 SyncRoot 属性?如何正确实现这个属性?
- 自己实现集合框架(十):顺序栈的实现
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
The Black Box Society
Frank Pasquale / Harvard University Press / 2015-1-5 / USD 35.00
Every day, corporations are connecting the dots about our personal behavior—silently scrutinizing clues left behind by our work habits and Internet use. The data compiled and portraits created are inc......一起来看看 《The Black Box Society》 这本书的介绍吧!