网站首页 文章专栏 soul源码解析(八) - zookeeper 长轮询数据同步过程
跟其他流程的方式一样,也是通过DataSyncConfiguration配置类,以及配置文件中数据同步策略,初始化,如有不懂参见:soul源码解析(七) - http 长轮询数据同步过程
@Configuration
@ConditionalOnProperty(prefix = "soul.sync.zookeeper", name = "url")
@Import(ZookeeperConfiguration.class)
static class ZookeeperListener {
/**
* Config event listener data changed listener.
*
* @param zkClient the zk client
* @return the data changed listener
*/
@Bean
@ConditionalOnMissingBean(ZookeeperDataChangedListener.class)
public DataChangedListener zookeeperDataChangedListener(final ZkClient zkClient) {
return new ZookeeperDataChangedListener(zkClient);
}
/**
* Zookeeper data init zookeeper data init.
*
* @param zkClient the zk client
* @param syncDataService the sync data service
* @return the zookeeper data init
*/
@Bean
@ConditionalOnMissingBean(ZookeeperDataInit.class)
public ZookeeperDataInit zookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) {
return new ZookeeperDataInit(zkClient, syncDataService);
}
}
在自动装配的过程中,依赖加载了ZookeeperConfiguration,而在改配置装载的时候,同时初始化了ZkClient
@EnableConfigurationProperties(ZookeeperProperties.class)
public class ZookeeperConfiguration {
@Bean
@ConditionalOnMissingBean(ZkClient.class)
public ZkClient zkClient(final ZookeeperProperties zookeeperProp) {
return new ZkClient(zookeeperProp.getUrl(), zookeeperProp.getSessionTimeout(), zookeeperProp.getConnectionTimeout());
}
}
ZookeeperListener 初始化加载了两个bean,分别是 ZookeeperDataChangedListener ,和 ZookeeperDataInit。
ZookeeperDataChangedListener 跟其他的listener一样,同样实现了 DataChangedListener 接口,用于具体的事件监听,并处理数据,加载该bean的时候,维护一个zkclient成员变量,通过zkclient进行操作数据
ZookeeperDataInit 类实现了 CommandLineRunner 接口,其作用就是项目启动后执行run方法,在其加载的时候,初始化一个 syncDataService,用于具体的数据增删改查
@Override
public void run(final String... args) {
String pluginPath = ZkPathConstants.PLUGIN_PARENT;
String authPath = ZkPathConstants.APP_AUTH_PARENT;
String metaDataPath = ZkPathConstants.META_DATA;
if (!zkClient.exists(pluginPath) && !zkClient.exists(authPath) && !zkClient.exists(metaDataPath)) {
syncDataService.syncAll(DataEventTypeEnum.REFRESH);
}
}拿到 数据的 path路径,判断zk中是否存在,不存在则进行全量缓存。走syncAll方法
@Override
public boolean syncAll(final DataEventTypeEnum type) {
appAuthService.syncData();
List pluginDataList = pluginService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));
List selectorDataList = selectorService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));
List ruleDataList = ruleService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));
metaDataService.syncData();
return true;
}该方法websocket也会走,会去数据库查询数据,并发布相应的事件。
DataChangedEventDispatcher 中的方法 onApplicationEvent 感知到发布的事件后,有具体加载的数据同步方式执行相应的方法,本文中用zk同步,也就是在 ZookeeperDataChangedListener 中执行。
public void onApplicationEvent(final DataChangedEvent event) {
for (DataChangedListener listener : listeners) {
switch (event.getGroupKey()) {
case APP_AUTH:
listener.onAppAuthChanged((List) event.getSource(), event.getEventType());
break;
case PLUGIN:
listener.onPluginChanged((List) event.getSource(), event.getEventType());
break;
case RULE:
listener.onRuleChanged((List) event.getSource(), event.getEventType());
break;
case SELECTOR:
listener.onSelectorChanged((List) event.getSource(), event.getEventType());
break;
case META_DATA:
listener.onMetaDataChanged((List) event.getSource(), event.getEventType());
break;
default:
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}ZookeeperDataChangedListener 中onPluginChanged方法,先去判断是否是删除时间,如果是删除事件,则直接删除,如果不是,则构建根路径
public void onPluginChanged(final List changed, final DataEventTypeEnum eventType) {
for (PluginData data : changed) {
// delete
if (eventType == DataEventTypeEnum.DELETE) {
String pluginPath = ZkPathConstants.buildPluginPath(data.getName());
if (zkClient.exists(pluginPath)) {
zkClient.deleteRecursive(pluginPath);
}
String selectorParentPath = ZkPathConstants.buildSelectorParentPath(data.getName());
if (zkClient.exists(selectorParentPath)) {
zkClient.deleteRecursive(selectorParentPath);
}
String ruleParentPath = ZkPathConstants.buildRuleParentPath(data.getName());
if (zkClient.exists(ruleParentPath)) {
zkClient.deleteRecursive(ruleParentPath);
}
continue;
}
// update
String pluginPath = ZkPathConstants.buildPluginPath(data.getName());
if (!zkClient.exists(pluginPath)) {
zkClient.createPersistent(pluginPath, true);
}
zkClient.writeData(pluginPath, data);
}
}然后判断zk是否存在,不存在,则创建节点,并循环向里面写数据。其他事件大同小异,流程差不多。
根据配置,初始化 ZookeeperSyncDataConfiguration 配置类,并加载 ZookeeperSyncDataService 和 ZkClient两个bean
@Configuration
@ConditionalOnClass(ZookeeperSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.zookeeper", name = "url")
@EnableConfigurationProperties(ZookeeperConfig.class)
@Slf4j
public class ZookeeperSyncDataConfiguration {
@Bean
public SyncDataService syncDataService(final ObjectProvider zkClient, final ObjectProvider pluginSubscriber,
final ObjectProvider metaSubscribers, final ObjectProvider authSubscribers) {
log.info("you use zookeeper sync soul data.......");
return new ZookeeperSyncDataService(zkClient.getIfAvailable(), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
@Bean
public ZkClient zkClient(final ZookeeperConfig zookeeperConfig) {
return new ZkClient(zookeeperConfig.getUrl(), zookeeperConfig.getSessionTimeout(), zookeeperConfig.getConnectionTimeout());
}
}初始化ZookeeperSyncDataService 时,就会执行watch方法
public ZookeeperSyncDataService(final ZkClient zkClient, final PluginDataSubscriber pluginDataSubscriber,
final List metaDataSubscribers, final List authDataSubscribers) {
this.zkClient = zkClient;
this.pluginDataSubscriber = pluginDataSubscriber;
this.metaDataSubscribers = metaDataSubscribers;
this.authDataSubscribers = authDataSubscribers;
watcherData();
watchAppAuth();
watchMetaData();
}查询指定节点的数据,如不存在,则创建,存在则watchAll
private void watcherData() {
final String pluginParent = ZkPathConstants.PLUGIN_PARENT;
if (!zkClient.exists(pluginParent)) {
zkClient.createPersistent(pluginParent, true);
}
List pluginZKs = zkClient.getChildren(ZkPathConstants.buildPluginParentPath());
for (String pluginName : pluginZKs) {
watcherAll(pluginName);
}
zkClient.subscribeChildChanges(pluginParent, (parentPath, currentChildren) -> {
if (CollectionUtils.isNotEmpty(currentChildren)) {
for (String pluginName : currentChildren) {
watcherAll(pluginName);
}
}
});
}watchAll会去监听插件,选择器,规则三种事件类型
private void watcherAll(final String pluginName) {
watcherPlugin(pluginName);
watcherSelector(pluginName);
watcherRule(pluginName);
}监听到后,根据插件发布订阅的模式,去本地缓存,处理内存数据
总结:
整体流程很简单,主要是依赖 zookeeper 的 watch 机制,soul-web 会监听配置的节点,soul-admin 在启动的时候,会将数据全量写入 zookeeper,后续数据发生变更时,会增量更新 zookeeper 的节点,与此同时,soul-web 会监听配置信息的节点,一旦有信息变更时,会更新本地缓存。

版权声明:本文由星尘阁原创出品,转载请注明出处!