内容简介:流(Stream)是Redis从5.0.0版本新加入的一个数据结构,是一个类似于Kafka的消息系统。该结构相关的大部分命令使用字母在开始详细叙述之前,先说明一下:本文内容主要是结合官网文章Introduction to Redis Streams 和个人理解整理而成。另外想吐槽下自己,游戏玩多了也不太好,
流(Stream)是 Redis 从5.0.0版本新加入的一个数据结构,是一个类似于Kafka的消息系统。该结构相关的大部分命令使用字母 X
开头 如 XADD
, XLEN
, XRANGE
等。
在开始详细叙述之前,先说明一下:
本文内容主要是结合官网文章Introduction to Redis Streams 和个人理解整理而成。另外想吐槽下自己,游戏玩多了也不太好, Stream
总是不自觉的写成了 Steam
。
向Stream中添加数据
命令格式如
如
返回的 1541558444516-0
是消息id。 可以看到上面的命令ID参数传的是 *
, 代表由系统生成的ID,当然,你可以显式的指定消息ID(基本不太常用),对于这个ID有几点需要注意:
-
ID必须是由
-
连接的两部分,前一部分默认情况下是当前Server当前的毫秒时间戳,后一部分是一个无符号64位长整型序列号; -
同一个KEY下,后加入的ID一定要比已加入的ID大。
获取Stream长度
使用 XLEN
, 如
从Stream中获取数据
有三种方式从Stream中获取数据
按照范围查询
包括 XRANGE
和 XREVRANGE
两个命令 ,分别是正序和反序, 以正序 XRANGE
为例:
从Stream这个key中返回ID范围是 start
到 end
的 [前count个]
数据。 如
-
, +
分别表示最小和最大ID。
监听(XREAD)
XREAD
命令
如
执行完成后再执行:
发现Stream的长度没有变化,也就是说, XREAD
不会删除Stream里的数据。
上面的这个例子是一个非阻塞的方式监听。当使用 BLOCK
参数,并传递一个超时时间(0为永不超时),将启动一个阻塞方式的监听。 特殊的ID $
表示从最新的ID开始监听。如:
启动一个监听客户端:
该命令阻塞等待, 此时另起一个客户端:
等待的客户端收到消息:
如果有多个客户端都在监听同一个流,这些客户端都可以得到流中的数据。
消费者组 (Consumer Group)
机制说明
涉及三个命令 分别是
-
XGROUP
: 创建或者销毁一个 Consumer Group, 也可以从Consumer Group中删除一个 Consumer -
XREADGROUP
: 指定 Consumer Group 中的一个Consumer,消费一条消息 -
XACK
: 在XREADGROUP
调用时不指定NOACK
时需要显式调用XACK
命令 来确认该消息已被正确处理,可以删除。
消费者组的消费方式可以用下图表示
一个消息 msg 可通过 group1 和 group2 分发,并且 group1 中的 msg 会被 consumer1 或者 consumer2 消费,group2 中的 msg 会被 consumer3 或者 comsumer4 消费。
创建/消费/确认
使用 XGROUP
命令创建一个consumer group,如
这样就创建了一个名为foocg1的 consumer group, 其中 $
表示该组将要消费当前时间开始的消息,然后我们向Stream中添加一些消息:
此时,使用foocg1下的c1消费者来消费一条消息
其中最后的ID字段 指定为 >
, 表示只获取那些从来没有被分发的消息。
我们继续消费一条消息
然后,再消费历史上所有的数据
注意这里ID传的是 0-0
, 此时会发现消费的是第一条消息。也就是说,没有经过XACK的消息依旧会保留在队列中。
执行 XACK
操作:
此时再去消费历史数据
发现已经获取不到被 XACK
的消息了,当所有的历史数据全部被 XACK
后:
一个伪码表示的客户端
一个消费者组的实现的伪码表示可以写作:
XPENDING 和 XCLAIM
XPENDING
可以获取消息系统中已经分发但是未被 XACK
的消息的情况
如:
表示有8个未确认消息,最小ID是"1541573732130-0",最大ID是"1541581755413-0", 其中c1 消费者有8个未确认的消息。传递start, end, count参数可以获取指定范围指定数目的未确认消息的详细信息,传递consumer可获取指定consumer下未确认信息列表。
XCLAIM
可以将未被确认的消息重新声明给其他消费者
如下面命令可以获取到一条原属于c1的消息未被确认:
下面命令可以将将原本属于 消费者c1 的消息 1541581753377-0 在等待确认的时间>30000情况下重新声明给c2
此时
XPENDING
和 XCLAIM
可以用来处理当一个消费者获取到一个消息后,运行失败导致无法执行 XACK
,此时这个消息就永远不会进行 确认已消费
操作的情形。
其他命令
-
XINFO
可以查看流的一些信息 -
XTRIM
可以获得一个有长度上限的Stream -
XDEL
可以从Stream中删除消息
这些命令可以在官网找到详细的说明,这里就不再赘述了。
其他说明
-
Stream支持AOF和RDB格式的持久化
-
当调用XDEL等造成Stream长度为0时,为了保留可能存在的Consumer Group信息,Stream不会被删除。
-
Redis Cluster场景下,由于Key存在于单节点下,所以同一个流的所有消息也会位于同一个节点下。
-
由于同一个Stream(Key)下的所有消息位于同一节点,类比Kafka分区更像是使用多个Key形成多个Stream来处理本质上是同一类消息的一个Stream,而不是Stream下的Consumer Group。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Redis设计与实现
黄健宏 / 机械工业出版社 / 2014-6 / 79.00
【官方网站】 本书的官方网站 www.RedisBook.com 提供了书本试读、相关源码下载和勘误回报等服务,欢迎读者浏览和使用。 【编辑推荐】 系统而全面地描述了 Redis 内部运行机制 图示丰富,描述清晰,并给出大量参考信息,是NoSQL数据库开发人员案头必备 包括大部分Redis单机特征,以及所有多机特性 【读者评价】 这本书描述的知识点很丰富,......一起来看看 《Redis设计与实现》 这本书的介绍吧!