网站首页 文章专栏 soul源码解析(五)
    可以根据是否使用该插件,动态进行选择插件的种类,不用的插件不进行加载,很好的使得网关具备可拓展性,根据项目动态选择插件,而且可以自定义插件进行使用。

    这是整体的代码结构,按照不同插件分为各个子项目,同时包含以下顶层的接口信息,比如:api模块提供了各种顶层接口,base提供了各种基础数据,包括策略的一些代码等等,其余的即为各个插件实现,如:divide插件,ratelimiter插件,rewrite插件,sentinel插件,hystrix插件等。

    上面的即是divide插件的代码结构,spi下为流量转发的几个策略,hash,轮询,随机(加权),cache里面是对缓存的处理
    启动bootstrap时,会去加载SoulConfiguration类,初始化 webHandler bean
@Bean("webHandler")
public SoulWebHandler soulWebHandler(final ObjectProvider<List<SoulPlugin>> plugins) {
    List<SoulPlugin> pluginList = plugins.getIfAvailable(Collections::emptyList);
    final List<SoulPlugin> soulPlugins = pluginList.stream()
            .sorted(Comparator.comparingInt(SoulPlugin::getOrder)).collect(Collectors.toList());
    soulPlugins.forEach(soulPlugin -> log.info("loader plugin:[{}] [{}]", soulPlugin.named(), soulPlugin.getClass().getName()));
    return new SoulWebHandler(soulPlugins);
}如图,soulPluginList中为所如图,soulPluginList中为所有的被加载的插件信息:

    加载完成后日志会打印:
2021-01-19 22:00:25.580 INFO 13552 --- [ main] .s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 20ms. Found 0 Redis repository interfaces. 2021-01-19 22:06:06.695 INFO 13552 --- [ main] o.d.s.w.configuration.SoulConfiguration : loader plugin:[global] [org.dromara.soul.plugin.global.GlobalPlugin] 2021-01-19 22:06:06.695 INFO 13552 --- [ main] o.d.s.w.configuration.SoulConfiguration : loader plugin:[waf] [org.dromara.soul.plugin.waf.WafPlugin] 2021-01-19 22:06:06.695 INFO 13552 --- [ main] o.d.s.w.configuration.SoulConfiguration : loader plugin:[rate_limiter] [org.dromara.soul.plugin.ratelimiter.RateLimiterPlugin] 2021-01-19 22:06:06.695 INFO 13552 --- [ main] o.d.s.w.configuration.SoulConfiguration : loader plugin:[hystrix] [org.dromara.soul.plugin.hystrix.HystrixPlugin] 2021-01-19 22:06:06.695 INFO 13552 --- [ main] o.d.s.w.configuration.SoulConfiguration : loader plugin:[divide] [org.dromara.soul.plugin.divide.DividePlugin] 2021-01-19 22:06:06.695 INFO 13552 --- [ main] o.d.s.w.configuration.SoulConfiguration : loader plugin:[webClient] [org.dromara.soul.plugin.httpclient.WebClientPlugin] 2021-01-19 22:06:06.695 INFO 13552 --- [ main] o.d.s.w.configuration.SoulConfiguration : loader plugin:[divide] [org.dromara.soul.plugin.divide.websocket.WebSocketPlugin] 2021-01-19 22:06:06.695 INFO 13552 --- [ main] o.d.s.w.configuration.SoulConfiguration : loader plugin:[monitor] [org.dromara.soul.plugin.monitor.MonitorPlugin] 2021-01-19 22:06:06.695 INFO 13552 --- [ main] o.d.s.w.configuration.SoulConfiguration : loader plugin:[response] [org.dromara.soul.plugin.httpclient.response.WebClientResponsePlugin]
    
