网关 Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.6) 之 WebSocketRoutingFilter

栏目: Java · 发布时间: 8年前

内容简介:网关 Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.6) 之 WebSocketRoutingFilter

摘要: 原创出处 http://www.iocoder.cn/Spring-Cloud-Gateway/filter-websocket-routing/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Spring-Cloud-Gateway 2.0.X M4

网关 Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.6) 之 WebSocketRoutingFilter

关注 微信公众号:【芋道源码】 有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有 源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言 将得到 认真 回复。 甚至不知道如何读源码也可以请教噢
  4. 新的 源码解析文章 实时 收到通知。 每周更新一篇左右
  5. 认真的 源码交流微信群。

1. 概述

本文主要分享 WebsocketRoutingFilter 的代码实现

WebsocketRoutingFilter ,Websocket 路由 网关过滤器。其根据 ws:// / wss:// 前缀( Scheme )过滤处理, 代理后端 Websocket 服务 ,提供给客户端连接。如下图 :

网关 Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.6) 之 WebSocketRoutingFilter

  • 目前 一个 RouteDefinition 只能指定 一个 后端 WebSocket 服务。官方正在计划在 LoadBalancerClientFilter 上实现 Websocket 的负载均衡功能。也就说,未来 一个 RouteDefinition 能够指定 多个 后端 WebSocket 服务。

Websocket 的 RouteDefinition 配置如下 :

cloud:
gateway:
routes:
- id: websocket_test
uri: ws://localhost:9000
order: 8000
predicates:
- Path=/echo
  • uri 使用 ws:// 或者 wss:// 为前缀。

推荐 Spring Cloud 书籍:

2. 环境搭建

在解析源码之前,我们先以 wscat 搭建一个 WebSocket 服务。

第一步,安装 wscat 。

npm install -g wscat

第二步,启动 wscat 。

wscat --listen 9000

第三步,连接 wscat 。

wscat --listen 9000

第四步,配置 RouteDefinition ,并启动 Spring Cloud Gateway 。

cloud:
 gateway:
 routes:
 - id: websocket_test
 uri: ws://localhost:9000
 order: 8000
 predicates:
 - Path=/echo

第五步,通过 Gateway 连接 wscat 。

wscat --connect ws://localhost:8080/echo

大功告成。

注意,wscat 同一时间仅允许一个客户端连接。

3. WebsocketRoutingFilter

org.springframework.cloud.gateway.filter.WebsocketRoutingFilter ,Websocket 路由 网关过滤器。

构造方法,代码如下 :

public class WebsocketRoutingFilter implements GlobalFilter, Ordered{
 public static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol";

 private final WebSocketClient webSocketClient;
 private final WebSocketService webSocketService;

 public WebsocketRoutingFilter(WebSocketClient webSocketClient){
 this(webSocketClient, new HandshakeWebSocketService());
 }

 public WebsocketRoutingFilter(WebSocketClient webSocketClient,
WebSocketService webSocketService){
 this.webSocketClient = webSocketClient;
 this.webSocketService = webSocketService;
 }
 
}

#getOrder() 方法,代码如下 :

@Override
public int getOrder(){
 return Ordered.LOWEST_PRECEDENCE;
}

#filter(ServerWebExchange, GatewayFilterChain) 方法,代码如下 :

 1: @Override
 2: public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain){
 3: // 获得 requestUrl
 4: URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
 5: 
 6: // 判断是否能够处理
 7: String scheme = requestUrl.getScheme();
 8: if (isAlreadyRouted(exchange) || (!scheme.equals("ws") && !scheme.equals("wss"))) {
 9: return chain.filter(exchange);
10: }
11: 
12: // 设置已经路由
13: setAlreadyRouted(exchange);
14: 
15: // 处理连接请求
16: return this.webSocketService.handleRequest(exchange,
17: new ProxyWebSocketHandler(requestUrl, this.webSocketClient, exchange.getRequest().getHeaders()));
18: }
  • 第 4 行 :获得 requestUrl
  • 第 7 至 10 行 :判断 ForwardRoutingFilter 是否能够处理该请求,需要满足两个条件 :

    • ws:// 或者 wss:// 前缀( Scheme ) 。
    • 调用 ServerWebExchangeUtils#isAlreadyRouted(ServerWebExchange) 方法,判断该请求暂未被其他 Routing 网关处理。代码如下 :

      public static boolean isAlreadyRouted(ServerWebExchange exchange){
       return exchange.getAttributeOrDefault(GATEWAY_ALREADY_ROUTED_ATTR, false);
      }
      
  • 第 13 行 :设置该请求已经被处理。代码如下 :

    public static void setAlreadyRouted(ServerWebExchange exchange){
     exchange.getAttributes().put(GATEWAY_ALREADY_ROUTED_ATTR, true);
    }
    
  • 第 15 至 16 行 :调用 WebSocketService#hanldeRequest(ServerWebExchange, WebSocketHandler) 方法,处理客户端发起的连接请求( Handshake Request ) 。这个方法的实现不在本文范围内,但是良心如笔者,大概讲下涉及到的类 :

