网站首页 文章专栏 soul源码解析(八) - zookeeper 长轮询数据同步过程
跟其他流程的方式一样,也是通过DataSyncConfiguration配置类,以及配置文件中数据同步策略,初始化,如有不懂参见:soul源码解析(七) - http 长轮询数据同步过程
@Configuration
@ConditionalOnProperty(prefix = "soul.sync.zookeeper", name = "url")
@Import(ZookeeperConfiguration.class)
static class ZookeeperListener {
/**
* Config event listener data changed listener.
*
* @param zkClient the zk client
* @return the data changed listener
*/
@Bean
@ConditionalOnMissingBean(ZookeeperDataChangedListener.class)
public DataChangedListener zookeeperDataChangedListener(final ZkClient zkClient) {
return new ZookeeperDataChangedListener(zkClient);
}
/**
* Zookeeper data init zookeeper data init.
*
* @param zkClient the zk client
* @param syncDataService the sync data service
* @return the zookeeper data init
*/
@Bean
@ConditionalOnMissingBean(ZookeeperDataInit.class)
public ZookeeperDataInit zookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {
return new ZookeeperDataInit(zkClient, syncDataService);
}
}
在自动装配的过程中,依赖加载了ZookeeperConfiguration,而在改配置装载的时候,同时初始化了ZkClient
@EnableConfigurationProperties(ZookeeperProperties.class)
public class ZookeeperConfiguration {
@Bean
@ConditionalOnMissingBean(ZkClient.class)
public ZkClient zkClient(final ZookeeperProperties zookeeperProp) {
return new ZkClient(zookeeperProp.getUrl(), zookeeperProp.getSessionTimeout(), zookeeperProp.getConnectionTimeout());
}
}
ZookeeperListener 初始化加载了两个bean,分别是 ZookeeperDataChangedListener ,和 ZookeeperDataInit。
ZookeeperDataChangedListener 跟其他的listener一样,同样实现了 DataChangedListener 接口,用于具体的事件监听,并处理数据,加载该bean的时候,维护一个zkclient成员变量,通过zkclient进行操作数据
ZookeeperDataInit 类实现了 CommandLineRunner 接口,其作用就是项目启动后执行run方法,在其加载的时候,初始化一个 syncDataService,用于具体的数据增删改查
@Override public void run(final String... args) { String pluginPath = ZkPathConstants.PLUGIN_PARENT; String authPath = ZkPathConstants.APP_AUTH_PARENT; String metaDataPath = ZkPathConstants.META_DATA; if (!zkClient.exists(pluginPath) && !zkClient.exists(authPath) && !zkClient.exists(metaDataPath)) { syncDataService.syncAll(DataEventTypeEnum.REFRESH); } }
拿到 数据的 path路径,判断zk中是否存在,不存在则进行全量缓存。走syncAll方法
@Override public boolean syncAll(final DataEventTypeEnum type) { appAuthService.syncData(); List pluginDataList = pluginService.listAll(); eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList)); List selectorDataList = selectorService.listAll(); eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList)); List ruleDataList = ruleService.listAll(); eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList)); metaDataService.syncData(); return true; }
该方法websocket也会走,会去数据库查询数据,并发布相应的事件。
DataChangedEventDispatcher 中的方法 onApplicationEvent 感知到发布的事件后,有具体加载的数据同步方式执行相应的方法,本文中用zk同步,也就是在 ZookeeperDataChangedListener 中执行。
public void onApplicationEvent(final DataChangedEvent event) { for (DataChangedListener listener : listeners) { switch (event.getGroupKey()) { case APP_AUTH: listener.onAppAuthChanged((List) event.getSource(), event.getEventType()); break; case PLUGIN: listener.onPluginChanged((List) event.getSource(), event.getEventType()); break; case RULE: listener.onRuleChanged((List) event.getSource(), event.getEventType()); break; case SELECTOR: listener.onSelectorChanged((List) event.getSource(), event.getEventType()); break; case META_DATA: listener.onMetaDataChanged((List) event.getSource(), event.getEventType()); break; default: throw new IllegalStateException("Unexpected value: " + event.getGroupKey()); } } }
ZookeeperDataChangedListener 中onPluginChanged方法,先去判断是否是删除时间,如果是删除事件,则直接删除,如果不是,则构建根路径
public void onPluginChanged(final List changed, final DataEventTypeEnum eventType) { for (PluginData data : changed) { // delete if (eventType == DataEventTypeEnum.DELETE) { String pluginPath = ZkPathConstants.buildPluginPath(data.getName()); if (zkClient.exists(pluginPath)) { zkClient.deleteRecursive(pluginPath); } String selectorParentPath = ZkPathConstants.buildSelectorParentPath(data.getName()); if (zkClient.exists(selectorParentPath)) { zkClient.deleteRecursive(selectorParentPath); } String ruleParentPath = ZkPathConstants.buildRuleParentPath(data.getName()); if (zkClient.exists(ruleParentPath)) { zkClient.deleteRecursive(ruleParentPath); } continue; } // update String pluginPath = ZkPathConstants.buildPluginPath(data.getName()); if (!zkClient.exists(pluginPath)) { zkClient.createPersistent(pluginPath, true); } zkClient.writeData(pluginPath, data); } }
然后判断zk是否存在,不存在,则创建节点,并循环向里面写数据。其他事件大同小异,流程差不多。
根据配置,初始化 ZookeeperSyncDataConfiguration 配置类,并加载 ZookeeperSyncDataService 和 ZkClient两个bean
@Configuration @ConditionalOnClass(ZookeeperSyncDataService.class) @ConditionalOnProperty(prefix = "soul.sync.zookeeper", name = "url") @EnableConfigurationProperties(ZookeeperConfig.class) @Slf4j public class ZookeeperSyncDataConfiguration { @Bean public SyncDataService syncDataService(final ObjectProvider zkClient, final ObjectProvider pluginSubscriber, final ObjectProvider metaSubscribers, final ObjectProvider authSubscribers) { log.info("you use zookeeper sync soul data......."); return new ZookeeperSyncDataService(zkClient.getIfAvailable(), pluginSubscriber.getIfAvailable(), metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList)); } @Bean public ZkClient zkClient(final ZookeeperConfig zookeeperConfig) { return new ZkClient(zookeeperConfig.getUrl(), zookeeperConfig.getSessionTimeout(), zookeeperConfig.getConnectionTimeout()); } }
初始化ZookeeperSyncDataService 时,就会执行watch方法
public ZookeeperSyncDataService(final ZkClient zkClient, final PluginDataSubscriber pluginDataSubscriber, final List metaDataSubscribers, final List authDataSubscribers) { this.zkClient = zkClient; this.pluginDataSubscriber = pluginDataSubscriber; this.metaDataSubscribers = metaDataSubscribers; this.authDataSubscribers = authDataSubscribers; watcherData(); watchAppAuth(); watchMetaData(); }
查询指定节点的数据,如不存在,则创建,存在则watchAll
private void watcherData() { final String pluginParent = ZkPathConstants.PLUGIN_PARENT; if (!zkClient.exists(pluginParent)) { zkClient.createPersistent(pluginParent, true); } List pluginZKs = zkClient.getChildren(ZkPathConstants.buildPluginParentPath()); for (String pluginName : pluginZKs) { watcherAll(pluginName); } zkClient.subscribeChildChanges(pluginParent, (parentPath, currentChildren) -> { if (CollectionUtils.isNotEmpty(currentChildren)) { for (String pluginName : currentChildren) { watcherAll(pluginName); } } }); }
watchAll会去监听插件,选择器,规则三种事件类型
private void watcherAll(final String pluginName) { watcherPlugin(pluginName); watcherSelector(pluginName); watcherRule(pluginName); }
监听到后,根据插件发布订阅的模式,去本地缓存,处理内存数据
总结:
整体流程很简单,主要是依赖 zookeeper 的 watch 机制,soul-web 会监听配置的节点,soul-admin 在启动的时候,会将数据全量写入 zookeeper,后续数据发生变更时,会增量更新 zookeeper 的节点,与此同时,soul-web 会监听配置信息的节点,一旦有信息变更时,会更新本地缓存。
版权声明:本文由星尘阁原创出品,转载请注明出处!