Etcd Raft源码分析之二:选举流程

栏目: 后端 · 发布时间: 6年前

内容简介:这一节在没有特殊说明时,都是在raft/raft.go这个文件中(可以查看方法前的r *raft来知道当前是在raft.go中)。node的tick()方法调用raft/raft.go中raft结构体的tick()。在1.2节中,raft.becomeFollower()中设置了raft结构体的step函数和tick函数=tickElection。

1.6 node tick与raft的tickElection

这一节在没有特殊说明时,都是在raft/raft.go这个文件中(可以查看方法前的r *raft来知道当前是在raft.go中)。

node的tick()方法调用raft/raft.go中raft结构体的tick()。在1.2节中,

raft.becomeFollower()中设置了raft结构体的step函数和tick函数=tickElection。

在上一小节的最后,node.run()从n.tickc通道中获取到消息,调用raft.tick()方法,实际上调用了raft.tickElection()。

// tickElection is run by followers and candidates after r.electionTimeout.
func (r *raft) tickElection() {
    r.electionElapsed++

    if r.promotable() && r.pastElectionTimeout() {
        r.electionElapsed = 0
        r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
    }
}

当Follower或者Candidate超过选举时间后,会发送类型为MsgHup的一条消息给自己,接着调用r.campaign()方法。

如果消息类型不是MsgHup、MsgVote、MsgPreVote,则调用r.step(r, m)函数。

比如前面becomeFollower时设置了r.step=stepFollower,那么这里就会真正调用stepFollower()方法了。

在stepFollower()方法中,可以看到,它处理的消息类型并没有上面的MsgHup、MsgVote、MsgPreVote。

在最开始启动raft.Node时调用了becomeFollower,初始时,raft的Term为0,后来又被更新为1。

对于类型为MsgHup的pb.Message而言,它的Term初始时为0,所以会执行下面的m.Term==0分支,并接着执行pb.MsgHup分支。

注意:这里的第一个switch语句块的条件是先判断m.Term==0。除此之外,如果m.Term==r.Term,也会执行第二个switch语句块。

func (r *raft) Step(m pb.Message) error {
    // Handle the message term, which may result in our stepping down to a follower.
    switch {
    case m.Term == 0:
        // local message
    case m.Term > r.Term:
        ...
    case m.Term < r.Term:
        ...
        return nil
    }

    switch m.Type {
    case pb.MsgHup:
        if r.state != StateLeader {
            ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
            if r.preVote {
                r.campaign(campaignPreElection)
            } else {
                r.campaign(campaignElection)
            }
        } else {
            r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
        }

    case pb.MsgVote, pb.MsgPreVote:
        ...
    default:
        // 必须先调用r.step=xxx设置函数,然后才能调用下面的语句,真正执行函数。
        r.step(r, m)
    }
    return nil
}

复习下raft调用becomeFollower,raft的状态会更新为StateFollower(下面也列出了成为其他两种角色的代码):

  • 成为Follower,启动选举,定时器为tickElection,在electionTimeout超时后,成为Candidate
  • 成为Candidate,增加Term,投票给自己,定时器为tickElection
  • 成为Leader,定时器为tickHeartbeat,定时发送心跳给Follower
// raft/raft.go
func (r *raft) becomeFollower(term uint64, lead uint64) {
    r.step = stepFollower
    r.reset(term)
    r.tick = r.tickElection
    r.lead = lead
    r.state = StateFollower
    r.logger.Infof("%x became follower at term %d", r.id, r.Term)
}

func (r *raft) becomeCandidate() {
    r.step = stepCandidate
    // 成为候选人时,Term加1
    r.reset(r.Term + 1)
    r.tick = r.tickElection
    // 投票给自己
    r.Vote = r.id
    r.state = StateCandidate
    r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
}

func (r *raft) becomeLeader() {
    r.step = stepLeader
    r.reset(r.Term)
    r.tick = r.tickHeartbeat
    r.lead = r.id
    r.state = StateLeader
    r.pendingConfIndex = r.raftLog.lastIndex()
    r.appendEntry(pb.Entry{Data: nil})
    r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}

1.7 竞选Leader

Follower在electionTimeout超时后,会竞选成为Candidate。这里我们暂时不考虑两阶段:

  • 调用becomeCandidate,设置投票消息类型为MsgVote
  • 如果获取到大多数选票,则调用becomeLeader()
  • 向每个节点发送voteMsg

这里Follower第一次执行campaign时,步骤2获取到的选票肯定不满足大多数,所以会向其他节点发送MsgVote消息。

func (r *raft) campaign(t CampaignType) {
    var term uint64
    var voteMsg pb.MessageType
    if t == campaignPreElection {
        r.becomePreCandidate()
        voteMsg = pb.MsgPreVote
        // PreVote RPCs are sent for the next term before we've incremented r.Term.
        term = r.Term + 1
    } else {
        r.becomeCandidate()
        voteMsg = pb.MsgVote
        term = r.Term
    }
    if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
        // We won the election after voting for ourselves (which must mean that this is a single-node cluster). Advance to the next state.
        if t == campaignPreElection {
            r.campaign(campaignElection)
        } else {
            r.becomeLeader()
        }
        return
    }
    for id := range r.prs {
        // 如果是自己,不需要发送
        if id == r.id continue
        var ctx []byte
        if t == campaignTransfer ctx = []byte(t)
        r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
    }
}

