EtcdRaft源码分析(线性一致读)

栏目: IT技术 · 发布时间: 4年前

内容简介:背景我们知道Raft是Leader+Follower的模型,所有的更新由Leader处理,然后再同步给Follower。想象一下,如果要所有的节点都参与进来支持读取的请求,会带来什么样的问题?

背景

我们知道Raft是Leader+Follower的模型,所有的更新由Leader处理,然后再同步给Follower。

想象一下,如果要所有的节点都参与进来支持读取的请求,会带来什么样的问题?

Leader跟Follower并不总是一致的,换句话说Follower会落后Leader的进度。如果没有特别的处理,那么不同的节点读取的结果很可能不一致。

如果Leader被集群孤立,而且其他人已经推举出了新的Leader。而老的Leader还没有察觉到这个变化,他任然觉得还是Leader,但是他的数据已经不可信。如果他还在对外提供服务,那么读取的结果很可能不一致。

EtcdRaft的线性一致读是通过ReadIndex的机制来实现,大致的实现其实很简单,也就是在处理请求之前,会去集群中确认自己权力是否稳固,这样对外提供的服务才够权威。下面我们一起来剖析下Raft是怎么处理的。

接口

type Node interface {

...

// ReadIndex request a read state. The read state will be set in the ready.

// Read state has a read index. Once the application advances further than the read

// index, any linearizable read requests issued before the read request can be

// processed safely. The read state will have the same rctx attached.

ReadIndex(ctx context.Context, rctx []byte) error

...

}

这个接口很怪,返回error,不是我们通常意义理解的查询接口。Golang的世界就是这么奇妙。注定这个接口没有那么简单。下面我们先搞清楚,这个接口是怎么用的。

EtcdServer

EtcdServer是再好不过的例子了。

func (s *EtcdServer) linearizableReadLoop() {

var rs raft.ReadState

for {

ctxToSend := make([]byte, 8)

id1 := s.reqIDGen.Next()

binary.BigEndian.PutUint64(ctxToSend, id1)

...

cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())

if err := s.r.ReadIndex(cctx, ctxToSend); err != nil {

...

}

cancel()

var (
     timeout bool
     done    bool
  )
  for !timeout && !done {
     select {
     case rs = <-s.r.readStateC:
        done = bytes.Equal(rs.RequestCtx, ctxToSend)
       ...
  }
  ...

  if ai := s.getAppliedIndex(); ai < rs.Index {
     select {
     case <-s.applyWait.Wait(rs.Index):
     case <-s.stopping:
        return
     }
  }
  ...

}

}

当然这是精简过后的代码,只保留了跟ReadIndex相关的逻辑。从这里我们也能看出一些端倪。

首先,传入的参数ctxToSend,是一个单调自增的id,没有任何意义,只是用来客户端区分请求只用。

其次,会从readStateC里面监听发出的ReadIndex请求Raft是否有了回应。

第三,会看下本地已经写入状态机的日志有没有到ReadIndex请求回来的位置,如果没有继续等待,如果有这个方法立即结束。

注意没有这个方法整个是个for循环,而且请求id还单调自增,那么执行下来的结果就是会一直保持对Raft状态的监控,一有风吹草动,这边就会提前收到通知。

ReadIndex

func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {

return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})

}

可以看到,最终是通过MsgReadIndex的方式对内进行广播。

Follower

case pb.MsgReadIndex:

if r.lead == None {

r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)

return nil

}

m.To = r.lead

r.send(m)

首先破除第一个问题,不管谁接到ReadIndex的请求,将转发给Leader。

Leader

case pb.MsgReadIndex:

