网站首页 文章专栏 soul源码解析(七) - http 长轮询数据同步过程
soul源码解析(七) - http 长轮询数据同步过程

一. admin端的初始化

1). 跟websocket方式一样,也是通过DataSyncConfiguration配置类,以及配置文件中数据同步策略,初始化

@Configuration
@ConditionalOnProperty(name = "soul.sync.http.enabled", havingValue = "true")
@EnableConfigurationProperties(HttpSyncProperties.class)
static class HttpLongPollingListener {

    @Bean
    @ConditionalOnMissingBean(HttpLongPollingDataChangedListener.class)
    public HttpLongPollingDataChangedListener httpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {
        return new HttpLongPollingDataChangedListener(httpSyncProperties);
    }

}

    注意一点是,websocket的配置中加了matchIfMissing 配置,也就是就算没有相关的配置,也会初始化websocket的初始化server

@ConditionalOnProperty(name = "soul.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)

2). 初始化的HttpLongPollingDataChangedListener类继承自AbstractDataChangedListener抽象类,该抽象类实现了DataChangedListener接口以及InitializingBean接口

    new该类的时候会初始化,一个用于存放连接的ArrayBlockingQueue,大小为1024,以及一个定时线程池

public HttpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {
    this.clients = new ArrayBlockingQueue<>(1024);
    this.scheduler = new ScheduledThreadPoolExecutor(1,
            SoulThreadFactory.create("long-polling", true));
    this.httpSyncProperties = httpSyncProperties;
}

3). 由于AbstractDataChangedListener抽象类实现了InitializingBean接口,则会在加载的时候执行afterPropertiesSet方法,方法中对数据库存放的数据进行缓存到本地

@Override
public final void afterPropertiesSet() {
    updateAppAuthCache();
    updatePluginCache();
    updateRuleCache();
    updateSelectorCache();
    updateMetaDataCache();
    afterInitialize();
}

// 将appAuth的信息从mysql查出来,并放到内存
protected void updateAppAuthCache() {
    this.updateCache(ConfigGroupEnum.APP_AUTH, appAuthService.listAll());
}

// 数据转为json格式,并加密
protected  void updateCache(final ConfigGroupEnum group, final List data) {
    String json = GsonUtils.getInstance().toJson(data);
    ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());
    ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);
    LOGGER.info("update config cache[{}], old:{}, updated:{}", group, oldVal, newVal);
}

最后调用 afterInitialize() 方法,在该方法中构建一个延迟任务,每隔5分钟刷新下本地缓存

@Override
protected void afterInitialize() {
    // 默认5分钟
    long syncInterval = httpSyncProperties.getRefreshInterval().toMillis();
    // Periodically check the data for changes and update the cache
    scheduler.scheduleWithFixedDelay(() -> {
        LOGGER.info("http sync strategy refresh config start.");
        try {
            this.refreshLocalCache();
            LOGGER.info("http sync strategy refresh config success.");
        } catch (Exception e) {
            LOGGER.error("http sync strategy refresh config error!", e);
        }
    }, syncInterval, syncInterval, TimeUnit.MILLISECONDS);
    LOGGER.info("http sync strategy refresh interval: {}ms", syncInterval);
}


二. bootstrap端的初始化

1). 根据配置,初始化HttpSyncDataConfiguration配置类

@Configuration
@ConditionalOnClass(HttpSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.http", name = "url")
@Slf4j
public class HttpSyncDataConfiguration {
    。。。
}

在配置类中,加载一个 HttpSyncDataService bean,该类实现了SyncDataService接口

public HttpSyncDataService(final HttpConfig httpConfig, final PluginDataSubscriber pluginDataSubscriber,
                           final List metaDataSubscribers, final List authDataSubscribers) {
    factory = new DataRefreshFactory(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
    this.httpConfig = httpConfig;
    this.serverList = Lists.newArrayList(Splitter.on(",").split(httpConfig.getUrl()));
    this.start(httpConfig);
}

初始化时,创建了DataRefreshFactory,并初始化了各个数据类型的刷新类,PluginDataRefresh,SelectorDataRefresh等等,这些类用于具体数据缓存实现

public DataRefreshFactory(final PluginDataSubscriber pluginDataSubscriber,
                          final List metaDataSubscribers,
                          final List authDataSubscribers) {
    ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataRefresh(pluginDataSubscriber));
    ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataRefresh(pluginDataSubscriber));
    ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataRefresh(pluginDataSubscriber));
    ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AppAuthDataRefresh(authDataSubscribers));
    ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataRefresh(metaDataSubscribers));
}

