上面的文章整体介绍了stream 实现方式,以及xadd生产端的流程,接下来继续写一下后面消费端的过程。
redis stream的消费方法有几种, XREAD、XREADGROUP, 还有xrange/xrevrange, 后者比较简单,主要就是准备参数然后调用streamReplyWithRange 来根据范围读取消息内容。
一、xrange 范围消息读取
xrange 语法为:
XRANGE key start end [COUNT < n> ]
先来看一下xrange的代码,前面部分例行检查,获取start,end id范围。
void xrangeGenericCommand(client *c, int rev) { //读取某一段消息 robj *o; stream *s; streamID startid, endid; long long count = 0; robj *startarg = rev ? c->argv[3] : c->argv[2]; robj *endarg = rev ? c->argv[2] : c->argv[3]; if (streamParseIDOrReply(c,startarg,&startid,0) == C_ERR) return; if (streamParseIDOrReply(c,endarg,&endid,UINT64_MAX) == C_ERR) return; /* Parse the COUNT option if any. */ if (c->argc > 4) { //---- } /* Return the specified range to the user. */ if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL || checkType(c,o,OBJ_STREAM)) return; s = o->ptr; streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL); }
得到stream名字后,调用lookupKeyReadOrReply 来查询,注意lookupKeyReadOrReply 函数是任何redis key的查询函数,redis对stream的处理也没有特殊的,还是存储在db里面。
接下来调用streamReplyWithRange 根据group和consumer参数,读取start到end的最多count个元素,可以反向读取。 具体逻辑跟rax树相关,先放后面介绍。 来看看xread的处理方式。
二、XREAD/XREADGROUP group consumer机制实现原理
xread 和 xreadgroup 的处理函数都是 xreadCommand(), 以阻塞读取某个stream内容。两个命令的区别是 后者多带有这两组参数,并且必须同时出现:
XREAD [BLOCK < milliseconds >] [COUNT < count >] STREAMS key_1 key_2 ... key_N ID_1 ID_2 ... ID_N
group read多包括这两组参数: [GROUP group-name consumer-name]
0. 参数检查
readCommand 函数相对比较复杂,首先进行参数检查,初始化数据结构,如果指定了groupname,则需要检查这个groupname是否在对应的stream里面存在,以及需要设置对应的每个stream上面的group数据结构 streamCG 指针。
//如果指定了group,需要为每个stream申请一个streamCG结构 if (groupname) groups = zmalloc(sizeof(streamCG*)*streams_count); //循环遍历所有的stream 名字和后面的id字段,进行stream存在性和group存在性检查 //同时会根据ID来设置对应stream ids的读取起始位置 for (int i = streams_arg + streams_count; i < c->argc; i++) { //i指向遍历的后面的id部分 //id_idx 是后面的每个stream对应的开始ID, 第几个stream的id int id_idx = i - streams_arg - streams_count; //key是对应i的stream位置 robj *key = c->argv[i-streams_count]; robj *o = lookupKeyRead(c->db,key); //查到对应stream结构 if (o && checkType(c,o,OBJ_STREAM)) goto cleanup; streamCG *group = NULL; /* If a group was specified, than we need to be sure that the * key and group actually exist. */ if (groupname) { if (o == NULL || (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL) { //指定了群组,并且stream不存在,或者stream对应的groupname不存在,报错 //所以这里有意思,read的时候,如果是readgroup, 那么所哟的stream必须存在,否则单纯的read时stream可以不存在 addReplyErrorFormat(c, "-NOGROUP ...", (char*)key->ptr,(char*)groupname->ptr); goto cleanup; } //记录对应的group,也就是对应stream的某个消费组 groups[id_idx] = group; }
接下来是处理参数ID, 如果是"$" 代表当前stream的最后,最大last_id,">" 代表本消费组的最大id. 将结果放在ids[]数组里面。
if (strcmp(c->argv[i]->ptr,"$") == 0) { if (o) { stream *s = o->ptr; ids[id_idx] = s->last_id; //指定的是$, 那么就设置为当前stream的最后一个id } else { ids[id_idx].ms = 0; //如果这个stream不存在,就为0,什么都要 ids[id_idx].seq = 0; } continue; } else if (strcmp(c->argv[i]->ptr,">") == 0) { //从本群的最后一个开始, if (!xreadgroup || groupname == NULL) { addReplyError(c,"The > ID can be specified only when calling " "XREADGROUP using the GROUP <group> " "<consumer> option."); goto cleanup; } ids[id_idx] = group->last_id; continue; } if (streamParseIDOrReply(c,c->argv[i],ids+id_idx,0) != C_OK) goto cleanup; }
1. 同步读取已有消息
上面初始化groups数组和id数组后,找到了streams_count 个key需要处理,于是循环遍历每一个key,去读取消息。先找到对应stream的结构,然后看对应的stream的最大的id是否比参数里的id要大 , 果是,就有内容,否则没有新的内容,直接跳过继续。
//遍历每一个stream, 每个stream都最多读取 count个元素 for (int i = 0; i < streams_count; i++) { //先找到对应stream的结构,然后看对应的stream的最大的id是否比参数里的id要大, //如果是,就有内容,否则没有新的内容,直接跳过继续 robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i]); if (o == NULL) continue; stream *s = o->ptr; streamID *gt = ids+i; /* ID must be greater than this. */ if (s->last_id.ms > gt->ms || (s->last_id.ms == gt->ms && s->last_id.seq > gt->seq)) {//当前stream的最大id要大于参数的最大id,有心内容 arraylen++; if (arraylen == 1) arraylen_ptr = addDeferredMultiBulkLength(c); /* streamReplyWithRange() handles the start ID as inclusive, * so start from the next ID, since we want only messages with * IDs greater than start. */ streamID start = *gt;//这是开始位置 start.seq++; /* uint64_t can t overflow in this context. */ /* Emit the two elements sub-array consisting of the name * of the stream and the data we extracted from it. */ //组成返回数据结构,bulk addReplyMultiBulkLen(c,2); addReplyBulk(c,c->argv[streams_arg+i]); streamConsumer *consumer = NULL; if (groups){ //查找这个group里面的consumer, 根据参数的consumername , 如果没有,就会默认创建一个 consumer = streamLookupConsumer(groups[i], consumername->ptr,1); } streamPropInfo spi = {c->argv[i+streams_arg], groupname}; //传入group消费组和消费者id, 读取start开始的count个元素, 放到client的发送缓冲区里面 streamReplyWithRange(c,s,&start,NULL,count,0, groups ? groups[i] : NULL, consumer, noack, &spi); if (groups) server.dirty++; } }
上面streamLookupConsumer 函数会读取对应的消费者consumer结构 streamConsumer, 如果没有,就会自动创建,但是 redis 不会自动删除,因为有消费者id的pel列表存在,需要手动删除。具体可以看看作者这文章( Introduction to Redis Streams )后面“Consumer groups”的说明 ,还是挺复杂的。
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) { //查找这个group里面的consumer, 根据参数的consumername , 如果没有,就会默认创建一个 //group对应的consumers列表放在cg->consumers 的rax树里面 streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name, sdslen(name)); if (consumer == raxNotFound) { if (!create) return NULL; consumer = zmalloc(sizeof(*consumer)); consumer->name = sdsdup(name); consumer->pel = raxNew(); //group对应的consumer列表放在rax树里面 raxInsert(cg->consumers,(unsigned char*)name,sdslen(name), consumer,NULL); } consumer->seen_time = mstime(); return consumer; }
streamReplyWithRange 传入group消费组和消费者id, 读取start开始的count个元素, 放到client的发送缓冲区里面。
接下来判断一下是否读取到了消息内容,如果读取到了,就会立即给客户端返回,但是数目不保证。如果一条都没有读取到,注意是一条都没有读取到,就会考虑进行sleep阻塞, arraylen 代表有多少个key有事件,并不是有多少消息。
/* We replied synchronously! Set the top array len and return to caller. */ //只要成功读取到了内容,就不用等待,因此这里的意思是,只要读取到了一条 就不等待,而不是读取到count条就等待 //这个一定要注意,不是读取满count条, 而是只要有一条就返回 if (arraylen) { setDeferredMultiBulkLength(c,arraylen_ptr,arraylen); goto cleanup; }
2. BLOCK阻塞等待消息
如果上面同步检查所有stream都没有消息,就会进入到下面的timeout流程,检查是否要等待。如果提供了BLOCK < milisecond >参数,表示最多等多久。
检查timeout 大于0后,会调用blockForKeys 登记这个客户端到每个等待的key的block列表里面, 等有新事件发生的时候,xadd会调用 signalKeyAsReady 来找一个对应的客户端然后通知他的. 果有其他客户端需改对应的key,就会触发handleClientsBlockedOnKeys 函数进行处理。
//还没有读满,需要继续等待 /* Block if needed. */ if (timeout != -1) { /* If we are inside a MULTI/EXEC and the list is empty the only thing * we can do is treating it as a timeout (even with timeout 0). */ if (c->flags & CLIENT_MULTI) { addReply(c,shared.nullmultibulk); goto cleanup; } //登记这个客户端到每个等待的key的block列表里面 , //等有新事件发生的时候,xadd会调用 signalKeyAsReady 来找一个对应的客户端然后通知他的 //如果有其他客户端需改对应的key,就会触发handleClientsBlockedOnKeys 函数进行处理 blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count, timeout, NULL, ids); /* If no COUNT is given and we block, set a relatively small count: * in case the ID provided is too low, we do not want the server to * block just to serve this client a huge stream of messages. */ c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT; /* If this is a XREADGROUP + GROUP we need to remember for which * group and consumer name we are blocking, so later when one of the * keys receive more data, we can call streamReplyWithRange() passing * the right arguments. */ if (groupname) { //记录等待的group 名字和消费者名字,在 触发事件后,需要更新group的位置 incrRefCount(groupname); incrRefCount(consumername); c->bpop.xread_group = groupname; c->bpop.xread_consumer = consumername; } else { c->bpop.xread_group = NULL; c->bpop.xread_consumer = NULL; } goto cleanup; }
关于block逻辑,blockForKeys 主要涉及到redis之前的block方式,暂时不进入了,主要来介绍一下当key有新消息到来后,stream部分是怎么处理的。 这块涉及到 handleClientsBlockedOnKeys 里面对应不同类型的key的唤醒逻辑。
上面我们知道,redis会调用blockForKeys 来设置当前客户端阻塞在某个key上面,之后请求就会返回,服务器不会真正阻塞的,而只是记录相关的等待结构,然后继续处理别的请求。
当有生产者 XADD 后,就会调用signalKeyAsReady 来触发消息的投递任务。也就是handleClientsBlockedOnKeys 的工作。
processCommand() 里面调用call()处理结束后,判断ready_keys 调用这里来处理这条指令的block列表, 话说这里虽然用了list来传递数据,但其实还是阻塞的形式,要是异步的就好了,对于等待在某个key的客户端列表,使用的是先进后出的优先顺序,当然stream会有group的逻辑在里面。
来看一下handleClientsBlockedOnKeys 代码。
void handleClientsBlockedOnKeys(void) { while(listLength(server.ready_keys) != 0) { //这个循环上面注释说了,为了避免BRPOPLPUSH 会再产生新的触发事件,所以得再次判断 list *l; //挪出来,以便新的二次触发事件放到ready_keys里面 l = server.ready_keys; server.ready_keys = listCreate(); while(listLength(l) != 0) { //下面根据这个阻塞的key的类型来分别进行对应的处理 listNode *ln = listFirst(l); readyList *rl = ln->value;
上面简单初始化,然后遍历所有有更新事件的key,所谓的 readyList , 然后将这个key用lookupKeyWrite获取对应等待的key的类型, robj,从中我们可以获取对应key的类型了。
/* Serve clients blocked on list key. */ robj *o = lookupKeyWrite(rl->db,rl->key); if (o != NULL && o->type == OBJ_LIST) { }else if (o != NULL && o->type == OBJ_ZSET) { }else if (o != NULL && o->type == OBJ_STREAM) { //处理stream类型的等待事件, 有客户端等在这上面 , 先取出在这个key上等待的客户端列表 //然后 dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); stream *s = o->ptr;
如果是OBJ_STREAM, 理stream类型的等待事件, 有客户端等在这上面 , 先取出在这个key上等待的客户端列表。 然后一个个判断是否有符合这个客户端的id要求。
list *clients = dictGetVal(de); listNode *ln; listIter li; listRewind(clients,&li); //下面循环所有等待的客户端列表,一个个判断他们是否跟当前key匹配,如果id等都合适, //那么调用streamReplyWithRange 去扫描发送消息。同时,如果有多个consumer等待在一个key上的情况 //怎么处理呢? 不会发生重复吗? 答案当然不会,因为streamReplyWithRange每次回更新最大的last_id, //然后下回进来的时候第二个消费者其实不会有什么实际的操作发生 while((ln = listNext(&li))) { client *receiver = listNodeValue(ln); //一个客户端可能block在多个key上? 是的 //每个客户端的bpop结构里面记录了我都阻塞在了哪些key里面 //但是,同时只能等待在某一类key上面,不能是多种。 因为blockClient 里面会重置btype, 且等待状态会拒绝其他请求 if (receiver->btype != BLOCKED_STREAM) continue; //从这个客户端的等待的key列表里面找到这个key,然后比较id是否有新的,如果有新的就发送数据并且解开等待状态 streamID *gt = dictFetchValue(receiver->bpop.keys, rl->key); if (s->last_id.ms > gt->ms || (s->last_id.ms == gt->ms && s->last_id.seq > gt->seq)) { streamID start = *gt;//这个客户端等待的开始ID start.seq++; /* Can t overflow, it s an uint64_t */ /* If we blocked in the context of a consumer * group, we need to resolve the group and * consumer here. */ streamCG *group = NULL; streamConsumer *consumer = NULL; //如果记录了group信息,那么查找一下这个group和consumer是不是存在 if (receiver->bpop.xread_group) { group = streamLookupCG(s, receiver->bpop.xread_group->ptr);
重点说一下,上面循环所有等待的客户端列表,一个个判断他们是否跟当前key匹配,如果id等都合适,那么调用streamReplyWithRange 去扫描发送消息。
同时,如果有多个consumer等待在一个key上的情况, 怎么处理呢? 不会发生重复吗?
因为streamReplyWithRange每次回更新最大的last_id,后下回进来的时候第二个消费者其实不会有什么实际的操作发生。 这就是同组group内消费的时候,之后发生一次消费的原因。但是,这里也能看出来,redis没有kafka的partition方式的核心点,就在于消息是没有分partition的,他们在触发的时候从队列头部取一个客户端后发送出去。
同时,发送给一个客户端后这条消息的ID其实就到了group和consumer的 PEL列表里面了,之后就不会再投递给别人了。怎么实现的呢?后面看streamReplyWithRange 前面部分。
最后调用streamReplyWithRange 将制定范围的消息发送给这个消费者。
} if (group) { consumer = streamLookupConsumer(group, receiver->bpop.xread_consumer->ptr, 1); } /* Emit the two elements sub-array consisting of * the name of the stream and the data we * extracted from it. Wrapped in a single-item * array, since we have just one key. */ addReplyMultiBulkLen(receiver,1); addReplyMultiBulkLen(receiver,2); addReplyBulk(receiver,rl->key); streamPropInfo pi = { rl->key, receiver->bpop.xread_group }; //调用函数给这个group和consumer 发送指定位置的信息 streamReplyWithRange(receiver,s,&start,NULL, receiver->bpop.xread_count, 0, group, consumer, 0, &pi); /* Note that after we unblock the client, gt * and other receiver->bpop stuff are no longer * valid, so we must do the setup above before * this call. */ //解锁客户端,这样会触发去处理这客户端阻塞期间的其他命令。也会从等待队列等数据中移除 unblockClient(receiver); } }
streamReplyWithRange函数给这个group和consumer 发送指定位置的信息, 之后通过调用 unblockClient解锁客户端,这样会触发去处理这客户端阻塞期间的其他命令。也会从等待队列等数据中移除。
这样如果客户端还需要阻塞读取,就需要继续调用XREAD BLOCK timeout 读取。
包括ACK, group消费, 消息ID支持回溯消费旧消息, 支持多消费者消费同一个队列且互相不重复, 支持多个消费者消费多次同一个队列。
期待Redis能继续完善Stream, 改进目前的一些限制,让大家用的更加方便更加爽,能一定程度代替kafka ^.^ 。
