内容简介:先来看下EtcdServer结构体的定义,这里与Raft相关的是创建EtcdServer:raftNode中匿名嵌入了node,raft交互流程相关的内容都放在raftNode中,而节点状态、IO调用、事件触发起点等入口都放在了node中,
1. EtcdServer启动流程
先来看下EtcdServer结构体的定义,这里与Raft相关的是 r raftNode
属性。
//etcdserver/server.go EtcdServer struct { // inflightSnapshots holds count the number of snapshots currently inflight. inflightSnapshots int64 // must use atomic operations to access; keep 64-bit aligned. appliedIndex uint64 // must use atomic operations to access; keep 64-bit aligned. committedIndex uint64 // must use atomic operations to access; keep 64-bit aligned. term uint64 // must use atomic operations to access; keep 64-bit aligned. lead uint64 // must use atomic operations to access; keep 64-bit aligned. // consistIndex used to hold the offset of current executing entry // It is initialized to 0 before executing any entry. consistIndex consistentIndex // must use atomic operations to access; keep 64-bit aligned. r raftNode // uses 64-bit atomics; keep 64-bit aligned. readych chan struct{} Cfg ServerConfig lgMu *sync.RWMutex lg *zap.Logger w wait.Wait readMu sync.RWMutex // read routine notifies etcd server that it waits for reading by sending an empty struct to readwaitC readwaitc chan struct{} // readNotifier is used to notify the read routine that it can process the request when there is no error readNotifier *notifier // stop signals the run goroutine should shutdown. stop chan struct{} // stopping is closed by run goroutine on shutdown. stopping chan struct{} // done is closed when all goroutines from start() complete. done chan struct{} errorc chan error id types.ID attributes membership.Attributes cluster *membership.RaftCluster v2store v2store.Store snapshotter *snap.Snapshotter applyV2 ApplierV2 applyV3 applierV3 // applyV3 is the applier with auth and quotas applyV3Base applierV3 // applyV3Base is the core applier without auth or quotas applyWait wait.WaitTime kv mvcc.ConsistentWatchableKV lessor lease.Lessor bemu sync.Mutex be backend.Backend SyncTicker *time.Ticker // compactor is used to auto-compact the KV. compactor v3compactor.Compactor // peerRt used to send requests (version, lease) to peers. peerRt http.RoundTripper reqIDGen *idutil.Generator // wgMu blocks concurrent waitgroup mutation while server stopping wgMu sync.RWMutex // wg is used to wait for the go routines that depends on the server state to exit when stopping the server. wg sync.WaitGroup // ctx is used for etcd-initiated requests that may need to be canceled on etcd server shutdown. ctx context.Context cancel context.CancelFunc leadTimeMu sync.RWMutex leadElectedTime time.Time }
创建EtcdServer:
- 启动raft.Node n
- 创建raftNode r
- 创建EtcdServer、kv、以及传输协议需要的Transport
//etcdserver/server.go func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { var ( w *wal.WAL n raft.Node s *raft.MemoryStorage id types.ID cl *membership.RaftCluster remotes []*membership.Member snapshot *raftpb.Snapshot ) // 是否有WAL日志 haveWAL := wal.Exist(cfg.WALDir()) // 加载快照文件 ss := snap.New(cfg.Logger, cfg.SnapDir()) // 根据是否有WAL,以及是否新的集群,分别启动当前节点 switch { // 没有WAL,不是新的集群,说明加入已有的集群 case !haveWAL && !cfg.NewCluster: cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap) existingCluster, gerr := GetClusterFromRemotePeers(getRemotePeerURLs(cl, cfg.Name), prt) remotes = existingCluster.Members() id, n, s, w = startNode(cfg, cl, nil) // 没有WAL,新的集群 case !haveWAL && cfg.NewCluster: cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap) id, n, s, w = startNode(cfg, cl, cl.MemberIDs()) // 有WAL,是否强制成为一个新的集群 case haveWAL: snapshot, err = ss.Load() if !cfg.ForceNewCluster { id, cl, n, s, w = restartNode(cfg, snapshot) } else { id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot) } // 从存储中恢复集群的成员变量 cl.Recover(api.UpdateCapability) } heartbeat := time.Duration(cfg.TickMs) * time.Millisecond srv = &EtcdServer{ readych: make(chan struct{}), Cfg: cfg, lgMu: new(sync.RWMutex), v2store: st, snapshotter: ss, // 创建Raft节点,每个EtcdServer都有一个Raft节点 r: *newRaftNode( raftNodeConfig{ isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) }, Node: n, heartbeat: heartbeat, raftStorage: s, storage: NewStorage(w, ss), }, ), id: id, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, cluster: cl, SyncTicker: time.NewTicker(500 * time.Millisecond), } srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex) // 创建Transport传输对象 tr := &rafthttp.Transport{ DialTimeout: cfg.peerDialTimeout(), ID: id, URLs: cfg.PeerURLs, ClusterID: cl.ID(), Raft: srv, Snapshotter: ss, } // 将传输对象设置到RaftNode的transport属性上 srv.r.transport = tr return srv, nil }
raftNode中匿名嵌入了node,raft交互流程相关的内容都放在raftNode中,而节点状态、IO调用、事件触发起点等入口都放在了node中,
可以看到两者都在启动后都起了一个for-select结构的goroutine循环处理各自负责的事件
1.1 启动Raft节点(raft.Node)
概要流程:etcdserver/server.go NewServer() -> etcdserver/raft.go startNode() -> raft/node.go StartNode()
// etcdserver/raft.go func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) { member := cl.MemberByName(cfg.Name) metadata := pbutil.MustMarshal( &pb.Metadata{ NodeID: uint64(member.ID), ClusterID: uint64(cl.ID()), }, ) w, err = wal.Create(cfg.Logger, cfg.WALDir(), metadata) peers := make([]raft.Peer, len(ids)) for i, id := range ids { var ctx []byte ctx, err = json.Marshal((*cl).Member(id)) peers[i] = raft.Peer{ID: uint64(id), Context: ctx} } id = member.ID // 创建一个内存存储 s = raft.NewMemoryStorage() c := &raft.Config{ ID: uint64(id), ElectionTick: cfg.ElectionTicks, HeartbeatTick: 1, Storage: s, MaxSizePerMsg: maxSizePerMsg, MaxInflightMsgs: maxInflightMsgs, CheckQuorum: true, PreVote: cfg.PreVote, } // 调用raft/node.go的StartNode()方法,最终创建的对象是一个实现了raft/Node接口的实现类(实现类其实也都定义在了raft/node.go中) n = raft.StartNode(c, peers) return id, n, s, w }
raft.Node的启动:
- newRaft(): 初始化Raft对象,所有关系Raft协议执行周期内的事项都被包装到了Raft对象中
- becomeFollower(): 初始化节点身份为Follower
- newNode(): 构造节点对象(Node)
- n.run(raft): 通过一个go routine启动
// raft/node.go // StartNode returns a new Node given configuration and a list of raft peers. // It appends a ConfChangeAddNode entry for each given peer to the initial log. func StartNode(c *Config, peers []Peers) Node { // 调用raft/raft.go,创建&raft的引用对象 r := newRaft(c) // become the follower at term 1 and apply initial configuration entries of term 1 // 在newRaft()中,会调用一次becomeFollower(),不过那里r.Term为初始值0,要成为Follower,Term加1 r.becomeFollower(1, None) // 追加配置变更的日志记录(和Message一样也属于一种Entry)到raft的raftLog中 for _, peer := range peers { cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context} d, err := cc.Marshal() e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d} r.raftLog.append(e) } // Mark these initial entries as committed. r.raftLog.committed = r.raftLog.lastIndex() for _, peer := range peers { r.addNode(peer.ID) } // 这里会将创建node{}对象,并初始化node结构体需要的各种通道对象 n := newNode() go n.run(r) return &n } // RestartNode is similar to StartNode but does not take a list of peers. The current membership of the cluster will be restored from the Storage. // If the caller has an existing state machine, pass in the last log index that has been applied to it; otherwise use zero. func RestartNode(c *Config) Node { r := newRaft(c) n := newNode() go n.run(r) return &n }
当创建raft.Node后,会立即调用n.run(raft)方法,这样当启动raftNode后,
当超时往ticker的通道发送消息后,raft.Node的运行方法就会从通道中获取到消息。
在分析raft.Node的run()方法之前,先来看创建Raft的逻辑,毕竟创建完raft对象后,
才会创建Node对象,调用run()方法时,需要传递raft对象:
// raft/raft.go func newRaft(c *Config) *raft { raftlog := newLog(c.Storage, c.Logger) hs, cs, err := c.Storage.InitialState() peers := c.peers learners := c.learners if len(cs.Nodes) > 0 || len(cs.Learners) > 0 { peers = cs.Nodes learners = cs.Learners } r := &raft{ id: c.ID, lead: None, isLearner: false, raftLog: raftlog, maxMsgSize: c.MaxSizePerMsg, maxInflight: c.MaxInflightMsgs, prs: make(map[uint64]*Progress), learnerPrs: make(map[uint64]*Progress), electionTimeout: c.ElectionTick, heartbeatTimeout: c.HeartbeatTick, checkQuorum: c.CheckQuorum, preVote: c.PreVote, readOnly: newReadOnly(c.ReadOnlyOption), disableProposalForwarding: c.DisableProposalForwarding, } for _, p := range peers { r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)} } for _, p := range learners { r.learnerPrs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight), IsLearner: true} if r.id == p { r.isLearner = true } } if !isHardStateEqual(hs, emptyState) { r.loadState(hs) } if c.Applied > 0 { raftlog.appliedTo(c.Applied) } // 一开始状态为Follower,然后在ticker超时后,成为候选人 r.becomeFollower(r.Term, None) // r.Term初始时为0 return r }
1.2 raft结构体
raft/raft.go下定义了raft结构体:
// raft/raft.go type raft struct { id uint64 Term uint64 Vote uint64 readStates []ReadState raftLog *raftLog // the log maxInflight int maxMsgSize uint64 prs map[uint64]*Progress learnerPrs map[uint64]*Progress matchBuf uint64Slice state StateType isLearner bool // isLearner is true if the local raft node is a learner. votes map[uint64]bool msgs []pb.Message lead uint64 // the leader id // leadTransferee is id of the leader transfer target when its value is not zero. // Follow the procedure defined in raft thesis 3.10. leadTransferee uint64 pendingConfIndex uint64 readOnly *readOnly // number of ticks since it reached last electionTimeout when it is leader or candidate. // number of ticks since it reached last electionTimeout or received a valid message from current leader when it is a follower. electionElapsed int // number of ticks since it reached last heartbeatTimeout. only leader keeps heartbeatElapsed. heartbeatElapsed int checkQuorum bool preVote bool heartbeatTimeout int electionTimeout int // randomizedElectionTimeout is a random number between [electiontimeout, 2 * electiontimeout - 1]. It gets reset when raft changes its state to follower or candidate. randomizedElectionTimeout int disableProposalForwarding bool tick func() step stepFunc } type stepFunc func(r *raft, m pb.Message) error
注意raft结构体的最后两个定义的是function,所以通过r.tick=,或者r.step=,都只是设置了函数而已,还没有真正调用函数。
所以在上面的newRaft()方法中,虽然调用了becomeFollower(),其实只是为raft这个结构体设置了一些属性,并没有真正执行“成为Follower”的逻辑。
要调用raft的step方法和tick方法,需要通过r.step(r,m)或者r.tick(),才会真正执行函数。但是在这之前,必须设置函数。
// raft/raft.go func (r *raft) becomeFollower(term uint64, lead uint64) { r.step = stepFollower r.reset(term) r.tick = r.tickElection r.lead = lead r.state = StateFollower r.logger.Infof("%x became follower at term %d", r.id, r.Term) }
前面介绍了raft.Node的启动(StartNode)流程,它的运行(run方法)后面再介绍。
注意:虽然启动时调用了raft.becomeFollower,但这里只是为raft结构体设置了函数,并没有真正执行!
1.3 创建raftNode对象
EtcdServer在启动节点(raft.Node)后,创建raftNode对象:
- raftNodeConfig中有两个存储对象:日志条目的存储(raftStorage)、WAL以及快照的存储(storage)
-
n
指的是raft.Node,r
指的是raftNode。node代表了etcd中一个节点,和raftNode是一对一的关系 - raftNode -> r -> etcdserver/raft.go (raftNode不是一个接口,而是一个结构体)
- raft.Node -> n -> raft/node.go (这个类里定义了Node接口,以及node实现类,node同时是一个结构体)
- raftNode引用了raftNodeConfig,后者又间接引用了raft.Node,所以通过raftNode可以直接调用Node接口的方法
结构体内嵌结构体或者接口,如果是匿名的(没有变量,直接定义类型),则可以直接调用。 比如raftNode定义了匿名的raftNodeConfig,后者又定义了匿名的raft.Node。 那么raft结构体就可以直接调用raft.Node中定义的接口方法。就好像raft.Node中的方法属于raft结构体一样。
// etcdserver/raft.go type raftNode struct { tickMu *sync.Mutex raftNodeConfig // raft节点的配置 msgSnapC chan raftpb.Message // a chan to send/receive snapshot applyc chan apply // a chan to send out apply readStateC chan raft.ReadState // a chan to send out readState ticker *time.Ticker // 节点的时钟ticker,有两种类型的时间:选举超时、心跳超时 td *contention.TimeoutDetector // contention detectors for raft heartbeat message stopped chan struct{} done chan struct{} } type raftNodeConfig struct { // to check if msg receiver is removed from cluster isIDRemoved func(id uint64) bool raft.Node raftStorage *raft.MemoryStorage storage Storage heartbeat time.Duration // for logging // transport specifies the transport to send and receive msgs to members. Sending messages MUST NOT block. // It is okay to drop messages, since clients should timeout and reissue their messages. transport rafthttp.Transporter } func newRaftNode(cfg raftNodeConfig) *raftNode { // 准备raftNode的各种通道 r := &raftNode{ tickMu: new(sync.Mutex), raftNodeConfig: cfg, td: contention.NewTimeoutDetector(2 * cfg.heartbeat), readStateC: make(chan raft.ReadState, 1), msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), applyc: make(chan apply), stopped: make(chan struct{}), done: make(chan struct{}), } // 创建Ticker定时器 if r.heartbeat == 0 { r.ticker = &time.Ticker{} } else { r.ticker = time.NewTicker(r.heartbeat) // 心跳超时 } return r }
EtcdServer创建raft.Node以及raftNode后,接着启动Etcd服务(创建以及启动都是由客户端调用的,比如etcd.go或者cluster.go):
如果没有调用EtcdServer.run(),那么就不会调用raftNode.start()。那么上面的ticker定时器即使超时了,也不会被获取到。
//etcdserver/server.go func (s *EtcdServer) run() { // 获取快照存储 sn, err := s.r.raftStorage.Snapshot() // raftReadyHandler包括了一系列关于Etcd的操作方法,这些函数会被raftNode调用(即Handler传递给raftNode对象) rh := &raftReadyHandler{ getLead: func() (lead uint64) { return s.getLead() }, updateLead: func(lead uint64) { s.setLead(lead) }, updateLeadership: func(newLeader bool) { if !s.isLeader() { if s.lessor != nil { s.lessor.Demote() } if s.compactor != nil { s.compactor.Pause() } setSyncC(nil) } else { if newLeader { t := time.Now() s.leadElectedTime = t } setSyncC(s.SyncTicker.C) if s.compactor != nil { s.compactor.Resume() } } }, updateCommittedIndex: func(ci uint64) { cci := s.getCommittedIndex() if ci > cci { s.setCommittedIndex(ci) } }, } // 调动raftNode的start()方法 s.r.start(rh) // 进度表示状态机的apply进度 ep := etcdProgress{ confState: sn.Metadata.ConfState, snapi: sn.Metadata.Index, appliedt: sn.Metadata.Term, appliedi: sn.Metadata.Index, } for { select { // 获取到raftNode的applyc通道 case ap := <-s.r.apply(): f := func(context.Context) { s.applyAll(&ep, ≈) } // 异步调用 sched.Schedule(f) case <-s.stop: return } } } // 应用所有的Progress以及日志、快照 func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { s.applySnapshot(ep, apply) s.applyEntries(ep, apply) proposalsApplied.Set(float64(ep.appliedi)) s.applyWait.Trigger(ep.appliedi) // wait for the raft routine to finish the disk writes before triggering a snapshot. // or applied index might be greater than the last index in raft storage, // since the raft routine might be slower than apply routine. <-apply.notifyc s.triggerSnapshot(ep) select { // snapshot requested via send() case m := <-s.r.msgSnapC: merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState) s.sendMergedSnap(merged) default: } }
EtcdServer的run()方法除了会启动raftNode外,它自己也有一个for循环处理apply通道中的数据。 apply通道涉及到消息应用(apply)到状态机。下面先来看启动raftNode的流程。
1.4 启动raftNode
前面说过EtcdServer创建raftNode的时候,就创建了一个ticker定时器,下面的start()方法会捕获到ticker通道。
时钟事件(ticker.C)触发后将会往 n.tickc
channel中写入消息(空的结构体)。
// etcdserver/raft.go func (r *raftNode) start(rh *raftReadyHandler) { //下面是在一个go routine中运行的,这里把go func() {}()省略掉了。 for { select { // raftNode的ticker在创建raftNode时创建的,当ticker超时后,可以从r.ticker.C通道中获取到数据 case <-r.ticker.C: r.tick() case rd := <-r.Ready(): // Ready事件的处理:监听是否有就绪消息到达,若有则发送到其它raft节点。这个逻辑比较复杂,后面再分析 } } } // raft.Node does not have locks in Raft package func (r *raftNode) tick() { r.tickMu.Lock() r.Tick() // 这里的Tick()方法对应的是raft/node.go的Node接口 r.tickMu.Unlock() } // raft/node.go // Tick increments the internal logical clock for this Node. Election timeouts and heartbeat timeouts are in units of ticks. func (n *node) Tick() { select { // node执行Tick()时,构造一个空的结构体,并往node的tickc通道发送一条消息 case n.tickc <- struct{}{}: case <-n.done: default: n.logger.Warningf("A tick missed to fire. Node blocks too long!") } }
既然有消息放入n.tickc通道中,那么就一定有其他地方从这个通道中取出数据,这个地方是在node.go的run方法处。
1.5 运行node(raft.Node)
在1.1节启动raft.Node中,就会调用node的run()方法。但是直到这里我们才开始分析node.run()方法是因为
从tickc这个通道中获取数据依赖于前面(1.4)节的ticker超时,而它们又间接依赖了raftNode的创建与启动。
如果没有创建raftNode,就不会创建r.ticker。如果没有启动raftNode,就不会从r.ticker.C通道获取到消息。
也就不会往node.tickc通道中放入消息,那么下面的node.run()方法就不会从node.tickc通道中获取到消息。
// raft/node.go func (n *node) run(r *raft) { ... for { ... select { case pm := <-propc: ... case m := <-n.recvc: // filter out response message from unknown From. if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) { r.Step(m) } case <-n.tickc: r.tick() case readyc <- rd: ... case <-advancec: ... case c := <-n.status: c <- getStatus(r) case <-n.stop: close(n.done) return } } }
总结下etcdserver/raft.go: r.ticker
以及raft/node.go: n.tickc
的通道放入和读取过程:
etcdserver/server:NewServer | |① raft/node:StartNode -----------------② node.run() -------→ ⑧ tickc | ↑⑦ |③ | etcdserver/raft:newRaftNode | | | ↓④ | ticker | ↑⑥-----------------------------------------------------/ | etcdserver/raft:start |⑤ | etcdserver/server:NewServer
时序图(https://www.websequencediagrams.com/#)
title EtcdServer startup & campaign etcd -> e/server.go: NewServer() e/server.go -> e/raft.go : startNode(): node[n] e/raft.go -> r/node.go : StartNode(): Node r/node.go -> r/raft.go : newRaft(): raft r/node.go -> r/raft.go : becomeFollower() note over r/raft.go : 1.r.step=stepFollower\n2.r.tick=tickElection r/node.go -> r/node.go : newNode(): node r/node.go -> r/node.go : node.run(raft) e/server.go -> e/raft.go : newRaftNode(): raftNode[r] etcd -> e/server.go : Start() e/server.go -> e/server.go : s.start() e/server.go -> e/server.go : s.run() e/server.go -> e/raft.go : s.r.start(raftReadyHandler) note over e/server.go : <-s.r.apply(): #raftNode.applyc e/server.go -> e/server.go : s.applyAll(ep,ap) note over e/raft.go : 1.<-r.ticker.C: r.tick()\n2.<-r.Ready() e/raft.go -> r/node.go : r.Tick() r/node.go -> r/raft.go : (tickc-) r.tick() r/raft.go -> r/raft.go : tickElection() r/raft.go -> r/raft.go : r.Step(m:MsgHup) r/raft.go -> r/raft.go : r.campaign() r/raft.go -> r/raft.go : r.becomeCandidate() note over r/raft.go : if recv quorum voteResp -> becomeLeader() r/raft.go -> r/raft.go : r.send(voteMsg)
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- kubelet 分析源码:启动流程
- 【zookeeper源码】启动流程详解
- View绘制流程源码分析
- gorm查询流程源码分析
- ReactNative源码解析-启动流程
- Android 系统源码-1:Android 系统启动流程源码分析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。