内容简介:lease是租约,类似于Redis中的TTL(Time To Live)。可以看一下怎么使用lease:可以看出来,我们就是拿一个lease的ID作为凭证。那么,lease是怎么实现的呢?可以看出来,Lease在创建的时候,就会分配一个ID和设定好TTL。
lease是租约,类似于 Redis 中的TTL(Time To Live)。可以看一下怎么使用lease:
cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: dialTimeout, }) if err != nil { log.Fatal(err) } defer cli.Close() // minimum lease TTL is 5-second resp, err := cli.Grant(context.TODO(), 5) if err != nil { log.Fatal(err) } // after 5 seconds, the key 'foo' will be removed _, err = cli.Put(context.TODO(), "foo", "bar", clientv3.WithLease(resp.ID)) if err != nil { log.Fatal(err) }
可以看出来,我们就是拿一个lease的ID作为凭证。那么,lease是怎么实现的呢?
type Lease struct { ID LeaseID ttl int64 // time to live of the lease in seconds remainingTTL int64 // remaining time to live in seconds, if zero valued it is considered unset and the full ttl should be used // expiryMu protects concurrent accesses to expiry expiryMu sync.RWMutex // expiry is time when lease should expire. no expiration when expiry.IsZero() is true expiry time.Time // mu protects concurrent accesses to itemSet mu sync.RWMutex itemSet map[LeaseItem]struct{} revokec chan struct{} }
可以看出来,Lease在创建的时候,就会分配一个ID和设定好TTL。
接下来看看lessor这个接口的定义:
// Lessor owns leases. It can grant, revoke, renew and modify leases for lessee. type Lessor interface { // SetRangeDeleter lets the lessor create TxnDeletes to the store. // Lessor deletes the items in the revoked or expired lease by creating // new TxnDeletes. SetRangeDeleter(rd RangeDeleter) SetCheckpointer(cp Checkpointer) // Grant grants a lease that expires at least after TTL seconds. Grant(id LeaseID, ttl int64) (*Lease, error) // Revoke revokes a lease with given ID. The item attached to the // given lease will be removed. If the ID does not exist, an error // will be returned. Revoke(id LeaseID) error // Checkpoint applies the remainingTTL of a lease. The remainingTTL is used in Promote to set // the expiry of leases to less than the full TTL when possible. Checkpoint(id LeaseID, remainingTTL int64) error // Attach attaches given leaseItem to the lease with given LeaseID. // If the lease does not exist, an error will be returned. Attach(id LeaseID, items []LeaseItem) error // GetLease returns LeaseID for given item. // If no lease found, NoLease value will be returned. GetLease(item LeaseItem) LeaseID // Detach detaches given leaseItem from the lease with given LeaseID. // If the lease does not exist, an error will be returned. Detach(id LeaseID, items []LeaseItem) error // Promote promotes the lessor to be the primary lessor. Primary lessor manages // the expiration and renew of leases. // Newly promoted lessor renew the TTL of all lease to extend + previous TTL. Promote(extend time.Duration) // Demote demotes the lessor from being the primary lessor. Demote() // Renew renews a lease with given ID. It returns the renewed TTL. If the ID does not exist, // an error will be returned. Renew(id LeaseID) (int64, error) // Lookup gives the lease at a given lease id, if any Lookup(id LeaseID) *Lease // Leases lists all leases. Leases() []*Lease // ExpiredLeasesC returns a chan that is used to receive expired leases. ExpiredLeasesC() <-chan []*Lease // Recover recovers the lessor state from the given backend and RangeDeleter. Recover(b backend.Backend, rd RangeDeleter) // Stop stops the lessor for managing leases. The behavior of calling Stop multiple // times is undefined. Stop() }
可以看出来作为一个lessor,也就是管理lease的东东,需要实现这些接口。我们具体关注怎么完成Grant,此外,expiration是怎么做的。 所以我们找到具体的实现来看看:
// lessor implements Lessor interface. // TODO: use clockwork for testability. type lessor struct { mu sync.RWMutex // demotec is set when the lessor is the primary. // demotec will be closed if the lessor is demoted. demotec chan struct{} leaseMap map[LeaseID]*Lease leaseHeap LeaseQueue leaseCheckpointHeap LeaseQueue itemMap map[LeaseItem]LeaseID // When a lease expires, the lessor will delete the // leased range (or key) by the RangeDeleter. rd RangeDeleter // When a lease's deadline should be persisted to preserve the remaining TTL across leader // elections and restarts, the lessor will checkpoint the lease by the Checkpointer. cp Checkpointer // backend to persist leases. We only persist lease ID and expiry for now. // The leased items can be recovered by iterating all the keys in kv. b backend.Backend // minLeaseTTL is the minimum lease TTL that can be granted for a lease. Any // requests for shorter TTLs are extended to the minimum TTL. minLeaseTTL int64 expiredC chan []*Lease // stopC is a channel whose closure indicates that the lessor should be stopped. stopC chan struct{} // doneC is a channel whose closure indicates that the lessor is stopped. doneC chan struct{} lg *zap.Logger // Wait duration between lease checkpoints. checkpointInterval time.Duration }
可以看到,里边有一个 leaseMap
, 有 leaseHeap
,为什么要有两个呢?堆的特性不知道大家还记得吗?此处的 leaseHeap
实现是
一个小堆,比较的关键是Lease失效的时间:
type LeaseQueue []*LeaseWithTime func (pq LeaseQueue) Len() int { return len(pq) } func (pq LeaseQueue) Less(i, j int) bool { return pq[i].time < pq[j].time } func (pq LeaseQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] pq[i].index = i pq[j].index = j } func (pq *LeaseQueue) Push(x interface{}) { n := len(*pq) item := x.(*LeaseWithTime) item.index = n *pq = append(*pq, item) } func (pq *LeaseQueue) Pop() interface{} { old := *pq n := len(old) item := old[n-1] item.index = -1 // for safety *pq = old[0 : n-1] return item }
所以,怎么保证lease失效呢?我们每次从小堆里判断堆顶元素是否失效,失效就 Pop
就可以了。那为什么又要有 leaseMap
呢?因为
这样可以加速查找,毕竟,哈希表的时间复杂度是 O(1)
。
那么,什么时候会进行lease的失效管理呢?我们看新建lessort的地方:
func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor { return newLessor(lg, b, cfg) } func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor { checkpointInterval := cfg.CheckpointInterval if checkpointInterval == 0 { checkpointInterval = 5 * time.Minute } l := &lessor{ leaseMap: make(map[LeaseID]*Lease), itemMap: make(map[LeaseItem]LeaseID), leaseHeap: make(LeaseQueue, 0), leaseCheckpointHeap: make(LeaseQueue, 0), b: b, minLeaseTTL: cfg.MinLeaseTTL, checkpointInterval: checkpointInterval, // expiredC is a small buffered chan to avoid unnecessary blocking. expiredC: make(chan []*Lease, 16), stopC: make(chan struct{}), doneC: make(chan struct{}), lg: lg, } l.initAndRecover() go l.runLoop() return l }
倒数第二行, go l.runLoop()
,跟进去看:
func (le *lessor) runLoop() { defer close(le.doneC) for { le.revokeExpiredLeases() le.checkpointScheduledLeases() select { case <-time.After(500 * time.Millisecond): case <-le.stopC: return } } }
每500毫秒会进行一次循环,检查失效的lease然后传递到 expiredC
这个channel里。但是 type lessor struct
这个结构体并没有
对其进行处理,估计是调用者负责处理,所以我搜索了一下是不是有地方处理:
$ ack -Q 'ExpiredLeasesC()' lease/lessor.go 126: ExpiredLeasesC() <-chan []*Lease 559:func (le *lessor) ExpiredLeasesC() <-chan []*Lease { 906:func (fl *FakeLessor) ExpiredLeasesC() <-chan []*Lease { return nil } lease/lessor_test.go 380: case el := <-le.ExpiredLeasesC(): 433: case el := <-le.ExpiredLeasesC(): etcdserver/server.go 1013: expiredLeaseC = s.lessor.ExpiredLeasesC()
果然, etcdserver/server.go
作为调用者对它进行处理。
最后,需要提到的是,lease也会进行持久化的,并且新建lessort的时候会优先看是否能从已有持久化的文件中恢复。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- 【源码阅读】AndPermission源码阅读
- 【源码阅读】Gson源码阅读
- 如何阅读Java源码 ,阅读java的真实体会
- 我的源码阅读之路:redux源码剖析
- JDK源码阅读(六):HashMap源码分析
- 如何阅读源码?
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Professional JavaScript for Web Developers
Nicholas C. Zakas / Wrox / 2009-1-14 / USD 49.99
This eagerly anticipated update to the breakout book on JavaScript offers you an in-depth look at the numerous advances to the techniques and technology of the JavaScript language. You'll see why Java......一起来看看 《Professional JavaScript for Web Developers》 这本书的介绍吧!