2). 在new HttpSyncDataService 时,最后调用了 start方法,在该方法中创建了一个http连接对象,readTime设置为90s,连接时间为10s

private void start(final HttpConfig httpConfig) {
    // init RestTemplate
    OkHttp3ClientHttpRequestFactory factory = new OkHttp3ClientHttpRequestFactory();
    factory.setConnectTimeout((int) this.connectionTimeout.toMillis());
    factory.setReadTimeout((int) HttpConstants.CLIENT_POLLING_READ_TIMEOUT);
    this.httpClient = new RestTemplate(factory);
    // It could be initialized multiple times, so you need to control that.
    if (RUNNING.compareAndSet(false, true)) {
        // fetch all group configs.
        this.fetchGroupConfig(ConfigGroupEnum.values());
        int threadSize = serverList.size();
        this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(),
                SoulThreadFactory.create("http-long-polling", true));
        // start long polling, each server creates a thread to listen for changes.
        this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
    } else {
        log.info("soul http long polling was started, executor=[{}]", executor);
    }
}

之后会去获取所有组配置,根据配置的ip端口信息列表去发起http请求

private void fetchGroupConfig(final ConfigGroupEnum... groups) throws SoulException {
    for (int index = 0; index < this.serverList.size(); index++) {
        String server = serverList.get(index);
        try {
            this.doFetchGroupConfig(server, groups);
            break;
        } catch (SoulException e) {
            // no available server, throw exception.
            if (index >= serverList.size() - 1) {
                throw e;
            }
            log.warn("fetch config fail, try another one: {}", serverList.get(index + 1));
        }
    }
}
private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
    StringBuilder params = new StringBuilder();
    for (ConfigGroupEnum groupKey : groups) {
        params.append("groupKeys").append("=").append(groupKey.name()).append("&");
    }
    String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");
    log.info("request configs: [{}]", url);
    String json = null;
    try {
        json = this.httpClient.getForObject(url, String.class);
    } catch (RestClientException e) {
        String message = String.format("fetch config fail from server[%s], %s", url, e.getMessage());
        log.warn(message);
        throw new SoulException(message, e);
    }
    // update local cache
    boolean updated = this.updateCacheWithJson(json);
    if (updated) {
        log.info("get latest configs: [{}]", json);
        return;
    }
    // not updated. it is likely that the current config server has not been updated yet. wait a moment.
    log.info("The config of the server[{}] has not been updated or is out of date. Wait for 30s to listen for changes again.", server);
    ThreadUtils.sleep(TimeUnit.SECONDS, 30);
}

拼装url以及参数信息

http://localhost:9095/configs/fetch?groupKeys=APP_AUTH&groupKeys=PLUGIN&groupKeys=RULE&groupKeys=SELECTOR&groupKeys=META_DATA

发起请求,拿到数据后,执行本地缓存

boolean updated = this.updateCacheWithJson(json);

根据初始化的配置组信息,遍历并刷新内存数据

public boolean executor(final JsonObject data) {
    final boolean[] success = {false};
    ENUM_MAP.values().parallelStream().forEach(dataRefresh -> success[0] = dataRefresh.refresh(data));
    return success[0];
}

@Override
public Boolean refresh(final JsonObject data) {
    boolean updated = false;
    JsonObject jsonObject = convert(data);
    if (null != jsonObject) {
        ConfigData result = fromJson(jsonObject);
        if (this.updateCacheIfNeed(result)) {
            updated = true;
            refresh(result.getData());
        }
    }
    return updated;
}

里面的 convent方法,fromJson方法,refresh方法都是具体ENUM_MAP中最早放进去的具体PluginDataRefresh,SelectorDataRefresh等实现类实现的,感觉设计的挺巧妙的

同时,刷新前,会去判断是否需要刷新,需要才刷新


