要求转发是 Gateway 最核心的功能之一 , 它涉及到三个紧张的观点 :
Route(路由): 路由是网关的基本单元,由ID、URI、一组Predicate、一组Filter组成,如果 Predicate 匹配 True ,则进行转发Predicate(谓语、断言): 路由转发的判断条件,这是一个 Java 8函数断言, 输入类型是 Spring Framework ServerWebExchange , 目前SpringCloud Gateway支持多种办法,常见如:Path、Query、Method、Header等,写法必须遵照 key=vlue的形式 Filter(过滤器): 过滤器是路由转发要求时所经由的过滤逻辑,利用特定工厂构建的 GatewayFilter 实例 , 可用于修正要求、相应内容
// - After=2017-01-20T17:42:47.789-07:00[America/Denver]// - Before=2017-01-20T17:42:47.789-07:00[America/Denver]//- Between=2017-01-20T17:42:47.789-07:00[America/Denver], 2017-01-21T17:42:47.789-07:00[America/Denver]// - Cookie=chocolate, ch.p复制代码
2.2 Mono 和 Flux
Mono 和 Flux 是贯穿了全体流程的核心工具 ,根据 reactive-streams 规范,发布做事器供应了数量可能无限的有序元素,并根据从其订阅做事器吸收到的需求发布这些元素。Reactor-core 有一组词 Publisher 接口的实现。我们将要创建序列的两个主要实现是 Mono 和 Flux。

