Etcd Raft源码分析之二:选举流程

栏目: 后端 · 发布时间: 6年前

内容简介:这一节在没有特殊说明时,都是在raft/raft.go这个文件中(可以查看方法前的r *raft来知道当前是在raft.go中)。node的tick()方法调用raft/raft.go中raft结构体的tick()。在1.2节中,raft.becomeFollower()中设置了raft结构体的step函数和tick函数=tickElection。

1.6 node tick与raft的tickElection

这一节在没有特殊说明时,都是在raft/raft.go这个文件中(可以查看方法前的r *raft来知道当前是在raft.go中)。

node的tick()方法调用raft/raft.go中raft结构体的tick()。在1.2节中,

raft.becomeFollower()中设置了raft结构体的step函数和tick函数=tickElection。

在上一小节的最后,node.run()从n.tickc通道中获取到消息,调用raft.tick()方法,实际上调用了raft.tickElection()。

// tickElection is run by followers and candidates after r.electionTimeout.
func (r *raft) tickElection() {
    r.electionElapsed++

    if r.promotable() && r.pastElectionTimeout() {
        r.electionElapsed = 0
        r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
    }
}

当Follower或者Candidate超过选举时间后,会发送类型为MsgHup的一条消息给自己,接着调用r.campaign()方法。

如果消息类型不是MsgHup、MsgVote、MsgPreVote,则调用r.step(r, m)函数。

比如前面becomeFollower时设置了r.step=stepFollower,那么这里就会真正调用stepFollower()方法了。

在stepFollower()方法中,可以看到,它处理的消息类型并没有上面的MsgHup、MsgVote、MsgPreVote。

在最开始启动raft.Node时调用了becomeFollower,初始时,raft的Term为0,后来又被更新为1。

对于类型为MsgHup的pb.Message而言,它的Term初始时为0,所以会执行下面的m.Term==0分支,并接着执行pb.MsgHup分支。

注意:这里的第一个switch语句块的条件是先判断m.Term==0。除此之外,如果m.Term==r.Term,也会执行第二个switch语句块。

func (r *raft) Step(m pb.Message) error {
    // Handle the message term, which may result in our stepping down to a follower.
    switch {
    case m.Term == 0:
        // local message
    case m.Term > r.Term:
        ...
    case m.Term < r.Term:
        ...
        return nil
    }

    switch m.Type {
    case pb.MsgHup:
        if r.state != StateLeader {
            ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
            if r.preVote {
                r.campaign(campaignPreElection)
            } else {
                r.campaign(campaignElection)
            }
        } else {
            r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
        }

    case pb.MsgVote, pb.MsgPreVote:
        ...
    default:
        // 必须先调用r.step=xxx设置函数,然后才能调用下面的语句,真正执行函数。
        r.step(r, m)
    }
    return nil
}

复习下raft调用becomeFollower,raft的状态会更新为StateFollower(下面也列出了成为其他两种角色的代码):

  • 成为Follower,启动选举,定时器为tickElection,在electionTimeout超时后,成为Candidate
  • 成为Candidate,增加Term,投票给自己,定时器为tickElection
  • 成为Leader,定时器为tickHeartbeat,定时发送心跳给Follower
// raft/raft.go
func (r *raft) becomeFollower(term uint64, lead uint64) {
    r.step = stepFollower
    r.reset(term)
    r.tick = r.tickElection
    r.lead = lead
    r.state = StateFollower
    r.logger.Infof("%x became follower at term %d", r.id, r.Term)
}

func (r *raft) becomeCandidate() {
    r.step = stepCandidate
    // 成为候选人时,Term加1
    r.reset(r.Term + 1)
    r.tick = r.tickElection
    // 投票给自己
    r.Vote = r.id
    r.state = StateCandidate
    r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
}

