Spring Boot系列22 Spring Websocket实现websocket集群方案的Demo

栏目: Html5 · 发布时间: 6年前

内容简介:上一篇文章第三个方案如下图在方案一的基础进行如下修改,新的架构图流程如下:

上一篇文章 Spring Boot系列21 Spring Websocket实现websocket集群方案讨论 里详细介绍了WebSocket集群的三种方案,并得出结论第三个方案是最好的,本文我们实现第三个方案。

第三个方案如下图

Spring Boot系列22 Spring Websocket实现websocket集群方案的Demo

在方案一的基础进行如下修改,新的架构图流程如下:

  1. 服务A增加WS模块,当websocket连接过来时,将此用户的连接信息(主要是websocket sesionId值)存储 redis
  2. 消息生产者发送消息到的交换机,这些服务不直接推送服务A/B
  3. 增加新的模块dispatch,此模块接收推送过来的信息,并从redis中读取消息接收用户对应的websocket sesionId值,然后根据上面的规则计算出用户对应的路由键,然后将消息发送到用户订阅的队列上
  4. 前端接收消息

详细实现的代码

工程名称:mvc 本文在 Spring Boot系列20 Spring Websocket实现向指定的用户发送消息 的基础进行修改。

在pom.xml中引入redis,rabbitmq相关的jar

<!--  webscoekt 集群 需要 引入支持RabbitMQ, redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
复制代码

rabbitmq, redis的配置

application-wscluster.properties

# websocket集群需要配置RabbitMQ
spring.rabbitmq.host:192.168.21.3
spring.rabbitmq.virtual-host: /icc-local
spring.rabbitmq.username: icc-dev
spring.rabbitmq.password: icc-dev

# 配置redis
spring.redis.database=0
spring.redis.host=192.168.21.4
# spring.redis.password=
spring.redis.port=7001
spring.redis.pool.max-idle=8
spring.redis.pool.min-idle=0  
spring.redis.pool.max-active=8  
spring.redis.pool.max-wait=-1
复制代码

IRedisSessionService及实现

接口IRedisSessionService定义了对redis的操作 IRedisSessionService实现类将用户名称和websocket sessionId的关系存储到redis,提供添加、删除、查询 IRedisSessionService

public interface IRedisSessionService {
    void add(String name, String wsSessionId);
    boolean del(String name);
    String get(String name);
}
复制代码

SimulationRedisSessionServiceImpl将用户名称和websocket sessionId的关系存储到redis,提供添加、删除、查询

@Component
public class SimulationRedisSessionServiceImpl implements IRedisSessionService {

    @Autowired
    private RedisTemplate<String, String> template;

    // key = 登录用户名称, value=websocket的sessionId
    private ConcurrentHashMap<String,String> redisHashMap = new ConcurrentHashMap<>(32);

    /**
     * 在缓存中保存用户和websocket sessionid的信息
     * @param name
     * @param wsSessionId
     */
    public void add(String name, String wsSessionId){
        BoundValueOperations<String,String> boundValueOperations = template.boundValueOps(name);
        boundValueOperations.set(wsSessionId,24 * 3600, TimeUnit.SECONDS);
    }

    /**
     * 从缓存中删除用户的信息
     * @param name
     */
    public boolean del(String name){
        return template.execute(new RedisCallback<Boolean>() {

            @Override
            public Boolean doInRedis(RedisConnection connection)
                    throws DataAccessException {
                byte[] rawKey = template.getStringSerializer().serialize(name);
                return connection.del(rawKey) > 0;
            }
        }, true);
    }

    /**
     * 根据用户id获取用户对应的sessionId值
     * @param name
     * @return
     */
    public String get(String name){
        BoundValueOperations<String,String> boundValueOperations = template.boundValueOps(name);
        return boundValueOperations.get();
    }
}
复制代码

AuthWebSocketHandlerDecoratorFactory

装饰WebSocketHandlerDecorator对象,在连接建立时,保存websocket的session id,其中key为帐号名称;在连接断开时,从缓存中删除用户的sesionId值。此websocket sessionId值用于创建消息的路由键。

@Component
public class AuthWebSocketHandlerDecoratorFactory implements WebSocketHandlerDecoratorFactory {
    private static final Logger log = LoggerFactory.getLogger(AuthWebSocketHandlerDecoratorFactory.class);

    @Autowired
    private IRedisSessionService redisSessionService;

    @Override
    public WebSocketHandler decorate(WebSocketHandler handler) {
        return new WebSocketHandlerDecorator(handler) {
            @Override
            public void afterConnectionEstablished(final WebSocketSession session) throws Exception {
                // 客户端与服务器端建立连接后,此处记录谁上线了
                Principal principal = session.getPrincipal();
                if(principal != null){
                    String username = principal.getName();
                    log.info("websocket online: " + username + " session " + session.getId());
                    redisSessionService.add(username, session.getId());
                }
                super.afterConnectionEstablished(session);
            }

            @Override
            public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
                // 客户端与服务器端断开连接后,此处记录谁下线了
                Principal principal = session.getPrincipal();
                if(principal != null){
                    String username = session.getPrincipal().getName();
                    log.info("websocket offline: " + username);
                    redisSessionService.del(username);
                }
                super.afterConnectionClosed(session, closeStatus);
            }
        };
    }
}
复制代码

WebSocketRabbitMQMessageBrokerConfigurer