> SpringGateway 是利用 webflux 作为底层调用框架的 , 个中涉及到 mono 和 Flux 工具
> 该序列中可以包含 3 种关照 :
正常的包含元素的序列结束的序列出错的Flux
Flux是一个标准Publisher,表示0到N个发射项的异步序列,选择性地以完成或缺点旗子暗记终止。与Reactive Streams规范中一样,这三种类型的旗子暗记转换为对下贱订阅者的onNext、onComplete或onError方法的调用。Mono
Mono 是 Publisher 的另一个实现。它最多发出一个条款,然后(可选)以 onComplete 旗子暗记或 onError 旗子暗记终止 , Mono 在实质上也是异步的它只供应了可用于Flux的操作符的子集,并且一些操作符(特殊是那些将Mono与另一个发布者组合的操作符)切换到Flux。例如,Mono#concatWith(Publisher)返回一个Flux ,而Mono#then(Mono)则返回另一个Mono。常见的方法如下 :
create : 以编程办法创建具有多次发射能力的Flux,empty : 发出0元素或返回空 Flux < t >just : 创建一个根本error : 创建一个Flux,它在订阅之后立即以指定的缺点终止PS : 这一块就不深入看了 , 先看完 Gateway 的主流程
三 . 拦截深入3.1 事理图首先来看一下 SpringGateway 的事理图
四 . 调用的入口4.1 调用流程Step 1 : HttpWebHandlerAdapter # handle : 构建 ServerWebExchange , 发起 Handler 处理Step 2 : DispatcherHandler # handle : 发起要求处理Step 3 : RoutePredicateHandlerMapping # getHandlerInternal : route 判断处理4.2. getHandlerInternal 逻辑
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) { // don't handle requests on management port if set and different than server port if (this.managementPortType == DIFFERENT && this.managementPort != null && exchange.getRequest().getURI().getPort() == this.managementPort) { return Mono.empty(); } exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName()); return lookupRoute(exchange) // .log("route-predicate-handler-mapping", Level.FINER) //name this .flatMap((Function<Route, Mono<?>>) r -> { exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR); if (logger.isDebugEnabled()) { logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r); } exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); return Mono.just(webHandler); }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> { exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR); if (logger.isTraceEnabled()) { logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]"); } })));}复制代码
3.2. lookupRoute
protected Mono<Route> lookupRoute(ServerWebExchange exchange) { return this.routeLocator.getRoutes() // individually filter routes so that filterWhen error delaying is not a // problem .concatMap(route -> Mono.just(route).filterWhen(r -> { // add the current route we are testing exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId()); return r.getPredicate().apply(exchange); }) // instead of immediately stopping main flux due to error, log and // swallow it .doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e)) .onErrorResume(e -> Mono.empty())) // .defaultIfEmpty() put a static Route not found // or .switchIfEmpty() // .switchIfEmpty(Mono.<Route>empty().log("noroute")) .next() // TODO: error handling .map(route -> { validateRoute(route, exchange); return route; });}复制代码
会遍历所有的 route
五. 发送的流程5.1 FilteringWebHandler 体系
此处的 webHandler 为 FilteringWebHandler 工具 , 来看一下这个工具的浸染
这里涉及到以下的 Filter :
C- ForwardPathFilter :C- ForwardRoutingFilter : 用来做本地forward的C- GatewayMetricsFilter : 与 Prometheus 整合,从而创建一个 Grafana dashboardC- LoadBalancerClientFilter : 用来整合Ribbon的 , 先获取微做事的名称,然后再通过Ribbon获取实际的调用地址C- NettyRoutingFilter : http 或 https ,利用 Netty 的 HttpClient 向下贱的做事发送代理要求C- NettyWriteResponseFilter : 用于将代理相应写回网关的客户端侧,以是该过滤器会在所有其他过滤器实行完成后才实行C- OrderedGatewayFilter :C- RouteToRequestUrlFilter : 将从request里获取的 原始url转换成Gateway进行要求转发时所利用的urlC- WebClientHttpRoutingFilter :C- WebClientWriteResponseFilter :C- WebsocketRoutingFilter : ws 或者 wss,那么该Filter会利用 Spring Web Socket 将 Websocket 要求转发到下贱C- WeightCalculatorWebFilter :可以参考 -> Spring Cloud Gateway 内置的全局过滤器
调用逻辑1 : FilteringWebHandler 管理
该工具中存在一个内部类 DefaultGatewayFilterChain , 该类为 Filter 过滤链
rivate static class DefaultGatewayFilterChain implements GatewayFilterChain { // 当前 Filter 链索引 private final int index; // Filter 凑集 private final List<GatewayFilter> filters; DefaultGatewayFilterChain(List<GatewayFilter> filters) { this.filters = filters; this.index = 0; } private DefaultGatewayFilterChain(DefaultGatewayFilterChain parent, int index) { this.filters = parent.getFilters(); this.index = index; } public List<GatewayFilter> getFilters() { return filters; } @Override public Mono<Void> filter(ServerWebExchange exchange) { return Mono.defer(() -> { if (this.index < filters.size()) { // 逐个 Filter 过滤调用 GatewayFilter filter = filters.get(this.index); DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1); return filter.filter(exchange, chain); } else { return Mono.empty(); // complete } }); }}复制代码
调用流程 3 : Filter 过滤
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // 常日判断部分条件 , 如果该 Filter 不符合 , 则跳过该 Filter if (isAlreadyRouted(exchange) || (!"http".equals(scheme) && !"https".equals(scheme))) { return chain.filter(exchange); }复制代码
5.2 发送的主体
核心的发送 Filter 是 NettyRoutingFilter, 下面只关注这个 Filter 的干系逻辑 :
C- NettyRoutingFilterpublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // 要求 URL : http://httpbin.org:80/get URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR); // 协议类型 : http String scheme = requestUrl.getScheme(); // Step 1 : filter 链处理 ,如果不符合 http 协议 , 就通过下一个 Filter 处理 if (isAlreadyRouted(exchange) || (!"http".equals(scheme) && !"https".equals(scheme))) { return chain.filter(exchange); } // Step 2 : 标识 Routed 已处理 setAlreadyRouted(exchange); // Step 3 : 获取 Request 要求工具 , 这个是外部要求的工具 ServerHttpRequest request = exchange.getRequest(); // Step 4 : 获取 Method 类型 (get/post...) final HttpMethod method = HttpMethod.valueOf(request.getMethodValue()); final String url = requestUrl.toString(); // Step 5 : 对 Header 进行处理 , 须要转发过去 HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange); final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders(); filtered.forEach(httpHeaders::set); // -> Transfer-Encoding String transferEncoding = request.getHeaders().getFirst(HttpHeaders.TRANSFER_ENCODING); boolean chunkedTransfer = "chunked".equalsIgnoreCase(transferEncoding); // -> preserveHostHeader boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false); // 通过 netty httpClient 发起转发要求 , PS !!! 这里是异步的 Flux<HttpClientResponse> responseFlux = this.httpClient .chunkedTransfer(chunkedTransfer).request(method).uri(url) .send((req, nettyOutbound) -> { // Step 6 : 转发 Header req.headers(httpHeaders); // => 是否须要记录之前的 host if (preserveHost) { String host = request.getHeaders().getFirst(HttpHeaders.HOST); req.header(HttpHeaders.HOST, host); } // Step 7 : 真正发起要求 return nettyOutbound.options(NettyPipeline.SendOptions::flushOnEach) .send(request.getBody() .map(dataBuffer -> ((NettyDataBuffer) dataBuffer) .getNativeBuffer())); }).responseConnection((res, connection) -> { // Step 8 : 要求完成 , 获取 response ServerHttpResponse response = exchange.getResponse(); // Step 9 : 转发headers 和 status 等属性 HttpHeaders headers = new HttpHeaders(); res.responseHeaders().forEach( entry -> headers.add(entry.getKey(), entry.getValue())); // => String CONTENT_TYPE = "Content-Type" // => String ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR = "original_response_content_type"; String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE); if (StringUtils.hasLength(contentTypeValue)) { exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue); } // 转发状态 , 存在往 GatewayResponse 设置状态 HttpStatus status = HttpStatus.resolve(res.status().code()); if (status != null) { response.setStatusCode(status); } else if (response instanceof AbstractServerHttpResponse) { ((AbstractServerHttpResponse) response) .setStatusCodeValue(res.status().code()); } else { throw new IllegalStateException( "Unable to set status code on response: " + res.status().code() + ", " + response.getClass()); } // 确保 Header filter 在设置状态后运行, 校验 header 中 filter 正常 HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter( getHeadersFilters(), headers, exchange, Type.RESPONSE); // String TRANSFER_ENCODING = "Transfer-Encoding" // String CONTENT_LENGTH = "Content-Length" if (!filteredResponseHeaders .containsKey(HttpHeaders.TRANSFER_ENCODING) && filteredResponseHeaders .containsKey(HttpHeaders.CONTENT_LENGTH)) { // content-length 存在须要去掉 Transfer-Encoding response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING); } exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet()); response.getHeaders().putAll(filteredResponseHeaders); // 延迟提交相应,直到所有路由过滤器都运行 // 将客户端相应作为ServerWebExchange属性,稍后写入相应NettyWriteResponseFilter exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res); exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection); return Mono.just(res); }); if (properties.getResponseTimeout() != null) { // 超时非常处理 responseFlux = responseFlux.timeout(properties.getResponseTimeout(), Mono.error(new TimeoutException("Response took longer than timeout: " + properties.getResponseTimeout()))) .onErrorMap(TimeoutException.class, // GATEWAY_TIMEOUT(504, "Gateway Timeout") th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th)); } return responseFlux.then(chain.filter(exchange));}复制代码
5.3 返回 Response
C- NettyWriteResponseFilterpublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { return chain.filter(exchange).then(Mono.defer(() -> { // Step 1 : 获取 GatewayRequest Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR); // 连接不存在直接返回空 if (connection == null) { return Mono.empty(); } // Step 2 : 获取 GatewayResponse ServerHttpResponse response = exchange.getResponse(); NettyDataBufferFactory factory = (NettyDataBufferFactory) response .bufferFactory(); // 此处紧张包含一个 byteBufflux final Flux<NettyDataBuffer> body = connection.inbound().receive().retain() .map(factory::wrap); // 媒体类型 MediaType contentType = null; try { contentType = response.getHeaders().getContentType(); } catch (Exception e) { log.trace("invalid media type", e); } return (isStreamingMediaType(contentType) ? response.writeAndFlushWith(body.map(Flux::just)) : response.writeWith(body)); }));}复制代码
总结
由于 netty 的底层理解得还不是很清楚 , 对付一些调用过程没办法输出数据看 , 这篇文章心里也不是很有底 , 后续深入后再来补充细节
作者:AntBlack链接:https://juejin.cn/post/6994822653177495589来源:掘金著作权归作者所有。商业转载请联系作者得到授权,非商业转载请注明出处。