内容简介:Raft是一个分布式一致性算法,充分的利用了可复制状态机以及日志。其最核心的设计目标就是易于理解。在性能、错误容错等方面来看有点类似当构建一个分布式系统时,一个非常重要的设计目标就是关于Raft更多的细节,这里建议直接阅读论文: "In Search of an Understandable Consensus Algorithm"
什么是Raft
Raft是一个分布式一致性算法,充分的利用了可复制状态机以及日志。其最核心的设计目标就是易于理解。在性能、错误容错等方面来看有点类似 Paxos ,但不同之处在于,Raft论文较为清晰的描述了其主要流程以及其中一些细节问题,而Paxos我们知道非常难以理解。
当构建一个分布式系统时,一个非常重要的设计目标就是 fault tolerance 。如果系统基于Raft协议实现,那么当其中一个节点挂掉,或者发生了网络分区等异常情况时,只要大多数节点仍然能够正常通讯,整个集群就能够正常对外提供服务而不会挂掉。
关于Raft更多的细节,这里建议直接阅读论文: "In Search of an Understandable Consensus Algorithm"
介绍
Etcd的Raft库已经在生产环境得到了非常广泛的应用,有力的支撑了etcd、K8S、Docker Swarm、TiDB/TiKV等分布式系统的构建,当你能够熟练的使用一个成熟的Raft库、甚至如果能够自己实现一个,那会有种'有了锤子,干什么都是钉子'的感觉。
特性
Etcd raft基本上已经实现了Raft协议的完整特性,包括:
- Leader选举
- 日志复制
- 日志压缩
- 成员变更
- Leader和Follower都支持高效的线性只读查询请求
- 通过batch、pipeline等手段优化日志复制、网络IO的延迟
概览
etcd的raft实现都在 etcd/raft
目录下,但是大部分的实现都在下面几个比较核心的文件:
-
raft.go
: 从名字也可以看出来,这个是最核心的部分,比如leader选择的逻辑、raft消息的处理逻辑等 -
node.go
: 可以理解为raft集群的一个节点,客户端也主要是这个类打交道,比如心跳的逻辑、propose、状态机、成员变更等都是这个类负责处理。 -
log.go
: raft日志相关的代码,比如保存日志记录 -
raft.proto
: 定义了raft一些核心的RPC数据结构,由于protobuf是跨语言的,因此如果想用其他语言重写etcd raft
,那么至少这部分内容都是可以复用的
用法
客户端主要使用 Node
和raft集群交互,首先需要启动一个raft集群,有两种方式:
- 启动一个全新的raft集群
- 加入一个已经存在的raft集群(节点重启、扩容、缩容)
启动一个三节点的集群:
storage := raft.NewMemoryStorage() c := &Config{ //代表一个节点的ID,必须唯一,并且不能为0,不能重复利用,和zookeeper的id类似 ID: 0x01, ElectionTick: 10, HeartbeatTick: 1, Storage: storage, MaxSizePerMsg: 4096, MaxInflightMsgs: 256, } //设置节点列表 n := raft.StartNode(c, []raft.Peer{{ID: 0x02}, {ID: 0x03}})
这里需要强调一个点,etcd的raft实现并不包括网络部分,网络通讯部分需要使用者自己实现,因此这里节点列表传入的是ID,而ip:port到id的映射需要库使用者自己实现。
如果让一个新的节点加入集群,那么就不需要传入节点列表,首先通过 ProposeConfChange
RPC发起一个成员变更请求,在任意一个raft集群节点都可以,然后启动这个节点:
//配置参考上文中的代码段 n := raft.StartNode(c, nil)
如果是重启一个节点,那么这里需要注意,我们需要恢复这个节点之前的状态,比如当前term、根据快照和日志恢复状态机等:
storage := raft.NewMemoryStorage() // Recover the in-memory storage from persistent snapshot, state and entries. // 根据快照、entry日志等恢复当前raft节点到之前的状态 storage.ApplySnapshot(snapshot) storage.SetHardState(state) storage.Append(entries) c := &Config{ ID: 0x01, ElectionTick: 10, HeartbeatTick: 1, Storage: storage, MaxSizePerMsg: 4096, MaxInflightMsgs: 256, } // Restart raft without peer information. // Peer information is already included in the storage. // 重启该raft节点,此时不用传入任何节点相关信息,因为已经在刚刚的恢复过程中填充好了 n := raft.RestartNode(c)
当raft集群启动完成后,对于一个raft节点,用户需要做几件事情,伪码如下:
for { select { case <-s.Ticker: n.Tick() case rd := <-s.Node.Ready(): saveToStorage(rd.HardState, rd.Entries, rd.Snapshot) send(rd.Messages) if !raft.IsEmptySnap(rd.Snapshot) { processSnapshot(rd.Snapshot) } for _, entry := range rd.CommittedEntries { process(entry) if entry.Type == raftpb.EntryConfChange { var cc raftpb.ConfChange cc.Unmarshal(entry.Data) s.Node.ApplyConfChange(cc) } } s.Node.Advance() case <-s.done: return } }
case <-s.Ticker
库使用者需要定时调用 tick()
方法,根据节点当前的角色调用对应的逻辑:
- 心跳, leader需要定时发送心跳包给follower
- 选举,如果一定时间没有收到leader的心跳,则转换为候选者,竞选leader
case rd := <-s.Node.Ready():
处理Ready
Ready封装了可以准备开始读取的entries、messages,需要保存到持久化介质、同步给其他节点:
type Ready struct { // The current volatile state of a Node. // SoftState will be nil if there is no update. // It is not required to consume or store SoftState. *SoftState // The current state of a Node to be saved to stable storage BEFORE // Messages are sent. // HardState will be equal to empty state if there is no update. pb.HardState // ReadStates can be used for node to serve linearizable read requests locally // when its applied index is greater than the index in ReadState. // Note that the readState will be returned when raft receives msgReadIndex. // The returned is only valid for the request that requested to read. ReadStates []ReadState // Entries specifies entries to be saved to stable storage BEFORE // Messages are sent. Entries []pb.Entry // Snapshot specifies the snapshot to be saved to stable storage. Snapshot pb.Snapshot // CommittedEntries specifies entries to be committed to a // store/state-machine. These have previously been committed to stable // store. CommittedEntries []pb.Entry // Messages specifies outbound messages to be sent AFTER Entries are // committed to stable storage. // If it contains a MsgSnap message, the application MUST report back to raft // when the snapshot has been received or has failed by calling ReportSnapshot. Messages []pb.Message // MustSync indicates whether the HardState and Entries must be synchronously // written to disk or if an asynchronous write is permissible. MustSync bool }
-
调用
Node.Ready()
,处理当前raft节点的状态,其中有些步骤可以并行执行- 将entries、HardState、快照按照顺序写到持久化介质中,底层存储介质支持原子写入,那么也可以一次性将他们写入
-
将所有的消息发送给远程节点,但一定要先将最近的HardState、上一轮Ready中的entries都持久化之后(可以和同一轮的entries持久化并行执行)。如果有类型为
MsgSnap
的消息,在这个消息发送成功之后,需要调用Node.ReportSnapshot()
。 -
如果有快照的话需要和已提交的entries一起应用到状态机( 库使用者提供
),如果已经提交的entries中包含
EntryConfChange
,那么需要调用Node.ApplyConfChange()
将节点的变更信息同步到本节点
-
调用
Node.Advance()
通知节点,表明本轮Ready已经处理完毕,可以开始处理下一轮。
另外还需要注意,由于网络部分需要库使用者自己实现,因此当收到一条消息的时候,需要将该消息转发给raft节点:
func recvRaftRPC(ctx context.Context, m raftpb.Message) { n.Step(ctx, m) }
发起提议
如果需要向raft集群发起一个提议,那么需要用下面这种方式:
// 协议的数据持久化成字节数组 n.Propose(ctx, data)
如果找个提议处理完成(已经持久化到持久化介质并同步到其他节点),那么就可以通过 Ready
的comitedEntries获取到,类型是 raftpb.EntryNormal
, 然后用户就可以根据自己的业务逻辑,将其应用到状态机中。
raft集群不保证该协议一定能够处理成功,若一定超时时间内,还未收到响应,那么需要根据业务场景考虑是否需要重试。
节点变更
如果需要对raft集群扩容或缩容,那么需要构造 ConfChange
,并调用:
n.ProposeConfChange(ctx, cc)
如果该变更请求处理成功,那么在commitedEntries中会有一条类型为 raftpb.EntryConfChange
的记录,
var cc raftpb.ConfChange cc.Unmarshal(data) n.ApplyConfChange(cc)
需要自己实现的部分
etcd的raft已经实现了大部分的功能,但是还是有几个组件需要使用者自己根据业务场景实现:
- 网络通讯部分
- Write ahead log
- 快照
网络通讯部分
网络部分说白了就是消息的收发,你可以理解为raft只依赖了接口,这个接口实现了两个方法: send
、 receive
,但是具体的实现需要库使用者自己写,这部分相对比较简单,使用RPC、HTTP、自定义协议都可以,具体的实现逻辑可以参考etcd自己的代码
Write-Ahead-Log(WAL)
如上文中提到的,用户需要保存Ready中的一些状态,比如entries、hardstate等,WAL有很多分布式系统都实现了,基本上参考他们的实现,结合自己的业务实现一个难度不会很大,如果是直接使用etcd raft库,那么可以直接基于etcd中wal的实现,另外也可以基于RocksDB等嵌入式KV实现,但是对于key-value的结构设计要考虑好,wal的原理后面有时间再叙述。
快照
快照应该都知道,比如说 Redis 的持久化,有一种模式是保存用户发过来的命令,但时间长了之后,这个日志会变的越来越大,这个时候当你扩容、重启节点的时候,加载这个文件会耗费很长时间,导致服务不可用,因此需要将内存中的状态持久化到磁盘中。
比如:
incr index incr index
这个时候index的值为2,当然这个例子只有两条命令,但假如说有一千万条记录,那么重放日志需要耗费很长时间,因此我们可以直接将 index:2
这个kv对写到磁盘中,那么这个时候之前对这个key的一千万条操作日志就变成了这一条记录。
那么raft的快照其实也类似,应用需要将自己状态机的当前快照,持久化成一个快照文件,并写入磁盘中,我们知道这个过程会非常慢,因此可以考虑和其他过程并行执行,以及其他的一些性能优化,这个后面的博客再写。
简单来实现的话,我们直接将状态机用json序列化成一个字节数组,并写入到本地文件中,后续读取的时候。
如何基于raft实现一个简单的分布式KV存储
这里简单描述一下流程,只是为了更容易理解etcd raft的使用方法,后面会再写篇博客详细记录:
- 应用实现自己的状态机,处理快照、已提交日志、WAL等
- 当用户发起一个put请求时,将该请求序列化成字节数组,propose到raft集群
- 处理成功后,会出现在commitedEntries中,解析该entry,回放到状态机中,这个时候该请求的结果已经可以在所有的raft节点上查询到了
- 用户发起查询请求,直接在用户封装的状态机中查询,并返回给用户
总结
本文只是简单描述了下etcd raft的使用方法,总的来说etcd raft的实现已经非常完善,但还是需要用户自己处理非常多的细节,比如网络、write aheadlog等,如果对raft不熟悉,相信会很难上手,我的想法是能够在其之上再封装一层,提供一个状态机接口,用户只需要关心自己的业务逻辑,其他的全部都交给库来处理。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- TypeScript基础入门之模块解析(一)
- TypeScript基础入门之模块解析(二)
- Hadoop入门(二)之 HDFS 详细解析
- Apache Flink 零基础入门(一):基础概念解析
- Apache Flink 零基础入门(一):基础概念解析
- 快速入门开发实现订单类图片识别结果抽象解析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。