Redis 5.0 重量级特性 Stream 实现源码分析(二)XREAD 消费流程

栏目: 数据库 · 发布时间: 6年前

上面的文章整体介绍了stream 实现方式,以及xadd生产端的流程,接下来继续写一下后面消费端的过程。

redis stream的消费方法有几种, XREAD、XREADGROUP, 还有xrange/xrevrange, 后者比较简单,主要就是准备参数然后调用streamReplyWithRange 来根据范围读取消息内容。

一、xrange 范围消息读取

xrange 语法为:

 XRANGE key start end [COUNT < n> ] 

先来看一下xrange的代码,前面部分例行检查,获取start,end id范围。

void xrangeGenericCommand(client *c, int rev) {
    //读取某一段消息
    robj *o;
    stream *s;
    streamID startid, endid;
    long long count = 0;
    robj *startarg = rev ? c->argv[3] : c->argv[2];
    robj *endarg = rev ? c->argv[2] : c->argv[3];

    if (streamParseIDOrReply(c,startarg,&startid,0) == C_ERR) return;
    if (streamParseIDOrReply(c,endarg,&endid,UINT64_MAX) == C_ERR) return;

    /* Parse the COUNT option if any. */
    if (c->argc > 4) {
    //----
    }

    /* Return the specified range to the user. */
    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
        || checkType(c,o,OBJ_STREAM)) return;
    s = o->ptr;
    streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL);
}

得到stream名字后,调用lookupKeyReadOrReply 来查询,注意lookupKeyReadOrReply 函数是任何redis key的查询函数,redis对stream的处理也没有特殊的,还是存储在db里面。

接下来调用streamReplyWithRange 根据group和consumer参数,读取start到end的最多count个元素,可以反向读取。 具体逻辑跟rax树相关,先放后面介绍。 来看看xread的处理方式。

二、XREAD/XREADGROUP group consumer机制实现原理

xread 和 xreadgroup 的处理函数都是 xreadCommand(), 以阻塞读取某个stream内容。两个命令的区别是 后者多带有这两组参数,并且必须同时出现:

XREAD [BLOCK < milliseconds >] [COUNT < count >] STREAMS key_1 key_2 ... key_N ID_1 ID_2 ... ID_N

group read多包括这两组参数: [GROUP group-name consumer-name]

0. 参数检查

readCommand 函数相对比较复杂,首先进行参数检查,初始化数据结构,如果指定了groupname,则需要检查这个groupname是否在对应的stream里面存在,以及需要设置对应的每个stream上面的group数据结构 streamCG 指针。

这里要注意的时候,xread支持一次读取多个stream,但是groupname只能指定一个,但是之前的文章我们知道,每个stream其实都有独立的groupname,以及独立的消费者名,所以这里需要拷贝一下group,但是指针不一样。

