参考leveldb的cache实现

栏目: 数据库 · 发布时间: 6年前

内容简介:对LevelDB比较了解应该知道其中一块cache实现:LRU Cache是采用双向链表 + HashTable,其中HashTable是为了解决双向链表的查找性能缺陷才引入的。接下来看一下采用golang如何实现LRU Cache。Output:输出:

前言

对LevelDB比较了解应该知道其中一块cache实现:LRU Cache是采用双向链表 + HashTable,其中HashTable是为了解决双向链表的查找性能缺陷才引入的。接下来看一下采用golang如何实现LRU Cache。

实现

1、定义cache接口【thread-safe的cache】

import (
        "io"
)
// 参考:https://sourcegraph.com/github.com/google/leveldb/-/blob/include/leveldb/cache.h
type Cache interface{
  // 在multiple clients下共享相同的cache来划分key空间
  // 当启动是会被分配一个新id 
  NewId()  uint64 
  
  // 插入: 从 key-value到cache的映射
  // 当insert的entry不在需要时对应的key-value传给deleter处理
  Insert(key string, value interface{}, size int, deleter func(key string, value interface{})) (handle io.Closer)

  // 查找: 不存在时返回 nil,nil, false
  Lookup(key string) (value interface{}, handle io.Closer, ok bool)

  // 删除: 注意必须所有关联的handle释放才能erase key
  Erase(key string)

  // 关闭: 所有的handle必须release 才能destroy所有存在的entries
  Close()  error
}

2、LRUCache

import (
  "container/list"
  "fmt"
  "io"
  "runtime"
  "sync"
  "sync/atomic"
  "time"
)

var _Cache = (*LRUCache)(nil)

// LRU cache实现
type   LRUCache struct{
    * _LRUCache
}

type _LRUCache struct{
   mu  sync.Mutex
   //  采用双向链表  和 HashTable
   //  双向循环链表,有大小限制,保证数据的新旧,当缓存不够时,保证先清除旧的数据
   list     *list.List

   // 二级指针数组,链表没有大小限制,动态扩展大小,保证数据快速查找,hash定位一级指针,得到存放在一级指针上的二级指针链表,遍历查找数据
   table  map[string]*list.Element

   // 当前size
   // cache容量
   size         int64
   capacity  int64
   last_id     uint64
}

// handle to an entry: 
// 哈希表中的元素
type LRUHandle struct{
    c                      *LRUCache
    key                   string
    value                interface{}
    size                   int64
    deleter              func(key string, vlaue interface{})
    time_created    time.Time
    time_accessed atomic.Value
    refs                    uint32
}
//  获取key
func   (h *LRUHandle)  Key()  string{
  return h.key
}

// 获取value
func (h *LRUHandle) Value() interface{} {
    return h.value
}
// 获取size
func (h *LRUHandle) Size() int {
    return int(h.size)
}

// 获取entry创建时间
func (h *LRUHandle) TimeCreated() time.Time {
    return h.time_created
}

// 获取entry访问时间
func (h *LRUHandle) TimeAccessed() time.Time {
    return h.time_accessed.Load().(time.Time)
}

// 
func (h *LRUHandle) Retain() (handle *LRUHandle) {
    h.c.mu.Lock()
    defer h.c.mu.Unlock()
    h.c.addref(h)
    return h
}

func (h *LRUHandle) Close() error {
    h.c.mu.Lock()
    defer h.c.mu.Unlock()
    h.c.unref(h)
    return nil
}

=================华丽分割线=================
// 根据指定capacity新建一个cache
func NewLRUCache(capacity int64) *LRUCache {
    assert(capacity > 0)
    p := &_LRUCache{
        list:     list.New(),
        table:    make(map[string]*list.Element),
        capacity: capacity,
    }
    runtime.SetFinalizer(p, (*_LRUCache).Close)
    return &LRUCache{p}
}

// 清除所有已存在的entry通过deleter函数(用户自定义)
// 前提:所有的handle必须release
func (p *LRUCache) Close() error {
    runtime.SetFinalizer(p._LRUCache, nil)
    p._LRUCache.Close()
    return nil
}

// 获取cache中key对应的内容 
func (p *LRUCache) Get(key string) (value interface{}, ok bool) {
    if v, h, ok := p.Lookup(key); ok {
        h.Close()
        return v, ok
    }
    return
}

// 类似get,差异在于cache中不存在时,可交由用户自行处理返回结果同时将返回的结果存放到cache中
func (p *LRUCache) GetFrom(key string, getter func(key string) (v interface{}, size int, err error)) (value interface{}, err error) {
    if v, h, ok := p.Lookup(key); ok {
        h.Close()
        return v, nil
    }
    if getter == nil {
        return nil, fmt.Errorf("cache: %q not found!", key)
    }
    value, size, err := getter(key) // 由使用方自行处理 比如从其他存储介质获取
    if err != nil {
        return
    }
    assert(size > 0)
    p.Set(key, value, size)   // 将内容添加到cache
    return
}

