内容简介:但是,我还是很想吐槽etcd的mvcc实现,有点乱,在我看来,是过度抽象了。为了理解mvcc,我们需要预先了解这些东西(下文,mvcc如无特别说明,都是指代mvcc文件夹下,etcd的mvcc实现):此处,我们预先了解bolt的一些东西,但是暂时不去探究bolt的实现,bolt的实现粗略的瞄了一眼,如果要写的话,有的写了:
MVCC 是数据库中常见的一种并发控制的方式,即保存数据的多个版本,在同一个事务里, 应用所见的版本是一致的。
但是,我还是很想吐槽etcd的mvcc实现,有点乱,在我看来,是过度抽象了。为了理解mvcc,我们需要预先了解这些东西(下文,mvcc如无特别说明,都是指代mvcc文件夹下,etcd的mvcc实现):
- mvcc底层使用 bolt 实现,bolt是一个基于B+树的KV存储。
-
kv.go
这个文件定义了大量的接口,然后接口之间又各种组合,但是其实最后 etcdserver 使用的就是ConsistentWatchableKV
这个接口。
此处,我们预先了解bolt的一些东西,但是暂时不去探究bolt的实现,bolt的实现粗略的瞄了一眼,如果要写的话,有的写了:
https://github.com/etcd-io/bbolt
- bolt的顶级是一个DB,DB里有多个bucket。在物理上,bolt使用单个文件存储。
- bolt在某一刻只允许一个 read-write 事务,但是可以同时允许多个 read-only 事务。其实就是读写锁,写只能顺序来,读可以并发读。
-
DB.Update()
是用来开启 read-write 事务的,DB.View()
则是用来开启 read-only 事务的。由于每次执行DB.Update()
都会写入一次磁盘,可以使用DB.Batch()
来进行批量操作。
store是上面所说的 ConsistentWatchableKV
的底层实现:
type store struct { ReadView WriteView // consistentIndex caches the "consistent_index" key's value. Accessed // through atomics so must be 64-bit aligned. consistentIndex uint64 // mu read locks for txns and write locks for non-txn store changes. mu sync.RWMutex ig ConsistentIndexGetter b backend.Backend kvindex index le lease.Lessor // revMuLock protects currentRev and compactMainRev. // Locked at end of write txn and released after write txn unlock lock. // Locked before locking read txn and released after locking. revMu sync.RWMutex // currentRev is the revision of the last completed transaction. currentRev int64 // compactMainRev is the main revision of the last compaction. compactMainRev int64 // bytesBuf8 is a byte slice of length 8 // to avoid a repetitive allocation in saveIndex. bytesBuf8 []byte fifoSched schedule.Scheduler stopc chan struct{} lg *zap.Logger }
可以看到其中有几个很重要的东西:
mu sync.RWMutex b backend.Backend kvindex index
接下来探究一下 Put
是怎么工作的,这样我们就可以粗略的了解 mvcc 是怎么工作的。
store 本身并没有实现 Put
方法, 但是却可以调用 Put
方法,因为在最上边,它嵌套了一个匿名的 WriteView
,从而获得了
这个方法:
type store struct { ReadView WriteView
而具体的实现则在 NewStore
这个函数里可以找到:
s.ReadView = &readView{s} s.WriteView = &writeView{s}
那我们就去看 writeView
怎么实现的:
func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) { tw := wv.kv.Write() defer tw.End() return tw.Put(key, value, lease) }
看看 wv.kv.Write
返回的是个啥嘎达(注意,wv.kv是一个符合KV这个interface的东东):
type KV interface { ReadView WriteView // Read creates a read transaction. Read() TxnRead // Write creates a write transaction. Write() TxnWrite
那我们就要去看 TxnWrite.Put
是怎么实现的:
// TxnWrite represents a transaction that can modify the store. type TxnWrite interface { TxnRead WriteView // Changes gets the changes made since opening the write txn. Changes() []mvccpb.KeyValue }
原来又是接口, Put
是在 WriteView
里定义的。所以呢,绕了一圈,我们又绕回来了,所以,etcd搞得这么复杂干啥呢。。。为了找出具体实现,我们得去 NewStore
里翻翻,具体传进去的是什么。原来传进去的就是自己啊,那就说明, type store struct
这玩意儿,肯定实现了 Write
这个方法。但是呢,
我找来找去,就是没有发现。最后我只能开启搜索大法,然后在 kvstore_txn.go
这个文件里找到了:
// 哇好绕啊,又绕到这里来了,我是佩服的 func (s *store) Write() TxnWrite { s.mu.RLock() tx := s.b.BatchTx() tx.Lock() tw := &storeTxnWrite{ storeTxnRead: storeTxnRead{s, tx, 0, 0}, tx: tx, beginRev: s.currentRev, changes: make([]mvccpb.KeyValue, 0, 4), } return newMetricsTxnWrite(tw) }
我是服气的。
接下来就要去翻 TxnWrite
的实现,也就是 storeTxnWrite
的 Put
的实现了,然后你会发现, Put
调用了 put
:
// etcdctl put foo bar最后到了这里 func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) { rev := tw.beginRev + 1 c := rev oldLease := lease.NoLease // if the key exists before, use its previous created and // get its previous leaseID _, created, ver, err := tw.s.kvindex.Get(key, rev) if err == nil { c = created.main oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)}) } ibytes := newRevBytes() // revision的bytes idxRev := revision{main: rev, sub: int64(len(tw.changes))} revToBytes(idxRev, ibytes) ver = ver + 1 kv := mvccpb.KeyValue{ Key: key, Value: value, CreateRevision: c, ModRevision: rev, Version: ver, Lease: int64(leaseID), } d, err := kv.Marshal() // kv的bytes if err != nil { if tw.storeTxnRead.s.lg != nil { tw.storeTxnRead.s.lg.Fatal( "failed to marshal mvccpb.KeyValue", zap.Error(err), ) } else { plog.Fatalf("cannot marshal event: %v", err) } } tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d) // 所以最后存储的,以revision为key,kv为value存储下来了 tw.s.kvindex.Put(key, idxRev) tw.changes = append(tw.changes, kv) if oldLease != lease.NoLease { if tw.s.le == nil { panic("no lessor to detach lease") } err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}}) if err != nil { if tw.storeTxnRead.s.lg != nil { tw.storeTxnRead.s.lg.Fatal( "failed to detach old lease from a key", zap.Error(err), ) } else { plog.Errorf("unexpected error from lease detach: %v", err) } } } if leaseID != lease.NoLease { if tw.s.le == nil { panic("no lessor to attach lease") } err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}}) if err != nil { panic("unexpected error from lease Attach") } } }
这里可以看出来,bolt里存储的KV,实际上并不是用户给出的KV。Key是revision,而Value是用户给出的KV。所以,才需要那个b树做索引, 把用户的key换成revision,然后再到bolt里,把revision换成真正的KV。
之后我们再看mvcc实现里的其他特性,例如watch是怎么实现的,实际上这玩意儿存储的时候,是有序存储的。
好了。这一节就看到这里了。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 【源码阅读】AndPermission源码阅读
- 【源码阅读】Gson源码阅读
- 如何阅读Java源码 ,阅读java的真实体会
- 我的源码阅读之路:redux源码剖析
- JDK源码阅读(六):HashMap源码分析
- 如何阅读源码?
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
JS 压缩/解压工具
在线压缩/解压 JS 代码
正则表达式在线测试
正则表达式在线测试