//如果指定了group,需要为每个stream申请一个streamCG结构
    if (groupname) groups = zmalloc(sizeof(streamCG*)*streams_count);

    //循环遍历所有的stream 名字和后面的id字段,进行stream存在性和group存在性检查
    //同时会根据ID来设置对应stream ids的读取起始位置
    for (int i = streams_arg + streams_count; i < c->argc; i++) {
        //i指向遍历的后面的id部分
        //id_idx 是后面的每个stream对应的开始ID, 第几个stream的id
        int id_idx = i - streams_arg - streams_count;
        //key是对应i的stream位置
        robj *key = c->argv[i-streams_count];
        robj *o = lookupKeyRead(c->db,key); //查到对应stream结构
        if (o && checkType(c,o,OBJ_STREAM)) goto cleanup;
        streamCG *group = NULL;

        /* If a group was specified, than we need to be sure that the
         * key and group actually exist. */
        if (groupname) {
            if (o == NULL || (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL)
            { //指定了群组,并且stream不存在,或者stream对应的groupname不存在,报错
                //所以这里有意思,read的时候,如果是readgroup, 那么所哟的stream必须存在,否则单纯的read时stream可以不存在
                addReplyErrorFormat(c, "-NOGROUP ...",  (char*)key->ptr,(char*)groupname->ptr);
                goto cleanup;
            }
            //记录对应的group,也就是对应stream的某个消费组
            groups[id_idx] = group;
        }

接下来是处理参数ID, 如果是"$" 代表当前stream的最后,最大last_id,">" 代表本消费组的最大id. 将结果放在ids[]数组里面。

if (strcmp(c->argv[i]->ptr,"$") == 0) {
            if (o) {
                stream *s = o->ptr;
                ids[id_idx] = s->last_id; //指定的是$, 那么就设置为当前stream的最后一个id
            } else {
                ids[id_idx].ms = 0; //如果这个stream不存在,就为0,什么都要
                ids[id_idx].seq = 0;
            }
            continue;
        } else if (strcmp(c->argv[i]->ptr,">") == 0) {
            //从本群的最后一个开始,
            if (!xreadgroup || groupname == NULL) {
                addReplyError(c,"The > ID can be specified only when calling "
                                "XREADGROUP using the GROUP <group> "
                                "<consumer> option.");
                goto cleanup;
            }
            ids[id_idx] = group->last_id;
            continue;
        }
        if (streamParseIDOrReply(c,c->argv[i],ids+id_idx,0) != C_OK)
            goto cleanup;
    }

1. 同步读取已有消息

上面初始化groups数组和id数组后,找到了streams_count 个key需要处理,于是循环遍历每一个key,去读取消息。先找到对应stream的结构,然后看对应的stream的最大的id是否比参数里的id要大 , 果是,就有内容,否则没有新的内容,直接跳过继续。

注意count代表的是每个stream最大读取数目,而不是总和。redis不保证返回的数目条数。

看一下代码,具体看注释。

//遍历每一个stream, 每个stream都最多读取 count个元素
    for (int i = 0; i < streams_count; i++) {
        //先找到对应stream的结构,然后看对应的stream的最大的id是否比参数里的id要大,
        //如果是,就有内容,否则没有新的内容,直接跳过继续
        robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i]);
        if (o == NULL) continue;
        stream *s = o->ptr;
        streamID *gt = ids+i; /* ID must be greater than this. */
        if (s->last_id.ms > gt->ms ||
            (s->last_id.ms == gt->ms && s->last_id.seq > gt->seq))
        {//当前stream的最大id要大于参数的最大id,有心内容
            arraylen++;
            if (arraylen == 1) arraylen_ptr = addDeferredMultiBulkLength(c);
            /* streamReplyWithRange() handles the  start  ID as inclusive,
             * so start from the next ID, since we want only messages with
             * IDs greater than start. */
            streamID start = *gt;//这是开始位置
            start.seq++; /* uint64_t can t overflow in this context. */

            /* Emit the two elements sub-array consisting of the name
             * of the stream and the data we extracted from it. */

            //组成返回数据结构,bulk
            addReplyMultiBulkLen(c,2);
            addReplyBulk(c,c->argv[streams_arg+i]);
            streamConsumer *consumer = NULL;
            if (groups){
                //查找这个group里面的consumer, 根据参数的consumername , 如果没有,就会默认创建一个
                consumer = streamLookupConsumer(groups[i], consumername->ptr,1);
            }
            streamPropInfo spi = {c->argv[i+streams_arg], groupname};

            //传入group消费组和消费者id, 读取start开始的count个元素, 放到client的发送缓冲区里面
            streamReplyWithRange(c,s,&start,NULL,count,0,
                                 groups ? groups[i] : NULL,
                                 consumer, noack, &spi);
            if (groups) server.dirty++;
        }
    }

