内容简介:开始读etcd的源代码,今天首先来看的是 raftexample,这是一个基于 raft 的简单内存KV,希望通过 raftexample 能对 etcd 有一个大概的认识。首先看一下目录结构:我的如何阅读源代码 这篇文章里介绍过几种阅读 源代码的方式,今天我们就要用上。
开始读etcd的源代码,今天首先来看的是 raftexample,这是一个基于 raft 的简单内存KV,希望通过 raftexample 能对 etcd 有一个大概的认识。
首先看一下目录结构:
$ tree -d -L 1 . . ├── Documentation # 文档 ├── auth # 认证?还没细看 ├── bin # 编译出来的二进制文件 ├── client # 应该是v2版本的客户端代码 ├── clientv3 # 应该是v3版本的客户端代码 ├── contrib # 今天我们要看的raftexample就在这里面 ├── default.etcd # 运行编译好的etcd产生的,忽略之 ├── docs # 文档 ├── embed # 封装了etcd的函数,以便别的程序封装 ├── etcdctl # etcdctl命令,也就是客户端 ├── etcdmain # main.go 调用了这里 ├── etcdserver # 服务端代码 ├── functional # 不知道是干啥的,看起来是用来验证功能的测试套件 ├── hack # 开发者用的,不知道干啥的 ├── integration # 不知道干啥的,忽略 ├── lease # 实现etcd的租约 ├── logos ├── mvcc # MVCC存储的实现 ├── pkg # 通用库 ├── proxy # 代理 ├── raft # raft一致性协议的实现 ├── scripts # 各种脚本 ├── tests # 不晓得干啥的,忽略 ├── tools # 一些工具,不知道干啥的,忽略 ├── vendor # go的vendor,忽略 ├── version # 版本信息 └── wal # Write-Ahead-Log的实现 27 directories
我的如何阅读源代码 这篇文章里介绍过几种阅读 源代码的方式,今天我们就要用上。
首先,看 main.go
文件:
package main import ( "flag" "strings" "go.etcd.io/etcd/raft/raftpb" ) func main() { cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers") id := flag.Int("id", 1, "node ID") kvport := flag.Int("port", 9121, "key-value server port") join := flag.Bool("join", false, "join an existing cluster") flag.Parse() proposeC := make(chan string) defer close(proposeC) confChangeC := make(chan raftpb.ConfChange) defer close(confChangeC) // raft provides a commit stream for the proposals from the http api var kvs *kvstore getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() } commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC) kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC) // the key-value http handler will propose updates to raft serveHttpKVAPI(kvs, *kvport, confChangeC, errorC) }
可以看出来,大概就是弄了两个channel,然后呢,新建了一个Raft的Node,新建了一个KV存储,然后就开始提供HTTP服务。
然后跟进去,读 newRaftNode
:
func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string, confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error, <-chan *snap.Snapshotter) { commitC := make(chan *string) errorC := make(chan error) rc := &raftNode{ proposeC: proposeC, confChangeC: confChangeC, commitC: commitC, errorC: errorC, id: id, peers: peers, join: join, waldir: fmt.Sprintf("raftexample-%d", id), snapdir: fmt.Sprintf("raftexample-%d-snap", id), getSnapshot: getSnapshot, snapCount: defaultSnapshotCount, stopc: make(chan struct{}), httpstopc: make(chan struct{}), httpdonec: make(chan struct{}), snapshotterReady: make(chan *snap.Snapshotter, 1), // rest of structure populated after WAL replay } go rc.startRaft() // 启动raft return commitC, errorC, rc.snapshotterReady }
就是实力化了一个 raftNode,然后呢,调用了 raftNode.startRaft 这个方法,那就继续跟进去:
func (rc *raftNode) startRaft() { if !fileutil.Exist(rc.snapdir) { if err := os.Mkdir(rc.snapdir, 0750); err != nil { log.Fatalf("raftexample: cannot create dir for snapshot (%v)", err) } } rc.snapshotter = snap.New(zap.NewExample(), rc.snapdir) rc.snapshotterReady <- rc.snapshotter oldwal := wal.Exist(rc.waldir) rc.wal = rc.replayWAL() rpeers := make([]raft.Peer, len(rc.peers)) for i := range rpeers { rpeers[i] = raft.Peer{ID: uint64(i + 1)} } c := &raft.Config{ ID: uint64(rc.id), ElectionTick: 10, HeartbeatTick: 1, Storage: rc.raftStorage, MaxSizePerMsg: 1024 * 1024, MaxInflightMsgs: 256, MaxUncommittedEntriesSize: 1 << 30, } // 设置 rc.node if oldwal { rc.node = raft.RestartNode(c) } else { startPeers := rpeers if rc.join { startPeers = nil } rc.node = raft.StartNode(c, startPeers) // 配置节点 } rc.transport = &rafthttp.Transport{ Logger: zap.NewExample(), ID: types.ID(rc.id), ClusterID: 0x1000, Raft: rc, ServerStats: stats.NewServerStats("", ""), LeaderStats: stats.NewLeaderStats(strconv.Itoa(rc.id)), ErrorC: make(chan error), } rc.transport.Start() for i := range rc.peers { if i+1 != rc.id { rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]}) } } go rc.serveRaft() // 启动HTTP服务 go rc.serveChannels() // 开始监听各个channel然后消费 }
可以看出来,这个方法呢,就是先检查是不是有快照,是不是有WAL日志,如果有的话,就恢复到上一个状态,如果没有的话,就新建。 然后调用 raft.RestartNode
,这里就是真正启用raft一致性协议的地方了,这里的raft就是最开始我们看的目录里的raft。这里 接下来做的事情就是启动一个transport,这嘎达呢,就跟指定的集群里其他节点通信。然后起一个循环去消费之前建立的的channel里的数据。 可以看到 rc.serveChannels
的代码:
func (rc *raftNode) serveChannels() { snap, err := rc.raftStorage.Snapshot() if err != nil { panic(err) } rc.confState = snap.Metadata.ConfState rc.snapshotIndex = snap.Metadata.Index rc.appliedIndex = snap.Metadata.Index defer rc.wal.Close() ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() // send proposals over raft go func() { confChangeCount := uint64(0) for rc.proposeC != nil && rc.confChangeC != nil { select { case prop, ok := <-rc.proposeC: log.Printf("received from rc.proposeC: prop: %+v, ok: %t", prop, ok) if !ok { rc.proposeC = nil } else { // blocks until accepted by raft state machine // 在此处,是kvstore.go里的kvstore rc.node.Propose(context.TODO(), []byte(prop)) } case cc, ok := <-rc.confChangeC: if !ok { rc.confChangeC = nil } else { confChangeCount++ cc.ID = confChangeCount rc.node.ProposeConfChange(context.TODO(), cc) } } } // client closed channel; shutdown raft if not already close(rc.stopc) }() // event loop on raft state machine updates for { select { case <-ticker.C: rc.node.Tick() // store raft entries to wal, then publish over commit channel case rd := <-rc.node.Ready(): rc.wal.Save(rd.HardState, rd.Entries) if !raft.IsEmptySnap(rd.Snapshot) { rc.saveSnap(rd.Snapshot) rc.raftStorage.ApplySnapshot(rd.Snapshot) rc.publishSnapshot(rd.Snapshot) } rc.raftStorage.Append(rd.Entries) rc.transport.Send(rd.Messages) if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok { rc.stop() return } rc.maybeTriggerSnapshot() rc.node.Advance() case err := <-rc.transport.ErrorC: rc.writeError(err) return case <-rc.stopc: rc.stop() return } } }
这段代码就比较长了,先看第一个 go func()
里的循环,就是监听最开始建立的两个channel,然后分别调用对应的接口,要注意, rc.node
的类型是 raft.Node
,这是一个接口。上面说了,实例化的时候,是调用 raft.StartNode
或者 raft.RestartNode
, 其返回结果是一个 raft.Node
,实际上代码返回的是 raft.node
,而 raft.node
实现了 raft.Node
这个接口,会不会有点晕?
所以呢,接下来我们要看看 rc.proposeC
这个channel, rc.confChangeC
我们就不看了,虽然不知道是干啥的,但是呢,从名字我们先 猜测它是用来做配置变更的(实际上就是)。看 rc.proposeC
,我们就要看这个channel在哪些地方用到了,也就是说,哪里有生产者, 哪里有消费者。搜索一下:
$ ack proposeC raftexample_test.go 29: proposeC []chan string 44: proposeC: make([]chan string, len(peers)), 51: clus.proposeC[i] = make(chan string, 1) 53: clus.commitC[i], clus.errorC[i], _ = newRaftNode(i+1, clus.peers, false, nil, clus.proposeC[i], clus.confChangeC[i]) 73: close(clus.proposeC[i]) 123: }(clus.proposeC[i], clus.commitC[i], clus.errorC[i]) 126: go func(i int) { clus.proposeC[i] <- "foo" }(i) 151: clus.proposeC[0] <- "foo" 152: clus.proposeC[0] <- "bar" kvstore.go 29: proposeC chan<- string // channel for proposing updates 40:func newKVStore(snapshotter *snap.Snapshotter, proposeC chan<- string, commitC <-chan *string, errorC <-chan error) *kvstore { 41: s := &kvstore{proposeC: proposeC, kvStore: make(map[string]string), snapshotter: snapshotter} 61: log.Printf("s.proposeC <- %s", buf.String()) 62: s.proposeC <- buf.String() raft.go 42: proposeC <-chan string // proposed messages (k,v) 80:// current), then new log entries. To shutdown, close proposeC and read errorC. 81:func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string, 88: proposeC: proposeC, 401: for rc.proposeC != nil && rc.confChangeC != nil { 403: case prop, ok := <-rc.proposeC: 404: log.Printf("received from rc.proposeC: prop: %+v, ok: %t", prop, ok) 406: rc.proposeC = nil main.go 31: proposeC := make(chan string) 32: defer close(proposeC) 39: commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC) 41: kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)
可以看出来, kvstore.go
的62行应该是生产者,而 raft.go
的403行应该是消费者。基于这种假设,我们要去验证一下,所以我加了几行日志, 那就运行一下:
$ curl -v -L http://127.0.0.1:9121/my-key -XPUT -d hello
下图是服务端的日志输出:
可以看出来,顺序是先调用了 s.proposeC <-
然后 received from rc.proposeC
,然后raft对消息进行处理,把它还原成函数调用,就是:
-
func (s *kvstore) Propose(k string, v string)
-
func (rc *raftNode) serveChannels()
-
func (n *node) Propose(ctx context.Context, data []byte) error
,这里的node是etcd/raft/node.go
里的结构体type node struct
为啥呢,我们要和raftexample里的代码对应上。继续看 raftexample/main.go
:
commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC) kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC) // the key-value http handler will propose updates to raft serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)
可以看到, main.go
最后其实是执行了 serveHttpKVAPI
,然后跳进去一看,会发现,调用了 httpKVAPI
,我们知道,go的HTTP服务 代码满足 ServeHTTP
这个接口就可以了,如果你不知道的话,说明没有读过 net/http
的代码,那么你现在知道了。所以直接跳过去看 httpKVAPI
的 ServeHTTP
这个方法,可以看到:
type httpKVAPI struct { store *kvstore confChangeC chan<- raftpb.ConfChange } func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) { key := r.RequestURI switch { case r.Method == "PUT": v, err := ioutil.ReadAll(r.Body) if err != nil { log.Printf("Failed to read on PUT (%v)\n", err) http.Error(w, "Failed on PUT", http.StatusBadRequest) return } h.store.Propose(key, string(v)) // Optimistic-- no waiting for ack from raft. Value is not yet // committed so a subsequent GET on the key may return old value w.WriteHeader(http.StatusNoContent)
h.store.Propose(key, string(v))
这里就是入口了。按照我们之前说的顺序, h.store.Propose(key, string(v))
会调用 s.proposeC <- buf.String()
, 然后这个时候就会唤醒 serveChannels
里的 case prop, ok := <-rc.proposeC
,然后调用 rc.node.Propose(context.TODO(), []byte(prop))
, 接下来就会调用 etcd/raft/node.go
里的 func (n *node) Propose(ctx context.Context, data []byte) error
这个函数。
好了,到此为止我们就知道etcd大概是怎么一个工作法,这篇博客到此结束。
接下来我们会继续探索真正的etcd里的各个细节。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 【源码阅读】AndPermission源码阅读
- 【源码阅读】Gson源码阅读
- 如何阅读Java源码 ,阅读java的真实体会
- 我的源码阅读之路:redux源码剖析
- JDK源码阅读(六):HashMap源码分析
- 如何阅读源码?
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Out of their Minds
Dennis Shasha、Cathy Lazere / Springer / 1998-07-02 / USD 16.00
This best-selling book is now available in an inexpensive softcover format. Imagine living during the Renaissance and being able to interview that eras greatest scientists about their inspirations, di......一起来看看 《Out of their Minds》 这本书的介绍吧!