// 获取value并指定默认值 当cache中不存在时返回第一个default值作为结果
func (p *LRUCache) Value(key string, defaultValue ...interface{}) interface{} {
    if v, h, ok := p.Lookup(key); ok {
        h.Close()
        return v
    }
    if len(defaultValue) > 0 {
        return defaultValue[0]
    } else {
        return nil
    }
}

// 添加key-value并指定size同时也设定deleter用于当key-value不用时执行delete
func (p *LRUCache) Set(key string, value interface{}, size int, deleter ...func(key string, value interface{})) {
    if len(deleter) > 0 {  // 给定deleter函数
        h := p.Insert(key, value, size, deleter[0])
        h.Close()
    } else {
        h := p.Insert(key, value, size, nil)
        h.Close()
    }
}

// 能够用于多clients共享相同的cache来分割key空间
// 一般在启动时会分配一个新的id给client,可将id作为key的前缀
func (p *LRUCache) NewId() uint64 {
    p.mu.Lock()
    defer p.mu.Unlock()

    p.last_id++
    return p.last_id
}

// 建立key-value到cache的映射并分配指定size
//  返回操作该映射的handle 当映射不再被需要时handle.Close会被调用
// 另外当insert的entry不再需要会触发deleter
func (p *LRUCache) Insert(key string, value interface{}, size int, deleter func(key string, value interface{})) (handle io.Closer) {
    handle = p.Insert_(key, value, size, deleter)
    return
}

// 类似insert 不过返回值=LRUHandle
func (p *LRUCache) Insert_(key string, value interface{}, size int, deleter func(key string, value interface{})) (handle *LRUHandle) {
    p.mu.Lock()
    defer p.mu.Unlock()

    assert(key != "" && size > 0)
       // 当二级指针数组中已存在对应的key
       // 需要先执行remove在table和list
      // 当缓存不够时  保证cache数据最新
    if element := p.table[key]; element != nil {
        p.list.Remove(element)  // 删除双向链表中element
        delete(p.table, key) // 删除二级索引表

        h := element.Value.(*LRUHandle) 
        p.unref(h)                     // 解除引用
    } 
        
        // 新建LRUHandle
    h := &LRUHandle{
        c:            p,
        key:          key,
        value:        value,
        size:         int64(size),
        deleter:      deleter,
        time_created: time.Now(),
        refs:         2, // One from LRUCache, one for the returned handle
    }
    h.time_accessed.Store(time.Now())
        // 放置表头 最新的数据
    element := p.list.PushFront(h)
    p.table[key] = element
    p.size += h.size
    p.checkCapacity()  // 检查是否超过capacity
    return h
}

// 查找
// 若是cache中不存在对应key的内容返回nil,nil,false
// 否则返回handle(key-value与cache的映射)
// 当mapping不存需要时 需要调用handle.Close() 
func (p *LRUCache) Lookup(key string) (value interface{}, handle io.Closer, ok bool) {
    // warning: (*LRUHandle)(nil) != (io.Closer)(nil)
    if v, h, ok := p.Lookup_(key); ok {
        return v, h, ok
    }
    return
}

// 类似Lookup 但返回值 *LRUHandle.
func (p *LRUCache) Lookup_(key string) (value interface{}, handle *LRUHandle, ok bool) {
    p.mu.Lock()
    defer p.mu.Unlock()
        // 通过二级索引数组 获取key的内容
    element := p.table[key]
    if element == nil {
        return nil, nil, false
    }
        // 通过二级索引table查找到element 再将双向链表list中的element移到表头
    p.list.MoveToFront(element)
       // 获取到对应的元素
    h := element.Value.(*LRUHandle)
    h.time_accessed.Store(time.Now())
       // 记录当前handle的引用数
    p.addref(h)
    return h.Value(), h, true
}

// 获取key对应的内容 并删除cache中双向链表和二级索引的内容
func (p *LRUCache) Take(key string) (handle io.Closer, ok bool) {
    p.mu.Lock()
    defer p.mu.Unlock()

    element := p.table[key]
    if element == nil {
        return nil, false
    }
        // 删除双向链表对应的element
    p.list.Remove(element)
       // 删除二级索引中的记录
    delete(p.table, key)

    h := element.Value.(*LRUHandle)
    return h, true
}

