Redis 内置了一个 Kafka:Stream

栏目: 数据库 · 发布时间: 5年前

内容简介:流(Stream)是Redis从5.0.0版本新加入的一个数据结构,是一个类似于Kafka的消息系统。该结构相关的大部分命令使用字母在开始详细叙述之前,先说明一下:本文内容主要是结合官网文章Introduction to Redis Streams 和个人理解整理而成。另外想吐槽下自己,游戏玩多了也不太好,

流(Stream)是 Redis 从5.0.0版本新加入的一个数据结构,是一个类似于Kafka的消息系统。该结构相关的大部分命令使用字母 X 开头 如 XADD , XLEN , XRANGE 等。

在开始详细叙述之前,先说明一下:

本文内容主要是结合官网文章Introduction to Redis Streams 和个人理解整理而成。另外想吐槽下自己,游戏玩多了也不太好, Stream 总是不自觉的写成了 Steam

向Stream中添加数据

命令格式如

返回的 1541558444516-0 是消息id。 可以看到上面的命令ID参数传的是 * , 代表由系统生成的ID,当然,你可以显式的指定消息ID(基本不太常用),对于这个ID有几点需要注意:

  • ID必须是由  - 连接的两部分,前一部分默认情况下是当前Server当前的毫秒时间戳,后一部分是一个无符号64位长整型序列号;

  • 同一个KEY下,后加入的ID一定要比已加入的ID大。

获取Stream长度

使用 XLEN , 如

从Stream中获取数据

有三种方式从Stream中获取数据

按照范围查询

包括 XRANGEXREVRANGE 两个命令 ,分别是正序和反序, 以正序 XRANGE 为例:

从Stream这个key中返回ID范围是 startend[前count个] 数据。 如

- , + 分别表示最小和最大ID。

监听(XREAD)

XREAD 命令

执行完成后再执行:

发现Stream的长度没有变化,也就是说, XREAD 不会删除Stream里的数据。

上面的这个例子是一个非阻塞的方式监听。当使用 BLOCK 参数,并传递一个超时时间(0为永不超时),将启动一个阻塞方式的监听。 特殊的ID $ 表示从最新的ID开始监听。如:

启动一个监听客户端:

该命令阻塞等待, 此时另起一个客户端:

等待的客户端收到消息:

如果有多个客户端都在监听同一个流,这些客户端都可以得到流中的数据。

消费者组 (Consumer Group)

机制说明

涉及三个命令 分别是

  • XGROUP : 创建或者销毁一个 Consumer Group, 也可以从Consumer Group中删除一个 Consumer

  • XREADGROUP : 指定 Consumer Group 中的一个Consumer,消费一条消息

  • XACK : 在  XREADGROUP 调用时不指定  NOACK 时需要显式调用  XACK 命令 来确认该消息已被正确处理,可以删除。

消费者组的消费方式可以用下图表示

一个消息 msg 可通过 group1 和 group2 分发,并且 group1 中的 msg 会被 consumer1 或者 consumer2 消费,group2 中的 msg 会被 consumer3 或者 comsumer4 消费。

创建/消费/确认

使用 XGROUP 命令创建一个consumer group,如

这样就创建了一个名为foocg1的 consumer group, 其中 $ 表示该组将要消费当前时间开始的消息,然后我们向Stream中添加一些消息:

此时,使用foocg1下的c1消费者来消费一条消息

其中最后的ID字段 指定为 > , 表示只获取那些从来没有被分发的消息。

我们继续消费一条消息

然后,再消费历史上所有的数据

注意这里ID传的是 0-0 , 此时会发现消费的是第一条消息。也就是说,没有经过XACK的消息依旧会保留在队列中。

执行 XACK 操作:

此时再去消费历史数据

发现已经获取不到被 XACK 的消息了,当所有的历史数据全部被 XACK 后:

一个伪码表示的客户端

一个消费者组的实现的伪码表示可以写作:

XPENDING 和 XCLAIM

XPENDING 可以获取消息系统中已经分发但是未被 XACK 的消息的情况

如:

表示有8个未确认消息,最小ID是"1541573732130-0",最大ID是"1541581755413-0", 其中c1 消费者有8个未确认的消息。传递start, end, count参数可以获取指定范围指定数目的未确认消息的详细信息,传递consumer可获取指定consumer下未确认信息列表。

XCLAIM 可以将未被确认的消息重新声明给其他消费者

如下面命令可以获取到一条原属于c1的消息未被确认:

下面命令可以将将原本属于 消费者c1 的消息 1541581753377-0 在等待确认的时间>30000情况下重新声明给c2

此时

XPENDINGXCLAIM 可以用来处理当一个消费者获取到一个消息后,运行失败导致无法执行 XACK ,此时这个消息就永远不会进行 确认已消费 操作的情形。

其他命令

  • XINFO 可以查看流的一些信息

  • XTRIM 可以获得一个有长度上限的Stream

  • XDEL 可以从Stream中删除消息

这些命令可以在官网找到详细的说明,这里就不再赘述了。

其他说明

  1. Stream支持AOF和RDB格式的持久化

  2. 当调用XDEL等造成Stream长度为0时,为了保留可能存在的Consumer Group信息,Stream不会被删除。

  3. Redis Cluster场景下,由于Key存在于单节点下,所以同一个流的所有消息也会位于同一个节点下。

  4. 由于同一个Stream(Key)下的所有消息位于同一节点,类比Kafka分区更像是使用多个Key形成多个Stream来处理本质上是同一类消息的一个Stream,而不是Stream下的Consumer Group。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Redis设计与实现

Redis设计与实现

黄健宏 / 机械工业出版社 / 2014-6 / 79.00

【官方网站】 本书的官方网站 www.RedisBook.com 提供了书本试读、相关源码下载和勘误回报等服务,欢迎读者浏览和使用。 【编辑推荐】 系统而全面地描述了 Redis 内部运行机制 图示丰富,描述清晰,并给出大量参考信息,是NoSQL数据库开发人员案头必备 包括大部分Redis单机特征,以及所有多机特性 【读者评价】 这本书描述的知识点很丰富,......一起来看看 《Redis设计与实现》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码