网站首页 文章专栏 soul源码解析(八) - zookeeper 长轮询数据同步过程
soul源码解析(八) - zookeeper 长轮询数据同步过程

一. admin端初始化

跟其他流程的方式一样,也是通过DataSyncConfiguration配置类,以及配置文件中数据同步策略,初始化,如有不懂参见:soul源码解析(七) - http 长轮询数据同步过程

@Configuration
@ConditionalOnProperty(prefix = "soul.sync.zookeeper", name = "url")
@Import(ZookeeperConfiguration.class)
static class ZookeeperListener {

   /**
    * Config event listener data changed listener.
    *
    * @param zkClient the zk client
    * @return the data changed listener
    */
   @Bean
   @ConditionalOnMissingBean(ZookeeperDataChangedListener.class)
   public DataChangedListener zookeeperDataChangedListener(final ZkClient zkClient) {
       return new ZookeeperDataChangedListener(zkClient);
   }

   /**
    * Zookeeper data init zookeeper data init.
    *
    * @param zkClient        the zk client
    * @param syncDataService the sync data service
    * @return the zookeeper data init
    */
   @Bean
   @ConditionalOnMissingBean(ZookeeperDataInit.class)
   public ZookeeperDataInit zookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {
       return new ZookeeperDataInit(zkClient, syncDataService);
   }
}

在自动装配的过程中,依赖加载了ZookeeperConfiguration,而在改配置装载的时候,同时初始化了ZkClient

@EnableConfigurationProperties(ZookeeperProperties.class)
public class ZookeeperConfiguration {
   
   @Bean
   @ConditionalOnMissingBean(ZkClient.class)
   public ZkClient zkClient(final ZookeeperProperties zookeeperProp) {
       return new ZkClient(zookeeperProp.getUrl(), zookeeperProp.getSessionTimeout(), zookeeperProp.getConnectionTimeout());
   }
}

ZookeeperListener 初始化加载了两个bean,分别是 ZookeeperDataChangedListener ,和 ZookeeperDataInit。

ZookeeperDataChangedListener 跟其他的listener一样,同样实现了 DataChangedListener 接口,用于具体的事件监听,并处理数据,加载该bean的时候,维护一个zkclient成员变量,通过zkclient进行操作数据

ZookeeperDataInit 类实现了 CommandLineRunner 接口,其作用就是项目启动后执行run方法,在其加载的时候,初始化一个 syncDataService,用于具体的数据增删改查

@Override
public void run(final String... args) {
    String pluginPath = ZkPathConstants.PLUGIN_PARENT;
    String authPath = ZkPathConstants.APP_AUTH_PARENT;
    String metaDataPath = ZkPathConstants.META_DATA;
    if (!zkClient.exists(pluginPath) && !zkClient.exists(authPath) && !zkClient.exists(metaDataPath)) {
        syncDataService.syncAll(DataEventTypeEnum.REFRESH);
    }
}

拿到 数据的 path路径,判断zk中是否存在,不存在则进行全量缓存。走syncAll方法

@Override
public boolean syncAll(final DataEventTypeEnum type) {
    appAuthService.syncData();
    List pluginDataList = pluginService.listAll();
    eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));
    List selectorDataList = selectorService.listAll();
    eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));
    List ruleDataList = ruleService.listAll();
    eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));
    metaDataService.syncData();
    return true;
}

该方法websocket也会走,会去数据库查询数据,并发布相应的事件。

DataChangedEventDispatcher 中的方法 onApplicationEvent 感知到发布的事件后,有具体加载的数据同步方式执行相应的方法,本文中用zk同步,也就是在 ZookeeperDataChangedListener  中执行。

public void onApplicationEvent(final DataChangedEvent event) {
    for (DataChangedListener listener : listeners) {
        switch (event.getGroupKey()) {
            case APP_AUTH:
                listener.onAppAuthChanged((List) event.getSource(), event.getEventType());
                break;
            case PLUGIN:
                listener.onPluginChanged((List) event.getSource(), event.getEventType());
                break;
            case RULE:
                listener.onRuleChanged((List) event.getSource(), event.getEventType());
                break;
            case SELECTOR:
                listener.onSelectorChanged((List) event.getSource(), event.getEventType());
                break;
            case META_DATA:
                listener.onMetaDataChanged((List) event.getSource(), event.getEventType());
                break;
            default:
                throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
        }
    }
}

ZookeeperDataChangedListener 中onPluginChanged方法,先去判断是否是删除时间,如果是删除事件,则直接删除,如果不是,则构建根路径