上面streamLookupConsumer 函数会读取对应的消费者consumer结构 streamConsumer, 如果没有,就会自动创建,但是 redis 不会自动删除,因为有消费者id的pel列表存在,需要手动删除。具体可以看看作者这文章( Introduction to Redis Streams )后面“Consumer groups”的说明 ,还是挺复杂的。

streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) {
    //查找这个group里面的consumer, 根据参数的consumername , 如果没有,就会默认创建一个
    //group对应的consumers列表放在cg->consumers 的rax树里面 
    streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,
                               sdslen(name));
    if (consumer == raxNotFound) {
        if (!create) return NULL;
        consumer = zmalloc(sizeof(*consumer));
        consumer->name = sdsdup(name);
        consumer->pel = raxNew();

        //group对应的consumer列表放在rax树里面
        raxInsert(cg->consumers,(unsigned char*)name,sdslen(name),
                  consumer,NULL);
    }
    consumer->seen_time = mstime();
    return consumer;
}

streamReplyWithRange 传入group消费组和消费者id, 读取start开始的count个元素, 放到client的发送缓冲区里面。

接下来判断一下是否读取到了消息内容,如果读取到了,就会立即给客户端返回,但是数目不保证。如果一条都没有读取到,注意是一条都没有读取到,就会考虑进行sleep阻塞, arraylen 代表有多少个key有事件,并不是有多少消息。

/* We replied synchronously! Set the top array len and return to caller. */
    //只要成功读取到了内容,就不用等待,因此这里的意思是,只要读取到了一条 就不等待,而不是读取到count条就等待
    //这个一定要注意,不是读取满count条, 而是只要有一条就返回
    if (arraylen) {
        setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
        goto cleanup;
    }

2. BLOCK阻塞等待消息

如果上面同步检查所有stream都没有消息,就会进入到下面的timeout流程,检查是否要等待。如果提供了BLOCK < milisecond >参数,表示最多等多久。

检查timeout 大于0后,会调用blockForKeys 登记这个客户端到每个等待的key的block列表里面, 等有新事件发生的时候,xadd会调用 signalKeyAsReady 来找一个对应的客户端然后通知他的. 果有其他客户端需改对应的key,就会触发handleClientsBlockedOnKeys 函数进行处理。

//还没有读满,需要继续等待
    /* Block if needed. */
    if (timeout != -1) {
        /* If we are inside a MULTI/EXEC and the list is empty the only thing
         * we can do is treating it as a timeout (even with timeout 0). */
        if (c->flags & CLIENT_MULTI) {
            addReply(c,shared.nullmultibulk);
            goto cleanup;
        }
        //登记这个客户端到每个等待的key的block列表里面 , 
        //等有新事件发生的时候,xadd会调用 signalKeyAsReady 来找一个对应的客户端然后通知他的
        //如果有其他客户端需改对应的key,就会触发handleClientsBlockedOnKeys 函数进行处理
        blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count, timeout, NULL, ids);
        /* If no COUNT is given and we block, set a relatively small count:
         * in case the ID provided is too low, we do not want the server to
         * block just to serve this client a huge stream of messages. */
        c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT;                                                       

        /* If this is a XREADGROUP + GROUP we need to remember for which
         * group and consumer name we are blocking, so later when one of the
         * keys receive more data, we can call streamReplyWithRange() passing
         * the right arguments. */
        if (groupname) {
            //记录等待的group 名字和消费者名字,在 触发事件后,需要更新group的位置
            incrRefCount(groupname);
            incrRefCount(consumername);
            c->bpop.xread_group = groupname;
            c->bpop.xread_consumer = consumername;
        } else {
            c->bpop.xread_group = NULL;
            c->bpop.xread_consumer = NULL;
        }
        goto cleanup;
    }

关于block逻辑,blockForKeys 主要涉及到redis之前的block方式,暂时不进入了,主要来介绍一下当key有新消息到来后,stream部分是怎么处理的。 这块涉及到 handleClientsBlockedOnKeys 里面对应不同类型的key的唤醒逻辑。

