内容简介:Redis 通过 PUBLISH 、 SUBSCRIBE 等命令实现了发布订阅模式。该功能提供两种信息机制, 分别是“发布订阅到频道”和“发布订阅到模式”。Redis 的Redis 的
目录
-
- PUBLISH 命令和 SUBSCRIBE 命令
- PSUBSCRIBE 模式订阅命令
- Redis 发布/订阅的存储结构
- Spring Data Redis 实现发布/订阅模式
Redis 发布/订阅命令
Redis 通过 PUBLISH 、 SUBSCRIBE 等命令实现了发布订阅模式。该功能提供两种信息机制, 分别是“发布订阅到频道”和“发布订阅到模式”。
PUBLISH 命令和 SUBSCRIBE 命令
PUBLISH channel message 复制代码
Redis 的 PUBLISH
命令可以让客户端把指定的消息发送到指定的频道中。
SUBSCRIBE channel [channel …] 复制代码
Redis 的 SUBSCRIBE
命令可以让客户端订阅任意数量的频道, 每当有新信息发送到被订阅的频道时,信息就会被发送给所有订阅指定频道的客户端。
下面我们就演示一下 PUBLISH命令和SUBSCRIBE命令的用法:
首先是订阅单个频道:
然后是订阅多个频道:
PSUBSCRIBE 模式订阅命令
Redis 的发布与订阅实现支持模式匹配(pattern matching)。
客户端可以订阅一个带 * 号的模式,如果某个/某些频道的名字和这个模式匹配,那么当有信息发送给这个/这些频道的时候,客户端也会收到这个/这些频道的信息。
客户端订阅的模式里面可以包含多个 glob 风格的通配符, 比如 * 、 ? 和 [...] 等。
比如执行命令:
PSUBSCRIBE t.* 复制代码
客户端将收到来自 t.java、 t.db 等频道的信息。
Redis 发布/订阅的存储结构
每个 Redis 服务器进程都维持着一个表示服务器状态的 redis.h/redisServer
结构, 结构的 pubsub_channels
属性是一个字典, 这个字典就用于保存订阅频道的信息:
struct redisServer { // ... dict *pubsub_channels; // ... } 复制代码
其中,字典的键为正在被订阅的频道, 而字典的值则是一个链表, 链表中保存了所有订阅这个频道的客户端。
当调用 PUBLISH channel message
命令的时候,程序首先根据 channel 定位到字典的键,然后将信息发送给字典值链表中的所有客户端。
Redis发布/订阅存储结构如下图所示:
Spring Data Redis 实现发布/订阅模式
下面带你一步步通过 Spring Data Redis 来实现发布与订阅。
示例项目基于SpringBoot搭建,你可以在这里找到 Spring Data Redis 实现发布/订阅模式的源码 。
由于篇幅原因下面就不再演示项目搭建和集成Redis的过程了,实现细节请参考 springboot redis demo project 。
MessagePublisher
首先定义一个发布者接口,接口只有一个 void publish(String message)
方法,用于发布消息。
public interface MessagePublisher { /** * publish message * @param message */ void publish(String message); } 复制代码
然后提供一个基于Redis的 MessagePublisher
实现。
其中最核心的是这个方法: redisTemplate.convertAndSend(topic.getTopic(), message)
,用于把消息发送到指定topic的channel之中。
import lombok.Setter; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; /** * Redis message publisher * * @author ijiangtao * @create 2019-05-01 19:36 **/ @Setter public class RedisMessagePublisher implements MessagePublisher { private RedisTemplate<String, String> redisTemplate; private ChannelTopic topic; private RedisMessagePublisher() { } public RedisMessagePublisher(RedisTemplate<String, String> redisTemplate, ChannelTopic topic) { this.redisTemplate = redisTemplate; this.topic = topic; } public void publish(String message) { redisTemplate.convertAndSend(topic.getTopic(), message); } } 复制代码
MessageListener
RedisMessageSubscriber
是一个订阅者,它实现了 MessageListener
接口,并通过一个 messageList
来存/取监听到的消息。
import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.stereotype.Component; import java.util.List; /** * Redis Message Subscriber * <p> * RedisMessageSubscriber implements the Spring Data Redis-provided MessageListener interface * * @author ijiangtao * @create 2019-05-01 19:39 **/ @AllArgsConstructor @NoArgsConstructor(access = AccessLevel.PRIVATE) @Data @Component public class RedisMessageSubscriber implements MessageListener { private List<String> messageList; public void onMessage(Message message, byte[] pattern) { messageList.add("[pattern:" + new String(pattern) + ",message:" + message.toString() + "]"); } } 复制代码
RedisPubSubConfig
下面定义了两个“topic”,并且通过两个“publisher`将“message”发布到“channel”指定的“topic”上。
然后我们定义了两个“subscriber”,“subscriber1”订阅了“topic1”和“topic2”,“subscriber2”只订阅了“topic2”。
最后我们将这些发布者和订阅者都注册到了 Spring Data Redis 提供的容器( RedisMessageListenerContainer
)中。
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.repository.configuration.EnableRedisRepositories; import java.util.ArrayList; /** * config * * @author ijiangtao * @create 2019-05-01 19:57 **/ @Configuration @ComponentScan("net.ijiangtao.tech.framework.spring.ispringboot.redis") @EnableRedisRepositories(basePackages = "net.ijiangtao.tech.framework.spring.ispringboot") @PropertySource("classpath:application.properties") public class RedisPubSubConfig { @Autowired private RedisTemplate<String, String> redisTemplate; @Bean RedisMessageListenerContainer redisContainer() { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(redisTemplate.getConnectionFactory()); container.addMessageListener(messageListenerAdapter1() , topic1()); container.addMessageListener(messageListenerAdapter1() , topic2()); container.addMessageListener(messageListenerAdapter2(), topic2()); return container; } @Bean MessageListenerAdapter messageListenerAdapter1() { return new MessageListenerAdapter(messageListener1()); } @Bean public RedisMessageSubscriber messageListener1() { return new RedisMessageSubscriber(new ArrayList<>()); } @Bean MessageListenerAdapter messageListenerAdapter2() { return new MessageListenerAdapter(messageListener2()); } @Bean public RedisMessageSubscriber messageListener2() { return new RedisMessageSubscriber(new ArrayList<>()); } @Bean MessagePublisher redisPublisherForTopic1() { return new RedisMessagePublisher(redisTemplate, topic1()); } @Bean MessagePublisher redisPublisherForTopic2() { return new RedisMessagePublisher(redisTemplate, topic2()); } @Bean ChannelTopic topic1() { return new ChannelTopic("topic1"); } @Bean ChannelTopic topic2() { return new ChannelTopic("topic2"); } } 复制代码
Unit Test
下面我们通过单元测试,往“topic1”和“topic2”分别发布了十条消息,然后遍历“subscriber1”和“subscriber2”监听到的消息内容。
import lombok.extern.slf4j.Slf4j; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.List; import java.util.UUID; /** * Redis Pub/Sub tests * * @author ijiangtao * @create 2019-05-01 19:12 **/ @RunWith(SpringRunner.class) @SpringBootTest @Slf4j public class RedisPubSub { @Autowired @Qualifier("redisPublisherForTopic1") private MessagePublisher redisPublisher1; @Autowired @Qualifier("redisPublisherForTopic2") private MessagePublisher redisPublisher2; @Autowired @Qualifier("messageListener1") private RedisMessageSubscriber subscriber1; @Autowired @Qualifier("messageListener2") private RedisMessageSubscriber subscriber2; @Test public void test1() { // 循环发布10次消息, 主要方法 redisTemplate.convertAndSend for (int i = 0; i < 10; i++) { String message = "Topic1 Message : " + UUID.randomUUID(); redisPublisher1.publish(message); } // 循环发布10次消息, 主要方法 redisTemplate.convertAndSend for (int i = 0; i < 10; i++) { String message = "Topic2 Message : " + UUID.randomUUID(); redisPublisher2.publish(message); } // 获取存储的订阅消息 List<String> messageList1 = subscriber1.getMessageList(); for (int i = 0; i < messageList1.size(); i++) { log.info(messageList1.get(i)); } // 获取存储的订阅消息 List<String> messageList2 = subscriber2.getMessageList(); for (int i = 0; i < messageList2.size(); i++) { log.info(messageList2.get(i)); } } } 复制代码
“subscriber1”监听到了“redisPublisher1”和“redisPublisher2”发布的共20条消息:
[pattern:topic1,message:Topic1 Message : 2239af04-8e91-4adf-8e1e-98261a44ff77] [pattern:topic1,message:Topic1 Message : 85107f06-2cae-4d6c-8123-9e8dc6e7a608] [pattern:topic1,message:Topic1 Message : 0b80b9b8-8eee-476e-8462-bb6cbbbcf863] [pattern:topic1,message:Topic1 Message : 0983f28d-d220-4538-b15e-dc66c0d3e491] [pattern:topic1,message:Topic1 Message : 0f2d863c-00b9-4406-8e49-020c78a3632d] [pattern:topic1,message:Topic1 Message : b8a0bb35-6cc2-4393-9136-2390de80f709] [pattern:topic1,message:Topic1 Message : 027f1ca5-39cc-42c6-a4d8-87dc138260b1] [pattern:topic1,message:Topic1 Message : ff85595e-2864-4dec-96c1-9dd29c69f670] [pattern:topic1,message:Topic1 Message : 77471855-f04b-437d-bd1b-afb801a33cf9] [pattern:topic1,message:Topic1 Message : feba4b0f-70c1-4c14-8ecb-bf4c6956f374] [pattern:topic2,message:Topic2 Message : dd5e97a7-5ed3-4d0c-be04-0ec2076b14fc] [pattern:topic2,message:Topic2 Message : c415c846-8597-49e5-a5e4-04c738dcdb77] [pattern:topic2,message:Topic2 Message : 07981d33-c894-43e7-9305-702f967c0a12] [pattern:topic2,message:Topic2 Message : 31a34aab-a363-4f5c-9409-3af12b0c3e61] [pattern:topic2,message:Topic2 Message : 73a4e995-399a-4291-a777-67fc2e4eb3ad] [pattern:topic2,message:Topic2 Message : 09ce7d6b-3257-49ca-a7b4-8ef900944596] [pattern:topic2,message:Topic2 Message : 42018099-0d56-4137-98c4-57716d2fbdcd] [pattern:topic2,message:Topic2 Message : 0b362883-ab76-413e-9c31-60650f3ed223] [pattern:topic2,message:Topic2 Message : 7097d949-8184-40b4-b9af-582ab44e342e] [pattern:topic2,message:Topic2 Message : 6967291d-aad4-4aa3-b44a-b988a9589700] 复制代码
“subscriber2”监听到了“redisPublisher2”发布的共10条消息:
[pattern:topic2,message:Topic2 Message : dd5e97a7-5ed3-4d0c-be04-0ec2076b14fc] [pattern:topic2,message:Topic2 Message : c415c846-8597-49e5-a5e4-04c738dcdb77] [pattern:topic2,message:Topic2 Message : 07981d33-c894-43e7-9305-702f967c0a12] [pattern:topic2,message:Topic2 Message : 31a34aab-a363-4f5c-9409-3af12b0c3e61] [pattern:topic2,message:Topic2 Message : 73a4e995-399a-4291-a777-67fc2e4eb3ad] [pattern:topic2,message:Topic2 Message : 09ce7d6b-3257-49ca-a7b4-8ef900944596] [pattern:topic2,message:Topic2 Message : 42018099-0d56-4137-98c4-57716d2fbdcd] [pattern:topic2,message:Topic2 Message : 0b362883-ab76-413e-9c31-60650f3ed223] [pattern:topic2,message:Topic2 Message : 7097d949-8184-40b4-b9af-582ab44e342e] [pattern:topic2,message:Topic2 Message : 6967291d-aad4-4aa3-b44a-b988a9589700] 复制代码
总结
之前介绍了 发布/订阅模式的基本概念和设计原理 ,本文从 Redis 发布和订阅相关的命令开始,逐步讲解了 Redis 发布订阅的存储结构,以及如何通过 Spring Data Redis 实现发布订阅模式。
本文是 精通Redis系列 和 精通 设计模式 系列 教程的一部分,欢迎关注我的公众号,和作者一起迭代成长。
以上所述就是小编给大家介绍的《设计模式之发布订阅模式(2) Redis实现发布订阅模式》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 设计模式之发布订阅模式(1) 一文搞懂发布订阅模式
- 设计模式——订阅模式(观察者模式)
- 发布订阅模式与观察者模式
- TypeScript 设计模式之发布:订阅模式
- 观察者模式 vs 发布订阅模式
- 观察者模式 vs 发布订阅模式
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Go程序设计语言
艾伦 A. A. 多诺万 / 李道兵、高博、庞向才、金鑫鑫、林齐斌 / 机械工业出版社 / 2017-5 / 79
本书由《C程序设计语言》的作者Kernighan和谷歌公司Go团队主管Alan Donovan联袂撰写,是学习Go语言程序设计的指南。本书共13章,主要内容包括:Go的基础知识、基本结构、基本数据类型、复合数据类型、函数、方法、接口、goroutine、通道、共享变量的并发性、包、go工具、测试、反射等。 本书适合作为计算机相关专业的教材,也可供Go语言爱好者阅读。一起来看看 《Go程序设计语言》 这本书的介绍吧!