// 删除key : 注意需要release所有关联的handle
func (p *LRUCache) Erase(key string) {
    p.mu.Lock()
    defer p.mu.Unlock()

    element := p.table[key]
    if element == nil {
        return
    }
        // 删除list中的element和table中记录
    p.list.Remove(element)
    delete(p.table, key)
       
    h := element.Value.(*LRUHandle)
    p.unref(h)       // 释放与handle的关联
    return
}

// 设置cache的capacity
// 当capacity过小且当前cache size远超过capacity时,cache将进行收缩
func (p *LRUCache) SetCapacity(capacity int64) {
    p.mu.Lock()
    defer p.mu.Unlock()

    assert(capacity > 0)
    p.capacity = capacity
    p.checkCapacity()
}

// 返回cache的少量统计信息:
// 当前entry的数量、cache size、cache capacity、最久访问entry的时间
func (p *LRUCache) Stats() (length, size, capacity int64, oldest time.Time) {
    p.mu.Lock()
    defer p.mu.Unlock()
    if lastElem := p.list.Back(); lastElem != nil {
        oldest = lastElem.Value.(*LRUHandle).time_accessed.Load().(time.Time)
    }
    return int64(p.list.Len()), p.size, p.capacity, oldest
}

// 统计信息以json串显示
func (p *LRUCache) StatsJSON() string {
    if p == nil {
        return "{}"
    }
    l, s, c, o := p.Stats()
    return fmt.Sprintf(`{
    "Length": %v,
    "Size": %v,
    "Capacity": %v,
    "OldestAccess": "%v"
}`, l, s, c, o)
}

// cache中element的数量:
func (p *LRUCache) Length() int64 {
    p.mu.Lock()
    defer p.mu.Unlock()
    return int64(p.list.Len())
}

// 返回cache的size
func (p *LRUCache) Size() int64 {
    p.mu.Lock()
    defer p.mu.Unlock()
    return p.size
}

// 返回cache的capacity
func (p *LRUCache) Capacity() int64 {
    p.mu.Lock()
    defer p.mu.Unlock()
    return p.capacity
}

// 返回cache最新访问的element的time
// 若是cache为空 则返回IsZero()的时间
func (p *LRUCache) Newest() (newest time.Time) {
    p.mu.Lock()
    defer p.mu.Unlock()
    if frontElem := p.list.Front(); frontElem != nil {
        newest = frontElem.Value.(*LRUHandle).time_accessed.Load().(time.Time)
    }
    return
}

// 返回cache最久访问的element的time
// 若是cache为空 则返回IsZero()的时间
func (p *LRUCache) Oldest() (oldest time.Time) {
    p.mu.Lock()
    defer p.mu.Unlock()
    if lastElem := p.list.Back(); lastElem != nil {
        oldest = lastElem.Value.(*LRUHandle).time_accessed.Load().(time.Time)
    }
    return
}

// 返回cache中最新使用key集合
func (p *LRUCache) Keys() []string {
    p.mu.Lock()
    defer p.mu.Unlock()

    keys := make([]string, 0, p.list.Len())
    for e := p.list.Front(); e != nil; e = e.Next() {
        keys = append(keys, e.Value.(*LRUHandle).key)
    }
    return keys
}
// LRUHandle引用计数(增加)
func (p *_LRUCache) addref(h *LRUHandle) {
    h.refs++
}
// LRUHandle引用计数(减少)
func (p *_LRUCache) unref(h *LRUHandle) {
    assert(h.refs > 0)
    h.refs--
    if h.refs <= 0 {
        p.size -= h.size
        if h.deleter != nil {
            h.deleter(h.key, h.value)
        }
    }
}

// 检查capacity:压缩相对时间比较久的entry(剔除)
func (p *LRUCache) checkCapacity() {
        // 当cache的size大于capacity同时二级索引数组不止1个 进行shrank
       // 从双向链表的尾部开始remove对应的element
       // 同时二级索引数组对应的内容也需要delete 并减少对应的引用计数 
      // 直至size <= capacity 二级索引数组个数 = 1
    for p.size > p.capacity && len(p.table) > 1 {
        delElem := p.list.Back()
        h := delElem.Value.(*LRUHandle)
        p.list.Remove(delElem)
        delete(p.table, h.key)
        p.unref(h)
    }
}

// 清空
func (p *LRUCache) Clear() {
    p.mu.Lock()
    defer p.mu.Unlock()
        // release 二级索引数组关联的handle
    for _, element := range p.table {
        h := element.Value.(*LRUHandle)
        p.unref(h)
    }
        // cache的双向链表和二级索引数组置空
    p.list = list.New()
    p.table = make(map[string]*list.Element)
    p.size = 0
    return
}