3). 在全量获取数据后,根据admin数量启动相应的HttpLongPollingTask线程,不停的执行doLongPulling方法

while (RUNNING.get()) {
    for (int time = 1; time <= retryTimes; time++) {
        try {
            doLongPolling(server);
            
            ...

doLongPulling方法中,首先拼接url以及参数,发送 http://localhost:9095/configs/listener 请求,服务端接受到请求后阻塞住请求,如果配置数据更改,则会立即响应更改的组信息。否则,客户端的请求线程将被阻塞,直到数据发生变化或达到指定的超时时间。

4). http 请求到达 sou-admin 之后,并非立马响应数据,而是利用 Servlet3.0 的异步机制,异步响应数据。首先,将长轮询请求任务 LongPollingClient 扔到 BlocingQueue 中,并且开启调度任务,60s 后执行,这样做的目的是 60s 后将该长轮询请求移除队列,即便是这段时间内没有发生配置数据变更

public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {    // 因为soul-web可能未收到某个配置变更的通知,因此MD5值可能不一致,则立即响应
    List changedGroup = compareMD5(request);
    String clientIp = getRemoteIp(request);    if (CollectionUtils.isNotEmpty(changedGroup)) {        this.generateResponse(response, changedGroup);        return;
    }    // Servlet3.0异步响应http请求
    final AsyncContext asyncContext = request.startAsync();
    asyncContext.setTimeout(0L);
    scheduler.execute(new LongPollingClient(asyncContext, clientIp, 60));
}class LongPollingClient implements Runnable {
    LongPollingClient(final AsyncContext ac, final String ip, final long timeoutTime) {        // 省略......
    }    @Override
    public void run() {        // 加入定时任务,如果60s之内没有配置变更,则60s后执行,响应http请求
        this.asyncTimeoutFuture = scheduler.schedule(() -> {            // clients是阻塞队列,保存了来自soul-web的请求信息
            clients.remove(LongPollingClient.this);
            List changedGroups = HttpLongPollingDataChangedListener.compareMD5((HttpServletRequest) asyncContext.getRequest());
            sendResponse(changedGroups);
        }, timeoutTime, TimeUnit.MILLISECONDS);        //
        clients.add(this);
    }
}

5).  如果这段时间内,管理员变更了配置数据,此时,会挨个移除队列中的长轮询请求,并响应数据,告知是哪个 Group 的数据发生了变更(我们将插件、规则、流量配置、用户配置数据分成不同的组)。网关收到响应信息之后,只知道是哪个 Group 发生了配置变更,还需要再次请求该 Group 的配置数据。有人会问,为什么不是直接将变更的数据写出?因为 http 长轮询机制只能保证准实时,如果在网关层处理不及时,或者管理员频繁更新配置,很有可能便错过了某个配置变更的推送,安全起见,只告知某个 Group 信息发生了变更。

// soul-admin发生了配置变更,挨个将队列中的请求移除,并予以响应class DataChangeTask implements Runnable {
    DataChangeTask(final ConfigGroupEnum groupKey) {        this.groupKey = groupKey;
    }    @Override
    public void run() {        try {            for (Iterator iter = clients.iterator(); iter.hasNext(); ) {
                LongPollingClient client = iter.next();
                iter.remove();
                client.sendResponse(Collections.singletonList(groupKey));
            }
        } catch (Throwable e) {
            LOGGER.error("data change error.", e);
        }
    }
}

6).  网关层接收到 http 响应信息之后,拉取变更信息(如果有变更的话),然后再次请求 soul-admin 的配置服务

if (groupJson != null) {
    // fetch group configuration async.
    ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);
    if (ArrayUtils.isNotEmpty(changedGroups)) {
        log.info("Group config changed: {}", Arrays.toString(changedGroups));
        this.doFetchGroupConfig(server, changedGroups);
    }
}

调用 doFetchGroupConfig 方法拉去数据,并同步到本地。






版权声明:本文由星尘阁原创出品,转载请注明出处!

本文链接:http://www.52xingchen.cn/detail/63




赞助本站,网站的发展离不开你们的支持!
来说两句吧
大侠留个名吧,或者可以使用QQ登录。
: 您已登陆!可以继续留言。
最新评论