内容简介:这里是本篇将基于Spring-Cloud-Gateway 基于过滤器实现,同 zuul 类似,有
概述
这里是 SpringCloud Gateway
实践的第一篇,主要讲过滤器的相关实现。Spring-Cloud-Gateway 是以 WebFlux
为基础的响应式架构设计, 是异步非阻塞式的,它能够充分利用多核 CPU 的硬件资源去处理大量的并发请求。
本篇将基于 spring-cloud-gateway 简介 基础环境进行改造。
工作原理
Spring-Cloud-Gateway 基于过滤器实现,同 zuul 类似,有 pre 和 post 两种方式的 filter,分别处理 前置逻辑 和 后置逻辑 。客户端的请求先经过 pre 类型的 filter,然后将请求转发到具体的业务服务,收到业务服务的响应之后,再经过 post 类型的 filter 处理,最后返回响应到客户端。
过滤器执行流程如下, order 越大,优先级越低
接下来我们来验证下 filter
执行顺序。
这里创建 3 个过滤器,分别配置不同的优先级
@Slf4j public class AFilter implements GlobalFilter { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { log.info("AFilter前置逻辑"); return chain.filter(exchange).then(Mono.fromRunnable(() -> { log.info("AFilter后置逻辑"); })); } } @Slf4j public class BFilter implements GlobalFilter { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { log.info("BFilter前置逻辑"); return chain.filter(exchange).then(Mono.fromRunnable(() -> { log.info("BFilter后置逻辑"); })); } } @Slf4j public class CFilter implements GlobalFilter { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { log.info("CFilter前置逻辑"); return chain.filter(exchange).then(Mono.fromRunnable(() -> { log.info("CFilter后置逻辑"); })); } } @Configuration public class FilterConfig { @Bean @Order(-1) public GlobalFilter a() { return new AFilter(); } @Bean @Order(0) public GlobalFilter b() { return new BFilter(); } @Bean @Order(1) public GlobalFilter c() { return new CFilter(); } }
curl -X POST -H "Content-Type:application/json" -d '{"name": "admin"}' http://192.168.124.5:2000/p/provider1 curl -X GET -G -d "username=admin" http://192.168.124.5:2000/p/provider1/1
查看网关输出日志
2020-03-29 16:23:22.832 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.AFilter : AFilter前置逻辑 2020-03-29 16:23:22.832 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.BFilter : BFilter前置逻辑 2020-03-29 16:23:22.832 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.CFilter : CFilter前置逻辑 2020-03-29 16:23:22.836 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.CFilter : CFilter后置逻辑 2020-03-29 16:23:22.836 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.BFilter : BFilter后置逻辑 2020-03-29 16:23:22.836 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.AFilter : AFilter后置逻辑
自定义过滤器
现在假设我们要统计某个服务的响应时间,我们可以在代码中
long beginTime = System.currentTimeMillis(); // do something... long elapsed = System.currentTimeMillis() - beginTime; log.info("elapsed: {}ms", elapsed);
每次都要这么写是不是很烦?Spring 告诉我们有个东西叫 AOP。但是我们是微服务啊,在每个服务里都写也很烦。这时候就该网关的过滤器登台表演了。
自定义过滤器需要实现 GatewayFilter
和 Ordered
。其中 GatewayFilter
中的这个方法就是用来实现你的自定义的逻辑的
Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
而 Ordered
中的 int getOrder()
方法是来给过滤器设定优先级别的,值越大则优先级越低。
好了,让我们来撸代码吧.
/** * 此过滤器功能为计算请求完成时间 */ public class ElapsedFilter implements GatewayFilter, Ordered { private static final String ELAPSED_TIME_BEGIN = "elapsedTimeBegin"; @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { exchange.getAttributes().put(ELAPSED_TIME_BEGIN, System.currentTimeMillis()); return chain.filter(exchange).then( Mono.fromRunnable(() -> { Long startTime = exchange.getAttribute(ELAPSED_TIME_BEGIN); if (startTime != null) { System.out.println(exchange.getRequest().getURI().getRawPath() + ": " + (System.currentTimeMillis() - startTime) + "ms"); } }) ); } /* *过滤器存在优先级,order越大,优先级越低 */ @Override public int getOrder() { return Ordered.LOWEST_PRECEDENCE; } }
我们在请求刚刚到达时,往 ServerWebExchange
中放入了一个属性 elapsedTimeBegin
,属性值为当时的毫秒级时间戳。然后在请求执行结束后,又从中取出我们之前放进去的那个时间戳,与当前时间的差值即为该请求的耗时。因为这是与业务无关的日志所以将 Ordered
设为 Integer.MAX_VALUE
以降低优先级。
现在再来看我们之前的问题:怎么来区分是 “pre” 还是 “post” 呢?其实就是 chain.filter(exchange)
之前的就是 “pre” 部分,之后的也就是 then
里边的是 “post” 部分。
创建好 Filter 之后我们将它添加到我们的 Filter Chain 里边
@Configuration public class FilterConfig { /** * http://localhost:8100/filter/provider * @param builder * @return */ @Bean public RouteLocator customerRouteLocator(RouteLocatorBuilder builder) { // @formatter:off // 可以对比application.yml中关于路由转发的配置 return builder.routes() .route(r -> r.path("/filter/**") .filters(f -> f.stripPrefix(1) .filter(new ElapsedFilter())) .uri("lb://idc-cloud-provider") .order(0) .id("filter") ) .build(); // @formatter:on } }
基于全局过滤器实现审计功能
// AdaptCachedBodyGlobalFilter @Component public class LogFilter implements GlobalFilter, Ordered { private Logger log = LoggerFactory.getLogger(LogFilter.class); private final ObjectMapper objectMapper = new ObjectMapper(); private static final String START_TIME = "startTime"; private static final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders(); @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); // 请求路径 String path = request.getPath().pathWithinApplication().value(); // 请求schema: http/https String scheme = request.getURI().getScheme(); // 请求方法 HttpMethod method = request.getMethod(); // 路由服务地址 URI targetUri = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR); // 请求头 HttpHeaders headers = request.getHeaders(); // 设置startTime exchange.getAttributes().put(START_TIME, System.currentTimeMillis()); // 获取请求地址 InetSocketAddress remoteAddress = request.getRemoteAddress(); MultiValueMap<String, String> formData = null; AccessRecord accessRecord = new AccessRecord(); accessRecord.setPath(path); accessRecord.setSchema(scheme); accessRecord.setMethod(method.name()); accessRecord.setTargetUri(targetUri.toString()); accessRecord.setRemoteAddress(remoteAddress.toString()); accessRecord.setHeaders(headers); if (method == HttpMethod.GET) { formData = request.getQueryParams(); accessRecord.setFormData(formData); writeAccessRecord(accessRecord); } if (method == HttpMethod.POST) { Mono<Void> voidMono = null; if (headers.getContentType().equals(MediaType.APPLICATION_JSON)) { // JSON voidMono = readBody(exchange, chain, accessRecord); } if (headers.getContentType().equals(MediaType.APPLICATION_FORM_URLENCODED)) { // x-www-form-urlencoded voidMono = readFormData(exchange, chain, accessRecord); } if (voidMono != null) { return voidMono; } } return chain.filter(exchange); } private Mono<Void> readFormData(ServerWebExchange exchange, GatewayFilterChain chain, AccessRecord accessRecord) { return null; } private Mono<Void> readBody(ServerWebExchange exchange, GatewayFilterChain chain, AccessRecord accessRecord) { return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> { byte[] bytes = new byte[dataBuffer.readableByteCount()]; dataBuffer.read(bytes); DataBufferUtils.release(dataBuffer); Flux<DataBuffer> cachedFlux = Flux.defer(() -> { DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes); DataBufferUtils.retain(buffer); return Mono.just(buffer); }); // 重写请求体,因为请求体数据只能被消费一次 ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) { @Override public Flux<DataBuffer> getBody() { return cachedFlux; } }; ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build(); return ServerRequest.create(mutatedExchange, messageReaders) .bodyToMono(String.class) .doOnNext(objectValue -> { accessRecord.setBody(objectValue); writeAccessRecord(accessRecord); }).then(chain.filter(mutatedExchange)); }); } @Override public int getOrder() { return Ordered.LOWEST_PRECEDENCE; } /** * TODO 异步日志 * @param accessRecord */ private void writeAccessRecord(AccessRecord accessRecord) { log.info("\n\n start------------------------------------------------- \n " + "请求路径:{}\n " + "scheme:{}\n " + "请求方法:{}\n " + "目标服务:{}\n " + "请求头:{}\n " + "远程IP地址:{}\n " + "表单参数:{}\n " + "请求体:{}\n " + "end------------------------------------------------- \n ", accessRecord.getPath(), accessRecord.getSchema(), accessRecord.getMethod(), accessRecord.getTargetUri(), accessRecord.getHeaders(), accessRecord.getRemoteAddress(), accessRecord.getFormData(), accessRecord.getBody()); } }
curl -X POST -H "Content-Type:application/json" -d '{"name": "admin"}' http://192.168.124.5:2000/p/provider1 curl -X GET -G -d "username=admin" http://192.168.124.5:2000/p/provider1/1
输出结果
start------------------------------------------------- 请求路径:/provider1 scheme:http 请求方法:POST 目标服务:http://192.168.124.5:2001/provider1 请求头:[Content-Type:"application/json", User-Agent:"PostmanRuntime/7.22.0", Accept:"*/*", Cache-Control:"no-cache", Postman-Token:"2a4ce04d-8449-411d-abd8-247d20421dc2", Host:"192.168.124.5:2000", Accept-Encoding:"gzip, deflate, br", Content-Length:"16", Connection:"keep-alive"] 远程IP地址:/192.168.124.5:49969 表单参数:null 请求体:{"name":"admin"} end-------------------------------------------------
接下来,我们来配置日志,方便日志系统提取日志。SpringBoot 默认的日志为 logback。
<?xml version="1.0" encoding="UTF-8"?> <configuration> <property name="LOGS" value="/Users/cuishiying/Documents/spring-cloud-learning/logs" /> <appender name="Console"> <layout> <Pattern> %black(%d{ISO8601}) %highlight(%-5level) [%blue(%t)] %yellow(%C{1.}): %msg%n%throwable </Pattern> </layout> </appender> <appender name="RollingFile"> <file>${LOGS}/spring-boot-logger.log</file> <encoder > <Pattern>%d %p %C{1.} [%t] %m%n</Pattern> </encoder> <rollingPolicy > <!-- rollover daily and when the file reaches 10 MegaBytes --> <fileNamePattern>${LOGS}/archived/spring-boot-logger-%d{yyyy-MM-dd}.%i.log </fileNamePattern> <timeBasedFileNamingAndTriggeringPolicy > <maxFileSize>10MB</maxFileSize> </timeBasedFileNamingAndTriggeringPolicy> </rollingPolicy> </appender> <!-- LOG everything at INFO level --> <root level="info"> <!--<appender-ref ref="RollingFile" />--> <appender-ref ref="Console" /> </root> <!-- LOG "cn.idea360*" at TRACE level additivity:是否向上级loger传递打印信息。默认是true--> <logger name="cn.idea360.gateway" level="info" additivity="false"> <appender-ref ref="RollingFile" /> <appender-ref ref="Console" /> </logger> </configuration>
这样 console 和日志目录下就都有日志了。
自定义过滤器工厂
如果你看过静态路由的配置,你应该对如下配置有印象。
filters: - StripPrefix=1 - AddResponseHeader=X-Response-Default-Foo, Default-Bar
StripPrefix
、 AddResponseHeader
这两个实际上是两个过滤器工厂(GatewayFilterFactory),用这种配置的方式更灵活方便。
我们就将之前的那个 ElapsedFilter
改造一下,让它能接收一个 boolean
类型的参数,来决定是否将请求参数也打印出来。
public class ElapsedGatewayFilterFactory extends AbstractGatewayFilterFactory<ElapsedGatewayFilterFactory.Config> { private static final Log log = LogFactory.getLog(GatewayFilter.class); private static final String ELAPSED_TIME_BEGIN = "elapsedTimeBegin"; private static final String KEY = "withParams"; public List<String> shortcutFieldOrder() { return Arrays.asList(KEY); } public ElapsedGatewayFilterFactory() { super(Config.class); } public GatewayFilter apply(Config config) { return (exchange, chain) -> { exchange.getAttributes().put(ELAPSED_TIME_BEGIN, System.currentTimeMillis()); return chain.filter(exchange).then( Mono.fromRunnable(() -> { Long startTime = exchange.getAttribute(ELAPSED_TIME_BEGIN); if (startTime != null) { StringBuilder sb = new StringBuilder(exchange.getRequest().getURI().getRawPath()) .append(": ") .append(System.currentTimeMillis() - startTime) .append("ms"); if (config.isWithParams()) { sb.append(" params:").append(exchange.getRequest().getQueryParams()); } log.info(sb.toString()); } }) ); }; } public static class Config { private boolean withParams; public boolean isWithParams() { return withParams; } public void setWithParams(boolean withParams) { this.withParams = withParams; } } }
过滤器工厂的顶级接口是 GatewayFilterFactory
,我们可以直接继承它的两个抽象类来简化开发 AbstractGatewayFilterFactory
和 AbstractNameValueGatewayFilterFactory
,这两个抽象类的区别就是前者接收一个参数(像 StripPrefix
和我们创建的这种),后者接收两个参数(像 AddResponseHeader
)。
GatewayFilter apply(Config config)
方法内部实际上是创建了一个 GatewayFilter
的匿名类,具体实现和之前的几乎一样,就不解释了。
静态内部类 Config
就是为了接收那个 boolean
类型的参数服务的,里边的变量名可以随意写,但是要重写 List shortcutFieldOrder()
这个方法。
这里注意一下,一定要调用一下父类的构造器把 Config
类型传过去,否则会报 ClassCastException
public ElapsedGatewayFilterFactory() { super(Config.class); }
工厂类我们有了,再把它注册到 Spring 当中
@Bean public ElapsedGatewayFilterFactory elapsedGatewayFilterFactory() { return new ElapsedGatewayFilterFactory(); }
然后添加配置(主要改动在 default-filters
配置)
server: port: 2000 spring: application: name: idc-gateway redis: host: localhost port: 6379 timeout: 6000ms # 连接超时时长(毫秒) jedis: pool: max-active: 1000 # 连接池最大连接数(使用负值表示没有限制) max-wait: -1ms # 连接池最大阻塞等待时间(使用负值表示没有限制) max-idle: 10 # 连接池中的最大空闲连接 min-idle: 5 # 连接池中的最小空闲连接 cloud: consul: host: localhost port: 8500 gateway: discovery: locator: enabled: true # 修改在这里。gateway可以通过开启以下配置来打开根据服务的serviceId来匹配路由,默认是大写 default-filters: - Elapsed=true routes: - id: provider # 路由 ID,保持唯一 uri: lb://idc-provider1 # uri指目标服务地址,lb代表从注册中心获取服务 predicates: # 路由条件。Predicate 接受一个输入参数,返回一个布尔值结果。该接口包含多种默认方法来将 Predicate 组合成其他复杂的逻辑(比如:与,或,非) - Path=/p/** filters: - StripPrefix=1 # 过滤器StripPrefix,作用是去掉请求路径的最前面n个部分截取掉。StripPrefix=1就代表截取路径的个数为1,比如前端过来请求/test/good/1/view,匹配成功后,路由到后端的请求路径就会变成http://localhost:8888/good/1/view
结语
本文到此结束。关于 Webflux
的学习刚入门,觉得可以像 Rxjava
那样在 onNext
中拿到异步数据,然而在 post
获取 body 中没生效。经测试可知 getBody
获得的数据输出为 null,而自己通过 Flux.create
创建的数据可以在订阅者中获取到。此处还有待研究,希望抛砖引玉,大家有研究出来的不吝赐教。同时,希望大家关注公众号【当我遇上你】。
参考
以上所述就是小编给大家介绍的《spring-cloud-gateway过滤器实践》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
The Intersectional Internet
Safiya Umoja Noble、Brendesha M. Tynes / Peter Lang Publishing / 2016
From race, sex, class, and culture, the multidisciplinary field of Internet studies needs theoretical and methodological approaches that allow us to question the organization of social relations that ......一起来看看 《The Intersectional Internet》 这本书的介绍吧!
URL 编码/解码
URL 编码/解码
RGB CMYK 转换工具
RGB CMYK 互转工具