网站首页 文章专栏 soul源码解析(五)
soul源码解析(五)

一. 插件的作用

    可以根据是否使用该插件,动态进行选择插件的种类,不用的插件不进行加载,很好的使得网关具备可拓展性,根据项目动态选择插件,而且可以自定义插件进行使用。

二. 插件模块代码结构

    

image.png

    这是整体的代码结构,按照不同插件分为各个子项目,同时包含以下顶层的接口信息,比如:api模块提供了各种顶层接口,base提供了各种基础数据,包括策略的一些代码等等,其余的即为各个插件实现,如:divide插件,ratelimiter插件,rewrite插件,sentinel插件,hystrix插件等。

    

image.png

    上面的即是divide插件的代码结构,spi下为流量转发的几个策略,hash,轮询,随机(加权),cache里面是对缓存的处理


三. divide初始化过程

    启动bootstrap时,会去加载SoulConfiguration类,初始化 webHandler bean

@Bean("webHandler")
public SoulWebHandler soulWebHandler(final ObjectProvider<List<SoulPlugin>> plugins) {
    List<SoulPlugin> pluginList = plugins.getIfAvailable(Collections::emptyList);
    final List<SoulPlugin> soulPlugins = pluginList.stream()
            .sorted(Comparator.comparingInt(SoulPlugin::getOrder)).collect(Collectors.toList());
    soulPlugins.forEach(soulPlugin -> log.info("loader plugin:[{}] [{}]", soulPlugin.named(), soulPlugin.getClass().getName()));
    return new SoulWebHandler(soulPlugins);
}

    如图,soulPluginList中为所如图,soulPluginList中为所有的被加载的插件信息:

    

image.png

    加载完成后日志会打印:

2021-01-19 22:00:25.580  INFO 13552 --- [           main] .s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 20ms. Found 0 Redis repository interfaces.
2021-01-19 22:06:06.695  INFO 13552 --- [           main] o.d.s.w.configuration.SoulConfiguration  : loader plugin:[global] [org.dromara.soul.plugin.global.GlobalPlugin]
2021-01-19 22:06:06.695  INFO 13552 --- [           main] o.d.s.w.configuration.SoulConfiguration  : loader plugin:[waf] [org.dromara.soul.plugin.waf.WafPlugin]
2021-01-19 22:06:06.695  INFO 13552 --- [           main] o.d.s.w.configuration.SoulConfiguration  : loader plugin:[rate_limiter] [org.dromara.soul.plugin.ratelimiter.RateLimiterPlugin]
2021-01-19 22:06:06.695  INFO 13552 --- [           main] o.d.s.w.configuration.SoulConfiguration  : loader plugin:[hystrix] [org.dromara.soul.plugin.hystrix.HystrixPlugin]
2021-01-19 22:06:06.695  INFO 13552 --- [           main] o.d.s.w.configuration.SoulConfiguration  : loader plugin:[divide] [org.dromara.soul.plugin.divide.DividePlugin]
2021-01-19 22:06:06.695  INFO 13552 --- [           main] o.d.s.w.configuration.SoulConfiguration  : loader plugin:[webClient] [org.dromara.soul.plugin.httpclient.WebClientPlugin]
2021-01-19 22:06:06.695  INFO 13552 --- [           main] o.d.s.w.configuration.SoulConfiguration  : loader plugin:[divide] [org.dromara.soul.plugin.divide.websocket.WebSocketPlugin]
2021-01-19 22:06:06.695  INFO 13552 --- [           main] o.d.s.w.configuration.SoulConfiguration  : loader plugin:[monitor] [org.dromara.soul.plugin.monitor.MonitorPlugin]
2021-01-19 22:06:06.695  INFO 13552 --- [           main] o.d.s.w.configuration.SoulConfiguration  : loader plugin:[response] [org.dromara.soul.plugin.httpclient.response.WebClientResponsePlugin]

    

    1. divide插件 DividePlugin 继承自 AbstractPlugin 抽象类,该抽象类又实现了顶层 SoulPlugin 接口,整体通过过滤器链完成各个插件的组装

    流量请求来了之后,先进入GlobalPlugin,根据路径build soulcontext参数,并将其放入exchange参数内,进入下一过滤器。

