内容简介:本文是使用 golang 实现 redis 系列的第五篇, 将介绍如何使用跳表实现有序集合(SortedSet)的相关功能。跳表(skiplist) 是 Redis 中 SortedSet 数据结构的底层实现, 跳表优秀的范围查找能力为本文完整源代码在Github
本文是使用 golang 实现 redis 系列的第五篇, 将介绍如何使用跳表实现有序集合(SortedSet)的相关功能。
跳表(skiplist) 是 Redis 中 SortedSet 数据结构的底层实现, 跳表优秀的范围查找能力为 ZRange
和 ZRangeByScore
等命令提供了支持。
本文完整源代码在Github HDT3213/godis
结构定义
实现 ZRange
命令最简单的数据结构是有序链表:
在有序链表上实现 ZRange key start end
命令需要进行 end
次查询, 即时间复杂度为 O(n)
跳表的优化思路是添加上层链表,上层链表中会跳过一些节点。如图所示:
在有两层的跳表中,搜索的时间复杂度降低为了 O(n / 2) 。以此类推在有 log2(n) 层的跳表中,搜索元素的时间复杂度为 O(log n) 。
了解数据结构之后,可以定义相关的类型了:
// 对外的元素抽象 type Element struct { Member string Score float64 } type Node struct { Element // 元素的名称和 score backward *Node // 后向指针 level []*Level // 前向指针, level[0] 为最下层 } // 节点中每一层的抽象 type Level struct { forward *Node // 指向同层中的下一个节点 span int64 // 到 forward 跳过的节点数 } // 跳表的定义 type skiplist struct { header *Node tail *Node length int64 level int16 }
用一张图来表示一下:
查找节点
有了上文的描述查找节点的逻辑不难实现, 以 RangeByRank 的核心逻辑为例:
// 寻找排名为 rank 的节点, rank 从1开始 func (skiplist *skiplist) getByRank(rank int64)*Node { var i int64 = 0 n := skiplist.header // 从顶层向下查询 for level := skiplist.level - 1; level >= 0; level-- { // 从当前层向前搜索 // 若当前层的下一个节点已经超过目标 (i+n.level[level].span > rank),则结束当前层搜索进入下一层 for n.level[level].forward != nil && (i+n.level[level].span) <= rank { i += n.level[level].span n = n.level[level].forward } if i == rank { return n } } return nil }
ZRangeByScore
命令需要 getFirstInScoreRange
函数找到分数范围内第一个节点:
func (skiplist *skiplist) getFirstInScoreRange(min *ScoreBorder, max *ScoreBorder) *Node { // 判断跳表和范围是否有交集,若无交集提早返回 if !skiplist.hasInRange(min, max) { return nil } n := skiplist.header // 从顶层向下查询 for level := skiplist.level - 1; level >= 0; level-- { // 若 forward 节点仍未进入范围则继续向前(forward) // 若 forward 节点已进入范围,当 level > 0 时 forward 节点不能保证是 *第一个* 在 min 范围内的节点, 因此需进入下一层查找 for n.level[level].forward != nil && !min.less(n.level[level].forward.Score) { n = n.level[level].forward } } // 当从外层循环退出时 level=0 (最下层), n.level[0].forward 一定是 min 范围内的第一个节点 n = n.level[0].forward if !max.greater(n.Score) { return nil } return n }
插入节点
插入节点的操作比较多,我们以注释的方式进行说明:
func (skiplist *skiplist)insert(member string, score float64)*Node { // 寻找新节点的先驱节点,它们的 forward 将指向新节点 // 因为每层都有一个 forward 指针, 所以每层都会对应一个先驱节点 // 找到这些先驱节点并保存在 update 数组中 update := make([]*Node, maxLevel) rank := make([]int64, maxLevel) // 保存各层先驱节点的排名,用于计算span node := skiplist.header for i := skiplist.level - 1; i >= 0; i-- { // 从上层向下寻找 // 初始化 rank if i == skiplist.level - 1 { rank[i] = 0 } else { rank[i] = rank[i + 1] } if node.level[i] != nil { // 遍历搜索 for node.level[i].forward != nil && (node.level[i].forward.Score < score || (node.level[i].forward.Score == score && node.level[i].forward.Member < member)) { // same score, different key rank[i] += node.level[i].span node = node.level[i].forward } } update[i] = node } level := randomLevel() // 随机决定新节点的层数 // 可能需要创建新的层 if level > skiplist.level { for i := skiplist.level; i < level; i++ { rank[i] = 0 update[i] = skiplist.header update[i].level[i].span = skiplist.length } skiplist.level = level } // 创建新节点并插入跳表 node = makeNode(level, score, member) for i := int16(0); i < level; i++ { // 新节点的 forward 指向先驱节点的 forward node.level[i].forward = update[i].level[i].forward // 先驱节点的 forward 指向新节点 update[i].level[i].forward = node // 计算先驱节点和新节点的 span node.level[i].span = update[i].level[i].span - (rank[0] - rank[i]) update[i].level[i].span = (rank[0] - rank[i]) + 1 } // 新节点可能不会包含所有层 // 对于没有层,先驱节点的 span 会加1 (后面插入了新节点导致span+1) for i := level; i < skiplist.level; i++ { update[i].level[i].span++ } // 更新后向指针 if update[0] == skiplist.header { node.backward = nil } else { node.backward = update[0] } if node.level[0].forward != nil { node.level[0].forward.backward = node } else { skiplist.tail = node } skiplist.length++ return node }
randomLevel 用于随机决定新节点包含的层数,随机结果出现2的概率是出现1的25%, 出现3的概率是出现2的25%:
func randomLevel() int16 { level := int16(1) for float32(rand.Int31()&0xFFFF) < (0.25 * 0xFFFF) { level++ } if level < maxLevel { return level } return maxLevel }
删除节点
删除节点的思路与插入节点基本一致:
// 删除操作可能一次删除多个节点 func (skiplist *skiplist) RemoveRangeByRank(start int64, stop int64)(removed []*Element) { var i int64 = 0 // 当前指针的排名 update := make([]*Node, maxLevel) removed = make([]*Element, 0) // 从顶层向下寻找目标的先驱节点 node := skiplist.header for level := skiplist.level - 1; level >= 0; level-- { for node.level[level].forward != nil && (i+node.level[level].span) < start { i += node.level[level].span node = node.level[level].forward } update[level] = node } i++ node = node.level[0].forward // node 是目标范围内第一个节点 // 删除范围内的所有节点 for node != nil && i < stop { next := node.level[0].forward removedElement := node.Element removed = append(removed, &removedElement) skiplist.removeNode(node, update) node = next i++ } return removed }
接下来分析一下执行具体节点删除操作的removeNode函数:
// 传入目标节点和删除后的先驱节点 // 在批量删除时我们传入的 update 数组是相同的 func (skiplist *skiplist) removeNode(node *Node, update []*Node) { for i := int16(0); i < skiplist.level; i++ { // 如果先驱节点的forward指针指向了目标节点,则需要修改先驱的forward指针跳过要删除的目标节点 // 同时更新先驱的 span if update[i].level[i].forward == node { update[i].level[i].span += node.level[i].span - 1 update[i].level[i].forward = node.level[i].forward } else { update[i].level[i].span-- } } // 修改目标节点后继节点的backward指针 if node.level[0].forward != nil { node.level[0].forward.backward = node.backward } else { skiplist.tail = node.backward } // 必要时删除空白的层 for skiplist.level > 1 && skiplist.header.level[skiplist.level-1].forward == nil { skiplist.level-- } skiplist.length-- }
以上所述就是小编给大家介绍的《Golang 实现 Redis(5): 用跳表实现SortedSet》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- php如何实现session,自己实现session,laravel如何实现session
- AOP如何实现及实现原理
- webpack 实现 HMR 及其实现原理
- Docker实现原理之 - OverlayFS实现原理
- 为什么实现 .NET 的 ICollection 集合时需要实现 SyncRoot 属性?如何正确实现这个属性?
- 自己实现集合框架(十):顺序栈的实现
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。