网站首页 文章专栏 soul源码解析(五)
可以根据是否使用该插件,动态进行选择插件的种类,不用的插件不进行加载,很好的使得网关具备可拓展性,根据项目动态选择插件,而且可以自定义插件进行使用。
这是整体的代码结构,按照不同插件分为各个子项目,同时包含以下顶层的接口信息,比如:api模块提供了各种顶层接口,base提供了各种基础数据,包括策略的一些代码等等,其余的即为各个插件实现,如:divide插件,ratelimiter插件,rewrite插件,sentinel插件,hystrix插件等。
上面的即是divide插件的代码结构,spi下为流量转发的几个策略,hash,轮询,随机(加权),cache里面是对缓存的处理
启动bootstrap时,会去加载SoulConfiguration类,初始化 webHandler bean
@Bean("webHandler") public SoulWebHandler soulWebHandler(final ObjectProvider<List<SoulPlugin>> plugins) { List<SoulPlugin> pluginList = plugins.getIfAvailable(Collections::emptyList); final List<SoulPlugin> soulPlugins = pluginList.stream() .sorted(Comparator.comparingInt(SoulPlugin::getOrder)).collect(Collectors.toList()); soulPlugins.forEach(soulPlugin -> log.info("loader plugin:[{}] [{}]", soulPlugin.named(), soulPlugin.getClass().getName())); return new SoulWebHandler(soulPlugins); }
如图,soulPluginList中为所如图,soulPluginList中为所有的被加载的插件信息:
加载完成后日志会打印:
2021-01-19 22:00:25.580 INFO 13552 --- [ main] .s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 20ms. Found 0 Redis repository interfaces. 2021-01-19 22:06:06.695 INFO 13552 --- [ main] o.d.s.w.configuration.SoulConfiguration : loader plugin:[global] [org.dromara.soul.plugin.global.GlobalPlugin] 2021-01-19 22:06:06.695 INFO 13552 --- [ main] o.d.s.w.configuration.SoulConfiguration : loader plugin:[waf] [org.dromara.soul.plugin.waf.WafPlugin] 2021-01-19 22:06:06.695 INFO 13552 --- [ main] o.d.s.w.configuration.SoulConfiguration : loader plugin:[rate_limiter] [org.dromara.soul.plugin.ratelimiter.RateLimiterPlugin] 2021-01-19 22:06:06.695 INFO 13552 --- [ main] o.d.s.w.configuration.SoulConfiguration : loader plugin:[hystrix] [org.dromara.soul.plugin.hystrix.HystrixPlugin] 2021-01-19 22:06:06.695 INFO 13552 --- [ main] o.d.s.w.configuration.SoulConfiguration : loader plugin:[divide] [org.dromara.soul.plugin.divide.DividePlugin] 2021-01-19 22:06:06.695 INFO 13552 --- [ main] o.d.s.w.configuration.SoulConfiguration : loader plugin:[webClient] [org.dromara.soul.plugin.httpclient.WebClientPlugin] 2021-01-19 22:06:06.695 INFO 13552 --- [ main] o.d.s.w.configuration.SoulConfiguration : loader plugin:[divide] [org.dromara.soul.plugin.divide.websocket.WebSocketPlugin] 2021-01-19 22:06:06.695 INFO 13552 --- [ main] o.d.s.w.configuration.SoulConfiguration : loader plugin:[monitor] [org.dromara.soul.plugin.monitor.MonitorPlugin] 2021-01-19 22:06:06.695 INFO 13552 --- [ main] o.d.s.w.configuration.SoulConfiguration : loader plugin:[response] [org.dromara.soul.plugin.httpclient.response.WebClientResponsePlugin]
1. divide插件 DividePlugin 继承自 AbstractPlugin 抽象类,该抽象类又实现了顶层 SoulPlugin 接口,整体通过过滤器链完成各个插件的组装
流量请求来了之后,先进入GlobalPlugin,根据路径build soulcontext参数,并将其放入exchange参数内,进入下一过滤器。
@Override public Mono execute(final ServerWebExchange exchange, final SoulPluginChain chain) { final ServerHttpRequest request = exchange.getRequest(); final HttpHeaders headers = request.getHeaders(); final String upgrade = headers.getFirst("Upgrade"); SoulContext soulContext; if (StringUtils.isBlank(upgrade) || !"websocket".equals(upgrade)) { soulContext = builder.build(exchange); } else { final MultiValueMap queryParams = request.getQueryParams(); soulContext = transformMap(queryParams); } exchange.getAttributes().put(Constants.CONTEXT, soulContext); return chain.execute(exchange); }
再进入AbstractPlugin的 excute 方法,根据该请求的 plugin名称判断是否需要处理,不匹配就放行。
String pluginName = named(); final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName); if (pluginData != null && pluginData.getEnabled()) { ... } return chain.execute(exchange);
然后进入 dividePlugin 插件的 excute 方法,获取 前面放进去的 soulContext 参数,并获取相应的 处理规则rule,再获取目标机器的信息,如有多个则进行负载策略,放入 upstreamList
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT); assert soulContext != null; final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class); final List upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
再通过LoadBalanceUtil进行负载均衡:
根据规则的名称初始化LoadBalance,进行实际的负载策略
final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress(); DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip); public static DivideUpstream selector(final List upstreamList, final String algorithm, final String ip) { LoadBalance loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getJoin(algorithm); return loadBalance.select(upstreamList, ip); }
如果,upstreamList只有一个元素,也就是单机,那么直接就执行了,因为不存在负载均衡
如果多台机器,则进行策略。
@Override public DivideUpstream select(final List upstreamList, final String ip) { if (CollectionUtils.isEmpty(upstreamList)) { return null; } if (upstreamList.size() == 1) { return upstreamList.get(0); } return doSelect(upstreamList, ip); }
这里看下随机的负载均衡实现,先算出总的实例个数,以及权重,如果权重相同,或者为0则均等随机,如果不相同且大于0,则按照总权重随机
@Override public DivideUpstream doSelect(final List upstreamList, final String ip) { // 总个数 int length = upstreamList.size(); // 总权重 int totalWeight = 0; // 权重是否都一样 boolean sameWeight = true; for (int i = 0; i < length; i++) { int weight = upstreamList.get(i).getWeight(); // 累计总权重 totalWeight += weight; if (sameWeight && i > 0 && weight != upstreamList.get(i - 1).getWeight()) { // 计算所有权重是否一样 sameWeight = false; } } if (totalWeight > 0 && !sameWeight) { // 如果权重不相同且权重大于0则按总权重数随机 int offset = RANDOM.nextInt(totalWeight); // 并确定随机值落在哪个片断上 for (DivideUpstream divideUpstream : upstreamList) { offset -= divideUpstream.getWeight(); if (offset < 0) { return divideUpstream; } } } // 如果权重相同或权重为0则均等随机 return upstreamList.get(RANDOM.nextInt(length)); }
最终选择出目标执行机器信息,构建url以及参数,设置超时时间等,并进入下一插件。
//设置一下 http url String domain = buildDomain(divideUpstream); String realURL = buildRealURL(domain, soulContext, exchange); exchange.getAttributes().put(Constants.HTTP_URL, realURL); //设置下超时时间 exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout()); return chain.execute(exchange);
进入webClientPlugin插件,获取具体执行的urlPath,进行实际的调用;
@Override public Mono execute(final ServerWebExchange exchange, final SoulPluginChain chain) { final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT); assert soulContext != null; String urlPath = exchange.getAttribute(Constants.HTTP_URL); if (StringUtils.isEmpty(urlPath)) { Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null); return WebFluxResultUtils.result(exchange, error); } long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L); log.info("you request,The resulting urlPath is :{}", urlPath); HttpMethod method = HttpMethod.valueOf(exchange.getRequest().getMethodValue()); WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(urlPath); return handleRequestBody(requestBodySpec, exchange, timeout, chain); }
至此,所有的流程都走完了。
版权声明:本文由星尘阁原创出品,转载请注明出处!