3.1 ProxyWebSocketHandler

org.springframework.cloud.gateway.filter.WebsocketRoutingFilter.ProxyWebSocketHandler代理 后端 WebSocket 服务处理器。

构造方法,代码如下 :

 1: private static class ProxyWebSocketHandler implements WebSocketHandler{
 2: 
 3: private final WebSocketClient client;
 4: private final URI url;
 5: private final HttpHeaders headers;
 6: private final List<String> subProtocols;
 7: 
 8: public ProxyWebSocketHandler(URI url, WebSocketClient client, HttpHeaders headers){
 9: this.client = client;
10: this.url = url;
11: this.headers = new HttpHeaders();//headers;
12: //TODO: better strategy to filter these headers?
13: headers.entrySet().forEach(header -> {
14: if (!header.getKey().toLowerCase().startsWith("sec-websocket")
15: && !header.getKey().equalsIgnoreCase("upgrade")
16: && !header.getKey().equalsIgnoreCase("connection")) {
17: this.headers.addAll(header.getKey(), header.getValue());
18: }
19: });
20: List<String> protocols = headers.get(SEC_WEBSOCKET_PROTOCOL);
21: if (protocols != null) {
22: this.subProtocols = protocols;
23: } else {
24: this.subProtocols = Collections.emptyList();
25: }
26: }
27: }

#handle(WebSocketSession) 方法,代码如下 :

 1: @Override
 2: public Mono<Void> handle(WebSocketSession session){
 3: // pass headers along so custom headers can be sent through
 4: return client.execute(url, this.headers, new WebSocketHandler() {
 5: @Override
 6: public Mono<Void> handle(WebSocketSession proxySession){
 7: // Use retain() for Reactor Netty
 8: // 转发消息 客户端 =》后端服务
 9: Mono<Void> proxySessionSend = proxySession
10: .send(session.receive().doOnNext(WebSocketMessage::retain));
11: // 转发消息 后端服务=》客户端
12: // .log("proxySessionSend", Level.FINE);
13: Mono<Void> serverSessionSend = session
14: .send(proxySession.receive().doOnNext(WebSocketMessage::retain));
15: // .log("sessionSend", Level.FINE);
16: 
17: //
18: return Mono.when(proxySessionSend, serverSessionSend).then();
19: }
20: 
21: /**
22: * Copy subProtocols so they are available downstream.
23: * @return
24: */
25: @Override
26: public List<String> getSubProtocols(){
27: return ProxyWebSocketHandler.this.subProtocols;
28: }
29: });
30: }
  • 第 6 行 :调用 WebSocketClient#execute(URI, HttpHeaders, WebSocketHandler) 方法, 连接后端【被代理】的 WebSocket 服务 。连接成功后,回调 WebSocketHandler 实现的内部类的 #handle(WebSocketSession) 方法。
  • WebSocketHandler 实现的内部类
    • 第 9 至 10 行 :转发消息,客户端 => 后端服务。
    • 第 13 至 14 行 :转发消息,后端服务 => 客户端。
    • 第 18 行 :调用 Mono#when() 方法,合并 proxySessionSend / serverSessionSend 两个 Mono 。调用 Mono#then() 方法, 参数为空 ,合并的 Mono 不发射数据出来。RxJava 和 Reactor 类似,可以参考 《ReactiveX文档中文翻译 —— And/Then/When》 学习下 when / and / then 操作符。
    • 下图可以帮助理解下这个类的用途 : 网关 Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.6) 之 WebSocketRoutingFilter

666. 彩蛋

:smiling_imp: 限于对 Reactor 和 Netty 了解不够深入,写的不够透彻。回头深入理解下它们。

网关 Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.6) 之 WebSocketRoutingFilter

胖友,分享一波朋友圈可好!


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

The Haskell School of Music

The Haskell School of Music

Paul Hudak、Donya Quick / Cambridge University Press / 2018-10-4 / GBP 42.99

This book teaches functional programming through creative applications in music and sound synthesis. Readers will learn the Haskell programming language and explore numerous ways to create music and d......一起来看看 《The Haskell School of Music》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

MD5 加密
MD5 加密

MD5 加密工具

html转js在线工具
html转js在线工具

html转js在线工具