设计模式之发布订阅模式(2) Redis实现发布订阅模式

栏目: 数据库 · 发布时间: 6年前

内容简介:Redis 通过 PUBLISH 、 SUBSCRIBE 等命令实现了发布订阅模式。该功能提供两种信息机制, 分别是“发布订阅到频道”和“发布订阅到模式”。Redis 的Redis 的

目录

    • PUBLISH 命令和 SUBSCRIBE 命令
    • PSUBSCRIBE 模式订阅命令
  • Redis 发布/订阅的存储结构
  • Spring Data Redis 实现发布/订阅模式

Redis 发布/订阅命令

Redis 通过 PUBLISH 、 SUBSCRIBE 等命令实现了发布订阅模式。该功能提供两种信息机制, 分别是“发布订阅到频道”和“发布订阅到模式”。

PUBLISH 命令和 SUBSCRIBE 命令

设计模式之发布订阅模式(2) Redis实现发布订阅模式
PUBLISH channel message
复制代码

Redis 的 PUBLISH 命令可以让客户端把指定的消息发送到指定的频道中。

SUBSCRIBE channel [channel …]
复制代码

Redis 的 SUBSCRIBE 命令可以让客户端订阅任意数量的频道, 每当有新信息发送到被订阅的频道时,信息就会被发送给所有订阅指定频道的客户端。

下面我们就演示一下 PUBLISH命令和SUBSCRIBE命令的用法:

首先是订阅单个频道:

设计模式之发布订阅模式(2) Redis实现发布订阅模式

然后是订阅多个频道:

设计模式之发布订阅模式(2) Redis实现发布订阅模式

PSUBSCRIBE 模式订阅命令

设计模式之发布订阅模式(2) Redis实现发布订阅模式

Redis 的发布与订阅实现支持模式匹配(pattern matching)。

客户端可以订阅一个带 * 号的模式,如果某个/某些频道的名字和这个模式匹配,那么当有信息发送给这个/这些频道的时候,客户端也会收到这个/这些频道的信息。

客户端订阅的模式里面可以包含多个 glob 风格的通配符, 比如 * 、 ? 和 [...] 等。

比如执行命令:

PSUBSCRIBE t.*
复制代码

客户端将收到来自 t.java、 t.db 等频道的信息。

设计模式之发布订阅模式(2) Redis实现发布订阅模式

Redis 发布/订阅的存储结构

每个 Redis 服务器进程都维持着一个表示服务器状态的 redis.h/redisServer 结构, 结构的 pubsub_channels 属性是一个字典, 这个字典就用于保存订阅频道的信息:

struct redisServer {
    // ...
    dict *pubsub_channels;
    // ...
}
复制代码

其中,字典的键为正在被订阅的频道, 而字典的值则是一个链表, 链表中保存了所有订阅这个频道的客户端。

当调用 PUBLISH channel message 命令的时候,程序首先根据 channel 定位到字典的键,然后将信息发送给字典值链表中的所有客户端。

Redis发布/订阅存储结构如下图所示:

设计模式之发布订阅模式(2) Redis实现发布订阅模式

Spring Data Redis 实现发布/订阅模式

下面带你一步步通过 Spring Data Redis 来实现发布与订阅。

示例项目基于SpringBoot搭建,你可以在这里找到 Spring Data Redis 实现发布/订阅模式的源码

由于篇幅原因下面就不再演示项目搭建和集成Redis的过程了,实现细节请参考 springboot redis demo project

MessagePublisher

首先定义一个发布者接口,接口只有一个 void publish(String message) 方法,用于发布消息。

public interface MessagePublisher {
    /**
     * publish message
     * @param message
     */
    void publish(String message);
}
复制代码

然后提供一个基于Redis的 MessagePublisher 实现。

其中最核心的是这个方法: redisTemplate.convertAndSend(topic.getTopic(), message) ,用于把消息发送到指定topic的channel之中。

import lombok.Setter;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;

/**
 * Redis message publisher
 *
 * @author ijiangtao
 * @create 2019-05-01 19:36
 **/
@Setter
public class RedisMessagePublisher implements MessagePublisher {

    private RedisTemplate<String, String> redisTemplate;

    private ChannelTopic topic;

    private RedisMessagePublisher() { }

    public RedisMessagePublisher(RedisTemplate<String, String> redisTemplate, ChannelTopic topic) {
        this.redisTemplate = redisTemplate;
        this.topic = topic;
    }

    public void publish(String message) {
        redisTemplate.convertAndSend(topic.getTopic(), message);
    }
}
复制代码

MessageListener

RedisMessageSubscriber 是一个订阅者,它实现了 MessageListener 接口,并通过一个 messageList 来存/取监听到的消息。

