内容简介:Codis是基于proxy架构的redis集群方案,如图1所示,即客户端的请求会先发送到proxy,由proxy做sharding后转发到后端redis实例。这个sharding的规则(常称之为路由表、转发表、slot表等)保存在集中化的组件(比如zookeeper、文件系统等)上,然后由Dashboard统一配置到所有Proxy上。相比而言,redis自己的集群方案redis-cluster则是无中心化的架构,如图2所示,它没有集中化的控制组件和proxy,客户端可以向集群内的任意一台节点发送请求,然后根
概述
Codis是基于proxy架构的 redis 集群方案,如图1所示,即客户端的请求会先发送到proxy,由proxy做sharding后转发到后端redis实例。这个sharding的规则(常称之为路由表、转发表、slot表等)保存在集中化的组件(比如zookeeper、文件系统等)上,然后由Dashboard统一配置到所有Proxy上。相比而言,redis自己的集群方案redis-cluster则是无中心化的架构,如图2所示,它没有集中化的控制组件和proxy,客户端可以向集群内的任意一台节点发送请求,然后根据节点的返回值做重定向(MOVE或ASK)操作,客户端本地也会缓存slot表,并根据每次的重定向信息来更新这个表。由于没有中心化组件存储或配置路由表,因此redis-cluster使用gossip在集群间同步路由表和集群拓补信息,在经过一段时间时候,理想情况下集群中每个节点都掌握了整个集群的路由信息。
图1 Codis架构图
图2 redis-cluster
对nosql数据库而言,水平扩缩容(Scale in/out)是一项基本的能力。Scale in/out是指可以动态的添加或删除集群中的节点,来水平扩展或收缩集群容量和CPU算力,它和纵向扩缩容(Scale up/down)是相对的。由于nosql是没有schema的,一般都是简单的kv结构(或者是kkv结构),因此做Scale in/out还是相对而言比较容易的。因为key是按照slot为单位进行sharding的(常见公式有:crc16(key) % slot_num,如图3 ),因此只要将一个实例上的某些slots迁移到其它节点上,再把路由表(即slot和node的映射关系)更新即可。虽然Codis和redis-cluster都支持这种slot迁移的Scale in/out,但是他们的实现方式还是有诸多区别的,接下来本文会阐述它们的不同。
图3 key-slot-node映射关系
Slot迁移难点
将一个redis上指定slot下的所有key迁移到其他redis上并不麻烦。其实只要两步,第一步先获取这个slot下所有key,然后对每个key发送迁移命令即可。由于redis本身没有slot的概念,更不维护key与slot的映射关系,因此第一步是需要改造redis引擎,使其可以维护key与slot的映射关系,这一点redis-cluster和Codis都是这么做的(比如使用一个单独的dict数组来维护这种索引关系,每个数组的下标就是slot num,每个数组元素是一个dick,里面存放的是<key、crc pair)。第二步发送就比较简单了,redis原生支持对一些key进行迁移的命令:MIGRATE,如下:
MIGRATE host port "" dbid timeout [COPY | REPLACE | AUTH password] KEYS key1 key2 ... keyN
redis-cluster的确就是直接使用MIGRATE 命令进行key的迁移,但是这个命令是同步阻塞的,鉴于redis单线程的特性,当MIGRATE耗时太久(比如网络较慢、迁移bigkey)时会导致主线程无法处理用户请求,从而导致用户RT变大甚至超时。因此,直接使用MIGRATE命令虽然方便,但是有诸多限制。Codis自己修改了redis引擎,加入了slots同步迁移和异步迁移的功能(同步迁移比较简单,本文不再赘述)。
因此,要想做到平滑的、用户基本无感的scale in/out,slot迁移需要解决以下几个难点:
- 不能使用同步阻塞的迁移方式,否则对于bigkey或者慢网络迁移会阻塞主线程正常服务
- 对bigkey的迁移需要特殊处理,否则在bigkey的序列化、发送、反序列化时都可能导致源redis实例和目标redis实例主线程阻塞
- 单个key的迁移过程需要保证原子性,即要么一个key全部迁移成功,要么全部迁移失败,不存在中间状态
- 对迁移中的slot的读写处理,即一个slot如果正处于迁移过程中,其中的key(已迁移走、迁移中或等待迁移)是否可以被正常读写
Redis-Cluster实现
图4 redis-cluster slot迁移
slot分配
如图4所述,redis-cluster为了支持slot迁移,改造引擎加入了key和slot的映射关系。redis-cluster使用rax树来维护这个关系,因此在新建集群、集群扩缩容的时候,都会涉及到slot分配、删除等操作,这些操作主要通过以下命令实现:
- cluster addslots <slot> [slot ...] 将一个或多个槽(slot)指派(assign)给当前节点。
- cluster delslots <slot> [slot ...] 移除一个或多个槽对当前节点的指派。
- cluster flushslots 移除指派给当前节点的所有槽,让当前节点变成一个没有指派任何槽的节点。
- cluster setslot <slot> node <node_id> 将槽 slot 指派给 node_id 指定的节点,如果槽已经指派给另一个节点,那么先让另一个节点删除该槽>,然后再进行指派。
key-slot操作
一旦映射关系建立好,接下来就可以执行key相关的slot命令,redis-cluster提供了以下几个命令:
- cluster slot <key> 计算键 key 应该被放置在哪个槽上。
- cluster countkeysinslot <slot> 返回槽 slot 目前包含的键值对数量。
- cluster getkeysinslot <slot> <count> 返回 count 个 slot 槽中的键。
slot迁移流程
redis-cluster在迁移一个slot的时候具体流程如下:
- 对目标节点发送 cluster setslot <slot> importing <sourceNodeId> 命令,让目标节点准备导入槽的数据。
- 对源节点发送 cluster setslot <slot> migrating <targetNodeId> 命令,让源节点准备迁出槽的数据。
- 源节点循环执行 cluster getkeysinslot <slot> <count> 命令,获取count个属于槽slot的键。
- 在源节点上执行 migrate <targetIp> <targetPort> "" 0 <timeout> keys <keys...> 命令,把获取的键通过流水线(pipeline)机制批量迁移到目标节点。
- 重复执行步骤3和步骤4直到槽下所有的键值数据迁移到目标节点。
- 向集群内所有主节点发送cluster setslot <slot> node <targetNodeId>命令,通知槽分配给目标节点。为了保证槽节点映射变更及时传播,需要遍历发送给所有主节点更新被迁移的槽指向新节点。
如果中途想取消一个迁移,可以向节点发送 cluster setslot <slot> stable 取消对槽 slot 的导入(import)或者迁移(migrate)状态。
key迁移的原子性:
由于migrate命令是同步阻塞的(同步发送并同步接收),迁移过程会阻塞该引擎上的所有key的读写,只有在迁移响应成功之后才会将本地key删除,因此迁移是原子的。
迁移中的读写冲突:
因为MIGRATE命令是同步阻塞的,因此不会存在一个key正在被迁移又同时被读写的情况,但是由于一个slot下可能有部分key被迁移完成,部分key正在等待迁移的情况,为此如果读写的一个key所属的slot正在被迁移,redis-cluster做如下处理:
- 客户端根据本地slots缓存发送命令到源节点,如果存在键对象则直接执行并返回结果给客户端。
- 如果键对象不存在,但是key所在的slot属于本节点,则可能存在于目标节点,这时源节点会回复ASK重定向异常。格式如下:(error)ASK <slot> <targetIP>:<targetPort>。
- 客户端从ASK重定向异常提取出目标节点信息,发送asking命令到目标节点打开客户端连接标识,再执行键命令。如果存在则执行,不存在则返回不存在信息
- 如果key所在的slot不属于本节点,则返回MOVE重定向。格式如下:(error)MOVED <slot> <targetIP>:<targetPort>。
总结
redis-cluster让redis集群化,Scale能力拓展了分布式的灵活性。但是也给redis带来了一些限制,其实这些限制也是其他redis集群方案基本都有的。比如,由于redis追求简单、高性能,并不支持跨节点(分布式)事务,因此一些涉及到可能跨节点的操作都将被限制,主要有:
- key批量操作支持有限。如mset、mget,目前只支持具有相同slot值的key执行批量操作。对于映射为不同slot值的key由于执行mget、mget等操作可能存在于多个节点上因此不被支持。
- key事务操作支持有限。同理只支持多key在同一节点上的事务操作,当多个key分布在不同的节点上时无法使用事务功能。
- key作为数据分区的最小粒度,因此不能将一个大的键值对象如hash、list等映射到不同的节点。
- 不支持多数据库空间。单机下的Redis可以支持16个数据库,集群模式下只能使用一个数据库空间,即db 0。
- 复制结构只支持一层,从节点只能复制主节点,不支持嵌套树状复制结构。
- migrate是同步阻塞迁移,对于bigkey迁移会导致引擎阻塞,从而影响对该引擎的所有key的读写。
Codis实现
slot分配
和redis-cluster不同,codis的redis上不会维护slot表信息,每个redis都默认自己负责1024个slot,slot表是维护在Dashboard并被Proxy感知的,这一点算是Codis的架构一个较大的特点。
key相关命令:
Codis只提供了一个key相关的slot命令:slotshashkey [key1 key2...] , 获取key所对应的hashslot。
迁移过程:
- Dashboard制定迁移计划,并主动发起迁移
- 期间Proxy也可以发起被动迁移(对于同步迁移而言)
- Dashboard向目标redis循环发送 slotsmgrttagslot-async $host $port $timeout $maxbulks $maxbytes $slot $numkeys 命令开启异步迁移(每次随机迁移最多numkeys个key,不指定默认为100)
- 直到被迁移的slot内所有的key已经被迁移成功,则迁移结束
具体流程可见图5。
图5 coids slot迁移流程
key迁移的原子性:
由于codis使用异步迁移slotsmgrttagslot-async命令,因此无法像redis-cluster那样利用MIGRATE命令同步阻塞的特性保证key迁移的原子性。为此,Codis做了以下手段来保证key的原子性:
- 迁移中的key为只读状态,对于写命令则返回TRAGIN错误友Proxy进行重试
- 对于bigkey进行拆分迁移,每个拆分指令会在目标redis上设置临时TTL(迁移完成再修正),如果中途迁移失败,那么最终目标redis上的key会被删除
- 只有key整个迁移成功正确收到响应,才会将本地key删除
迁移中的读写冲突:
和redis-cluster同步迁移不同,Codis由于使用异步迁移,因此一个正处于迁移状态的key(即key已经被发送或者被部分发送,还没有得到最终响应)是可能被用户继续读写的,为此除了像redis-cluster那样要考虑迁移中的slot,Codis还需要考虑迁移中的key的读写冲突处理。
对于一个读写请求,如果key所在的slot正在被迁移 ,proxy会使用slotsmgrt-exec-wrapper $hashkey $command [$arg1 ...] 命令对原始请求进行包装一下再发送给redis,如果原始命令是读操作则可以正常响应,如果是写操作则redis返回TRYAGIN错误,由Prxoy进行重试。如果key已经迁移走,则引擎返回MOVED错误,Proxy需要更新路由表,具体过程如图6所示。
图6 codis对迁移中的key的读写处理
同步迁移与异步迁移
本文将详细描述同步迁移和异步迁移的实现原理。
同步迁移
图7所示的就是同步迁移的流程,源端会将key进行序列化,然后使用socket将数据发送到目标redis(其实就是调用restore命令),目标redis收到restore命令后会对key进行反序列化,存储到DB之后回复ACK,源端redis收到ACK之后就将本地的key删除。可以看到,整个过程,源端redis都是阻塞的,如果迁移的key是一个bigkey,会导致源端序列化、网络传输、目标端反序列化、源端同步删除非常耗时,由于redis的单线程特性,时间循环(eventloop)无法及时处理用户的读写事件,从而导致用户RT增高甚至超时。
图7 同步迁移流程
由于redis支持list、set、zset、hash等复合数据结构,因此会有bigkey的问题。图8所示的就是MIGRATE命令实现原理,在MIGRATE中,所谓的序列化其实就是将key对应的value进行RDB格式化,在目标端redis按照RDB格式进行加载。如果list、set、zset、hash成员很多(比如几千个甚至几万个),那么RDB格式化和加载就会非常耗时。
图8 MIGRATE命令原理
异步迁移
既然同步迁移会阻塞主线程,那么很容易想到的解决方案就是使用一个独立线程做迁移,如图9所示。由于多线程会设计到对共享数据(比如DB)的访问,因此需要加同步原语,这对redis单线程、几乎无锁的架构而言,改动起来是比较复杂的。
图9 独立线程实现异步迁移
另一种异步迁移实现思路,是依然采用单线程模型,即对象的序列化(在源redis端)和反序列化(在目标redis端)依然会阻塞主线程,但是和MIGRATE同步迁移不同,异步迁移不会同步等待restore的返回,restore完成之后目标端redis会向源端redis发送一个restore-ack命令(类似于回调机制)来通知源端redis迁移的状态。因此这样大大的减少了源端redis迁移的阻塞时间,可以让事件循环(eventloop)尽快的处理下一个就绪事件。
由于这种方案依然依赖于主线程做序列化和反序列化,因此,为了进一步降低序列化和反序列化的耗时,Codis使用拆分指令(chunked)的方式对bigkey做迁移处理。如图10所示,对于一个list而言,假设其包含非常多的elem,如果一次性将其全部序列化则非常耗时,如果将其等价拆分成一条条RPUSH指令,则每一条指令则非常的轻量。
图10 指令拆分
使用指令拆分之后,原本一个key只需要一条restore命令的迁移,现在变成很多条,因此为了保证迁移的原子性(即不会存在一些elem迁移成功,一些elem迁移失败),Codis会在每一个拆分指令中加上一个临时TTL,由于只有全部前已成功才会删除本地的key,因此即使中途迁移失败,已迁移成功的elem也会超时自动删除,最终效果就好比迁移没有发生一样。elem全部迁移成功之后,Codis会再单独发送一个修正TTL的命令并删除本地的key。
图11 临时TTL
异步迁移的第一步,就是先发一条DEL命令删除目标redis上的key,如图12所示。
图12 第一步先删除目标key
如图13所示,接下来收到目标redis的ACK之后会继续发送后续的拆分指令,每次发送的拆分指令的个数是可以参数控制的。
图13 临时TTL
所有的拆分指令全部发送完成之后,会再发一个修成TTL的指令,最后删除本地的key。
图14 迁移完成删除本地的key
并不是所有的key都会采用chunked的方式迁移,对于string对象、小对象依然可以直接使用RDB格式序列化,只有对于大对象(bigkey)才会触发chunked方式迁移。
图15 针对不同对象使用不同迁移方式
异步迁移源码解读
前文主要论述了redis-cluster同步迁移和Codis异步迁移的异同和原理,redis-cluster同步迁移可以参考redis源码中cluster.c中关于migrateCommand和restoreCommand实现,源码还是非常简单的。Codis的slot迁移提供了同步和异步两种,同步迁移的代码在slots.c中,其代码和redis原生的migrateCommand基本一致,因此两者观其一即可。异步迁移代码在slots_async.c中,这块的原创性就比较高了,由于原作者对代码基本没有加注释,因此为了便于理解,我在阅读源码的时候简单的加了一些中文注释,就贴在这里吧。原理如前文所述,向看实现的可以看下面的代码,我就不一一拆分解释了,因为太多了。。。
#include "server.h"
/* ============================ Worker Thread for Lazy Release ============================= */
typedef struct {
pthread_t thread;/* lazy工作线程 */
pthread_mutex_t mutex;/* 互斥信号量 */
pthread_cond_t cond;/* 条件变量 */
list *objs; /* 要被lazy释放的对象链表 */
} lazyReleaseWorker;
/* lazy释放主线程 */
static void *
lazyReleaseWorkerMain(void *args) {
lazyReleaseWorker *p = args;
while (1) {
/* 等待在条件变量上,条件为待释放对象链表长度为0 */
pthread_mutex_lock(&p->mutex);
while (listLength(p->objs) == 0) {
pthread_cond_wait(&p->cond, &p->mutex);
}
/* 取出链表的第一个节点 */
listNode *head = listFirst(p->objs);
/* 节点值为要释放的对象 */
robj *o = listNodeValue(head);
/* 从链表中删除这个节点 */
listDelNode(p->objs, head);
pthread_mutex_unlock(&p->mutex);
/* 释放对象 */
decrRefCount(o);
}
return NULL;
}
/* lazy释放一个对象 */
static void
lazyReleaseObject(robj *o) {
/* 对象当前的refcount必须已经为1,即已经没有任何人引用这个对象 */
serverAssert(o->refcount == 1);
/* 获取lazyReleaseWorker */
lazyReleaseWorker *p = server.slotsmgrt_lazy_release;
/* 上锁 */
pthread_mutex_lock(&p->mutex);
if (listLength(p->objs) == 0) {
/* 如果待释放队列长度为0,则唤醒释放线程 */
pthread_cond_broadcast(&p->cond);
}
/* 将待释放对象加入释放链表 */
listAddNodeTail(p->objs, o);
/* 解锁 */
pthread_mutex_unlock(&p->mutex);
}
/* 创建lazy释放工作线程 */
static lazyReleaseWorker *
createLazyReleaseWorkerThread() {
lazyReleaseWorker *p = zmalloc(sizeof(lazyReleaseWorker));
pthread_mutex_init(&p->mutex, NULL);
pthread_cond_init(&p->cond, NULL);
p->objs = listCreate();
/* 创建线程 */
if (pthread_create(&p->thread, NULL, lazyReleaseWorkerMain, p) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize Worker Thread for Lazy Release Jobs.");
exit(1);
}
return p;
}
/* 初始化Lazy释放工作线程 */
void
slotsmgrtInitLazyReleaseWorkerThread() {
server.slotsmgrt_lazy_release = createLazyReleaseWorkerThread();
}
/* ============================ Iterator for Data Migration ================================ */
#define STAGE_PREPARE 0
#define STAGE_PAYLOAD 1
#define STAGE_CHUNKED 2
#define STAGE_FILLTTL 3
#define STAGE_DONE 4
/* 单对象迭代器 */
typedef struct {
int stage;
robj *key;/* 单对象对应的的key */
robj *val;/* 单对象对应的的值 */
long long expire;/* 该对象对应的过期设置 */
unsigned long cursor;/* 游标,用于dictScan */
unsigned long lindex;/* 索引,listTypeInitIterator时用到 */
unsigned long zindex;/* 索引,遍历zset时用到 */
unsigned long chunked_msgs;/* 该对象chunked消息个数 */
} singleObjectIterator;
/* 创建单对象迭代 */
static singleObjectIterator *
createSingleObjectIterator(robj *key) {
/* 分配空间 */
singleObjectIterator *it = zmalloc(sizeof(singleObjectIterator));
/* 初始化阶段 */
it->stage = STAGE_PREPARE;
/* 设置key */
it->key = key;
/* 引用计数 */
incrRefCount(it->key);
it->val = NULL;
it->expire = 0;
it->cursor = 0;
it->lindex = 0;
it->zindex = 0;
it->chunked_msgs = 0;
return it;
}
/* 释放SingleObjectIterator */
static void
freeSingleObjectIterator(singleObjectIterator *it) {
if (it->val != NULL) {
/* 对val解引用 */
decrRefCount(it->val);
}
/* 对key解引用 */
decrRefCount(it->key);
/* 释放结构 */
zfree(it);
}
static void
freeSingleObjectIteratorVoid(void *it) {
freeSingleObjectIterator(it);
}
/* 判断单个对象是否还有下一个阶段需要处理 */
static int
singleObjectIteratorHasNext(singleObjectIterator *it) {
/* 只要状态不是STAGE_DONE就还需要继续处理 */
return it->stage != STAGE_DONE;
}
/* 如果是sds编码的字符串对象就返回sds底层字符换的长度,否则返回默认长度len */
static size_t
sdslenOrElse(robj *o, size_t len) {
return sdsEncodedObject(o) ? sdslen(o->ptr) : len;
}
/* 如果val类型为dict时执行dictScan操作的回调 */
static void
singleObjectIteratorScanCallback(void *data, const dictEntry *de) {
/* 提取privdata {ll, val, &len}*/
void **pd = (void **)data;
list *l = pd[0];/* 链表,用于存放scan出来的元素 */
robj *o = pd[1];/* 被迭代的对象值val */
long long *n = pd[2];/* 返回字节数的指针 */
robj *objs[2] = {NULL, NULL};
switch (o->type) {
case OBJ_HASH:
/* 如果原对象是hash,则分别将hash的key和value按顺序方式链表 */
objs[0] = dictGetKey(de);
objs[1] = dictGetVal(de);
break;
case OBJ_SET:
/* 如果原对象是set,则只将hash的key放入链表 */
objs[0] = dictGetKey(de);
break;
}
/* 将扫出来的对象添加到链表 */
for (int i = 0; i < 2; i ++) {
if (objs[i] != NULL) {
/* 引用计数 */
incrRefCount(objs[i]);
/* 这个对象的大小,对于string对象就是string长度,其他对象就按8字节算 */
*n += sdslenOrElse(objs[i], 8);
listAddNodeTail(l, objs[i]);
}
}
}
/* 将double转为内存二进制表示 */
static uint64_t
convertDoubleToRawBits(double value) {
union {
double d;
uint64_t u;
} fp;
fp.d = value;
return fp.u;
}
/* 将内存二进制表示转为double值 */
static double
convertRawBitsToDouble(uint64_t value) {
union {
double d;
uint64_t u;
} fp;
fp.u = value;
return fp.d;
}
/* 从Uint64创建RawString对象 */
static robj *
createRawStringObjectFromUint64(uint64_t v) {
uint64_t p = intrev64ifbe(v);
return createRawStringObject((char *)&p, sizeof(p));
}
/* 从RawString获取Uint64 */
static int
getUint64FromRawStringObject(robj *o, uint64_t *p) {
if (sdsEncodedObject(o) && sdslen(o->ptr) == sizeof(uint64_t)) {
*p = intrev64ifbe(*(uint64_t *)(o->ptr));
return C_OK;
}
return C_ERR;
}
/* 计算一个对象需要的restore命令的个数,单个restore上只能携带maxbulks个Bulk
Bulk:$6\r\nfoobar\r\n
Multi-bulk :"*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n"
*/
static long
numberOfRestoreCommandsFromObject(robj *val, long long maxbulks) {
long long numbulks = 0;
switch (val->type) {
case OBJ_LIST:
if (val->encoding == OBJ_ENCODING_QUICKLIST) {
/* list的长度就是需要的Bulk的数目 */
numbulks = listTypeLength(val);
}
break;
case OBJ_HASH:
if (val->encoding == OBJ_ENCODING_HT) {
/* hash表中每个元素需要2个Bulk */
numbulks = hashTypeLength(val) * 2;
}
break;
case OBJ_SET:
if (val->encoding == OBJ_ENCODING_HT) {
/* set中每个元素需要1个Bulk */
numbulks = setTypeSize(val);
}
break;
case OBJ_ZSET:
if (val->encoding == OBJ_ENCODING_SKIPLIST) {
/* zset中每个元素需要2个Bulk */
numbulks = zsetLength(val) * 2;
}
break;
}
/* 如果实际的numbulks比要求的maxbulks小,则使用一条restore命令 */
if (numbulks <= maxbulks) {
return 1;
}
/* 计算需要的restore命令个数 */
return (numbulks + maxbulks - 1) / maxbulks;
}
/* 估计Restore命令的个数 */
static long
estimateNumberOfRestoreCommands(redisDb *db, robj *key, long long maxbulks) {
/* 查找key对应的val */
robj *val = lookupKeyWrite(db, key);
if (val != NULL) {
return numberOfRestoreCommandsFromObject(val, maxbulks);
}
return 0;
}
extern void createDumpPayload(rio *payload, robj *o);
extern zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank);
static slotsmgrtAsyncClient *getSlotsmgrtAsyncClient(int db);
/* 单对象迭代,返回值为命令个数(Bulks) */
static int
singleObjectIteratorNext(client *c, singleObjectIterator *it,
long long timeout, unsigned int maxbulks, unsigned int maxbytes) {
/* *
* STAGE_PREPARE ---> STAGE_PAYLOAD ---> STAGE_DONE
* | A
* V |
* +------------> STAGE_CHUNKED ---> STAGE_FILLTTL
* A |
* | V
* +-------+
* */
/* 本次迭代的key */
robj *key = it->key;
/* 但对象迁移的准备阶段 */
if (it->stage == STAGE_PREPARE) {
/* 以写的方式查找key,与lookupKeyRead区别是没有命中率更新 */
robj *val = lookupKeyWrite(c->db, key);
if (val == NULL) {
/* 如果key没有找到,则结束 */
it->stage = STAGE_DONE;
return 0;
}
/* 设置值 */
it->val = val;
/* 增加引用 */
incrRefCount(it->val);
/* 设置过期时间 */
it->expire = getExpire(c->db, key);
/* 前导消息 */
int leading_msgs = 0;
/* 获取db对应的slotsmgrtAsyncClient */
slotsmgrtAsyncClient *ac = getSlotsmgrtAsyncClient(c->db->id);
if (ac->c == c) {
/* 只有slotsmgrtAsyncClient未被使用的时候 */
if (ac->used == 0) {
/* 表示已经被使用 */
ac->used = 1;
/* 如果需要验证 */
if (server.requirepass != NULL) {
/* SLOTSRESTORE-ASYNC-AUTH $password */
addReplyMultiBulkLen(c, 2);
addReplyBulkCString(c, "SLOTSRESTORE-ASYNC-AUTH");
addReplyBulkCString(c, server.requirepass);
leading_msgs += 1;
}
/* SELECT DB操作 */
do {
/* SLOTSRESTORE-ASYNC-SELECT $db */
addReplyMultiBulkLen(c, 2);
addReplyBulkCString(c, "SLOTSRESTORE-ASYNC-SELECT");
addReplyBulkLongLong(c, c->db->id);
leading_msgs += 1;
} while (0);
}
}
/* SLOTSRESTORE-ASYNC delete $key */
addReplyMultiBulkLen(c, 3);
addReplyBulkCString(c, "SLOTSRESTORE-ASYNC");
addReplyBulkCString(c, "delete");
addReplyBulk(c, key);
/* 计算需要的restore命令个数,maxbulks表示一个restore命令可承载的bulk最大数目 */
long n = numberOfRestoreCommandsFromObject(val, maxbulks);
if (n >= 2) {
/* 如果需要2个及以上,则进入CHUNKED阶段,即启用分块传输 */
it->stage = STAGE_CHUNKED;
/* chunked消息个数 */
it->chunked_msgs = n;
} else {
/* 否则一个restore可以承载,则直接进入PAYLOAD阶段 */
it->stage = STAGE_PAYLOAD;
it->chunked_msgs = 0;
}
/* 这里的1为delete命令,再加上其他的前导命令(如果有),作为命令个数返回 */
return 1 + leading_msgs;
}
/* 取出key对应的值 */
robj *val = it->val;
long long ttl = 0;
if (it->stage == STAGE_CHUNKED) {
/* 如果是CHUNKED阶段,则设置一个临时ttl */
ttl = timeout * 3;
} else if (it->expire != -1) {
/* 否则如果val上有过期时间,则重新计算ttl */
ttl = it->expire - mstime();
if (ttl < 1) {
ttl = 1;
}
}
/* 当一个CHUNKED对象全部序列化完成之后会到这个阶段 */
if (it->stage == STAGE_FILLTTL) {
/* SLOTSRESTORE-ASYNC expire $key $ttl */
addReplyMultiBulkLen(c, 4);
addReplyBulkCString(c, "SLOTSRESTORE-ASYNC");
addReplyBulkCString(c, "expire");
addReplyBulk(c, key);
/* 设置真实的ttl */
addReplyBulkLongLong(c, ttl);
/* 迭代结束 */
it->stage = STAGE_DONE;
/* 该阶段只有一个命令 */
return 1;
}
/* 如果是PAYLOAD阶段切val类型不是OBJ_STRING */
if (it->stage == STAGE_PAYLOAD && val->type != OBJ_STRING) {
/* 负载缓冲区 */
rio payload;
/* 将val序列化为RDB格式 */
createDumpPayload(&payload, val);
/* SLOTSRESTORE-ASYNC object $key $ttl $payload */
addReplyMultiBulkLen(c, 5);
addReplyBulkCString(c, "SLOTSRESTORE-ASYNC");
/* 对象类型 */
addReplyBulkCString(c, "object");
addReplyBulk(c, key);
addReplyBulkLongLong(c, ttl);
/* 添加payload */
addReplyBulkSds(c, payload.io.buffer.ptr);
/* 迭代结束 */
it->stage = STAGE_DONE;
/* 该阶段只有一个命令 */
return 1;
}
/* 如果是PAYLOAD阶段切val类型为OBJ_STRING */
if (it->stage == STAGE_PAYLOAD && val->type == OBJ_STRING) {
/* SLOTSRESTORE-ASYNC string $key $ttl $payload */
addReplyMultiBulkLen(c, 5);
addReplyBulkCString(c, "SLOTSRESTORE-ASYNC");
addReplyBulkCString(c, "string");
addReplyBulk(c, key);
addReplyBulkLongLong(c, ttl);
addReplyBulk(c, val);
/* 迭代结束 */
it->stage = STAGE_DONE;
/* 该阶段只有一个命令 */
return 1;
}
/* 如果是CHUNKED类型 */
if (it->stage == STAGE_CHUNKED) {
const char *cmd = NULL;
/* 根据val的类型使用不同的子命令 */
switch (val->type) {
case OBJ_LIST:
cmd = "list";
break;
case OBJ_HASH:
cmd = "hash";
break;
case OBJ_SET:
cmd = "dict";
break;
case OBJ_ZSET:
cmd = "zset";
break;
default:
serverPanic("unknown object type");
}
/* 是否还有更多需要序列化 */
int more = 1;
/* ll链表用于存放本次SLOTSRESTORE-ASYNC命令携带的args */
list *ll = listCreate();
/* 设置是否函数,本质就是调用decrRefCount */
listSetFreeMethod(ll, decrRefCountVoid);
long long hint = 0, len = 0;
if (val->type == OBJ_LIST) {
/* 如果val类型为OBJ_LIST,则创建list迭代 */
listTypeIterator *li = listTypeInitIterator(val, it->lindex, LIST_TAIL);
do {
/* 表示list每一项 */
listTypeEntry entry;
/* 遍历 */
if (listTypeNext(li, &entry)) {
quicklistEntry *e = &(entry.entry);
robj *obj;
if (e->value) {
/* */
obj = createStringObject((const char *)e->value, e->sz);
} else {
/* */
obj = createStringObjectFromLongLong(e->longval);
}
/* 累计字节数 */
len += sdslenOrElse(obj, 8);
/* 添加到ll */
listAddNodeTail(ll, obj);
/* 索引加1 */
it->lindex ++;
} else {
/* 没有更多了 */
more = 0;
}
/* 当还有更多要发送且ll现有元素个数小于maxbulks且字节数小于 maxbytes */
} while (more && listLength(ll) < maxbulks && len < maxbytes);
/* 释放迭代器 */
listTypeReleaseIterator(li);
/* 原list的总长度 */
hint = listTypeLength(val);
}
if (val->type == OBJ_HASH || val->type == OBJ_SET) {
/* 控制循环次数 */
int loop = maxbulks * 10;
/* 默认最大循环次数 */
if (loop < 100) {
loop = 100;
}
dict *ht = val->ptr;
void *pd[] = {ll, val, &len};
do {
it->cursor = dictScan(ht, it->cursor, singleObjectIteratorScanCallback, pd);
if (it->cursor == 0) {
/* 没有更多了 */
more = 0;
}
/* 如果还有更多且ll现有元素个数小于maxbulks且本次发送字节数小于maxbytes且loop不为0 */
} while (more && listLength(ll) < maxbulks && len < maxbytes && (-- loop) >= 0);
/* 原hash的总大小 */
hint = dictSize(ht);
}
if (val->type == OBJ_ZSET) {
/* 如果是ZSET类型 */
zset *zs = val->ptr;
dict *ht = zs->dict;
long long rank = (long long)zsetLength(val) - it->zindex;
zskiplistNode *node = (rank >= 1) ? zslGetElementByRank(zs->zsl, rank) : NULL;
do {
if (node != NULL) {
robj *field = node->obj;
incrRefCount(field);
len += sdslenOrElse(field, 8);
listAddNodeTail(ll, field);
uint64_t bits = convertDoubleToRawBits(node->score);
robj *score = createRawStringObjectFromUint64(bits);
len += sdslenOrElse(score, 8);
listAddNodeTail(ll, score);
node = node->backward;
it->zindex ++;
} else {
/* 没有更多了 */
more = 0;
}
/* 如果还有更多元素且bulks没有超过maxbulks且产生的字节数没有超过maxbytes */
} while (more && listLength(ll) < maxbulks && len < maxbytes);
/* 原hash总大小 */
hint = dictSize(ht);
}
/* SLOTSRESTORE-ASYNC list/hash/zset/dict $key $ttl $hint [$arg1 ...] */
addReplyMultiBulkLen(c, 5 + listLength(ll));/* MultiBulk总长度 */
addReplyBulkCString(c, "SLOTSRESTORE-ASYNC");
addReplyBulkCString(c, cmd);/* list?hash? */
addReplyBulk(c, key);
addReplyBulkLongLong(c, ttl);/* ttl */
addReplyBulkLongLong(c, hint);/* 总大小 */
/* 遍历ll,ll里面存放了本地要发送的args */
while (listLength(ll) != 0) {
/* 取出头结点 */
listNode *head = listFirst(ll);
/* 取出值对象 */
robj *obj = listNodeValue(head);
/* 添加回复 */
addReplyBulk(c, obj);
/* 删除该节点 */
listDelNode(ll, head);
}
/* 释放ll */
listRelease(ll);
if (!more) {
/* 如果对象所有元素都被序列换完毕,则进入FILLTTL阶段 */
it->stage = STAGE_FILLTTL;
}
/* 该阶段只有一个命令 */
return 1;
}
if (it->stage != STAGE_DONE) {
serverPanic("invalid iterator stage");
}
serverPanic("use of empty iterator");
}
/* ============================ Iterator for Data Migration (batched) ====================== */
typedef struct {
struct zskiplist *tags;/* 标识一个hashtag有没有被添加过 */
dict *keys;/* 批处理的Keys */
list *list; /* 每个节点的值都是singleObjectIterator */
dict *hash_slot;/* hash数组,数组的下标为slot_num,每个数组元素的字典为key、crc对 */
struct zskiplist *hash_tags;/* 用于保存具有hashtag的key,score为key的crc,值为key */
long long timeout;/* 进程chunked restore时会指定临时ttl,值为timeout*3 */
unsigned int maxbulks;/* 单次restore最多发送多少个bulks */
unsigned int maxbytes;/* 单次发送最多发送多少字节数 */
list *removed_keys;/* 一个key被发送完成之后会加入这个链表 */
list *chunked_vals;/* 用于存放使用chunked方式发生的val */
long estimate_msgs;/* 估算的restore命令的个数 */
} batchedObjectIterator;
/* 创建batchedObjectIterator */
static batchedObjectIterator *
createBatchedObjectIterator(dict *hash_slot, struct zskiplist *hash_tags,
long long timeout, unsigned int maxbulks, unsigned int maxbytes) {
batchedObjectIterator *it = zmalloc(sizeof(batchedObjectIterator));
it->tags = zslCreate();
it->keys = dictCreate(&setDictType, NULL);
it->list = listCreate();
listSetFreeMethod(it->list, freeSingleObjectIteratorVoid);
it->hash_slot = hash_slot;
it->hash_tags = hash_tags;
it->timeout = timeout;
it->maxbulks = maxbulks;
it->maxbytes = maxbytes;
it->removed_keys = listCreate();
listSetFreeMethod(it->removed_keys, decrRefCountVoid);
it->chunked_vals = listCreate();
listSetFreeMethod(it->chunked_vals, decrRefCountVoid);
it->estimate_msgs = 0;
return it;
}
/* 释放BatchedObjectIterator */
static void
freeBatchedObjectIterator(batchedObjectIterator *it) {
zslFree(it->tags);
dictRelease(it->keys);
listRelease(it->list);
listRelease(it->removed_keys);
listRelease(it->chunked_vals);
zfree(it);
}
/* 批处理迭代(即一次处理多个key) */
static int
batchedObjectIteratorHasNext(batchedObjectIterator *it) {
/* list链表不为空,每个节点的值都是singleObjectIterator */
while (listLength(it->list) != 0) {
/* 每个节点的值都是singleObjectIterator */
listNode *head = listFirst(it->list);
/* 每个节点的值都是singleObjectIterator */
singleObjectIterator *sp = listNodeValue(head);
/* 判断单个对象是否已经处于STAGE_DONE */
if (singleObjectIteratorHasNext(sp)) {
/* 不处于STAGE_DONE,即单对象迭代还没结束,则直接返回1,下次还会迭代这个对象 */
return 1;
}
/* 否则当前单对象已经迭代结束 */
if (sp->val != NULL) {
/* 如果当前单对象的value不为空,就把单对象的key添加到removed_keys链表 */
incrRefCount(sp->key);
listAddNodeTail(it->removed_keys, sp->key);
if (sp->chunked_msgs != 0) {
/* 如果chunked的消息个数不为0 */
incrRefCount(sp->val);
/* 就把val加入到chunked_vals链表 */
listAddNodeTail(it->chunked_vals, sp->val);
}
}
/* 删除这个节点 */
listDelNode(it->list, head);
}
return 0;
}
/* 批处理对象迭代,返回值为本地迭代产生的SLOTSRESTORE系列命令的个数 */
static int
batchedObjectIteratorNext(client *c, batchedObjectIterator *it) {
/* 遍历链表 */
if (listLength(it->list) != 0) {
/* 取出头结点 */
listNode *head = listFirst(it->list);
/* 节点值为singleObjectIterator */
singleObjectIterator *sp = listNodeValue(head);
/* maxbytes减去客户端输出缓冲区当前已有的大小就是本次能发送的最大字节数 */
long long maxbytes = (long long)it->maxbytes - getClientOutputBufferMemoryUsage(c);
/* 单对象迭代,迭代超时timeout,迭代单词最大maxbulks,单次最大maxbytes */
return singleObjectIteratorNext(c, sp, it->timeout, it->maxbulks, maxbytes > 0 ? maxbytes : 0);
}
serverPanic("use of empty iterator");
}
/* 批处理里面是否包含key,返回1表示存在,返回0表示不存在 */
static int
batchedObjectIteratorContains(batchedObjectIterator *it, robj *key, int usetag) {
/* 如果在keys中找到,则存在 */
if (dictFind(it->keys, key) != NULL) {
return 1;
}
/* 如果没有使用hashtag则结束查找 */
if (!usetag) {
return 0;
}
uint32_t crc;
int hastag;
/* 计算key的crc和hashtag */
slots_num(key->ptr, &crc, &hastag);
if (!hastag) {
/* 如果key没有hashtag则结束查找 */
return 0;
}
/* 否则填充range */
zrangespec range;
range.min = (double)crc;
range.minex = 0;
range.max = (double)crc;
range.maxex = 0;
/* 以crc为范围在跳表tags中查找,每一个hashtag被添加都会在tags跳表中添加一个节点 */
return zslFirstInRange(it->tags, ⦥) != NULL;
}
/* 向批处理添加一个key,返回值为本次新添加的key的个数 */
static int
batchedObjectIteratorAddKey(redisDb *db, batchedObjectIterator *it, robj *key) {
/* 添加到keys字典 */
if (dictAdd(it->keys, key, NULL) != C_OK) {
return 0;
}
/* 引用计数 */
incrRefCount(key);
/* 创建createSingleObjectIterator */
listAddNodeTail(it->list, createSingleObjectIterator(key));
/* 对该对象需要的restore命令个数进行预估 */
it->estimate_msgs += estimateNumberOfRestoreCommands(db, key, it->maxbulks);
/* 当前批处理的key个数 */
int size = dictSize(it->keys
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。