EtcdRaft源码分析(线性一致读)

栏目: IT技术 · 发布时间: 4年前

内容简介:背景我们知道Raft是Leader+Follower的模型,所有的更新由Leader处理,然后再同步给Follower。想象一下,如果要所有的节点都参与进来支持读取的请求,会带来什么样的问题?

背景

我们知道Raft是Leader+Follower的模型,所有的更新由Leader处理,然后再同步给Follower。

想象一下,如果要所有的节点都参与进来支持读取的请求,会带来什么样的问题?

Leader跟Follower并不总是一致的,换句话说Follower会落后Leader的进度。如果没有特别的处理,那么不同的节点读取的结果很可能不一致。

如果Leader被集群孤立,而且其他人已经推举出了新的Leader。而老的Leader还没有察觉到这个变化,他任然觉得还是Leader,但是他的数据已经不可信。如果他还在对外提供服务,那么读取的结果很可能不一致。

EtcdRaft的线性一致读是通过ReadIndex的机制来实现,大致的实现其实很简单,也就是在处理请求之前,会去集群中确认自己权力是否稳固,这样对外提供的服务才够权威。下面我们一起来剖析下Raft是怎么处理的。

接口

type Node interface {

...

// ReadIndex request a read state. The read state will be set in the ready.

// Read state has a read index. Once the application advances further than the read

// index, any linearizable read requests issued before the read request can be

// processed safely. The read state will have the same rctx attached.

ReadIndex(ctx context.Context, rctx []byte) error

...

}

这个接口很怪,返回error,不是我们通常意义理解的查询接口。Golang的世界就是这么奇妙。注定这个接口没有那么简单。下面我们先搞清楚,这个接口是怎么用的。

EtcdServer

EtcdServer是再好不过的例子了。

func (s *EtcdServer) linearizableReadLoop() {

var rs raft.ReadState

for {

ctxToSend := make([]byte, 8)

id1 := s.reqIDGen.Next()

binary.BigEndian.PutUint64(ctxToSend, id1)

...

cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())

if err := s.r.ReadIndex(cctx, ctxToSend); err != nil {

...

}

cancel()

var (
     timeout bool
     done    bool
  )
  for !timeout && !done {
     select {
     case rs = <-s.r.readStateC:
        done = bytes.Equal(rs.RequestCtx, ctxToSend)
       ...
  }
  ...

  if ai := s.getAppliedIndex(); ai < rs.Index {
     select {
     case <-s.applyWait.Wait(rs.Index):
     case <-s.stopping:
        return
     }
  }
  ...

}

}

当然这是精简过后的代码,只保留了跟ReadIndex相关的逻辑。从这里我们也能看出一些端倪。

首先,传入的参数ctxToSend,是一个单调自增的id,没有任何意义,只是用来客户端区分请求只用。

其次,会从readStateC里面监听发出的ReadIndex请求Raft是否有了回应。

第三,会看下本地已经写入状态机的日志有没有到ReadIndex请求回来的位置,如果没有继续等待,如果有这个方法立即结束。

注意没有这个方法整个是个for循环,而且请求id还单调自增,那么执行下来的结果就是会一直保持对Raft状态的监控,一有风吹草动,这边就会提前收到通知。

ReadIndex

func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {

return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})

}

可以看到,最终是通过MsgReadIndex的方式对内进行广播。

Follower

case pb.MsgReadIndex:

if r.lead == None {

r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)

return nil

}

m.To = r.lead

r.send(m)

首先破除第一个问题,不管谁接到ReadIndex的请求,将转发给Leader。

Leader

case pb.MsgReadIndex:

if r.quorum() > 1 {

if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {

// Reject read only request when this leader has not committed any log entry at its term.

return nil

}

// thinking: use an interally defined context instead of the user given context.
  // We can express this in terms of the term and index instead of a user-supplied value.
  // This would allow multiple reads to piggyback on the same message.
  switch r.readOnly.option {
  case ReadOnlySafe:
     r.readOnly.addRequest(r.raftLog.committed, m)
     r.bcastHeartbeatWithCtx(m.Entries[0].Data)
  case ReadOnlyLeaseBased:
     ri := r.raftLog.committed
     if m.From == None || m.From == r.id { // from local member
        r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
     } else {
        r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
     }
  }

} else {

r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})

}

return nil

}

如果当前Leader截至到现在还没有提交任何entry,那么直接返回。

下面我们看下不同的ReadOnly选项都是怎么实现的。

ReadOnlySafe

func (ro *readOnly) addRequest(index uint64, m pb.Message) {

ctx := string(m.Entries[0].Data)

if _, ok := ro.pendingReadIndex[ctx]; ok {

return

}

ro.pendingReadIndex[ctx] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})}

ro.readIndexQueue = append(ro.readIndexQueue, ctx)

}

首先我们将请求的requestid放入pendingReadIndex中暂存,当然了,如果之前已经请求过了,那么返回,同时会初始化一个readIndexStatus,最后append到readIndexQueue

注意这里的readIndexQueue是FIFO的,是严格有序的。

其次,request里面保存的只是一个请求的id,用来客户端来区分请求用的。

另外一个需要注意的是,这里保存的index是当前Leader的committedindex,也就是最终一致的地方。

想象一下,客户端那边在不停的往Raft里面灌数据,那么其他客户端在读取Raft数据的时候,怎么知道哪些数据是形成一致的,可以安全拿出来用的。当然committedindex是重要的指标,代表,包括这个index及之前的数据都是安全的,有保障的。

那么怎么拿到这个committedindex,这就是ReadIndex的目的。

bcastHeartbeatWithCtx