public void onPluginChanged(final List changed, final DataEventTypeEnum eventType) {
    for (PluginData data : changed) {
        // delete
        if (eventType == DataEventTypeEnum.DELETE) {
            String pluginPath = ZkPathConstants.buildPluginPath(data.getName());
            if (zkClient.exists(pluginPath)) {
                zkClient.deleteRecursive(pluginPath);
            }
            String selectorParentPath = ZkPathConstants.buildSelectorParentPath(data.getName());
            if (zkClient.exists(selectorParentPath)) {
                zkClient.deleteRecursive(selectorParentPath);
            }
            String ruleParentPath = ZkPathConstants.buildRuleParentPath(data.getName());
            if (zkClient.exists(ruleParentPath)) {
                zkClient.deleteRecursive(ruleParentPath);
            }
            continue;
        }

        // update
        String pluginPath = ZkPathConstants.buildPluginPath(data.getName());
        if (!zkClient.exists(pluginPath)) {
            zkClient.createPersistent(pluginPath, true);
        }
        zkClient.writeData(pluginPath, data);
    }
}

然后判断zk是否存在,不存在,则创建节点,并循环向里面写数据。其他事件大同小异,流程差不多。


二. bootstrap端的启动

根据配置,初始化 ZookeeperSyncDataConfiguration 配置类,并加载 ZookeeperSyncDataService 和 ZkClient两个bean

@Configuration
@ConditionalOnClass(ZookeeperSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.zookeeper", name = "url")
@EnableConfigurationProperties(ZookeeperConfig.class)
@Slf4j
public class ZookeeperSyncDataConfiguration {
    
    
    @Bean
    public SyncDataService syncDataService(final ObjectProvider zkClient, final ObjectProvider pluginSubscriber,
                                           final ObjectProvider metaSubscribers, final ObjectProvider authSubscribers) {
        log.info("you use zookeeper sync soul data.......");
        return new ZookeeperSyncDataService(zkClient.getIfAvailable(), pluginSubscriber.getIfAvailable(),
                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
    }
    
    
    @Bean
    public ZkClient zkClient(final ZookeeperConfig zookeeperConfig) {
        return new ZkClient(zookeeperConfig.getUrl(), zookeeperConfig.getSessionTimeout(), zookeeperConfig.getConnectionTimeout());
    }
    
}

初始化ZookeeperSyncDataService 时,就会执行watch方法

public ZookeeperSyncDataService(final ZkClient zkClient, final PluginDataSubscriber pluginDataSubscriber,
                                final List metaDataSubscribers, final List authDataSubscribers) {
    this.zkClient = zkClient;
    this.pluginDataSubscriber = pluginDataSubscriber;
    this.metaDataSubscribers = metaDataSubscribers;
    this.authDataSubscribers = authDataSubscribers;
    watcherData();
    watchAppAuth();
    watchMetaData();
}

查询指定节点的数据,如不存在,则创建,存在则watchAll

private void watcherData() {
    final String pluginParent = ZkPathConstants.PLUGIN_PARENT;
    if (!zkClient.exists(pluginParent)) {
        zkClient.createPersistent(pluginParent, true);
    }
    List pluginZKs = zkClient.getChildren(ZkPathConstants.buildPluginParentPath());
    for (String pluginName : pluginZKs) {
        watcherAll(pluginName);
    }
    zkClient.subscribeChildChanges(pluginParent, (parentPath, currentChildren) -> {
        if (CollectionUtils.isNotEmpty(currentChildren)) {
            for (String pluginName : currentChildren) {
                watcherAll(pluginName);
            }
        }
    });
}

watchAll会去监听插件,选择器,规则三种事件类型

private void watcherAll(final String pluginName) {
    watcherPlugin(pluginName);
    watcherSelector(pluginName);
    watcherRule(pluginName);
}

监听到后,根据插件发布订阅的模式,去本地缓存,处理内存数据


总结:

整体流程很简单,主要是依赖 zookeeper 的 watch 机制,soul-web 会监听配置的节点,soul-admin 在启动的时候,会将数据全量写入 zookeeper,后续数据发生变更时,会增量更新 zookeeper 的节点,与此同时,soul-web 会监听配置信息的节点,一旦有信息变更时,会更新本地缓存

zookeeper节点设计




版权声明:本文由星尘阁原创出品,转载请注明出处!

本文链接:http://www.52xingchen.cn/detail/64




赞助本站,网站的发展离不开你们的支持!
来说两句吧
大侠留个名吧,或者可以使用QQ登录。
: 您已登陆!可以继续留言。
最新评论