内容简介:kafka在日志处理领域由于其一系列的工具,其他相关服务,基本上算得上是王者,但是缺点也很明显: 难以维护,性能低,需要大量机器,部署复杂,参数众多。前阵子Redis 5.0 Beta版本发布 , 随之而来的重大特性“关于stream的使用,主要的命令是XADD, XREAD,XREADGROUP, XGROUP等指令,具体使用方法可以参考上面的文章,不习惯英文的可以看看这里有篇文章写的也很仔细“
kafka在日志处理领域由于其一系列的工具,其他相关服务,基本上算得上是王者,但是缺点也很明显: 难以维护,性能低,需要大量机器,部署复杂,参数众多。
前阵子 Redis 5.0 Beta版本发布 , 随之而来的重大特性“ Introduction to Redis Streams ”, 似乎跟kafka在功能上有很多类似的地方,但也有不少不一样的点。
关于stream的使用,主要的命令是XADD, XREAD,XREADGROUP, XGROUP等指令,具体使用方法可以参考上面的文章,不习惯英文的可以看看这里有篇文章写的也很仔细“ 挑战Kafka!Redis5.0重量级特性Stream尝鲜 ”。 这里只分析代码实现方式,功能是怎么实现的。
一、总体介绍
先大概介绍一下几个最重要的点:
- stream是一个可持久化的消息队列, 对标kafka,解决了pubsub订阅发布功能不能持久化的问题;
- 支持分组多次消费,同样有group,消费者的概念。这样能通过多个group的形式,实现多次消费同一个消息队列,不过redis在这块实现的比较简单,并没有像kafka那样严格的partition,后者在partition上面有复杂的逻辑,比如consumer 挂掉后,其他消费者能一起再选举,选出谁来负责这个对应的partition,一个partition严格只属于某一个consumer来消费。redis在这方面实现比较轻量级;
- 支持position,能够消费历史消息,也能轻松移动消费id位置,这样给故障恢复带来了很多可以操作的空间,不至于消息丢了,没办法重复获取历史消息;
- 有ACK机制,能够一定程度保证消息的“at least once” 投递。当然带来的问题也显而易见: consumer需要能够支持重放,消息可能重复到来。
- 消息可以设置最大保存范围,这样不用担心塞满内存。但是kafka不会立即持久化消息到磁盘,这个依赖aof等常规机制,这一点也很轻量。消息内容是保存在rdb文件里面的,而不是一堆文件。
值得注意的一点是,redis虽然提供了XADD操作,但是不代表redis会主动重传,而是将没有收到XACK的消息保存到了PEL 树里面,等消费者再次来消费的时候,带着一个小id(通常是0-0)的时候,就会将PEL里面的消息发送给这个消费者。所以,没有自动重传的。
接下来了解一下主要的数据结构。
二、重要数据结构
下面先介绍几个重要的用到的数据结构。
0. streamID 消息ID
redis stream的消息ID比较特殊,是一个timestamp-sequencenum的结构,基本上可以理解为先比较时间戳,后比较序列号了。
typedef struct streamID {
uint64_t ms; /* Unix time in milliseconds. */
uint64_t seq; /* Sequence number. */
} streamID;
1. stream 消息队列
redis的一个stream消息队列用stream结构表示,同样存储在db里面,队列名字就是db里面的key,type用OBJ_STREAM来表示。
里面最重要的衬衣就是rax了,redis引入了一个很重要的树结构,用来做字符串压缩和存储消息队列的,这块后面的文章在写其原理,先介绍一下整体实现方式。
从下面的结构体里面,还有一个rax 是cgroups,用来存储这个stream都有哪些group存在。
typedef struct stream {
//rax树存储真正的消息内容
rax *rax; /* The radix tree holding the stream. */
//这个stream的消息长度
uint64_t length; /* Number of elements inside this stream. */
//当前stream的最后一个id
streamID last_id; /* Zero if there are yet no items. */
//这个stream有哪些group, group里面会存储对应的有哪些consumers
rax *cgroups; /* Consumer groups dictionary: name -> streamCG */
} stream;
2. streamCG 消费组
消费组存储在上面stream结构的cgroups里面,并且有几个重要的成员:
1. last_id 每个组的消费者共享一个last_id代表这个组消费到了什么位置,每次投递后会更新这个group。当然是必须指定group的时候会更新。
2. pel 已经发送给客户端,但是还没有收到XACK的消息都存储在pel树里面,顾名思义“Pending entries list”, 在文章开头也说了,redis不会主动重传而是依赖客户端主动来查询其未确认的消息列表。作者建议对于group消费者,每次启动的时候先用ID : 0-0 来查询其未确认消息,然后自主选择怎么处理。
3. consumers 同理,存储了我这个组有哪些消费者,后面介绍消费者结构。
/* Consumer group. */
typedef struct streamCG {
//当前我这个stream的最大id
streamID last_id;
//还没有收到ACK的消息列表
rax *pel;
//这个group里面对一个的consumers有哪些, 后者类型为 streamConsumer
rax *consumers;
} streamCG;
3. streamConsumer消费者结构
消费者比较简单,名字,活跃时间,以及pel列表,pel列表group里面也有一个,实际上指向都是一样的。
typedef struct streamConsumer {
mstime_t seen_time; /* Last time this consumer was active. */
sds name; 、
//待ACK的消息列表,注意这个功能streamCG 里面的实际上指向一个。也就是说一条消息,两个地方都会记住
rax *pel;
} streamConsumer;
主要的数据结构介绍差不多了,接下来从新增一条消息开始了解一下实现方法。
三、xadd实现原理
XADD 指令由xaddCommand 实现,其语法为:
XADD key [MAXLEN
函数开始例行检查MAXLEN参数,ID参数,设置标记变量和id, maxlen值,如果指定了的话。 然后调用streamTypeLookupWriteOrCreate 来询到对应stream名的robj结构,从中获取stream结构. 如果stream不存在,会创建之。最后通过streamAppendItem 将参数里面的field列表插入到rax树里面去。
robj *o;
stream *s;
//查询到对应stream名的robj结构,从中获取stream结构. 如果stream不存在,会创建之
if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
s = o->ptr;
/* Append using the low level function and return the ID. */
//将后面的参数写入stream中, stream使用rax树来组织的,然后 每个rax树里面的节点使用listpack列表来组织的,后者又有压缩在里面
if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2,
&id, id_given ? &id : NULL)
== C_ERR)
{
addReplyError(c,"The ID specified in XADD is equal or smaller than the "
"target stream top item");
return;
}
//返回结果
addReplyStreamID(c,&id);
streamTypeLookupWriteOrCreate 比较简单, 调用createStreamObject 创建一个stream结构。
robj *streamTypeLookupWriteOrCreate(client *c, robj *key) {
robj *o = lookupKeyWrite(c->db,key);
if (o == NULL) {
//不存在,创建一个stream结构
o = createStreamObject();
dbAdd(c->db,key,o);
} else {
if (o->type != OBJ_STREAM) {
addReply(c,shared.wrongtypeerr);
return NULL;
}
}
return o;
}
继续看一下stream是怎么创建的:
robj *createStreamObject(void) {
//创建一个stream结构,并且放到redis的通用robj里面、
stream *s = streamNew();
robj *o = createObject(OBJ_STREAM,s);
o->encoding = OBJ_ENCODING_STREAM;
return o;
}
stream *streamNew(void) {、
//新建stream, cgroups暂时置空,需要的时候再分配内存
stream *s = zmalloc(sizeof(*s));
s->rax = raxNew();
s->length = 0;
s->last_id.ms = 0;
s->last_id.seq = 0;
s->cgroups = NULL; /* Created on demand to save memory when not used. */
return s;
}
可以看出,stream其实也是作为一个robj结构,类型为 OBJ_STREAM, 存储在数据库里面的,类似于其他list等结构,没有什么特殊的。
streamAppendItem 将后面的参数写入stream中, stream使用rax树来组织的,然后 每个rax树里面的节点使用listpack列表来组织的,后者又有压缩在里面。 这块略复杂,篇幅太长后面文章在继续写,里面最重要的就是有个listpack的结构,能够压缩field 列表的内存空间占用,避免重复的key占用太多空间。
listpack这个还挺期待能够加在其他所有key里面的。
接下来 如果传递了maxlen, 这回进行trim,不过暂时先不深入到里面了, 之后就是处理同步问题,重写一下对应的id字段,redis的上层调用函数会把这条指令propgate广播到从库,果从库也都自动创建id,那时间戳就不一样了,所以这里直接修改上层使用的参数了。
/* Let s rewrite the ID argument with the one actually generated for
* AOF/replication propagation. */
robj *idarg = createObjectFromStreamID(&id);
rewriteClientCommandArgument(c,i,idarg);
decrRefCount(idarg);
//通知等待在这个key上面的客户端,给他们新的内容
if (server.blocked_clients_by_type[BLOCKED_STREAM]) ///在这个阻塞类型上阻塞的客户端数目,用来加速,没必要的就 不需要进去了
signalKeyAsReady(c->db, c->argv[1]);
}
上面最重要的函数便是signalKeyAsReady, 会标记这个key有新事件,后面需要处理阻塞相关的事情。这块放最后一节说。
下一节开始写XREAD,XREADGROUP的操作原理。
以上所述就是小编给大家介绍的《Redis 5.0 重量级特性 Stream 实现源码分析(一)overview,XADD》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Redis 5.0 重量级特性 Stream 实现源码分析(二)XREAD 消费流程
- 开源作者痛斥京东重量级项目抄袭
- 开源作者痛斥京东重量级项目抄袭
- Java:锁的四中状态:无锁,偏向锁,轻量级锁,重量级锁
- 每日新知 - php7.4重量级特性:预加载
- [ Laravel 5.7 文档 ] 快速入门 —— 重量级开发环境:Homestead
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Ant Colony Optimization
Marco Dorigo、Thomas Stützle / A Bradford Book / 2004-6-4 / USD 45.00
The complex social behaviors of ants have been much studied by science, and computer scientists are now finding that these behavior patterns can provide models for solving difficult combinatorial opti......一起来看看 《Ant Colony Optimization》 这本书的介绍吧!