上篇分享了SCG的启动原理,主要剖析了SCG如何在weblfux整体流程里切入自己的启动和初始化。本文我们将聚焦webflux DispatcherHandler 三大组件,更加深入的聊聊SCG的 Predicate 的实现原理,其中 RoutePredicateHandlerMapping 作为 DispatcherHandler 一个组件实现,从一开始就接管了请求。
作为前端控制器设计模式的实现,webflux中的 DispatcherHandler 和 SpringMVC 中 DispatcherServlet 如出一辙,是请求的集中访问点。对于 SCG,或者说所有基于 webflux 构建的系统,所有请求都是由 DispatcherHandler 调度的。如上图所示,DispatcherHandler(下文将简写为DH)有三大组件,
一组HandlerMapping: 缓存了系统所有请求映射规则(即路由规则) DH 处理请求的第一步便是从中找出当前请求最合适的Handler。
一组HandlerAdapter: Handler适配器组件,针对第一步找到的handler,DH 会从中匹配对应的一个 HandlerAdapter 来进一步处理请求,请求结果可能会返回HandlerResult。
一组 HandlerResultHandler: 针对第二步处理返回的 HandlerResult (也有可能没有,则不必处理),会从中匹配一个对应的 HandlerResultHandler 进一步处理。
上面activity图清晰表达了处理逻辑,如果不够过瘾,在来看下核心代码:
@Overridepublic Mono<Void> handle(ServerWebExchange exchange) { if (this.handlerMappings == null) { return createNotFoundError(); } if (CorsUtils.isPreFlightRequest(exchange.getRequest())) { return handlePreFlight(exchange); } return Flux.fromIterable(this.handlerMappings) .concatMap(mapping -> mapping.getHandler(exchange)) .next() .switchIfEmpty(createNotFoundError()) .flatMap(handler -> invokeHandler(exchange, handler)) .flatMap(result -> handleResult(exchange, result));}
全部逻辑都在最后一行Flux代码里,对Reactive代码不熟悉的同学,我稍微解释一下这段代码的意思:
从handlerMappings列表构造一个flux对象:Flux.fromIterable(this.handlerMappings)
依次对flux中HandlerMapping对象执行getHandler,找到则结束:concatMap(mapping -> mapping.getHandler(exchange)).next()
遍历完都没找到handler,则返回notFoundError: switchIfEmpty(createNotFoundError()) 对找到的handler执行invokeHandler方法: flatMap(handler -> invokeHandler(exchange, handler))
对上一步返回的结果进行处理(若有结果): flatMap(result -> handleResult(exchange, result))
这里有个类似逻辑但是更简单的例子可以run一下看看:FluxHandlerTest.java
了解整体思路之后,我们来看 HandlerMapping 实现:
webflux提供了三个HM, 对应图中蓝色的类:
RouterFunctionMapping
SimpleUrlHandlerMapping
RequestMappingHandlerMapping
SCG实现了一个自己的HandlerMapping: RoutePredicateHanderMapping(RPHM) 。SCG从这里开始接管 webflux 请求整个处理逻辑,其后的 Route 规则匹配、netty client对upstream的调用都在此刻埋下伏笔。
先看 webflux 中提供的三个 HandlerMapping,WebFluxConfigurationSupport在启动的时候 会初始化好这三个 bean,然后 springboot 会将他们自动注入到 DispatcherHandler 中。
RouterFunctionMapping
用于函数式断点的路由, 如果他匹配,则会将请求交给一个RouterFunction处理,比较抽象,举个例子:
//手动一个RouterFunction Bean加到 RouterFunctionMapping 中@Beanpublic RouterFunction<ServerResponse> routerHandlerConfig(){ return RouterFunctions.route(GET("/helloflux"), this::hello);}
那么请求 /helloflux 时,在 DH 第一阶段就会匹配到 RouterFunctionMapping 得到这个 RouteFunction 对象处理,有兴趣可以 debug 一下,代码链接:RouterConfig.java
SimpleUrlHandlerMapping
用于显示注册URL模式匹配,这里不举例了,早期写springMVC代码的时候经常有类似的配置。
RequestMappingHandlerMapping
基于注解的路由控制,项目中用的最多,比如:
@RestControllerpublic class HelloController { @GetMapping("/hello") public Mono<String> hello(){ return Mono.just("hello"); }}
我们来详细分析一下,在启动的时候会把所有类似上面代码解析成一个 RequestMappingInfo 对象并注册在 MappingRegistration 对象里:
一个 RequestMappingInfo 对象表达了详细的请求 mapping 规则,或者说路由规则,从上面类图很容易发现他包含请求path、参数、header、method等条件。所以,我们可以看一个稍微复杂一点的例子,不仅需要请求路劲匹配 /hello, 另外必须是get请求、带有请求参数foo=bar、带一个token=wls的header、并且 Content-Type 为 text/plain
@RequestMapping(path = "/hello", method = RequestMethod.GET, params = "foo=bar", headers = "token=wls", consumes = "text/plain")public Mono<String> helloWithMoreRouteConditions(){ return Mono.just("helloWithMoreRouteConditions");}
于是我们根据这个HelloController.java会得到如下mapping规则的注册信息( MappingRegistration ):
首先, pathLookup 有一个 /hello 的 mapping,映射到两个 RequestMappingInfo 对象,一个就是简单的 hello,另一个有多个条件。
其次, mappingRegistry 中对两个 RequestMappingInfo 对象分别映射到 handlerMethod (即 helloController#hello, helloController#helloWithMoreRouteConditions)
当执行简单的 /hello 请求时很显然会匹配并执行helloController#hello方法
curl http://localhost:10000/hello
而当带上更加精确的条件时的helloController里的两个方法都匹配,但是最佳匹配是helloWithMoreRouteConditions:
curl -X GET -H 'token:wls' -H 'Content-Type:text/plain' http://localhost:10000/hello?foo=bar
这里有一个最佳匹配规则,对应代码332、333行。
最后,我们来研究 SCG 实现的 RoutePredicateHanderMapping
RoutePredicateHanderMapping 集成了一个 RouteLocator 对象,根据上一篇”SCG的启动原理“分享,我们知道SCG的路由配置都会初始化好,并可以被 routeLocator 对象查到。此外,他还集成了一个 FilteringWebHandler 对象最为 webHandler,这些 Bean 都是在 GatewayAutoConfiguration 中自动创建和初始化的。所以,这里重点看一下当请求进来被 SCG 接管之后如何执行。 整体的执行逻辑还是再 DispatchHandler 框架内,第一步执行到 RoutePredicateHanderMapping#getHandlerInternal
@Overrideprotected Mono<?> getHandlerInternal(ServerWebExchange exchange) { if (this.managementPortType == DIFFERENT && this.managementPort != null && exchange.getRequest().getURI().getPort() == this.managementPort) { return Mono.empty(); } exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName()); return lookupRoute(exchange) .flatMap((Function<Route, Mono<?>>) r -> { exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR); if (logger.isDebugEnabled()) { logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r); } exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); return Mono.just(webHandler); }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> { exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR); if (logger.isTraceEnabled()) { logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]"); } })));}protected Mono<Route> lookupRoute(ServerWebExchange exchange) { return this.routeLocator.getRoutes() .concatMap(route -> Mono.just(route).filterWhen(r -> { exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId()); return r.getPredicate().apply(exchange); }) .doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e)) .onErrorResume(e -> Mono.empty())) .next() .map(route -> { if (logger.isDebugEnabled()) { logger.debug("Route matched: " + route.getId()); } validateRoute(route, exchange); return route; });}
代码还是很清晰的,第一步lookupRoute,找到第一个所有 predicate#apply 都为 true 的 route。第二步把该 route 缓存在 ServerWebExchange 属性里,然后返回 webHandler(这里是 SCG 的 FilteringWebHandler 对象),他的 handle 方法会被 DispatcherHandler 调用。我们进入到 FilteringWebHandler :
@Overridepublic Mono<Void> handle(ServerWebExchange exchange) { Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR); List<GatewayFilter> gatewayFilters = route.getFilters(); List<GatewayFilter> combined = new ArrayList<>(this.globalFilters); combined.addAll(gatewayFilters); // TODO: needed or cached? AnnotationAwareOrderComparator.sort(combined); if (logger.isDebugEnabled()) { logger.debug("Sorted gatewayFilterFactories: " + combined); } return new DefaultGatewayFilterChain(combined).filter(exchange);}
他是执行 SCG 的 Filter 链,这将是另一趴逻辑,具体展开得放到后续文章里,本文先可以明确的是当执行到其中一个叫 NettyRoutingFilter 时,netty HttpClient 会向 upstream 发起真正的请求,拿到 upstream 的结果之后返回给 downstream.
以上便是本文分享的全部内容,最后附全文引用的完整类图:
原文:https://juejin.cn/post/7097977648554442760