if r.quorum() > 1 {

if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {

// Reject read only request when this leader has not committed any log entry at its term.

return nil

}

// thinking: use an interally defined context instead of the user given context.
  // We can express this in terms of the term and index instead of a user-supplied value.
  // This would allow multiple reads to piggyback on the same message.
  switch r.readOnly.option {
  case ReadOnlySafe:
     r.readOnly.addRequest(r.raftLog.committed, m)
     r.bcastHeartbeatWithCtx(m.Entries[0].Data)
  case ReadOnlyLeaseBased:
     ri := r.raftLog.committed
     if m.From == None || m.From == r.id { // from local member
        r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
     } else {
        r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
     }
  }

} else {

r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})

}

return nil

}

如果当前Leader截至到现在还没有提交任何entry,那么直接返回。

下面我们看下不同的ReadOnly选项都是怎么实现的。

ReadOnlySafe

func (ro *readOnly) addRequest(index uint64, m pb.Message) {

ctx := string(m.Entries[0].Data)

if _, ok := ro.pendingReadIndex[ctx]; ok {

return

}

ro.pendingReadIndex[ctx] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})}

ro.readIndexQueue = append(ro.readIndexQueue, ctx)

}

首先我们将请求的requestid放入pendingReadIndex中暂存,当然了,如果之前已经请求过了,那么返回,同时会初始化一个readIndexStatus,最后append到readIndexQueue

注意这里的readIndexQueue是FIFO的,是严格有序的。

其次,request里面保存的只是一个请求的id,用来客户端来区分请求用的。

另外一个需要注意的是,这里保存的index是当前Leader的committedindex,也就是最终一致的地方。

想象一下,客户端那边在不停的往Raft里面灌数据,那么其他客户端在读取Raft数据的时候,怎么知道哪些数据是形成一致的,可以安全拿出来用的。当然committedindex是重要的指标,代表,包括这个index及之前的数据都是安全的,有保障的。

那么怎么拿到这个committedindex,这就是ReadIndex的目的。

bcastHeartbeatWithCtx

r.bcastHeartbeatWithCtx(m.Entries[0].Data)

func (r *raft) sendHeartbeat(to uint64, ctx []byte) {

// Attach the commit as min(to.matched, r.committed).

// When the leader sends out heartbeat message,

// the receiver(follower) might not be matched with the leader

// or it might not have all the committed entries.

// The leader MUST NOT forward the follower's commit to

// an unmatched index.

commit := min(r.getProgress(to).Match, r.raftLog.committed)

m := pb.Message{

To: to,

Type: pb.MsgHeartbeat,

Commit: commit,

Context: ctx,

}

r.send(m)

}

将请求的requestid作为心跳的context,发给其他成员。下面我们看下Follower或Candidate接到请求是怎么处理的。

Follower&Candidate

case pb.MsgHeartbeat:

r.electionElapsed = 0

r.lead = m.From

r.handleHeartbeat(m)

case pb.MsgHeartbeat:

r.becomeFollower(m.Term, m.From) // always m.Term == r.Term

r.handleHeartbeat(m)

func (r *raft) handleHeartbeat(m pb.Message) {

r.raftLog.commitTo(m.Commit)

r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})

}

可以看到二位接到心跳后,并没有针对ReadOnly做特殊处理。做了他们自己平时该做的。

回忆下心跳篇,Leader怎么维持自己的权威,不就是发心跳么?其他成员接受到心跳就认为Leader还存在,还不用重新选举。

既然ReadOnly的实现是需要大多数人确认我还是不是真正的Leader,那么Leader在接受心跳响应的时候,看收到的有没有过半数不就可以了么?所以心跳是完美的实现这个的载体。

下面我们看下Leader在处理心跳响应的时候,在做什么

Leader

case pb.MsgHeartbeatResp:

pr.RecentActive = true

pr.resume()

// free one slot for the full inflights window to allow progress.

if pr.State == ProgressStateReplicate && pr.ins.full() {

pr.ins.freeFirstOne()

}

if pr.Match < r.raftLog.lastIndex() {

r.sendAppend(m.From)

}

if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {

return nil

}

ackCount := r.readOnly.recvAck(m)