r.bcastHeartbeatWithCtx(m.Entries[0].Data)

func (r *raft) sendHeartbeat(to uint64, ctx []byte) {

// Attach the commit as min(to.matched, r.committed).

// When the leader sends out heartbeat message,

// the receiver(follower) might not be matched with the leader

// or it might not have all the committed entries.

// The leader MUST NOT forward the follower's commit to

// an unmatched index.

commit := min(r.getProgress(to).Match, r.raftLog.committed)

m := pb.Message{

To: to,

Type: pb.MsgHeartbeat,

Commit: commit,

Context: ctx,

}

r.send(m)

}

将请求的requestid作为心跳的context,发给其他成员。下面我们看下Follower或Candidate接到请求是怎么处理的。

Follower&Candidate

case pb.MsgHeartbeat:

r.electionElapsed = 0

r.lead = m.From

r.handleHeartbeat(m)

case pb.MsgHeartbeat:

r.becomeFollower(m.Term, m.From) // always m.Term == r.Term

r.handleHeartbeat(m)

func (r *raft) handleHeartbeat(m pb.Message) {

r.raftLog.commitTo(m.Commit)

r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})

}

可以看到二位接到心跳后,并没有针对ReadOnly做特殊处理。做了他们自己平时该做的。

回忆下心跳篇,Leader怎么维持自己的权威,不就是发心跳么?其他成员接受到心跳就认为Leader还存在,还不用重新选举。

既然ReadOnly的实现是需要大多数人确认我还是不是真正的Leader,那么Leader在接受心跳响应的时候,看收到的有没有过半数不就可以了么?所以心跳是完美的实现这个的载体。

下面我们看下Leader在处理心跳响应的时候,在做什么

Leader

case pb.MsgHeartbeatResp:

pr.RecentActive = true

pr.resume()

// free one slot for the full inflights window to allow progress.

if pr.State == ProgressStateReplicate && pr.ins.full() {

pr.ins.freeFirstOne()

}

if pr.Match < r.raftLog.lastIndex() {

r.sendAppend(m.From)

}

if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {

return nil

}

ackCount := r.readOnly.recvAck(m)

if ackCount < r.quorum() {

return nil

}

rss := r.readOnly.advance(m)

for _, rs := range rss {

req := rs.req

if req.From == None || req.From == r.id { // from local member

r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})

} else {

r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})

}

}

跟心跳篇重复的部分,在这里我们不再赘述

首先到了这里要不你开启了ReadOnlySafe,要不就是消息上下文中没有发现ReadIndex

recvAck

ackCount := r.readOnly.recvAck(m)

if ackCount < r.quorum() {

return nil

}

func (ro *readOnly) recvAck(m pb.Message) int {

rs, ok := ro.pendingReadIndex[string(m.Context)]

if !ok {

return 0

}

rs.acks[m.From] = struct{}{}

// add one to include an ack from local node

return len(rs.acks) + 1

}

努力回忆下在Leader发心跳前做的准备工作,其中之一就是保存一个客户端的请求id到pendingReadIndex。这里再次提取这个request,将这个心跳响应累加到acks里面。

那Leader怎么知道这是普通的心跳响应还是ReadIndex的心跳响应呢?关键就是Message的context是不是有请求id

换句话说,这里就是收集因为这次ReadIndex请求发起的心跳,最终有多少人给了回应。

如果超过一半的人答复了Leader,说明这个Leader是被人承认的,有公信力的。那么继续往下

advance

func (ro readOnly) advance(m pb.Message) [] readIndexStatus {

var (

i int

found bool

)

ctx := string(m.Context)

rss := []*readIndexStatus{}

for _, okctx := range ro.readIndexQueue {

i++

rs, ok := ro.pendingReadIndex[okctx]

if !ok {

panic("cannot find corresponding read state from pending map")

}

rss = append(rss, rs)

if okctx == ctx {

found = true

break

}

}

if found {

ro.readIndexQueue = ro.readIndexQueue[i:]

for _, rs := range rss {

delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data))

}

return rss

}

return nil

}

之前在addRequest的时候往ReadOnly里面写了这次请求的信息,那么这里从里面找出来。

找到后,将之前的还没来得及处理得请求,一起摘出来,往下处理

for _, rs := range rss {

req := rs.req

if req.From == None || req.From == r.id { // from local member

r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})

} else {

r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})

}

}

遍历上面返回的请求列表

如果是发给当前节点的请求,那么将这个ReadState累加在本地

如果不是,给对方发MsgReadIndexResp

Follower

case pb.MsgReadIndexResp:

if len(m.Entries) != 1 {

r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))

return nil

}

r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})

}

可以看到,Follower是通过Leader拿到最新的commit,尽管他自己都还没有跟上对方的进度。但是对于外部请求方来说,他并不区分当前是Follower或Leader,他只想知道当前Raft的最新状态。所以将最新得commit累加到本地得ReadState,等待发送出去。

总结

事已至此,最新的readstate都已经保存在本地了,那么这些状态怎么发送出去,让应用层处理,那就是Ready在做的事情了。

作者:Pillar_Zhong

链接: https://www.jianshu.com/p/076989a2ede2

来源:简书

著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Pro CSS and HTML Design Patterns

Pro CSS and HTML Design Patterns

Michael Bowers / Apress / April 23, 2007 / $44.99

Design patterns have been used with great success in software programming. They improve productivity, creativity, and efficiency in web design and development, and they reduce code bloat and complexit......一起来看看 《Pro CSS and HTML Design Patterns》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

在线进制转换器
在线进制转换器

各进制数互转换器

URL 编码/解码
URL 编码/解码

URL 编码/解码