@Override
public Mono execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
    final ServerHttpRequest request = exchange.getRequest();
    final HttpHeaders headers = request.getHeaders();
    final String upgrade = headers.getFirst("Upgrade");
    SoulContext soulContext;
    if (StringUtils.isBlank(upgrade) || !"websocket".equals(upgrade)) {
        soulContext = builder.build(exchange);
    } else {
        final MultiValueMap queryParams = request.getQueryParams();
        soulContext = transformMap(queryParams);
    }
    exchange.getAttributes().put(Constants.CONTEXT, soulContext);
    return chain.execute(exchange);
}


     再进入AbstractPlugin的 excute 方法,根据该请求的 plugin名称判断是否需要处理,不匹配就放行。

String pluginName = named();
final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
if (pluginData != null && pluginData.getEnabled()) {
    ...
}
return chain.execute(exchange);

    

    然后进入 dividePlugin 插件的 excute 方法,获取 前面放进去的 soulContext 参数,并获取相应的 处理规则rule,再获取目标机器的信息,如有多个则进行负载策略,放入 upstreamList 

final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
assert soulContext != null;
final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class);
final List upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());

    

    再通过LoadBalanceUtil进行负载均衡:

    根据规则的名称初始化LoadBalance,进行实际的负载策略

final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);

public static DivideUpstream selector(final List upstreamList, final String algorithm, final String ip) {
    LoadBalance loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getJoin(algorithm);
    return loadBalance.select(upstreamList, ip);
}

    如果,upstreamList只有一个元素,也就是单机,那么直接就执行了,因为不存在负载均衡

    如果多台机器,则进行策略。

@Override
public DivideUpstream select(final List upstreamList, final String ip) {
    if (CollectionUtils.isEmpty(upstreamList)) {
        return null;
    }
    if (upstreamList.size() == 1) {
        return upstreamList.get(0);
    }
    return doSelect(upstreamList, ip);
}

    这里看下随机的负载均衡实现,先算出总的实例个数,以及权重,如果权重相同,或者为0则均等随机,如果不相同且大于0,则按照总权重随机

@Override
public DivideUpstream doSelect(final List upstreamList, final String ip) {
    // 总个数
    int length = upstreamList.size();
    // 总权重
    int totalWeight = 0;
    // 权重是否都一样
    boolean sameWeight = true;
    for (int i = 0; i < length; i++) {
        int weight = upstreamList.get(i).getWeight();
        // 累计总权重
        totalWeight += weight;
        if (sameWeight && i > 0
                && weight != upstreamList.get(i - 1).getWeight()) {
            // 计算所有权重是否一样
            sameWeight = false;
        }
    }
    if (totalWeight > 0 && !sameWeight) {
        // 如果权重不相同且权重大于0则按总权重数随机
        int offset = RANDOM.nextInt(totalWeight);
        // 并确定随机值落在哪个片断上
        for (DivideUpstream divideUpstream : upstreamList) {
            offset -= divideUpstream.getWeight();
            if (offset < 0) {
                return divideUpstream;
            }
        }
    }
    // 如果权重相同或权重为0则均等随机
    return upstreamList.get(RANDOM.nextInt(length));
}

    最终选择出目标执行机器信息,构建url以及参数,设置超时时间等,并进入下一插件。

//设置一下 http url
String domain = buildDomain(divideUpstream);
String realURL = buildRealURL(domain, soulContext, exchange);
exchange.getAttributes().put(Constants.HTTP_URL, realURL);
//设置下超时时间
exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
return chain.execute(exchange);

    进入webClientPlugin插件,获取具体执行的urlPath,进行实际的调用;

@Override
public Mono execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
    final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
    assert soulContext != null;
    String urlPath = exchange.getAttribute(Constants.HTTP_URL);
    if (StringUtils.isEmpty(urlPath)) {
        Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
        return WebFluxResultUtils.result(exchange, error);
    }
    long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);
    log.info("you request,The resulting urlPath is :{}", urlPath);
    HttpMethod method = HttpMethod.valueOf(exchange.getRequest().getMethodValue());
    WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(urlPath);
    return handleRequestBody(requestBodySpec, exchange, timeout, chain);
}

    

    至此,所有的流程都走完了。




版权声明:本文由星尘阁原创出品,转载请注明出处!

本文链接:http://www.52xingchen.cn/detail/61




赞助本站,网站的发展离不开你们的支持!
来说两句吧
大侠留个名吧,或者可以使用QQ登录。
: 您已登陆!可以继续留言。
最新评论