1. divide插件 DividePlugin 继承自 AbstractPlugin 抽象类,该抽象类又实现了顶层 SoulPlugin 接口,整体通过过滤器链完成各个插件的组装
流量请求来了之后,先进入GlobalPlugin,根据路径build soulcontext参数,并将其放入exchange参数内,进入下一过滤器。
@Override
public Mono execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
    final ServerHttpRequest request = exchange.getRequest();
    final HttpHeaders headers = request.getHeaders();
    final String upgrade = headers.getFirst("Upgrade");
    SoulContext soulContext;
    if (StringUtils.isBlank(upgrade) || !"websocket".equals(upgrade)) {
        soulContext = builder.build(exchange);
    } else {
        final MultiValueMap queryParams = request.getQueryParams();
        soulContext = transformMap(queryParams);
    }
    exchange.getAttributes().put(Constants.CONTEXT, soulContext);
    return chain.execute(exchange);
}再进入AbstractPlugin的 excute 方法,根据该请求的 plugin名称判断是否需要处理,不匹配就放行。
String pluginName = named();
final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
if (pluginData != null && pluginData.getEnabled()) {
    ...
}
return chain.execute(exchange);
    然后进入 dividePlugin 插件的 excute 方法,获取 前面放进去的 soulContext 参数,并获取相应的 处理规则rule,再获取目标机器的信息,如有多个则进行负载策略,放入 upstreamList 
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT); assert soulContext != null; final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class); final List upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
    再通过LoadBalanceUtil进行负载均衡:
根据规则的名称初始化LoadBalance,进行实际的负载策略
final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
public static DivideUpstream selector(final List upstreamList, final String algorithm, final String ip) {
    LoadBalance loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getJoin(algorithm);
    return loadBalance.select(upstreamList, ip);
}    如果,upstreamList只有一个元素,也就是单机,那么直接就执行了,因为不存在负载均衡
如果多台机器,则进行策略。
@Override
public DivideUpstream select(final List upstreamList, final String ip) {
    if (CollectionUtils.isEmpty(upstreamList)) {
        return null;
    }
    if (upstreamList.size() == 1) {
        return upstreamList.get(0);
    }
    return doSelect(upstreamList, ip);
}    这里看下随机的负载均衡实现,先算出总的实例个数,以及权重,如果权重相同,或者为0则均等随机,如果不相同且大于0,则按照总权重随机
@Override
public DivideUpstream doSelect(final List upstreamList, final String ip) {
    // 总个数
    int length = upstreamList.size();
    // 总权重
    int totalWeight = 0;
    // 权重是否都一样
    boolean sameWeight = true;
    for (int i = 0; i < length; i++) {
        int weight = upstreamList.get(i).getWeight();
        // 累计总权重
        totalWeight += weight;
        if (sameWeight && i > 0
                && weight != upstreamList.get(i - 1).getWeight()) {
            // 计算所有权重是否一样
            sameWeight = false;
        }
    }
    if (totalWeight > 0 && !sameWeight) {
        // 如果权重不相同且权重大于0则按总权重数随机
        int offset = RANDOM.nextInt(totalWeight);
        // 并确定随机值落在哪个片断上
        for (DivideUpstream divideUpstream : upstreamList) {
            offset -= divideUpstream.getWeight();
            if (offset < 0) {
                return divideUpstream;
            }
        }
    }
    // 如果权重相同或权重为0则均等随机
    return upstreamList.get(RANDOM.nextInt(length));
}    最终选择出目标执行机器信息,构建url以及参数,设置超时时间等,并进入下一插件。
//设置一下 http url String domain = buildDomain(divideUpstream); String realURL = buildRealURL(domain, soulContext, exchange); exchange.getAttributes().put(Constants.HTTP_URL, realURL); //设置下超时时间 exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout()); return chain.execute(exchange);
    进入webClientPlugin插件,获取具体执行的urlPath,进行实际的调用;
@Override
public Mono execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
    final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
    assert soulContext != null;
    String urlPath = exchange.getAttribute(Constants.HTTP_URL);
    if (StringUtils.isEmpty(urlPath)) {
        Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
        return WebFluxResultUtils.result(exchange, error);
    }
    long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);
    log.info("you request,The resulting urlPath is :{}", urlPath);
    HttpMethod method = HttpMethod.valueOf(exchange.getRequest().getMethodValue());
    WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(urlPath);
    return handleRequestBody(requestBodySpec, exchange, timeout, chain);
}    
    至此,所有的流程都走完了。
版权声明:本文由星尘阁原创出品,转载请注明出处!