三、同步阻塞等待读取消息

上面我们知道,redis会调用blockForKeys 来设置当前客户端阻塞在某个key上面,之后请求就会返回,服务器不会真正阻塞的,而只是记录相关的等待结构,然后继续处理别的请求。

当有生产者 XADD 后,就会调用signalKeyAsReady 来触发消息的投递任务。也就是handleClientsBlockedOnKeys 的工作。

processCommand() 里面调用call()处理结束后,判断ready_keys 调用这里来处理这条指令的block列表, 话说这里虽然用了list来传递数据,但其实还是阻塞的形式,要是异步的就好了,对于等待在某个key的客户端列表,使用的是先进后出的优先顺序,当然stream会有group的逻辑在里面。

来看一下handleClientsBlockedOnKeys 代码。

void handleClientsBlockedOnKeys(void) {
    while(listLength(server.ready_keys) != 0) {
        //这个循环上面注释说了,为了避免BRPOPLPUSH 会再产生新的触发事件,所以得再次判断
        list *l;
        //挪出来,以便新的二次触发事件放到ready_keys里面
        l = server.ready_keys;
        server.ready_keys = listCreate();

        while(listLength(l) != 0) { //下面根据这个阻塞的key的类型来分别进行对应的处理
            listNode *ln = listFirst(l);
            readyList *rl = ln->value;

上面简单初始化,然后遍历所有有更新事件的key,所谓的 readyList , 然后将这个key用lookupKeyWrite获取对应等待的key的类型, robj,从中我们可以获取对应key的类型了。

/* Serve clients blocked on list key. */
            robj *o = lookupKeyWrite(rl->db,rl->key);
            if (o != NULL && o->type == OBJ_LIST) {
            }else if (o != NULL && o->type == OBJ_ZSET) {
            }else if (o != NULL && o->type == OBJ_STREAM) {
                //处理stream类型的等待事件, 有客户端等在这上面 , 先取出在这个key上等待的客户端列表
                //然后
                dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
                stream *s = o->ptr;

如果是OBJ_STREAM, 理stream类型的等待事件, 有客户端等在这上面 , 先取出在这个key上等待的客户端列表。 然后一个个判断是否有符合这个客户端的id要求。

list *clients = dictGetVal(de);
    listNode *ln;
    listIter li;
    listRewind(clients,&li);

    //下面循环所有等待的客户端列表,一个个判断他们是否跟当前key匹配,如果id等都合适,
    //那么调用streamReplyWithRange 去扫描发送消息。同时,如果有多个consumer等待在一个key上的情况
    //怎么处理呢? 不会发生重复吗? 答案当然不会,因为streamReplyWithRange每次回更新最大的last_id,
    //然后下回进来的时候第二个消费者其实不会有什么实际的操作发生
    while((ln = listNext(&li))) {
        client *receiver = listNodeValue(ln);
        //一个客户端可能block在多个key上? 是的
        //每个客户端的bpop结构里面记录了我都阻塞在了哪些key里面
        //但是,同时只能等待在某一类key上面,不能是多种。 因为blockClient 里面会重置btype, 且等待状态会拒绝其他请求
        if (receiver->btype != BLOCKED_STREAM) continue;
        //从这个客户端的等待的key列表里面找到这个key,然后比较id是否有新的,如果有新的就发送数据并且解开等待状态
        streamID *gt = dictFetchValue(receiver->bpop.keys, rl->key);
        if (s->last_id.ms > gt->ms ||
            (s->last_id.ms == gt->ms && s->last_id.seq > gt->seq))
        {
            streamID start = *gt;//这个客户端等待的开始ID
            start.seq++; /* Can t overflow, it s an uint64_t */

            /* If we blocked in the context of a consumer
             * group, we need to resolve the group and
             * consumer here. */
            streamCG *group = NULL;
            streamConsumer *consumer = NULL;
            //如果记录了group信息,那么查找一下这个group和consumer是不是存在
            if (receiver->bpop.xread_group) {
                group = streamLookupCG(s, receiver->bpop.xread_group->ptr);

重点说一下,上面循环所有等待的客户端列表,一个个判断他们是否跟当前key匹配,如果id等都合适,那么调用streamReplyWithRange 去扫描发送消息。

同时,如果有多个consumer等待在一个key上的情况, 怎么处理呢? 不会发生重复吗?

答案当然不会。

因为streamReplyWithRange每次回更新最大的last_id,后下回进来的时候第二个消费者其实不会有什么实际的操作发生。 这就是同组group内消费的时候,之后发生一次消费的原因。但是,这里也能看出来,redis没有kafka的partition方式的核心点,就在于消息是没有分partition的,他们在触发的时候从队列头部取一个客户端后发送出去。

同时,发送给一个客户端后这条消息的ID其实就到了group和consumer的 PEL列表里面了,之后就不会再投递给别人了。怎么实现的呢?后面看streamReplyWithRange 前面部分。

最后调用streamReplyWithRange 将制定范围的消息发送给这个消费者。

}
            if (group) {
                consumer = streamLookupConsumer(group, receiver->bpop.xread_consumer->ptr, 1);
            }

            /* Emit the two elements sub-array consisting of
             * the name of the stream and the data we
             * extracted from it. Wrapped in a single-item
             * array, since we have just one key. */
            addReplyMultiBulkLen(receiver,1);
            addReplyMultiBulkLen(receiver,2);
            addReplyBulk(receiver,rl->key);

            streamPropInfo pi = {
                rl->key,
                receiver->bpop.xread_group
            };
            //调用函数给这个group和consumer 发送指定位置的信息
            streamReplyWithRange(receiver,s,&start,NULL,
                                 receiver->bpop.xread_count,
                                 0, group, consumer, 0, &pi);

            /* Note that after we unblock the client,  gt 
             * and other receiver->bpop stuff are no longer
             * valid, so we must do the setup above before
             * this call. */
            //解锁客户端,这样会触发去处理这客户端阻塞期间的其他命令。也会从等待队列等数据中移除
            unblockClient(receiver);
        }
    }

streamReplyWithRange函数给这个group和consumer 发送指定位置的信息, 之后通过调用 unblockClient解锁客户端,这样会触发去处理这客户端阻塞期间的其他命令。也会从等待队列等数据中移除。

这样如果客户端还需要阻塞读取,就需要继续调用XREAD BLOCK timeout 读取。

四、总结

到这里阻塞读取消息队列的方式也讲完了,redis比较巧妙的利用了自己的db数据结构,实现了kafka大量重要的消息队列特性。

包括ACK, group消费, 消息ID支持回溯消费旧消息, 支持多消费者消费同一个队列且互相不重复, 支持多个消费者消费多次同一个队列。

也支持内存保存一部分队列内容。

期待Redis能继续完善Stream, 改进目前的一些限制,让大家用的更加方便更加爽,能一定程度代替kafka ^.^ 。

后面分个文章讲一下block的原理。

Redis 5.0 重量级特性 Stream 实现源码分析(二)XREAD 消费流程

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

网站重构

网站重构

[美] Jeffrey Zeldman / 傅捷、王宗义、祝军 / 电子工业出版社 / 2005-4 / 38.00元

这本书是为了希望自己的网站成本变得更低,运行得更好,访问者更多的网页设计师、开发者、网站所有者及管理者写的。 书中着重分析了目前网站建设中存在的一些问题,以及“Web标准”思想的产生、发展和推广,并从技术细节上讲解了网站实际制作和开发的过程中如何向Web标准过渡,如何采用和符合Web标准。本书的出版目的就是帮助读者理解Web标准,创建出用最低的费用达到最多的用户,并维持最长时间的网站,并且提......一起来看看 《网站重构》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器