func (r *raft) becomeLeader() {
    r.step = stepLeader
    r.reset(r.Term)
    r.tick = r.tickHeartbeat
    r.lead = r.id
    r.state = StateLeader
    r.pendingConfIndex = r.raftLog.lastIndex()
    r.appendEntry(pb.Entry{Data: nil})
    r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}

1.7 竞选Leader

Follower在electionTimeout超时后,会竞选成为Candidate。这里我们暂时不考虑两阶段:

  • 调用becomeCandidate,设置投票消息类型为MsgVote
  • 如果获取到大多数选票,则调用becomeLeader()
  • 向每个节点发送voteMsg

这里Follower第一次执行campaign时,步骤2获取到的选票肯定不满足大多数,所以会向其他节点发送MsgVote消息。

func (r *raft) campaign(t CampaignType) {
    var term uint64
    var voteMsg pb.MessageType
    if t == campaignPreElection {
        r.becomePreCandidate()
        voteMsg = pb.MsgPreVote
        // PreVote RPCs are sent for the next term before we've incremented r.Term.
        term = r.Term + 1
    } else {
        r.becomeCandidate()
        voteMsg = pb.MsgVote
        term = r.Term
    }
    if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
        // We won the election after voting for ourselves (which must mean that this is a single-node cluster). Advance to the next state.
        if t == campaignPreElection {
            r.campaign(campaignElection)
        } else {
            r.becomeLeader()
        }
        return
    }
    for id := range r.prs {
        // 如果是自己,不需要发送
        if id == r.id continue
        var ctx []byte
        if t == campaignTransfer ctx = []byte(t)
        r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
    }
}

候选人发送VoteMsg给其他节点(Follower)后,当收到大多数Follower节点的投票结果后,候选人会成为Leader。

下面我们分析Follower节点收到候选人发送的MsgVote请求是如何处理的,这涉及到RPC调用,在Etcd中是rafthttp。

1.8 Raft HTTP

每个EtcdServer都有一个HTTP服务端,用来接收其他节点发送的消息,以及返回响应结果给发送者:

//etcdserver/api/rafthttp/http.go
func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    var m raftpb.Message
    if err := h.r.Process(context.TODO(), m); err != nil {
        switch v := err.(type) {
        case writerToResponse:
            v.WriteTo(w)
        }
        return
    }
}

EtcdServer 实现了etcdserver/api/rafthttp/transport.go的 Raft 接口

//etcdserver/api/rafthttp/transport.go
type Raft interface {
    Process(ctx context.Context, m raftpb.Message) error
    IsIDRemoved(id uint64) bool
    ReportUnreachable(id uint64)
    ReportSnapshot(id uint64, status raft.SnapshotStatus)
}

Q:s.r返回的是etcdserver/raft.go的raftNode,它是一个结构体。而Step方法定义在raft/Node.go接口中。

那么问题是:raftNode结构体的Step方法,是怎么调用到Node接口的Step方法?

A:先看下node.go下的Node接口和node结构体。node结构体实现了Node接口的所有方法,所以可以把node看做是是Node接口的实现类。

虽然raftNode结构体中没有定义raft.Node接口,但是它的raftNodeConfig属性定义了!这种语法叫做struct内嵌/嵌套(embedded)interface。

EtcdServer的Process方法调用raft/node.go中Node接口(其实现类是这个文件下的node结构体)的Step方法:

// etcdserver/server.go
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
    return s.r.Step(ctx, m)
}

// raft/node.go
func (n *node) Step(ctx context.Context, m pb.Message) error {
    // ignore unexpected local messages receiving over network
    if IsLocalMsg(m.Type) {
        return nil
    }
    return n.step(ctx, m)
}
func (n *node) step(ctx context.Context, m pb.Message) error {
    return n.stepWithWaitOption(ctx, m, false)
}
// Step advances the state machine using msgs. The ctx.Err() will be returned, if any.
func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
    if m.Type != pb.MsgProp {
        select {
        case n.recvc <- m:
            return nil
        case <-ctx.Done():
            return ctx.Err()
        case <-n.done:
            return ErrStopped
        }
    }
    ...
    return nil
}