if ackCount < r.quorum() {

return nil

}

rss := r.readOnly.advance(m)

for _, rs := range rss {

req := rs.req

if req.From == None || req.From == r.id { // from local member

r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})

} else {

r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})

}

}

跟心跳篇重复的部分,在这里我们不再赘述

首先到了这里要不你开启了ReadOnlySafe,要不就是消息上下文中没有发现ReadIndex

recvAck

ackCount := r.readOnly.recvAck(m)

if ackCount < r.quorum() {

return nil

}

func (ro *readOnly) recvAck(m pb.Message) int {

rs, ok := ro.pendingReadIndex[string(m.Context)]

if !ok {

return 0

}

rs.acks[m.From] = struct{}{}

// add one to include an ack from local node

return len(rs.acks) + 1

}

努力回忆下在Leader发心跳前做的准备工作,其中之一就是保存一个客户端的请求id到pendingReadIndex。这里再次提取这个request,将这个心跳响应累加到acks里面。

那Leader怎么知道这是普通的心跳响应还是ReadIndex的心跳响应呢?关键就是Message的context是不是有请求id

换句话说,这里就是收集因为这次ReadIndex请求发起的心跳,最终有多少人给了回应。

如果超过一半的人答复了Leader,说明这个Leader是被人承认的,有公信力的。那么继续往下

advance

func (ro readOnly) advance(m pb.Message) [] readIndexStatus {

var (

i int

found bool

)

ctx := string(m.Context)

rss := []*readIndexStatus{}

for _, okctx := range ro.readIndexQueue {

i++

rs, ok := ro.pendingReadIndex[okctx]

if !ok {

panic("cannot find corresponding read state from pending map")

}

rss = append(rss, rs)

if okctx == ctx {

found = true

break

}

}

if found {

ro.readIndexQueue = ro.readIndexQueue[i:]

for _, rs := range rss {

delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data))

}

return rss

}

return nil

}

之前在addRequest的时候往ReadOnly里面写了这次请求的信息,那么这里从里面找出来。

找到后,将之前的还没来得及处理得请求,一起摘出来,往下处理

for _, rs := range rss {

req := rs.req

if req.From == None || req.From == r.id { // from local member

r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})

} else {

r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})

}

}

遍历上面返回的请求列表

如果是发给当前节点的请求,那么将这个ReadState累加在本地

如果不是,给对方发MsgReadIndexResp

Follower

case pb.MsgReadIndexResp:

if len(m.Entries) != 1 {

r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))

return nil

}

r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})

}

可以看到,Follower是通过Leader拿到最新的commit,尽管他自己都还没有跟上对方的进度。但是对于外部请求方来说,他并不区分当前是Follower或Leader,他只想知道当前Raft的最新状态。所以将最新得commit累加到本地得ReadState,等待发送出去。

总结

事已至此,最新的readstate都已经保存在本地了,那么这些状态怎么发送出去,让应用层处理,那就是Ready在做的事情了。

作者:Pillar_Zhong

链接: https://www.jianshu.com/p/076989a2ede2

来源:简书

著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

超级连接者:破解新互联时代的成功密码

超级连接者:破解新互联时代的成功密码

伊桑•祖克曼(ETHAN ZUCKERMAN) / 林玮、张晨 / 浙江人民出版社 / 2018-8-1 / CNY 72.90

● 我们生活在一个互联互通的世界,我们需要辩证地看待某些事件,发现隐藏在背后的真相。着眼当下,看清彼此之间的联系,而非凭空幻想未来世界联系之紧密。数字世界主义要求我们承担起责任,让隐藏的联系变成现实。 ● 我们对世界的看法是局限的、不完整的、带有偏见的。如果我们想要改变从这个广阔的世界所获取的信息,我们需要做出结构性的改变。 ● 建立联系是一种新的力量。无论是在国家层面、企业层面还是个......一起来看看 《超级连接者:破解新互联时代的成功密码》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具