//   cache的close
func (p *_LRUCache) Close() {
    p.mu.Lock()
    defer p.mu.Unlock()

    for _, element := range p.table {
        h := element.Value.(*LRUHandle)
        assert(h.refs == 1, "h.refs = ", h.refs)
        p.unref(h)
    }

    p.list = nil
    p.table = nil
    p.size = 0
}
至此lru cache实现完成

3、应用

package main

import (
    "fmt"
    "cache"
)

func main() {
        // 新建cache
    c := cache.NewLRUCache(100)
    defer c.Close()
        // set entry到缓存
    c.Set("key1", "value1", 1)
    value1 := c.Value("key1").(string)
    fmt.Println("key1:", value1)

    c.Set("key2", "value2", 1)
    value2 := c.Value("key2", "null").(string) // 指定默认值
    fmt.Println("key2:", value2)

    value3 := c.Value("key3", "null").(string)
    fmt.Println("key3:", value3)

    value4 := c.Value("key4") // value4 is nil
    fmt.Println("key4:", value4)

    fmt.Println("Done")
}

Output:

key1: value1
key2: value2
key3: null
key4: <nil>
Done

Non GC Object

package main

import (
    "fmt"
    "cache"
)

func main() {
    c := cache.NewLRUCache(10)
    defer c.Close()

    id0 := c.NewId()
    id1 := c.NewId()
    id2 := c.NewId()
    fmt.Println("id0:", id0)
    fmt.Println("id1:", id1)
    fmt.Println("id2:", id2)

    // add new
    v1 := "data:123"
    h1 := c.Insert("123", "data:123", len("data:123"), func(key string, value interface{}) {
        fmt.Printf("deleter(%q:%q)\n", key, value)
    })

    // fetch ok
    v2, h2, ok := c.Lookup("123")
    assert(ok)
    assert(h2 != nil)

    // remove
    c.Erase("123")

    // fetch failed
    _, h3, ok := c.Lookup("123")
    assert(!ok)
    assert(h3 == nil)

    // h1&h2 still valid!
    fmt.Printf("user1(%s)\n", v1)
    fmt.Printf("user2(%s)\n", v2.(string))

    // release h1
        // 由于 h2  handle对应值 故而deleter不能被触发
    h1.Close()

    // invoke deleter
    fmt.Println("invoke deleter(123) begin")
    h2.Close()
    fmt.Println("invoke deleter(123) end")

    // 添加
    h4 := c.Insert("abc", "data:abc", len("data:abc"), func(key string, value interface{}) {
        fmt.Printf("deleter(%q:%q)\n", key, value)
    })
    // release h4
    // 由于cache在handle对应值 故而deleter不能被触发
    h4.Close()

    // cache length
    length := c.Length()
    assert(length == 1)

    // cache size
    size := c.Size()
    assert(size == 8, "size:", size)

    // add h5
        //  超过容量10 会触发h4的deleter
    fmt.Println("invoke deleter(h4) begin")
    h5 := c.Insert("456", "data:456", len("data:456"), func(key string, value interface{}) {
        fmt.Printf("deleter(%q:%q)\n", key, value)
    })
    fmt.Println("invoke deleter(h4) end")

    // 必须释放所有的handle
    h5.Close()

    // stats
    fmt.Println("StatsJSON:", c.StatsJSON())

    // done
    fmt.Println("Done")
}

func assert(v bool, a ...interface{}) {
    if !v {
        panic(fmt.Sprint(a...))
    }
}

输出:

id0: 1
id1: 2
id2: 3
user1(data:123)
user2(data:123)
invoke deleter(123) begin
deleter("123":"data:123")
invoke deleter(123) end
invoke deleter(h4) begin
deleter("abc":"data:abc")
invoke deleter(h4) end
StatsJSON: {
        "Length": 1,
        "Size": 8,
        "Capacity": 10,
        "OldestAccess": "2015-08-21 18:00:24.0119469 +0800 CST"
}
Done
deleter("456":"data:456")

以上所述就是小编给大家介绍的《参考leveldb的cache实现》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

代码之美

代码之美

Grey Wilson / 聂雪军 / 机械工业出版社 / 2008年09月 / 99.00元

《代码之美》介绍了人类在一个奋斗领域中的创造性和灵活性:计算机系统的开发领域。在每章中的漂亮代码都是来自独特解决方案的发现,而这种发现是来源于作者超越既定边界的远见卓识,并且识别出被多数人忽视的需求以及找出令人叹为观止的问题解决方案。 《代码之美》33章,有38位作者,每位作者贡献一章。每位作者都将自己心目中对于“美丽的代码”的认识浓缩在一章当中,张力十足。38位大牛,每个人对代码之美都有自......一起来看看 《代码之美》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

SHA 加密
SHA 加密

SHA 加密工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换