候选人发送请求给Follower节点,当Follower节点收到请求时,会将消息发送到node的recvc通道中。

注意:Etcd分布式集群中的所有节点都会启动raftNode、raft.Node,也都会运行node.run()方法。

// raft/node.go
func (n *node) run(r *raft) {
    ...
    for {
        select {
            ...
        case <-n.tickc:
            r.tick()
        case m := <-n.recvc:
            // filter out response message from unknown From.
            if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
                r.Step(m)
            }
        }
    }
}

回顾前面Candidate从n.tickc中获取到定时器(ElectionTimeout)的超时消息,通过tickElection() 调用到raft.go的 Step() 方法,然后参与竞选(campaign)并发送VoteMsg给Follower节点。

这里Follower节点收到VoteMsg请求,从n.recvc中获取到消息,也会调用 r.Step(m) 方法。

两个节点的角色不一样,但是都会调用相同的raft.Step()方法,当然两者的处理逻辑不一样。

1.9 Follower收到MsgVote,投票

Follower收到消息m的Term=1,它自己raft.Term=0,步骤如下:

  • m.Term > r.Term,调用becomeFollower()
  • 返回MsgVoteResp给候选人。
// raft/raft.go
func (r *raft) Step(m pb.Message) error {
    // Handle the message term, which may result in our stepping down to a follower.
    switch {
    case m.Term == 0:
        // local message
    case m.Term > r.Term:
        switch {
        default:
            if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
                r.becomeFollower(m.Term, m.From)
            } else {
                // Follower收到Candidate的MsgVote请求,自己成为Follower(更改状态)
                // 同样,这里只是设置Raft的step函数和tick函数,还没有真正执行成为Follower的逻辑
                r.becomeFollower(m.Term, None)  
            }
        }
    case m.Term < r.Term:
        ...
        return nil
    }

    switch m.Type {
    case pb.MsgHup:
        ...
    case pb.MsgVote, pb.MsgPreVote:
        canVote := r.Vote == m.From ||                              // We can vote if this is a repeat of a vote we've already cast...
            (r.Vote == None && r.lead == None) ||                   // ...we haven't voted and we don't think there's a leader yet in this term...
            (m.Type == pb.MsgPreVote && m.Term > r.Term)            // ...or this is a PreVote for a future term...
        if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {    // ...and we believe the candidate is up to date.
            r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)}) // 返回的消息类型为VoteResp
            if m.Type == pb.MsgVote {
                r.electionElapsed = 0  // Only record real votes. 重置选举计数器
                r.Vote = m.From // 投票给发送这条消息的节点,即候选人
            }
        } else {
            r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true}) // 拒绝投票
        }
    default:
        err := r.step(r, m)
        if err != nil {
            return err
        }
    }
    return nil
}

当Follower返回MsgVote的响应结果MsgVoteResp给Candidate,Candidate的处理流程与Follower收到消息的类似,也会调用r.Step()方法。

1.10 Candidate成为Leader

由于Follower返回消息的Term等于Candidate发送消息的Term,所以直接走第二个switch条件的default分支:

// raft/raft.go
func (r *raft) Step(m pb.Message) error {
    // Handle the message term, which may result in our stepping down to a follower.
    switch {
    case m.Term == 0:
        // local message
    case m.Term > r.Term:
        ...
    case m.Term < r.Term:
        ...
        return nil
    }

    switch m.Type {
    case pb.MsgHup:
        ...
    case pb.MsgVote, pb.MsgPreVote:
        ...
    default:
        err := r.step(r, m) // 调用Raft的stepFunc,即1.7节通过becomeCandidate设置的stepCandidate
        if err != nil {
            return err
        }
    }
    return nil
}

前面我们看到在Step(m)方法中都没有走到default分支,这里开始调用r.step(r,m)函数,对应的是stepCandidate:

// stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
// whether they respond to MsgVoteResp or MsgPreVoteResp.
func stepCandidate(r *raft, m pb.Message) error {
    // Only handle vote responses corresponding to our candidacy (while in
    // StateCandidate, we may get stale MsgPreVoteResp messages in this term from our pre-candidate state).
    var myVoteRespType pb.MessageType
    if r.state == StatePreCandidate {
        myVoteRespType = pb.MsgPreVoteResp
    } else {
        myVoteRespType = pb.MsgVoteResp
    }
    switch m.Type {
    case pb.MsgProp:
        r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
        return ErrProposalDropped
    case pb.MsgApp:
        r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
        r.handleAppendEntries(m)
    case pb.MsgHeartbeat:
        r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
        r.handleHeartbeat(m)
    case pb.MsgSnap:
        r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
        r.handleSnapshot(m)
    case myVoteRespType:  // 候选人收到Follower的VoteMsg消息,判断投票个数
        gr := r.poll(m.From, m.Type, !m.Reject)
        r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.quorum(), gr, m.Type, len(r.votes)-gr)
        switch r.quorum() {
        case gr:
            if r.state == StatePreCandidate {
                r.campaign(campaignElection)
            } else {
                r.becomeLeader() // 满足投票个数,成为Leader
                r.bcastAppend() // 向其他Follower节点发送MsgApp请求
            }
        case len(r.votes) - gr:
            // pb.MsgPreVoteResp contains future term of pre-candidate m.Term > r.Term; reuse r.Term
            r.becomeFollower(r.Term, None)
        }
    case pb.MsgTimeoutNow:
        r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
    }
    return nil
}

总结下每个节点在收到HTTP请求时,都会调用raft.Step(m)方法,如果消息的类型不是MsgHup、MsgVote、MsgPreVote,则调用r.step(r,m)方法。

Raft的stepFunc在发送/收到HTTP请求之前一定会被设置,设置的地方在调用becomeFollower、becomeCandidate、becomeLeader这三个方法中。

A:以Follower1转为Candidate为例:

  1. 调用becomeFollower,设置r.step=stepFollower
  2. ticker的electionTimeout超时,调用becomeCandidate,设置r.step=stepCandidate
  3. 调用campaign,发送VoteMsg给其他所有节点

B:以Follower2为例:

  1. 调用becomeFollower,设置r.step=stepFollower
  2. 收到候选节点的VoteMsg请求,投票给候选节点,返回VoteMsgResp

C:以Candidate转为Leader为例:

  1. 收到Follower发送的VoteMsgResp,调用r.step函数,即A:2中的stepCandidate
  2. 判断投票个数,如果得到大多数的选票,调用becomeLeader,设置r.step=stepLeader
  3. 向其他Follower节点发送MsgApp请求

D:以Follower2为例:

  1. 收到Leader的MsgApp请求,调用r.step函数,即B:1的stepFollower
  2. 处理AppendEntries请求,返回MsgAppResp请求给Leader

下面是Candidate发送MsgVote给Follower,Follower返回MsgVoteResp给Candidate的流程:

Etcd Raft源码分析之二:选举流程


以上所述就是小编给大家介绍的《Etcd Raft源码分析之二:选举流程》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

复盘

复盘

陈中 / 机械工业出版社 / 2013-7-23 / 29

复盘是围棋中的一种学习方法,指的是在写完一盘棋之后,要重新摆一遍,看看哪里下得好,哪里下得不好,对下得好和不好的,都要进行分析和推演。 柳传志第一个将复盘引入到做事之中,成为联想三大方法论之一,在联想每一个重大决策的背后,都有复盘的身影。 本书完整系统讲述了复盘的内容,清晰了复盘的价值,给出了复盘的操作步骤,我们可以在自己的工作生活中,应用复盘的方法,向自己学习,随时随地的提高自己,把......一起来看看 《复盘》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

在线进制转换器
在线进制转换器

各进制数互转换器

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试