内容简介:LevelDB 源码分析(七):Skiplist
LevelDB 中的 memtable 只是一个封装类,它的底层实现是一个跳表。跳表是一种基于随机数的平衡数据结构,其他的平衡数据结构还有红黑树、AVL树,但跳表的原理比它们简单很多。跳表有点像链表,只不过每个节点是多层结构,通过在每个节点中增加向前的指针提高查找效率。
在 /leveldb/db
文件夹下有跳表的实现 skiplist.h
和跳表的测试程序 skiplist_test.cc
。
LevelDB 的 skiplist 是一个模板类,key 是跳表每个节点存储的信息类,跳表是一个顺序结构,comparator 是跳表 key 的比较器。
Skiplist 的写操作需要外部的同步信号,最有可能就是利用 mutex。读操作需要保证在读的过程中skiplist不会被销毁。除此之外,读操作不需要任何的内部锁或者同步信号。
分配的节点直到 skiplist 销毁的时候才会 delete 掉,是通过代码层面来保证这一点的,因为代码中没有 delete skiplist 节点的操作。
当一个节点连接到 skiplist 上后,除了 prev 和 next,节点上的其他内容都不能改变了。
template<typename Key, class Comparator>
class SkipList {
private:
struct Node;
public:
// 创建一个skiplist。cmp是比较函数。利用arena分配内存
explicit SkipList(Comparator cmp, Arena* arena);
// 插入新值到skiplist中
// REQUIRES: nothing that compares equal to key is currently in the list.
void Insert(const Key& key);
// 判断key是否存在
bool Contains(const Key& key) const;
// skiplist的迭代器
class Iterator {
public:
// 在特定的skiplist上初始化一个迭代器
// The returned iterator is not valid.
explicit Iterator(const SkipList* list);
// 迭代器是否定位在一个有效的节点上
bool Valid() const;
// 返回当前位置的key
const Key& key() const;
// 进行到下一个位置
void Next();
// 进行到前一个位置
void Prev();
// 进行到 key >= target的第一个位置
void Seek(const Key& target);
// 定位到头节点
void SeekToFirst();
// 定位到尾节点
void SeekToLast();
private:
const SkipList* list_;
Node* node_;
// Intentionally copyable
};
private:
enum { kMaxHeight = 12 }; // skiplist的最大层数,新增节点的随机层数不能大于此值
Comparator const compare_; // key值的比较函数,一旦初始化就不能变化了(当插入一些数据后,改变key,状态不可控)
Arena* const arena_; // Arena 用来给结点分配内存
Node* const head_; // skiplist的头结点
// 只会被 Insert() 修改。Read racily by readers, but stale values are ok.
// AtomicPointer是leveldb定义的一个原子指针,
// 它的读取和写入都设立了内存屏障,保证读取的值是即时的、最新的。
// 这里直接将int型转化为指针保存,因为不会对其取地址。
port::AtomicPointer max_height_; // 当前skiplist的最大层数
// 返回Skiplist的层数
inline int GetMaxHeight() const {
return static_cast<int>(reinterpret_cast<intptr_t>(max_height_.NoBarrier_Load()));
}
// 产生随机的level层数,只有执行Insert()时才会被调用
Random rnd_;
Node* NewNode(const Key& key, int height);// 新建一个level=height,值为key的节点
int RandomHeight(); // 随机产生一个level层数
// 比较两个key是否相等
bool Equal(const Key& a, const Key& b) const { return (compare_(a, b) == 0); }
// key值是否比节点n的key值大
bool KeyIsAfterNode(const Key& key, Node* n) const;
// If prev is non-NULL, fills prev[level] with pointer to previous node at "level" for every level in [0..max_height_-1].
// 找到key对应的Node或是key后面紧邻的Node
Node* FindGreaterOrEqual(const Key& key, Node** prev) const;
// 找到key前面紧邻的Node,如果skiplist为空,则返回头节点
Node* FindLessThan(const Key& key) const;
// Skiplist最后一个Node,如果skiplist为空,则返回头节点
Node* FindLast() const;
// 拷贝构造和赋值构造操作不允许
SkipList(const SkipList&);
void operator=(const SkipList&);
};
具体的实现如下:
template<typename Key, class Comparator>
struct SkipList<Key,Comparator>::Node { // Skiplist节点Node定义
explicit Node(const Key& k) : key(k) { }
Key const key;
// Accessors/mutators for links. Wrapped in methods so we can
// add the appropriate barriers as necessary.
// 返回本节点第n层的下一个节点指针
Node* Next(int n) {
assert(n >= 0);
// Use an 'acquire load' so that we observe a fully initialized
// version of the returned Node.
return reinterpret_cast<Node*>(next_[n].Acquire_Load());
}
// 重设n层的next指针
void SetNext(int n, Node* x) {
assert(n >= 0);
// Use a 'release store' so that anybody who reads through this pointer observes a fully initialized version of the inserted node.
next_[n].Release_Store(x);
}
// No-barrier variants that can be safely used in a few locations.
Node* NoBarrier_Next(int n) {
assert(n >= 0);
return reinterpret_cast<Node*>(next_[n].NoBarrier_Load());
}
void NoBarrier_SetNext(int n, Node* x) {
assert(n >= 0);
next_[n].NoBarrier_Store(x);
}
private:
// 数组的长度等于节点的层数。next_[0] 表示最低的一层。
port::AtomicPointer next_[1];
};
//建一个Node节点(指定key及level层数)
template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::NewNode(const Key& key, int height) {
char* mem = arena_->AllocateAligned(sizeof(Node) + sizeof(port::AtomicPointer) * (height - 1));
return new (mem) Node(key);
}
template<typename Key, class Comparator>
inline SkipList<Key,Comparator>::Iterator::Iterator(const SkipList* list) {
list_ = list;
node_ = NULL;
}
template<typename Key, class Comparator>
inline bool SkipList<Key,Comparator>::Iterator::Valid() const {
return node_ != NULL;
}
template<typename Key, class Comparator>
inline const Key& SkipList<Key,Comparator>::Iterator::key() const {
assert(Valid());
return node_->key;
}
template<typename Key, class Comparator>
inline void SkipList<Key,Comparator>::Iterator::Next() {
assert(Valid());
node_ = node_->Next(0);
}
template<typename Key, class Comparator>
inline void SkipList<Key,Comparator>::Iterator::Prev() {
// 不需要使用 prev 链接,只要寻找 key 的前一个节点就行了
assert(Valid());
node_ = list_->FindLessThan(node_->key);
if (node_ == list_->head_) {
node_ = NULL;
}
}
template<typename Key, class Comparator>
inline void SkipList<Key,Comparator>::Iterator::Seek(const Key& target) {
node_ = list_->FindGreaterOrEqual(target, NULL);
}
template<typename Key, class Comparator>
inline void SkipList<Key,Comparator>::Iterator::SeekToFirst() {
node_ = list_->head_->Next(0);
}
template<typename Key, class Comparator>
inline void SkipList<Key,Comparator>::Iterator::SeekToLast() {
node_ = list_->FindLast();
if (node_ == list_->head_) {
node_ = NULL;
}
}
// 返回一个随机高度
template<typename Key, class Comparator>
int SkipList<Key,Comparator>::RandomHeight() {
// Increase height with probability 1 in kBranching
static const unsigned int kBranching = 4;
int height = 1;
while (height < kMaxHeight && ((rnd_.Next() % kBranching) == 0)) {
height++;
}
assert(height > 0);
assert(height <= kMaxHeight);
return height;
}
// key值是否比节点n的值大。如果节点n是NULL,则值表示无穷大
template<typename Key, class Comparator>
bool SkipList<Key,Comparator>::KeyIsAfterNode(const Key& key, Node* n) const {
return (n != NULL) && (compare_(n->key, key) < 0);
}
// 函数返回的前向节点指针数组是为了向跳表插入节点时用的
template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindGreaterOrEqual(const Key& key, Node** prev)const {
Node* x = head_;
int level = GetMaxHeight() - 1; // 首先获得跳表的最高层数,减一是数组next最大下标
while (true) {
Node* next = x->Next(level);
if (KeyIsAfterNode(key, next)) { // 如果key在本节点node之后,继续前进
x = next; // 继续找
} else {
// 如果小于本节点node,把本节点的level层上的前节点指针记录进数组prev中,并跳向第一层的链表,
// 重复上述过程,直至来到最底层
if (prev != NULL)
prev[level] = x; // 当前level上,x为高度>=key节点高度,且正好排在其前面,插入和删除时使用
if (level == 0) {
return next;
} else {
level--;
}
}
}
}
template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node*
SkipList<Key,Comparator>::FindLessThan(const Key& key) const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
assert(x == head_ || compare_(x->key, key) < 0);
Node* next = x->Next(level);
if (next == NULL || compare_(next->key, key) >= 0) { // 从最高level尽可能向后移动更远的距离
if (level == 0) { // 后面key>查找的key时,或next为空时,level--,直到level=0
return x;
} else {
level--; // 从最高层往下后续查找
}
} else {
x = next;
}
}
}
template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindLast()const {
//从最高level走到头,然后减少level继续走到头,一直到level=0
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
Node* next = x->Next(level);
if (next == NULL) {
if (level == 0) {
return x;
} else {
level--;
}
} else {
x = next;
}
}
}
template<typename Key, class Comparator>
SkipList<Key,Comparator>::SkipList(Comparator cmp, Arena* arena):compare_(cmp),arena_(arena),
head_(NewNode(0,kMaxHeight)),
max_height_(reinterpret_cast<void*>(1)),
rnd_(0xdeadbeef) {
for (int i = 0; i < kMaxHeight; i++){
head_->SetNext(i, NULL);
}
}
template<typename Key, class Comparator>
void SkipList<Key,Comparator>::Insert(const Key& key) {
// TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
// here since Insert() is externally synchronized.
Node* prev[kMaxHeight];
Node* x = FindGreaterOrEqual(key, prev); //查找待插入的位置
// 不允许插入相同的数据
assert(x == NULL || !Equal(key, x->key));
int height = RandomHeight();
if (height > GetMaxHeight()) {
for (int i = GetMaxHeight(); i < height; i++) {
prev[i] = head_;
}
//fprintf(stderr, "Change height from %d to %d\n", max_height_, height);
// 并发的读操作可以在没有同步的情况下修改 max_height_。
// 如果一个并发的读要观察 max_height_ 的新值,会看到两种情况中的一种:
// (1)来自head_的新level指针的旧值,也就是NULL
// (2)或者是下面的循环设置的新值
// 在第一种情况中,读操作会直接下降到下一层,因为NULL排在所有key的后面
// 在第二种情况中,读操作会使用这个新的节点
max_height_.NoBarrier_Store(reinterpret_cast<void*>(height));
}
x = NewNode(key, height);
for (int i = 0; i < height; i++) {
// NoBarrier_SetNext() suffices since we will add a barrier when we publish a pointer to "x" in prev[i].
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
prev[i]->SetNext(i, x);
}
}
template<typename Key, class Comparator>
bool SkipList<Key,Comparator>::Contains(const Key& key) const {
Node* x = FindGreaterOrEqual(key, NULL);
if (x != NULL && Equal(key, x->key)) {
return true;
} else {
return false;
}
}
1.在 RandomHeight()
中 kBranching
和 (rnd_.Next() % kBranching
使得上层节点的数量约为下层的1/4。那么,当设定 MaxHeight=12 时,根节点为1时,约可均匀容纳 Key 的数量为 4^11= 4194304(约为400W)。当单独增大 MaxHeight 时,并不会使得 SkipList 的层级提升。MaxHeight=12为经验值,在百万数据规模时,尤为适用。
2.读值本身并不会改变SkipList的结构,因此多个读之间不存在并发问题。而当读、写同时存在时,SkipList 通过 AtomicPointer(原子指针)及结构调整上的小技巧达到“无锁”并发。一个节点一旦被添加到SkipList中,其层级结构将不再发生变化,Node中的唯一成员 port::AtomicPointer next_[1]
大小不会再发生改变。 port::AtomicPointer next_[1]
用于占位,实际的数组大小和本节点的 Height 一致。
3.跳表实际上是类似一种多层的有序链表,高层的链表比底层的链表节点更少,在更高层的链表上能更快的遍历完整个链表,跳到更底层的链表更利于精确的定位,以上便是skiplist利用空间换取时间的方法精髓。想首先从跳表头结点的最高层开始遍历,key值大于节点key值,则前往同一层的下一个节点,否则跳到节点的低一层并记录上一层的最后访问的节点,直到来到第一层(最底层)。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 以太坊源码分析(36)ethdb源码分析
- [源码分析] kubelet源码分析(一)之 NewKubeletCommand
- libmodbus源码分析(3)从机(服务端)功能源码分析
- [源码分析] nfs-client-provisioner源码分析
- [源码分析] kubelet源码分析(三)之 Pod的创建
- Spring事务源码分析专题(一)JdbcTemplate使用及源码分析
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Developer's Guide to Social Programming
Mark D. Hawker / Addison-Wesley Professional / 2010-8-25 / USD 39.99
In The Developer's Guide to Social Programming, Mark Hawker shows developers how to build applications that integrate with the major social networking sites. Unlike competitive books that focus on a s......一起来看看 《Developer's Guide to Social Programming》 这本书的介绍吧!