内容简介:网关 Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.7) 之 NettyRoutingFilter
摘要: 原创出处 http://www.iocoder.cn/Spring-Cloud-Gateway/filter-netty-routing/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文主要基于 Spring-Cloud-Gateway 2.0.X M4
关注**微信公众号:【芋道源码】**有福利:
- RocketMQ / MyCAT / Sharding-JDBC 所有 源码分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
- 您对于源码的疑问每条留言 都 将得到 认真 回复。 甚至不知道如何读源码也可以请教噢 。
- 新的 源码解析文章 实时 收到通知。 每周更新一篇左右 。
- 认真的 源码交流微信群。
1. 概述
本文主要分享 NettyRoutingFilter 的代码实现 。
NettyRoutingFilter ,Netty 路由 网关过滤器。其根据 http://
或 https://
前缀( Scheme )过滤处理,使用基于 Netty 实现的 HttpClient 请求后端 Http 服务。
NettyWriteResponseFilter ,与 NettyRoutingFilter 成对使用 的网关过滤器。其将 NettyRoutingFilter 请求后端 Http 服务的 响应 写回客户端。
大体流程如下 :
另外,Spring Cloud Gateway 实现了 WebClientHttpRoutingFilter / WebClientWriteResponseFilter ,功能上和 NettyRoutingFilter / NettyWriteResponseFilter 相同 ,差别在于基于 org.springframework.cloud.gateway.filter.WebClient
实现的 HttpClient 请求后端 Http 服务。在 《Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.8) 之 WebClientHttpRoutingFilter》 ,我们会详细解析。
推荐 Spring Cloud 书籍:
- 请支持正版。下载盗版, 等于主动编写低级 BUG 。
- 程序猿DD —— 《Spring Cloud微服务实战》
- 周立 —— 《Spring Cloud与 Docker 微服务架构实战》
- 两书齐买,京东包邮。
2. NettyRoutingFilter
org.springframework.cloud.gateway.filter.NettyRoutingFilter
,Netty 路由 网关过滤器。
构造方法,代码如下 :
public class NettyRoutingFilter implements GlobalFilter, Ordered { private final HttpClient httpClient; public NettyRoutingFilter(HttpClient httpClient) { this.httpClient = httpClient; } }
-
httpClient
属性,基于 Netty 实现的 HttpClient 。通过该属性, 请求后端的 Http 服务 。
#getOrder()
方法,代码如下 :
@Override public int getOrder(){ return Ordered.LOWEST_PRECEDENCE; }
- 返回顺序为
Integer.MAX_VALUE
。在 《Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.1) 之 GatewayFilter 一览》「3. GlobalFilter」 ,我们列举了所有 GlobalFilter 的顺序。
#filter(ServerWebExchange, GatewayFilterChain)
方法,代码如下 :
1: @Override 2: public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain){ 3: // 获得 requestUrl 4: URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR); 5: 6: // 判断是否能够处理 7: String scheme = requestUrl.getScheme(); 8: if (isAlreadyRouted(exchange) || (!scheme.equals("http") && !scheme.equals("https"))) { 9: return chain.filter(exchange); 10: } 11: 12: // 设置已经路由 13: setAlreadyRouted(exchange); 14: 15: ServerHttpRequest request = exchange.getRequest(); 16: 17: // Request Method 18: final HttpMethod method = HttpMethod.valueOf(request.getMethod().toString()); 19: 20: // 获得 url 21: final String url = requestUrl.toString(); 22: 23: // Request Header 24: final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders(); 25: request.getHeaders().forEach(httpHeaders::set); 26: 27: // 请求 28: return this.httpClient.request(method, url, req -> { 29: final HttpClientRequest proxyRequest = req.options(NettyPipeline.SendOptions::flushOnEach) 30: .failOnClientError(false) // // 是否请求失败,抛出异常 31: .headers(httpHeaders); 32: 33: // Request Form 34: if (MediaType.APPLICATION_FORM_URLENCODED.includes(request.getHeaders().getContentType())) { 35: return exchange.getFormData() 36: .flatMap(map -> proxyRequest.sendForm(form -> { 37: for (Map.Entry<String, List<String>> entry: map.entrySet()) { 38: for (String value : entry.getValue()) { 39: form.attr(entry.getKey(), value); 40: } 41: } 42: }).then()) 43: .then(chain.filter(exchange)); 44: } 45: 46: // Request Body 47: return proxyRequest.sendHeaders() //I shouldn't need this 48: .send(request.getBody() 49: .map(DataBuffer::asByteBuffer) // Flux<DataBuffer> => ByteBuffer 50: .map(Unpooled::wrappedBuffer)); // ByteBuffer => Flux<DataBuffer> 51: }).doOnNext(res -> { 52: ServerHttpResponse response = exchange.getResponse(); 53: // Response Header 54: // put headers and status so filters can modify the response 55: HttpHeaders headers = new HttpHeaders(); 56: res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue())); 57: response.getHeaders().putAll(headers); 58: 59: // Response Status 60: response.setStatusCode(HttpStatus.valueOf(res.status().code())); 61: 62: // 设置 Response 到 CLIENT_RESPONSE_ATTR 63: // Defer committing the response until all route filters have run 64: // Put client response as ServerWebExchange attribute and write response later NettyWriteResponseFilter 65: exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res); 66: }).then(chain.filter(exchange)); 67: }
-
第 4 行 :获得
requestUrl
。 -
第 7 至 10 行 :判断 ForwardRoutingFilter 是否能够处理该请求,需要满足两个条件 :
-
http://
或者https://
前缀( Scheme ) 。 -
调用
ServerWebExchangeUtils#isAlreadyRouted(ServerWebExchange)
方法,判断该请求暂未被其他 Routing 网关处理。代码如下 :public static boolean isAlreadyRouted(ServerWebExchange exchange){ return exchange.getAttributeOrDefault(GATEWAY_ALREADY_ROUTED_ATTR, false); }
-
-
第 13 行 :设置该请求已经被处理。代码如下 :
public static void setAlreadyRouted(ServerWebExchange exchange){ exchange.getAttributes().put(GATEWAY_ALREADY_ROUTED_ATTR, true); }
-
第 18 行 :创建 Netty Request Method 对象。
request#getMethod()
返回的不是io.netty.handler.codec.http.HttpMethod
,所以需要进行转换。 -
第 21 行 :获得
url
。 -
第 24 至 25 行 :创建 Netty Request Header 对象(
io.netty.handler.codec.http.DefaultHttpHeaders
),将请求的 Header 设置给它。 -
--------- 第 28 至 50 行 :调用
HttpClient#request(HttpMethod, String, Function)
方法,请求后端 Http 服务。 -
第 29 至 31 行 :创建 Netty Request 对象(
reactor.ipc.netty.http.client.HttpClientRequest
)。-
第 29 行 :TODO 【3024】 NettyPipeline.SendOptions::flushOnEach
-
第 30 行 :设置请求失败( 后端服务返回响应状体码
>= 400
)时,不抛出异常。相关代码如下 :// HttpClientOperations#checkResponseCode(HttpResponse response) // ... 省略无关代码 if (code >= 400) { if (clientError) { if (log.isDebugEnabled()) { log.debug("{} Received Request Error, stop reading: {}", channel(), response.toString()); } Exception ex = new HttpClientException(uri(), response); parentContext().fireContextError(ex); receive().subscribe(); return false; } return true; }
- 通过设置
clientError = false
,第 51 行可以调用Mono#doNext(Consumer)
方法, 统一订阅处理 返回的reactor.ipc.netty.http.client.HttpClientResponse
对象。
- 通过设置
-
第 31 行 :设置 Netty Request 对象的 Header 。
-
-
第 34 至 44 行 :【TODO 3025】目前是一个 BUG ,在 2.0.X 版本修复。见 FormIntegrationTests#formUrlencodedWorks() 单元测试的注释说明。
-
第 47 至 50 行 :请求后端的 Http 服务。
- 第 47 行 :发送请求 Header 。
- 第 48 至 50 行 :发送请求 Body 。其中中间的
#map(...)
的过程为Flux<DataBuffer> => ByteBuffer => Flux<DataBuffer>
。
-
--------- 第 51 至 65 行 :请求后端 Http 服务 完成 ,将 Netty Response 赋值给响应
response
。 -
第 53 至 57 行 :创建
org.springframework.http.HttpHeaders
对象,将 Netty Response Header 设置给它,而后设置回给响应response
。 -
第 60 行 :设置响应
response
的状态码。 -
第 65 行 :设置 Netty Response 到
CLIENT_RESPONSE_ATTR
。后续 NettyWriteResponseFilter 将 Netty Response 写回给客户端。 -
--------- 第 66 行 :提交过滤器链继续过滤。
3. NettyWriteResponseFilter
org.springframework.cloud.gateway.filter.NettyWriteResponseFilter
,Netty 回写 响应 网关过滤器。
#getOrder()
方法,代码如下 :
public static final int WRITE_RESPONSE_FILTER_ORDER = -1; @Override public int getOrder(){ return WRITE_RESPONSE_FILTER_ORDER; }
- 返回顺序为
-1
。在 《Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.1) 之 GatewayFilter 一览》「3. GlobalFilter」 ,我们列举了所有 GlobalFilter 的顺序。
#filter(ServerWebExchange, GatewayFilterChain)
方法,代码如下 :
1: @Override 2: public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain){ 3: // NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_ATTR is not added 4: // until the WebHandler is run 5: return chain.filter(exchange).then(Mono.defer(() -> { 6: // 获得 Response 7: HttpClientResponse clientResponse = exchange.getAttribute(CLIENT_RESPONSE_ATTR); 8: // HttpClientResponse clientResponse = getAttribute(exchange, CLIENT_RESPONSE_ATTR, HttpClientResponse.class); 9: if (clientResponse == null) { 10: return Mono.empty(); 11: } 12: log.trace("NettyWriteResponseFilter start"); 13: ServerHttpResponse response = exchange.getResponse(); 14: 15: // 将 Netty Response 写回给客户端。 16: NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory(); 17: //TODO: what if it's not netty 18: final Flux<NettyDataBuffer> body = clientResponse.receive() 19: .retain() // ByteBufFlux => ByteBufFlux 20: .map(factory::wrap); // ByteBufFlux => Flux<NettyDataBuffer> 21: return response.writeWith(body); 22: })); 23: }
- 第 5 行 :调用
#then(Mono)
方法,实现 After Filter 逻辑。 - 第 7 至 11 行 :从
CLIENT_RESPONSE_ATTR
中,获得 Netty Response 。 - 第 15 至 21 行 :将 Netty Response 写回给客户端。因为
org.springframework.http.server.reactive#writeWith(Publisher<? extends DataBuffer>)
需要的参数类型是Publisher<? extends DataBuffer>
,所以【第 18 至 20 行】的转换过程是ByteBufFlux => Flux<NettyDataBuffer>
。- 第 19 行 :TODO 【3024】ByteBufFlux#retain()
666. 彩蛋
下一篇 《Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.8) 之 WebClientHttpRoutingFilter》 走起!
胖友,分享一波朋友圈可好!
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
猜你喜欢:- Spring Cloud(七):服务网关zuul过滤器
- 网关 Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.3) 之 RouteToRequestUrlFilter
- 网关 Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.8) 之 WebClientHttpRoutingFilter
- 网关 Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.5) 之 ForwardRoutingFilter
- 网关 Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.6) 之 WebSocketRoutingFilter
- 网关 Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.4) 之 LoadBalancerClientFilter 负载均衡
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。