候选人发送VoteMsg给其他节点(Follower)后,当收到大多数Follower节点的投票结果后,候选人会成为Leader。

下面我们分析Follower节点收到候选人发送的MsgVote请求是如何处理的,这涉及到RPC调用,在Etcd中是rafthttp。

1.8 Raft HTTP

每个EtcdServer都有一个HTTP服务端,用来接收其他节点发送的消息,以及返回响应结果给发送者:

//etcdserver/api/rafthttp/http.go
func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    var m raftpb.Message
    if err := h.r.Process(context.TODO(), m); err != nil {
        switch v := err.(type) {
        case writerToResponse:
            v.WriteTo(w)
        }
        return
    }
}

EtcdServer 实现了etcdserver/api/rafthttp/transport.go的 Raft 接口

//etcdserver/api/rafthttp/transport.go
type Raft interface {
    Process(ctx context.Context, m raftpb.Message) error
    IsIDRemoved(id uint64) bool
    ReportUnreachable(id uint64)
    ReportSnapshot(id uint64, status raft.SnapshotStatus)
}

Q:s.r返回的是etcdserver/raft.go的raftNode,它是一个结构体。而Step方法定义在raft/Node.go接口中。

那么问题是:raftNode结构体的Step方法,是怎么调用到Node接口的Step方法?

A:先看下node.go下的Node接口和node结构体。node结构体实现了Node接口的所有方法,所以可以把node看做是是Node接口的实现类。

虽然raftNode结构体中没有定义raft.Node接口,但是它的raftNodeConfig属性定义了!这种语法叫做struct内嵌/嵌套(embedded)interface。

EtcdServer的Process方法调用raft/node.go中Node接口(其实现类是这个文件下的node结构体)的Step方法:

// etcdserver/server.go
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
    return s.r.Step(ctx, m)
}

// raft/node.go
func (n *node) Step(ctx context.Context, m pb.Message) error {
    // ignore unexpected local messages receiving over network
    if IsLocalMsg(m.Type) {
        return nil
    }
    return n.step(ctx, m)
}
func (n *node) step(ctx context.Context, m pb.Message) error {
    return n.stepWithWaitOption(ctx, m, false)
}
// Step advances the state machine using msgs. The ctx.Err() will be returned, if any.
func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
    if m.Type != pb.MsgProp {
        select {
        case n.recvc <- m:
            return nil
        case <-ctx.Done():
            return ctx.Err()
        case <-n.done:
            return ErrStopped
        }
    }
    ...
    return nil
}

候选人发送请求给Follower节点,当Follower节点收到请求时,会将消息发送到node的recvc通道中。

注意:Etcd分布式集群中的所有节点都会启动raftNode、raft.Node,也都会运行node.run()方法。

// raft/node.go
func (n *node) run(r *raft) {
    ...
    for {
        select {
            ...
        case <-n.tickc:
            r.tick()
        case m := <-n.recvc:
            // filter out response message from unknown From.
            if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
                r.Step(m)
            }
        }
    }
}

回顾前面Candidate从n.tickc中获取到定时器(ElectionTimeout)的超时消息,通过tickElection() 调用到raft.go的 Step() 方法,然后参与竞选(campaign)并发送VoteMsg给Follower节点。

这里Follower节点收到VoteMsg请求,从n.recvc中获取到消息,也会调用 r.Step(m) 方法。

两个节点的角色不一样,但是都会调用相同的raft.Step()方法,当然两者的处理逻辑不一样。

1.9 Follower收到MsgVote,投票

Follower收到消息m的Term=1,它自己raft.Term=0,步骤如下:

  • m.Term > r.Term,调用becomeFollower()
  • 返回MsgVoteResp给候选人。
// raft/raft.go
func (r *raft) Step(m pb.Message) error {
    // Handle the message term, which may result in our stepping down to a follower.
    switch {
    case m.Term == 0:
        // local message
    case m.Term > r.Term:
        switch {
        default:
            if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
                r.becomeFollower(m.Term, m.From)
            } else {
                // Follower收到Candidate的MsgVote请求,自己成为Follower(更改状态)
                // 同样,这里只是设置Raft的step函数和tick函数,还没有真正执行成为Follower的逻辑
                r.becomeFollower(m.Term, None)  
            }
        }
    case m.Term < r.Term:
        ...
        return nil
    }

    switch m.Type {
    case pb.MsgHup:
        ...
    case pb.MsgVote, pb.MsgPreVote:
        canVote := r.Vote == m.From ||                              // We can vote if this is a repeat of a vote we've already cast...
            (r.Vote == None && r.lead == None) ||                   // ...we haven't voted and we don't think there's a leader yet in this term...
            (m.Type == pb.MsgPreVote && m.Term > r.Term)            // ...or this is a PreVote for a future term...
        if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {    // ...and we believe the candidate is up to date.
            r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)}) // 返回的消息类型为VoteResp
            if m.Type == pb.MsgVote {
                r.electionElapsed = 0  // Only record real votes. 重置选举计数器
                r.Vote = m.From // 投票给发送这条消息的节点,即候选人
            }
        } else {
            r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true}) // 拒绝投票
        }
    default:
        err := r.step(r, m)
        if err != nil {
            return err
        }
    }
    return nil
}

