内容简介:上一篇文章第三个方案如下图在方案一的基础进行如下修改,新的架构图流程如下:
上一篇文章 Spring Boot系列21 Spring Websocket实现websocket集群方案讨论 里详细介绍了WebSocket集群的三种方案,并得出结论第三个方案是最好的,本文我们实现第三个方案。
第三个方案如下图
在方案一的基础进行如下修改,新的架构图流程如下:
- 服务A增加WS模块,当websocket连接过来时,将此用户的连接信息(主要是websocket sesionId值)存储 redis 中
- 消息生产者发送消息到的交换机,这些服务不直接推送服务A/B
- 增加新的模块dispatch,此模块接收推送过来的信息,并从redis中读取消息接收用户对应的websocket sesionId值,然后根据上面的规则计算出用户对应的路由键,然后将消息发送到用户订阅的队列上
- 前端接收消息
详细实现的代码
工程名称:mvc 本文在 Spring Boot系列20 Spring Websocket实现向指定的用户发送消息 的基础进行修改。
在pom.xml中引入redis,rabbitmq相关的jar
<!-- webscoekt 集群 需要 引入支持RabbitMQ, redis--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> 复制代码
rabbitmq, redis的配置
application-wscluster.properties
# websocket集群需要配置RabbitMQ spring.rabbitmq.host:192.168.21.3 spring.rabbitmq.virtual-host: /icc-local spring.rabbitmq.username: icc-dev spring.rabbitmq.password: icc-dev # 配置redis spring.redis.database=0 spring.redis.host=192.168.21.4 # spring.redis.password= spring.redis.port=7001 spring.redis.pool.max-idle=8 spring.redis.pool.min-idle=0 spring.redis.pool.max-active=8 spring.redis.pool.max-wait=-1 复制代码
IRedisSessionService及实现
接口IRedisSessionService定义了对redis的操作 IRedisSessionService实现类将用户名称和websocket sessionId的关系存储到redis,提供添加、删除、查询 IRedisSessionService
public interface IRedisSessionService { void add(String name, String wsSessionId); boolean del(String name); String get(String name); } 复制代码
SimulationRedisSessionServiceImpl将用户名称和websocket sessionId的关系存储到redis,提供添加、删除、查询
@Component public class SimulationRedisSessionServiceImpl implements IRedisSessionService { @Autowired private RedisTemplate<String, String> template; // key = 登录用户名称, value=websocket的sessionId private ConcurrentHashMap<String,String> redisHashMap = new ConcurrentHashMap<>(32); /** * 在缓存中保存用户和websocket sessionid的信息 * @param name * @param wsSessionId */ public void add(String name, String wsSessionId){ BoundValueOperations<String,String> boundValueOperations = template.boundValueOps(name); boundValueOperations.set(wsSessionId,24 * 3600, TimeUnit.SECONDS); } /** * 从缓存中删除用户的信息 * @param name */ public boolean del(String name){ return template.execute(new RedisCallback<Boolean>() { @Override public Boolean doInRedis(RedisConnection connection) throws DataAccessException { byte[] rawKey = template.getStringSerializer().serialize(name); return connection.del(rawKey) > 0; } }, true); } /** * 根据用户id获取用户对应的sessionId值 * @param name * @return */ public String get(String name){ BoundValueOperations<String,String> boundValueOperations = template.boundValueOps(name); return boundValueOperations.get(); } } 复制代码
AuthWebSocketHandlerDecoratorFactory
装饰WebSocketHandlerDecorator对象,在连接建立时,保存websocket的session id,其中key为帐号名称;在连接断开时,从缓存中删除用户的sesionId值。此websocket sessionId值用于创建消息的路由键。
@Component public class AuthWebSocketHandlerDecoratorFactory implements WebSocketHandlerDecoratorFactory { private static final Logger log = LoggerFactory.getLogger(AuthWebSocketHandlerDecoratorFactory.class); @Autowired private IRedisSessionService redisSessionService; @Override public WebSocketHandler decorate(WebSocketHandler handler) { return new WebSocketHandlerDecorator(handler) { @Override public void afterConnectionEstablished(final WebSocketSession session) throws Exception { // 客户端与服务器端建立连接后,此处记录谁上线了 Principal principal = session.getPrincipal(); if(principal != null){ String username = principal.getName(); log.info("websocket online: " + username + " session " + session.getId()); redisSessionService.add(username, session.getId()); } super.afterConnectionEstablished(session); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { // 客户端与服务器端断开连接后,此处记录谁下线了 Principal principal = session.getPrincipal(); if(principal != null){ String username = session.getPrincipal().getName(); log.info("websocket offline: " + username); redisSessionService.del(username); } super.afterConnectionClosed(session, closeStatus); } }; } } 复制代码
WebSocketRabbitMQMessageBrokerConfigurer
在 Spring Boot系列20 Spring Websocket实现向指定的用户发送消息 的基础上增加如下功能,将myWebSocketHandlerDecoratorFactory配置到websocket
@Configuration // 此注解开使用STOMP协议来传输基于消息代理的消息,此时可以在@Controller类中使用@MessageMapping @EnableWebSocketMessageBroker public class WebSocketRabbitMQMessageBrokerConfigurer extends AbstractWebSocketMessageBrokerConfigurer { @Autowired private MyPrincipalHandshakeHandler myDefaultHandshakeHandler; @Autowired private AuthHandshakeInterceptor sessionAuthHandshakeInterceptor; @Autowired private AuthWebSocketHandlerDecoratorFactory myWebSocketHandlerDecoratorFactory; @Override public void registerStompEndpoints(StompEndpointRegistry registry) { …. } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { … } /** * 这时实际spring weboscket集群的新增的配置,用于获取建立websocket时获取对应的sessionid值 * @param registration */ @Override public void configureWebSocketTransport(WebSocketTransportRegistration registration) { registration.addDecoratorFactory(myWebSocketHandlerDecoratorFactory); super.configureWebSocketTransport(registration); } } 复制代码
TestMQCtl:
在上文 Spring Boot系列20 Spring Websocket实现向指定的用户发送消息 的基础上,对此类进行修改
- sendMq2User()方法根据用户的帐号和websocket sessionId根据["web订阅队列名称+'-user'+websocket sessionId"]组合路由键。然后通过AmqpTemplate 实例向amq.topic交换机发送消息,路由键为["web订阅队列名称+'-user'+websocket sessionId"]。方法中websocket sessionId是从根据帐号名称从redis中获取 其它的方法,这里不一一列出
@Controller @RequestMapping(value = "/ws") public class TestMQCtl { private static final Logger log = LoggerFactory.getLogger(TestMQCtl.class); @Autowired private AmqpTemplate amqpTemplate; @Autowired private IRedisSessionService redisSessionService; /** * 向执行用户发送请求 * @param msg * @param name * @return */ @RequestMapping(value = "send2user") @ResponseBody public int sendMq2User(String msg, String name){ // 根据用户名称获取用户对应的session id值 String wsSessionId = redisSessionService.get(name); RequestMessage demoMQ = new RequestMessage(); demoMQ.setName(msg); // 生成路由键值,生成规则如下: websocket订阅的目的地 + "-user" + websocket的sessionId值。生成值类似: String routingKey = getTopicRoutingKey("demo", wsSessionId); // 向amq.topi交换机发送消息,路由键为routingKey log.info("向用户[{}]sessionId=[{}],发送消息[{}],路由键[{}]", name, wsSessionId, wsSessionId, routingKey); amqpTemplate.convertAndSend("amq.topic", routingKey, JSON.toJSONString(demoMQ)); return 0; } /** * 获取Topic的生成的路由键 * * @param actualDestination * @param sessionId * @return */ private String getTopicRoutingKey(String actualDestination, String sessionId){ return actualDestination + "-user" + sessionId; } …. } 复制代码
测试
以不同端口启动两个服务启动服务类:WebSocketClusterApplication 以“--spring.profiles.active=wscluster --server.port=8081”参数启动服务A 以“--spring.profiles.active=wscluster --server.port=8082”参数启动服务B
登录模拟帐号:xiaoming登录服务A,xiaoming2登录服务B使用xiaoming登录服务A,并登录websocket http://127.0.0.1:8081/ws/login 使用xiaoming登录,并提交
点击连接,如果连接变灰色,则登录websocket成功
打开另一个浏览器,使用xiaoming2登录服务B,并登录websocket http://127.0.0.1:8082/ws/login 使用xiaoming2登录并提交,最后登录websocket
登录服务A模拟发送页面登录http://127.0.0.1:8081/ws/send,发送消息
- 向帐号xiaoming发送消息xiaoming-receive,只能被连接服务A的服务websocket收到 §
- 向帐号xiaoming2发送消息xiaoming2-receive,只能被连接服务B的服务websocket收到
此时两个页面收到信息:
xiaoming帐号只收到xiaoming-receive xiaoming2帐号只收到xiaoming2-receive
登录服务B模拟发送页面登录http://127.0.0.1:8082/ws/send,发送消息,和http://127.0.0.1:8081/ws/send 一样发送相同消息,结果是一样
结论无论用户登录的服务A,还是服务B,我们通过以上的代码,我们都可以发送消息到指定的用户,所以我们已经实现websocket集群
以上所述就是小编给大家介绍的《Spring Boot系列22 Spring Websocket实现websocket集群方案的Demo》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- Redis集群实现原理探讨
- Dubbo(五):集群容错的实现
- SpringSession+Redis实现集群会话共享
- 快速实现 Tomcat 集群 Session 共享
- 基于 ZooKeeper 实现爬虫集群的监控
- 如何使用 Prometheus 轻松实现集群监控?
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
The Web Designer's Idea Book
Patrick Mcneil / How / 2008-10-6 / USD 25.00
The Web Designer's Idea Book includes more than 700 websites arranged thematically, so you can find inspiration for layout, color, style and more. Author Patrick McNeil has cataloged more than 5,000 s......一起来看看 《The Web Designer's Idea Book》 这本书的介绍吧!