内容简介:网关 Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.6) 之 WebSocketRoutingFilter
摘要: 原创出处 http://www.iocoder.cn/Spring-Cloud-Gateway/filter-websocket-routing/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文主要基于 Spring-Cloud-Gateway 2.0.X M4
关注 微信公众号:【芋道源码】 有福利:
- RocketMQ / MyCAT / Sharding-JDBC 所有 源码分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
- 您对于源码的疑问每条留言 都 将得到 认真 回复。 甚至不知道如何读源码也可以请教噢 。
- 新的 源码解析文章 实时 收到通知。 每周更新一篇左右 。
- 认真的 源码交流微信群。
1. 概述
本文主要分享 WebsocketRoutingFilter 的代码实现 。
WebsocketRoutingFilter ,Websocket 路由 网关过滤器。其根据 ws:// / wss:// 前缀( Scheme )过滤处理, 代理后端 Websocket 服务 ,提供给客户端连接。如下图 :
- 目前 一个 RouteDefinition 只能指定 一个 后端 WebSocket 服务。官方正在计划在 LoadBalancerClientFilter 上实现 Websocket 的负载均衡功能。也就说,未来 一个 RouteDefinition 能够指定 多个 后端 WebSocket 服务。
Websocket 的 RouteDefinition 配置如下 :
cloud: gateway: routes: - id: websocket_test uri: ws://localhost:9000 order: 8000 predicates: - Path=/echo
-
uri使用ws://或者wss://为前缀。
推荐 Spring Cloud 书籍:
- 请支持正版。下载盗版, 等于主动编写低级 BUG 。
- 程序猿DD —— 《Spring Cloud微服务实战》
- 周立 —— 《Spring Cloud与 Docker 微服务架构实战》
- 两书齐买,京东包邮。
2. 环境搭建
在解析源码之前,我们先以 wscat 搭建一个 WebSocket 服务。
第一步,安装 wscat 。
npm install -g wscat
第二步,启动 wscat 。
wscat --listen 9000
第三步,连接 wscat 。
wscat --listen 9000
第四步,配置 RouteDefinition ,并启动 Spring Cloud Gateway 。
cloud: gateway: routes: - id: websocket_test uri: ws://localhost:9000 order: 8000 predicates: - Path=/echo
第五步,通过 Gateway 连接 wscat 。
wscat --connect ws://localhost:8080/echo
大功告成。
注意,wscat 同一时间仅允许一个客户端连接。
3. WebsocketRoutingFilter
org.springframework.cloud.gateway.filter.WebsocketRoutingFilter ,Websocket 路由 网关过滤器。
构造方法,代码如下 :
public class WebsocketRoutingFilter implements GlobalFilter, Ordered{
public static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol";
private final WebSocketClient webSocketClient;
private final WebSocketService webSocketService;
public WebsocketRoutingFilter(WebSocketClient webSocketClient){
this(webSocketClient, new HandshakeWebSocketService());
}
public WebsocketRoutingFilter(WebSocketClient webSocketClient,
WebSocketService webSocketService){
this.webSocketClient = webSocketClient;
this.webSocketService = webSocketService;
}
}
-
webSocketClient属性,在 《Spring-Cloud-Gateway 源码解析 —— 网关初始化》「5.2 初始化 NettyConfiguration」 一文中,我们可以看到使用的是org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient实现类。通过该属性, 连接后端【被代理】的 WebSocket 服务 。 -
webSocketService属性,在 《Spring-Cloud-Gateway 源码解析 —— 网关初始化》「5.3 初始化 GlobalFilter」 一文中,我们可以看到使用的是org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService实现类。通过该属性,处理客户端发起的连接请求( Handshake Request ) 。
#getOrder() 方法,代码如下 :
@Override
public int getOrder(){
return Ordered.LOWEST_PRECEDENCE;
}
- 返回顺序为
Integer.MAX_VALUE。在 《Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.1) 之 GatewayFilter 一览》「3. GlobalFilter」 ,我们列举了所有 GlobalFilter 的顺序。
#filter(ServerWebExchange, GatewayFilterChain) 方法,代码如下 :
1: @Override
2: public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain){
3: // 获得 requestUrl
4: URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
5:
6: // 判断是否能够处理
7: String scheme = requestUrl.getScheme();
8: if (isAlreadyRouted(exchange) || (!scheme.equals("ws") && !scheme.equals("wss"))) {
9: return chain.filter(exchange);
10: }
11:
12: // 设置已经路由
13: setAlreadyRouted(exchange);
14:
15: // 处理连接请求
16: return this.webSocketService.handleRequest(exchange,
17: new ProxyWebSocketHandler(requestUrl, this.webSocketClient, exchange.getRequest().getHeaders()));
18: }
- 第 4 行 :获得
requestUrl。 -
第 7 至 10 行 :判断 ForwardRoutingFilter 是否能够处理该请求,需要满足两个条件 :
-
ws://或者wss://前缀( Scheme ) 。 -
调用
ServerWebExchangeUtils#isAlreadyRouted(ServerWebExchange)方法,判断该请求暂未被其他 Routing 网关处理。代码如下 :public static boolean isAlreadyRouted(ServerWebExchange exchange){ return exchange.getAttributeOrDefault(GATEWAY_ALREADY_ROUTED_ATTR, false); }
-
-
第 13 行 :设置该请求已经被处理。代码如下 :
public static void setAlreadyRouted(ServerWebExchange exchange){ exchange.getAttributes().put(GATEWAY_ALREADY_ROUTED_ATTR, true); } -
第 15 至 16 行 :调用
WebSocketService#hanldeRequest(ServerWebExchange, WebSocketHandler)方法,处理客户端发起的连接请求( Handshake Request ) 。这个方法的实现不在本文范围内,但是良心如笔者,大概讲下涉及到的类 :- 主要逻辑在
org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy类里。 - 【第一步】 ReactorNettyRequestUpgradeStrategy 调用
reactor.ipc.netty.http.server.HttpServerWSOperations,处理客户端发起的连接请求。处理成功,告知客户端连接成功。 - 【第二步】ReactorNettyRequestUpgradeStrategy 调用
org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy接口 的#handle(WebSocketSession)方法,处理客户端 WebSocket Session 。ProxyWebSocketHandler 是 WebSocketHandler 的 实现类 ,在「3.1 ProxyWebSocketHandler」来详细解析#handle(WebSocketSession)实现了什么逻辑。
- 主要逻辑在
3.1 ProxyWebSocketHandler
org.springframework.cloud.gateway.filter.WebsocketRoutingFilter.ProxyWebSocketHandler , 代理 后端 WebSocket 服务处理器。
构造方法,代码如下 :
1: private static class ProxyWebSocketHandler implements WebSocketHandler{
2:
3: private final WebSocketClient client;
4: private final URI url;
5: private final HttpHeaders headers;
6: private final List<String> subProtocols;
7:
8: public ProxyWebSocketHandler(URI url, WebSocketClient client, HttpHeaders headers){
9: this.client = client;
10: this.url = url;
11: this.headers = new HttpHeaders();//headers;
12: //TODO: better strategy to filter these headers?
13: headers.entrySet().forEach(header -> {
14: if (!header.getKey().toLowerCase().startsWith("sec-websocket")
15: && !header.getKey().equalsIgnoreCase("upgrade")
16: && !header.getKey().equalsIgnoreCase("connection")) {
17: this.headers.addAll(header.getKey(), header.getValue());
18: }
19: });
20: List<String> protocols = headers.get(SEC_WEBSOCKET_PROTOCOL);
21: if (protocols != null) {
22: this.subProtocols = protocols;
23: } else {
24: this.subProtocols = Collections.emptyList();
25: }
26: }
27: }
-
client属性,在 《Spring-Cloud-Gateway 源码解析 —— 网关初始化》「5.2 初始化 NettyConfiguration」 一文中,我们可以看到使用的是org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient实现类。通过该属性, 连接后端【被代理】的 WebSocket 服务 。 -
url属性,后端【被代理】的 WebSocket 服务的地址。 -
header属性,请求头,在 《 【计网】HTTP与WebSocket的区别》 有详细解析,包括为什么【第 14 至 18 行】的代码这样处理。 -
subProtocols属性,最终通信使用的协议。
#handle(WebSocketSession) 方法,代码如下 :
1: @Override
2: public Mono<Void> handle(WebSocketSession session){
3: // pass headers along so custom headers can be sent through
4: return client.execute(url, this.headers, new WebSocketHandler() {
5: @Override
6: public Mono<Void> handle(WebSocketSession proxySession){
7: // Use retain() for Reactor Netty
8: // 转发消息 客户端 =》后端服务
9: Mono<Void> proxySessionSend = proxySession
10: .send(session.receive().doOnNext(WebSocketMessage::retain));
11: // 转发消息 后端服务=》客户端
12: // .log("proxySessionSend", Level.FINE);
13: Mono<Void> serverSessionSend = session
14: .send(proxySession.receive().doOnNext(WebSocketMessage::retain));
15: // .log("sessionSend", Level.FINE);
16:
17: //
18: return Mono.when(proxySessionSend, serverSessionSend).then();
19: }
20:
21: /**
22: * Copy subProtocols so they are available downstream.
23: * @return
24: */
25: @Override
26: public List<String> getSubProtocols(){
27: return ProxyWebSocketHandler.this.subProtocols;
28: }
29: });
30: }
- 第 6 行 :调用
WebSocketClient#execute(URI, HttpHeaders, WebSocketHandler)方法, 连接后端【被代理】的 WebSocket 服务 。连接成功后,回调 WebSocketHandler 实现的内部类的#handle(WebSocketSession)方法。 - WebSocketHandler 实现的内部类
- 第 9 至 10 行 :转发消息,客户端
=>后端服务。 - 第 13 至 14 行 :转发消息,后端服务
=>客户端。 - 第 18 行 :调用
Mono#when()方法,合并proxySessionSend/serverSessionSend两个 Mono 。调用Mono#then()方法, 参数为空 ,合并的 Mono 不发射数据出来。RxJava 和 Reactor 类似,可以参考 《ReactiveX文档中文翻译 —— And/Then/When》 学习下when / and / then操作符。 - 下图可以帮助理解下这个类的用途 :
- 第 9 至 10 行 :转发消息,客户端
666. 彩蛋
:smiling_imp: 限于对 Reactor 和 Netty 了解不够深入,写的不够透彻。回头深入理解下它们。
胖友,分享一波朋友圈可好!
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- Spring Cloud(七):服务网关zuul过滤器
- 网关 Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.3) 之 RouteToRequestUrlFilter
- 网关 Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.8) 之 WebClientHttpRoutingFilter
- 网关 Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.5) 之 ForwardRoutingFilter
- 网关 Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.7) 之 NettyRoutingFilter
- 网关 Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.4) 之 LoadBalancerClientFilter 负载均衡
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
The Haskell School of Music
Paul Hudak、Donya Quick / Cambridge University Press / 2018-10-4 / GBP 42.99
This book teaches functional programming through creative applications in music and sound synthesis. Readers will learn the Haskell programming language and explore numerous ways to create music and d......一起来看看 《The Haskell School of Music》 这本书的介绍吧!