import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import java.util.List;

/**
 * Redis Message Subscriber
 * <p>
 * RedisMessageSubscriber implements the Spring Data Redis-provided MessageListener interface
 *
 * @author ijiangtao
 * @create 2019-05-01 19:39
 **/
@AllArgsConstructor
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Data
@Component
public class RedisMessageSubscriber implements MessageListener {

    private List<String> messageList;

    public void onMessage(Message message, byte[] pattern) {
        messageList.add("[pattern:" + new String(pattern) + ",message:" + message.toString() + "]");
    }
}
复制代码

RedisPubSubConfig

下面定义了两个“topic”,并且通过两个“publisher`将“message”发布到“channel”指定的“topic”上。

然后我们定义了两个“subscriber”,“subscriber1”订阅了“topic1”和“topic2”,“subscriber2”只订阅了“topic2”。

最后我们将这些发布者和订阅者都注册到了 Spring Data Redis 提供的容器( RedisMessageListenerContainer )中。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.repository.configuration.EnableRedisRepositories;

import java.util.ArrayList;

/**
 * config
 *
 * @author ijiangtao
 * @create 2019-05-01 19:57
 **/
@Configuration
@ComponentScan("net.ijiangtao.tech.framework.spring.ispringboot.redis")
@EnableRedisRepositories(basePackages = "net.ijiangtao.tech.framework.spring.ispringboot")
@PropertySource("classpath:application.properties")
public class RedisPubSubConfig {

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Bean
    RedisMessageListenerContainer redisContainer() {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();

        container.setConnectionFactory(redisTemplate.getConnectionFactory());

        container.addMessageListener(messageListenerAdapter1() , topic1());
        container.addMessageListener(messageListenerAdapter1() , topic2());

        container.addMessageListener(messageListenerAdapter2(), topic2());

        return container;
    }

    @Bean
    MessageListenerAdapter messageListenerAdapter1() {
        return new MessageListenerAdapter(messageListener1());
    }

    @Bean
    public RedisMessageSubscriber messageListener1() {
        return new RedisMessageSubscriber(new ArrayList<>());
    }

    @Bean
    MessageListenerAdapter messageListenerAdapter2() {
        return new MessageListenerAdapter(messageListener2());
    }

    @Bean
    public RedisMessageSubscriber messageListener2() {
        return new RedisMessageSubscriber(new ArrayList<>());
    }


    @Bean
    MessagePublisher redisPublisherForTopic1() {
        return new RedisMessagePublisher(redisTemplate, topic1());
    }

    @Bean
    MessagePublisher redisPublisherForTopic2() {
        return new RedisMessagePublisher(redisTemplate, topic2());
    }

    @Bean
    ChannelTopic topic1() {
        return new ChannelTopic("topic1");
    }

    @Bean
    ChannelTopic topic2() {
        return new ChannelTopic("topic2");
    }

}
复制代码

Unit Test

下面我们通过单元测试,往“topic1”和“topic2”分别发布了十条消息,然后遍历“subscriber1”和“subscriber2”监听到的消息内容。

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.List;
import java.util.UUID;

/**
 * Redis Pub/Sub tests
 *
 * @author ijiangtao
 * @create 2019-05-01 19:12
 **/
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class RedisPubSub {

    @Autowired
    @Qualifier("redisPublisherForTopic1")
    private MessagePublisher redisPublisher1;

    @Autowired
    @Qualifier("redisPublisherForTopic2")
    private MessagePublisher redisPublisher2;

    @Autowired
    @Qualifier("messageListener1")
    private RedisMessageSubscriber subscriber1;

    @Autowired
    @Qualifier("messageListener2")
    private RedisMessageSubscriber subscriber2;


    @Test
    public void test1() {

        // 循环发布10次消息, 主要方法 redisTemplate.convertAndSend
        for (int i = 0; i < 10; i++) {
            String message = "Topic1 Message : " + UUID.randomUUID();
            redisPublisher1.publish(message);
        }

        // 循环发布10次消息, 主要方法 redisTemplate.convertAndSend
        for (int i = 0; i < 10; i++) {
            String message = "Topic2 Message : " + UUID.randomUUID();
            redisPublisher2.publish(message);
        }

        // 获取存储的订阅消息
        List<String> messageList1 = subscriber1.getMessageList();
        for (int i = 0; i < messageList1.size(); i++) {
            log.info(messageList1.get(i));
        }

        // 获取存储的订阅消息
        List<String> messageList2 = subscriber2.getMessageList();
        for (int i = 0; i < messageList2.size(); i++) {
            log.info(messageList2.get(i));
        }

    }

}
复制代码

“subscriber1”监听到了“redisPublisher1”和“redisPublisher2”发布的共20条消息:

[pattern:topic1,message:Topic1 Message : 2239af04-8e91-4adf-8e1e-98261a44ff77]
[pattern:topic1,message:Topic1 Message : 85107f06-2cae-4d6c-8123-9e8dc6e7a608]
[pattern:topic1,message:Topic1 Message : 0b80b9b8-8eee-476e-8462-bb6cbbbcf863]
[pattern:topic1,message:Topic1 Message : 0983f28d-d220-4538-b15e-dc66c0d3e491]
[pattern:topic1,message:Topic1 Message : 0f2d863c-00b9-4406-8e49-020c78a3632d]
[pattern:topic1,message:Topic1 Message : b8a0bb35-6cc2-4393-9136-2390de80f709]
[pattern:topic1,message:Topic1 Message : 027f1ca5-39cc-42c6-a4d8-87dc138260b1]
[pattern:topic1,message:Topic1 Message : ff85595e-2864-4dec-96c1-9dd29c69f670]
[pattern:topic1,message:Topic1 Message : 77471855-f04b-437d-bd1b-afb801a33cf9]
[pattern:topic1,message:Topic1 Message : feba4b0f-70c1-4c14-8ecb-bf4c6956f374]
[pattern:topic2,message:Topic2 Message : dd5e97a7-5ed3-4d0c-be04-0ec2076b14fc]
[pattern:topic2,message:Topic2 Message : c415c846-8597-49e5-a5e4-04c738dcdb77]
[pattern:topic2,message:Topic2 Message : 07981d33-c894-43e7-9305-702f967c0a12]
[pattern:topic2,message:Topic2 Message : 31a34aab-a363-4f5c-9409-3af12b0c3e61]
[pattern:topic2,message:Topic2 Message : 73a4e995-399a-4291-a777-67fc2e4eb3ad]
[pattern:topic2,message:Topic2 Message : 09ce7d6b-3257-49ca-a7b4-8ef900944596]
[pattern:topic2,message:Topic2 Message : 42018099-0d56-4137-98c4-57716d2fbdcd]
[pattern:topic2,message:Topic2 Message : 0b362883-ab76-413e-9c31-60650f3ed223]
[pattern:topic2,message:Topic2 Message : 7097d949-8184-40b4-b9af-582ab44e342e]
[pattern:topic2,message:Topic2 Message : 6967291d-aad4-4aa3-b44a-b988a9589700]
复制代码

“subscriber2”监听到了“redisPublisher2”发布的共10条消息:

[pattern:topic2,message:Topic2 Message : dd5e97a7-5ed3-4d0c-be04-0ec2076b14fc]
[pattern:topic2,message:Topic2 Message : c415c846-8597-49e5-a5e4-04c738dcdb77]
[pattern:topic2,message:Topic2 Message : 07981d33-c894-43e7-9305-702f967c0a12]
[pattern:topic2,message:Topic2 Message : 31a34aab-a363-4f5c-9409-3af12b0c3e61]
[pattern:topic2,message:Topic2 Message : 73a4e995-399a-4291-a777-67fc2e4eb3ad]
[pattern:topic2,message:Topic2 Message : 09ce7d6b-3257-49ca-a7b4-8ef900944596]
[pattern:topic2,message:Topic2 Message : 42018099-0d56-4137-98c4-57716d2fbdcd]
[pattern:topic2,message:Topic2 Message : 0b362883-ab76-413e-9c31-60650f3ed223]
[pattern:topic2,message:Topic2 Message : 7097d949-8184-40b4-b9af-582ab44e342e]
[pattern:topic2,message:Topic2 Message : 6967291d-aad4-4aa3-b44a-b988a9589700]
复制代码

总结

之前介绍了 发布/订阅模式的基本概念和设计原理 ,本文从 Redis 发布和订阅相关的命令开始,逐步讲解了 Redis 发布订阅的存储结构,以及如何通过 Spring Data Redis 实现发布订阅模式。

本文是 精通Redis系列精通 设计模式 系列 教程的一部分,欢迎关注我的公众号,和作者一起迭代成长。

设计模式之发布订阅模式(2) Redis实现发布订阅模式

以上所述就是小编给大家介绍的《设计模式之发布订阅模式(2) Redis实现发布订阅模式》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Web Analytics 2.0

Web Analytics 2.0

Avinash Kaushik / Sybex / 2009-10-26 / USD 39.99

The bestselling book Web Analytics: An Hour A Day was the first book in the analytics space to move beyond clickstream analysis. Web Analytics 2.0 will significantly evolve the approaches from the fir......一起来看看 《Web Analytics 2.0》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具