当Follower返回MsgVote的响应结果MsgVoteResp给Candidate,Candidate的处理流程与Follower收到消息的类似,也会调用r.Step()方法。

1.10 Candidate成为Leader

由于Follower返回消息的Term等于Candidate发送消息的Term,所以直接走第二个switch条件的default分支:

// raft/raft.go
func (r *raft) Step(m pb.Message) error {
    // Handle the message term, which may result in our stepping down to a follower.
    switch {
    case m.Term == 0:
        // local message
    case m.Term > r.Term:
        ...
    case m.Term < r.Term:
        ...
        return nil
    }

    switch m.Type {
    case pb.MsgHup:
        ...
    case pb.MsgVote, pb.MsgPreVote:
        ...
    default:
        err := r.step(r, m) // 调用Raft的stepFunc,即1.7节通过becomeCandidate设置的stepCandidate
        if err != nil {
            return err
        }
    }
    return nil
}

前面我们看到在Step(m)方法中都没有走到default分支,这里开始调用r.step(r,m)函数,对应的是stepCandidate:

// stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
// whether they respond to MsgVoteResp or MsgPreVoteResp.
func stepCandidate(r *raft, m pb.Message) error {
    // Only handle vote responses corresponding to our candidacy (while in
    // StateCandidate, we may get stale MsgPreVoteResp messages in this term from our pre-candidate state).
    var myVoteRespType pb.MessageType
    if r.state == StatePreCandidate {
        myVoteRespType = pb.MsgPreVoteResp
    } else {
        myVoteRespType = pb.MsgVoteResp
    }
    switch m.Type {
    case pb.MsgProp:
        r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
        return ErrProposalDropped
    case pb.MsgApp:
        r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
        r.handleAppendEntries(m)
    case pb.MsgHeartbeat:
        r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
        r.handleHeartbeat(m)
    case pb.MsgSnap:
        r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
        r.handleSnapshot(m)
    case myVoteRespType:  // 候选人收到Follower的VoteMsg消息,判断投票个数
        gr := r.poll(m.From, m.Type, !m.Reject)
        r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.quorum(), gr, m.Type, len(r.votes)-gr)
        switch r.quorum() {
        case gr:
            if r.state == StatePreCandidate {
                r.campaign(campaignElection)
            } else {
                r.becomeLeader() // 满足投票个数,成为Leader
                r.bcastAppend() // 向其他Follower节点发送MsgApp请求
            }
        case len(r.votes) - gr:
            // pb.MsgPreVoteResp contains future term of pre-candidate m.Term > r.Term; reuse r.Term
            r.becomeFollower(r.Term, None)
        }
    case pb.MsgTimeoutNow:
        r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
    }
    return nil
}

总结下每个节点在收到HTTP请求时,都会调用raft.Step(m)方法,如果消息的类型不是MsgHup、MsgVote、MsgPreVote,则调用r.step(r,m)方法。

Raft的stepFunc在发送/收到HTTP请求之前一定会被设置,设置的地方在调用becomeFollower、becomeCandidate、becomeLeader这三个方法中。

A:以Follower1转为Candidate为例:

  1. 调用becomeFollower,设置r.step=stepFollower
  2. ticker的electionTimeout超时,调用becomeCandidate,设置r.step=stepCandidate
  3. 调用campaign,发送VoteMsg给其他所有节点

B:以Follower2为例:

  1. 调用becomeFollower,设置r.step=stepFollower
  2. 收到候选节点的VoteMsg请求,投票给候选节点,返回VoteMsgResp

C:以Candidate转为Leader为例:

  1. 收到Follower发送的VoteMsgResp,调用r.step函数,即A:2中的stepCandidate
  2. 判断投票个数,如果得到大多数的选票,调用becomeLeader,设置r.step=stepLeader
  3. 向其他Follower节点发送MsgApp请求

D:以Follower2为例:

  1. 收到Leader的MsgApp请求,调用r.step函数,即B:1的stepFollower
  2. 处理AppendEntries请求,返回MsgAppResp请求给Leader

下面是Candidate发送MsgVote给Follower,Follower返回MsgVoteResp给Candidate的流程:

Etcd Raft源码分析之二:选举流程


以上所述就是小编给大家介绍的《Etcd Raft源码分析之二:选举流程》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

PHP Cookbook

PHP Cookbook

Adam Trachtenberg、David Sklar / O'Reilly Media / 2006-08-01 / USD 44.99

When it comes to creating dynamic web sites, the open source PHP language is red-hot property: used on more than 20 million web sites today, PHP is now more popular than Microsoft's ASP.NET technology......一起来看看 《PHP Cookbook》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

在线进制转换器
在线进制转换器

各进制数互转换器

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换