Spring Boot系列20 Spring Websocket实现向指定的用户发送消息 的基础上增加如下功能,将myWebSocketHandlerDecoratorFactory配置到websocket

@Configuration
// 此注解开使用STOMP协议来传输基于消息代理的消息,此时可以在@Controller类中使用@MessageMapping
@EnableWebSocketMessageBroker
public class WebSocketRabbitMQMessageBrokerConfigurer extends AbstractWebSocketMessageBrokerConfigurer {

    @Autowired
    private MyPrincipalHandshakeHandler myDefaultHandshakeHandler;
    @Autowired
    private AuthHandshakeInterceptor sessionAuthHandshakeInterceptor;

    @Autowired
    private AuthWebSocketHandlerDecoratorFactory myWebSocketHandlerDecoratorFactory;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        ….
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
          …       
    }

    /**
     * 这时实际spring weboscket集群的新增的配置,用于获取建立websocket时获取对应的sessionid值
     * @param registration
     */
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.addDecoratorFactory(myWebSocketHandlerDecoratorFactory);
        super.configureWebSocketTransport(registration);
    }
}

复制代码

TestMQCtl:

在上文 Spring Boot系列20 Spring Websocket实现向指定的用户发送消息 的基础上,对此类进行修改

  • sendMq2User()方法根据用户的帐号和websocket sessionId根据["web订阅队列名称+'-user'+websocket sessionId"]组合路由键。然后通过AmqpTemplate 实例向amq.topic交换机发送消息,路由键为["web订阅队列名称+'-user'+websocket sessionId"]。方法中websocket sessionId是从根据帐号名称从redis中获取 其它的方法,这里不一一列出
@Controller
@RequestMapping(value = "/ws")
public class TestMQCtl {
    private  static final Logger log = LoggerFactory.getLogger(TestMQCtl.class);

    @Autowired
    private AmqpTemplate amqpTemplate;
    @Autowired
    private IRedisSessionService redisSessionService;

     /**
     * 向执行用户发送请求
     * @param msg
     * @param name
     * @return
     */
    @RequestMapping(value = "send2user")
    @ResponseBody
    public int sendMq2User(String msg, String name){
        // 根据用户名称获取用户对应的session id值
        String wsSessionId = redisSessionService.get(name);
        RequestMessage demoMQ = new RequestMessage();
        demoMQ.setName(msg);

        // 生成路由键值,生成规则如下: websocket订阅的目的地 + "-user" + websocket的sessionId值。生成值类似:
        String routingKey = getTopicRoutingKey("demo", wsSessionId);
        // 向amq.topi交换机发送消息,路由键为routingKey
        log.info("向用户[{}]sessionId=[{}],发送消息[{}],路由键[{}]", name, wsSessionId, wsSessionId, routingKey);
        amqpTemplate.convertAndSend("amq.topic", routingKey,  JSON.toJSONString(demoMQ));
        return 0;
    }

    /**
     * 获取Topic的生成的路由键
     *
     * @param actualDestination
     * @param sessionId
     * @return
     */
    private String getTopicRoutingKey(String actualDestination, String sessionId){
        return actualDestination + "-user" + sessionId;
    }
   ….
}
复制代码

测试

以不同端口启动两个服务启动服务类:WebSocketClusterApplication 以“--spring.profiles.active=wscluster --server.port=8081”参数启动服务A 以“--spring.profiles.active=wscluster --server.port=8082”参数启动服务B

登录模拟帐号:xiaoming登录服务A,xiaoming2登录服务B使用xiaoming登录服务A,并登录websocket http://127.0.0.1:8081/ws/login 使用xiaoming登录,并提交

Spring Boot系列22 Spring Websocket实现websocket集群方案的Demo

点击连接,如果连接变灰色,则登录websocket成功

Spring Boot系列22 Spring Websocket实现websocket集群方案的Demo

打开另一个浏览器,使用xiaoming2登录服务B,并登录websocket http://127.0.0.1:8082/ws/login 使用xiaoming2登录并提交,最后登录websocket

登录服务A模拟发送页面登录http://127.0.0.1:8081/ws/send,发送消息

  1. 向帐号xiaoming发送消息xiaoming-receive,只能被连接服务A的服务websocket收到 §
  2. 向帐号xiaoming2发送消息xiaoming2-receive,只能被连接服务B的服务websocket收到

此时两个页面收到信息:

Spring Boot系列22 Spring Websocket实现websocket集群方案的Demo

xiaoming帐号只收到xiaoming-receive xiaoming2帐号只收到xiaoming2-receive

登录服务B模拟发送页面登录http://127.0.0.1:8082/ws/send,发送消息,和http://127.0.0.1:8081/ws/send 一样发送相同消息,结果是一样

结论无论用户登录的服务A,还是服务B,我们通过以上的代码,我们都可以发送消息到指定的用户,所以我们已经实现websocket集群


以上所述就是小编给大家介绍的《Spring Boot系列22 Spring Websocket实现websocket集群方案的Demo》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Haskell School of Expression

The Haskell School of Expression

Paul Hudak / Cambridge University Press / 2000-01 / USD 95.00

Functional programming is a style of programming that emphasizes the use of functions (in contrast to object-oriented programming, which emphasizes the use of objects). It has become popular in recen......一起来看看 《The Haskell School of Expression》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具