当前位置: 首页 > news >正文

深入解析:Nacos AP 模式的实现原理与应用场景

优质博文:IT-BLOG-CN

一、CAP 基础

‌Nacos作为注册中心同时支持CPAP模式。‌ Nacos通过不同的协议和机制来实现这两种模式,以满足不同的需求场景。

Nacos中,默认情况下使用的是AP模式,通过Distro协议来实现。AP模式主要关注高可用性,在网络分区时仍然保持服务,但可能会允许短暂的数据不一致。

此外,Nacos也支持CP模式,通过Raft协议来实现。CP模式在网络分区时牺牲可用性以保证数据一致性,适用于对数据准确性要求高的场景,如金融行业。

具体实现上,Nacos通过客户端设置spring.cloud.nacos.discovery.ephemeral的值为false来启用CP模式,默认为true则为AP模式。此外,NacosgRPC通信端口和集群节点之间的通信端口也有所不同,分别用于APCP模式的实现‌。

二、Nacos AP 实现原理

Distro协议Distro是阿里巴巴开源的一个动态服务发现、配置管理和服务管理平台。目前流行的Nacos服务管理框架就采用了Distro协议。Distro协议被定位为临时数据的一致性协议 :该类型协议, 不需要把数据存储到磁盘或者数据库,因为临时数据通常和服务器保持一个session会话,该会话只要存在,数据就不会丢失。

Distro协议保证写必须永远是成功的,即使可能会发生网络分区。当网络恢复时,把各数据分片的数据进行合并。

Distro协议具有以下特点:
【1】数据分片Sharding
   ☑️ 将数据根据某种规则(如哈希)分片,每个节点负责一部分数据的存储和管理。
   ☑️ 这种分片策略可以有效地分散负载,避免单点瓶颈。

【2】数据复制Replication
   ☑️ 在每个节点上维护一份完整的数据副本,确保数据在节点之间的一致性。
   ☑️ 使用异步复制的方式,在数据更新时将更新信息广播给其他节点。

【3】一致性管理:
   ☑️ 采用最终一致性模型,确保数据在一定时间内达到一致。
   ☑️ 使用版本号或时间戳来管理数据的更新,确保数据的最新状态能够传播到所有节点。

【4】心跳检测和故障转移:
   ☑️ 定期进行心跳检测,确保节点的健康状态。
   ☑️ 在节点故障时,其他节点能够快速接管其数据和职责,确保系统的高可用性。

Distro协议服务端节点发现使用寻址机制来实现服务端节点的管理。在Nacos中,寻址模式有三种:

Nacos服务启动的时候ServerMemberManager这个类专门对集群节点进行管理的,这个类在init方法中就会对集群进行初始化

protected void init() throws NacosException {Loggers.CORE.info("Nacos-related cluster resource initialization");// 得到当前nacos服务的端口号this.port = EnvUtil.getProperty("server.port", Integer.class, 8848);// 得到当前nacos服务的地址this.localAddress = InetUtils.getSelfIP() + ":" + port;// 解析地址得到当前nacos服务所对应的集群节点对象this.self = MemberUtil.singleParse(this.localAddress);// 给当前nacos服务设置一个版本号this.self.setExtendVal(MemberMetaDataConstants.VERSION, VersionUtils.version);// 把自己放到serverList中serverList.put(self.getAddress(), self);// 该方法做了两件事// 1.注册了一个集群节点信息变更事件// 2.注册了订阅IPChangeEvent事件的事件订阅者registerClusterEvent();// 初始化节点地址寻址模式// 在这里就可以通过配置的节点地址去初始化整个nacos集群节点集合了initAndStartLookup();if (serverList.isEmpty()) {throw new NacosException(NacosException.SERVER_ERROR, "cannot get serverlist, so exit.");}Loggers.CORE.info("The cluster resource is initialized");
}

注册了订阅IPChangeEvent事件的事件订阅者registerClusterEvent:该方法做了两件事:
【1】注册了一个集群节点信息变更事件。
【2】注册了订阅IPChangeEvent事件的事件订阅者。

private void registerClusterEvent() {// 注册一个集群节点信息变更事件NotifyCenter.registerToPublisher(MembersChangeEvent.class,EnvUtil.getProperty("nacos.member-change-event.queue.size", Integer.class, 128));// 注册一个事件订阅者,订阅的事件类型是IPChangeEventNotifyCenter.registerSubscriber(new Subscriber<InetUtils.IPChangeEvent>() {@Overridepublic void onEvent(InetUtils.IPChangeEvent event) {String newAddress = event.getNewIP() + ":" + port;ServerMemberManager.this.localAddress = newAddress;EnvUtil.setLocalAddress(localAddress);Member self = ServerMemberManager.this.self;self.setIp(event.getNewIP());String oldAddress = event.getOldIP() + ":" + port;ServerMemberManager.this.serverList.remove(oldAddress);ServerMemberManager.this.serverList.put(newAddress, self);ServerMemberManager.this.memberAddressInfos.remove(oldAddress);ServerMemberManager.this.memberAddressInfos.add(newAddress);}@Overridepublic Class<? extends Event> subscribeType() {return InetUtils.IPChangeEvent.class;}});
}

通过上述方法分析可知:注册一个MembersChangeEvent事件,而对应的事件订阅者是ServerListManager。同时该方法还会注册一个IPChangeEvent事件的事件订阅者,IPChangeEvent这个事件就是当前节点IP发生变更之后发布的,该事件发布之后会被这个注册的订阅者所捕获,该订阅者做的事情也很简单,就是对集群节点集合中对应当前节点的ip进行更新就行了。

这一行代码就是初始化集群的关键: 创建nacos集群节点寻址器

private void initAndStartLookup() throws NacosException {this.lookup = LookupFactory.createLookUp(this);this.lookup.start();
}

先是通过LookupFactory创建一个节点寻址器,然后调用start方法启动这个节点寻址器。

通过上图可知Nacos配置集群节点地址的时候有两种方式:读取本地配置文件通过配置服务器

/*** 创建nacos集群节点寻址器** @param memberManager {@link ServerMemberManager}* @return {@link MemberLookup}* @throws NacosException NacosException*/
public static MemberLookup createLookUp(ServerMemberManager memberManager) throws NacosException {// 条件成立:当前nacos节点是集群模式if (!EnvUtil.getStandaloneMode()) {// 从配置环境中获取nacos集群节点的寻址方式String lookupType = EnvUtil.getProperty(LOOKUP_MODE_TYPE);LookupType type = chooseLookup(lookupType);// 得到对应的节点寻址器,FileConfigMemberLookup / AddressServerMemberLookupLOOK_UP = find(type);currentLookupType = type;}// 条件成立:当前nacos节点是单机模式else {LOOK_UP = new StandaloneMemberLookup();}LOOK_UP.injectMemberManager(memberManager);Loggers.CLUSTER.info("Current addressing mode selection : {}", LOOK_UP.getClass().getSimpleName());return LOOK_UP;
}

创建nacos集群节点寻址器(参考流程图)

/*** 创建nacos集群节点寻址器** @param memberManager {@link ServerMemberManager}* @return {@link MemberLookup}* @throws NacosException NacosException*/
public static MemberLookup createLookUp(ServerMemberManager memberManager) throws NacosException {// 条件成立:当前nacos节点是集群模式if (!EnvUtil.getStandaloneMode()) {// 从配置环境中获取nacos集群节点的寻址方式String lookupType = EnvUtil.getProperty(LOOKUP_MODE_TYPE);LookupType type = chooseLookup(lookupType);// 得到对应的节点寻址器,FileConfigMemberLookup / AddressServerMemberLookupLOOK_UP = find(type);currentLookupType = type;}// 条件成立:当前nacos节点是单机模式else {LOOK_UP = new StandaloneMemberLookup();}LOOK_UP.injectMemberManager(memberManager);Loggers.CLUSTER.info("Current addressing mode selection : {}", LOOK_UP.getClass().getSimpleName());return LOOK_UP;
}

怎么确定是使用FileConfigMemberLookup还是AddressServerMemberLookup,主要看是否配置了lookupType,如果没有配置lookupType就按照默认的先看是否配置了集群配置文件。

private static LookupType chooseLookup(String lookupType) {if (StringUtils.isNotBlank(lookupType)) {LookupType type = LookupType.sourceOf(lookupType);if (Objects.nonNull(type)) {return type;}}// 代码来到这里说明没有配置lookupType,此时会默认去寻找user.home/nacos/conf/cluster.conf文件File file = new File(EnvUtil.getClusterConfFilePath());// 条件成立:集群配置文件存在,或者环境变量配置了集群节点地址if (file.exists() || StringUtils.isNotBlank(EnvUtil.getMemberList())) {// 返回文件寻址模式return LookupType.FILE_CONFIG;}// 返回服务器寻址模式return LookupType.ADDRESS_SERVER;
}

确定了对应的节点寻址器,FileConfigMemberLookup / AddressServerMemberLookup后获取对应的节点地址。如果当前节点是集群模式,那么会去判断${user.home}/nacos/conf/cluster.conf这个文件是否存在或者环境变量中是否配置了集群节点地址,如果两者有一个成立就是文件寻址模式,反之是服务器寻址模式。

private static MemberLookup find(LookupType type) {// 条件成立:集群配置方式是文件配置的方式if (LookupType.FILE_CONFIG.equals(type)) {// 创建一个FileConfigMemberLookup对象并返回LOOK_UP = new FileConfigMemberLookup();return LOOK_UP;}// 条件成立:集群配置方式是通过服务器获取节点地址的方式if (LookupType.ADDRESS_SERVER.equals(type)) {LOOK_UP = new AddressServerMemberLookup();return LOOK_UP;}// unpossible to run herethrow new IllegalArgumentException();
}

如何初始化集群:
【1】单机模式: StandaloneMemberLookup
【2】文件模式: FileConfigMemberLookup利用监控cluster.conf文件的变动实现节点的管理。核心代码如下:

public class FileConfigMemberLookup extends AbstractMemberLookup {/*** 文件监听回调*/private FileWatcher watcher = new FileWatcher() {/*** 当对应的目录或者文件发生变更的时候会回调该方法* @param event {@link FileChangeEvent}*/@Overridepublic void onChange(FileChangeEvent event) {readClusterConfFromDisk();}@Overridepublic boolean interest(String context) {return StringUtils.contains(context, "cluster.conf");}};@Overridepublic void start() throws NacosException {if (start.compareAndSet(false, true)) {// 从文件中读取集群节点地址readClusterConfFromDisk();try {// 使用notify机制监控文件的变化,并自动触发读取cluster.confWatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);} catch (Throwable e) {Loggers.CLUSTER.error("An exception occurred in the launch file monitor : {}", e.getMessage());}}}@Overridepublic void destroy() throws NacosException {WatchFileCenter.deregisterWatcher(EnvUtil.getConfPath(), watcher);}/*** 从cluster.conf文件中读取集群节点地址*/private void readClusterConfFromDisk() {Collection<Member> tmpMembers = new ArrayList<>();try {// 获取到cluster.conf文件中配置的节点地址列表List<String> tmp = EnvUtil.readClusterConf();// 把这些节点地址分别转换成对应的集群节点对象tmpMembers = MemberUtil.readServerConf(tmp);} catch (Throwable e) {Loggers.CLUSTER.error("nacos-XXXX [serverlist] failed to get serverlist from disk!, error : {}", e.getMessage());}afterLookup(tmpMembers);}
}

start方法中先调用readClusterConfFromDisk方法,这个方法会读取${user.home}/nacos/conf/cluster.conf这个文件中配置的节点地址,读取到之后把这些节点地址转化为对应的Member对象,一个Member对象就代表一个节点,接着会调用父类AbstractMemberLookupafterLookup方法。

public void afterLookup(Collection<Member> members) {this.memberManager.memberChange(members);
}

调用的是集群节点管理器的menberChange方法,同时把上面从cluster.conf文件中读取到的Member节点集合作为方法参数:

/*** 当nacos节点启动 或者 每次配置的集群节点地址发生改变的时候就会调用到该方法* @param members   当前最新的集群节点地址* @return  返回true表示集群节点数量发生改变了,反之表示没改变*/
synchronized boolean memberChange(Collection<Member> members) {if (members == null || members.isEmpty()) {return false;}// 配置的集群节点地址是否包含当前nacos节点boolean isContainSelfIp = members.stream().anyMatch(ipPortTmp -> Objects.equals(localAddress, ipPortTmp.getAddress()));if (isContainSelfIp) {isInIpList = true;} else {isInIpList = false;members.add(this.self);Loggers.CLUSTER.warn("[serverlist] self ip {} not in serverlist {}", self, members);}// If the number of old and new clusters is different, the cluster information// must have changed; if the number of clusters is the same, then compare whether// there is a difference; if there is a difference, then the cluster node changes// are involved and all recipients need to be notified of the node change eventboolean hasChange = members.size() != serverList.size();ConcurrentSkipListMap<String, Member> tmpMap = new ConcurrentSkipListMap<>();Set<String> tmpAddressInfo = new ConcurrentHashSet<>();for (Member member : members) {final String address = member.getAddress();if (!serverList.containsKey(address)) {hasChange = true;// 如果cluster.conf或address-server中的cluster信息被更改,而对应的nacos-server还没有启动,则成员的状态应该设置为DOWN,// 如果相应的nacos-server已经启动,则在几秒钟后检测到该成员的状态将被设置为UPmember.setState(NodeState.DOWN);} else {//fix issue # 4925member.setState(serverList.get(address).getState());}// Ensure that the node is created only oncetmpMap.put(address, member);if (NodeState.UP.equals(member.getState())) {tmpAddressInfo.add(address);}}// 更新serverList为最新的集群节点集合serverList = tmpMap;// 更新memberAddressInfos为最新的集群节点地址memberAddressInfos = tmpAddressInfo;// 获取更新之后集群所有的节点对象Collection<Member> finalMembers = allMembers();Loggers.CLUSTER.warn("[serverlist] updated to : {}", finalMembers);// Persist the current cluster node information to cluster.conf// <important> need to put the event publication into a synchronized block to ensure// that the event publication is sequential// 条件成立:1.集群中有节点增加了//          2.集群中有节点下线了//          3.手动增加或者删除了节点配置地址信息if (hasChange) {// 把最新的节点写入到配置中(cluster.conf或者address-server)MemberUtil.syncToFile(finalMembers);// 发布一个MembersChangeEvent事件Event event = MembersChangeEvent.builder().members(finalMembers).build();NotifyCenter.publishEvent(event);}return hasChange;
}

如果是集群节点的数量发生改变的话,就会发布一个MembersChangeEvent事件,而这个事件对应的订阅者是ServerListManager这个类,在这个类中也保存了整个nacos集群所有的节点集合,在回调它的订阅方法时很简单就是把这个集合属性重新赋值,代码如下:

/*** 当集群中的其他节点发生变化的时候会当前nacos节点就会发布一个MembersChangeEvent事件,然后会调用该方法更新最新的集群信息集合* @param event MembersChangeEvent*/
@Override
public void onEvent(MembersChangeEvent event) {// 把最新的集群节点集合重新赋值到serversthis.servers = new ArrayList<>(event.getMembers());
}

读取配置文件的过程已经分析完后,对cluster.conf文件注册一个监听器,当cluster.conf文件发生变更时就会触发FileWatcher中的onChange回调方法,onChange回调方法会再次调用上面说的readClusterConfFromDisk方法重新读取一遍cluster.conf文件中的节点地址。

// 使用notify机制监控文件的变化,并自动触发读取cluster.conf
WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);public static synchronized boolean registerWatcher(final String paths, FileWatcher watcher) throws NacosException {checkState();if (NOW_WATCH_JOB_CNT == MAX_WATCH_FILE_JOB) {return false;}WatchDirJob job = MANAGER.get(paths);if (job == null) {job = new WatchDirJob(paths);job.start();MANAGER.put(paths, job);NOW_WATCH_JOB_CNT++;}job.addSubscribe(watcher);return true;
}/*** 文件监听回调*/
private FileWatcher watcher = new FileWatcher() {/*** 当对应的目录或者文件发生变更的时候会回调该方法* @param event {@link FileChangeEvent}*/@Overridepublic void onChange(FileChangeEvent event) {readClusterConfFromDisk();}@Overridepublic boolean interest(String context) {return StringUtils.contains(context, "cluster.conf");}
};

【3】服务器模式: AddressServerMemberLookup使用地址服务器存储节点信息,服务端节点定时拉取信息进行管理

核心代码: 使用地址服务器存储节点信息,会创建AddressServerMemberLookup,服务端定时拉取信息进行管理;

public class AddressServerMemberLookup extends AbstractMemberLookup {private final GenericType<RestResult<String>> genericType = new GenericType<RestResult<String>>() {};public String domainName;public String addressPort;public String addressUrl;public String envIdUrl;public String addressServerUrl;private volatile boolean isAddressServerHealth = true;private int addressServerFailCount = 0;private int maxFailCount = 12;private final NacosRestTemplate restTemplate = HttpClientBeanHolder.getNacosRestTemplate(Loggers.CORE);private volatile boolean shutdown = false;@Overridepublic void start() throws NacosException {if (start.compareAndSet(false, true)) {this.maxFailCount = Integer.parseInt(EnvUtil.getProperty("maxHealthCheckFailCount", "12"));initAddressSys();run();}}/**** 获取服务器地址*/private void initAddressSys() {String envDomainName = System.getenv("address_server_domain");if (StringUtils.isBlank(envDomainName)) {domainName = EnvUtil.getProperty("address.server.domain", "jmenv.tbsite.net");} else {domainName = envDomainName;}String envAddressPort = System.getenv("address_server_port");if (StringUtils.isBlank(envAddressPort)) {addressPort = EnvUtil.getProperty("address.server.port", "8080");} else {addressPort = envAddressPort;}String envAddressUrl = System.getenv("address_server_url");if (StringUtils.isBlank(envAddressUrl)) {addressUrl = EnvUtil.getProperty("address.server.url", EnvUtil.getContextPath() + "/" + "serverlist");} else {addressUrl = envAddressUrl;}addressServerUrl = "http://" + domainName + ":" + addressPort + addressUrl;envIdUrl = "http://" + domainName + ":" + addressPort + "/env";Loggers.CORE.info("ServerListService address-server port:" + addressPort);Loggers.CORE.info("ADDRESS_SERVER_URL:" + addressServerUrl);}@SuppressWarnings("PMD.UndefineMagicConstantRule")private void run() throws NacosException {// With the address server, you need to perform a synchronous member node pull at startup// Repeat three times, successfully jump outboolean success = false;Throwable ex = null;int maxRetry = EnvUtil.getProperty("nacos.core.address-server.retry", Integer.class, 5);for (int i = 0; i < maxRetry; i++) {try {//拉取集群节点信息syncFromAddressUrl();success = true;break;} catch (Throwable e) {ex = e;Loggers.CLUSTER.error("[serverlist] exception, error : {}", ExceptionUtil.getAllExceptionMsg(ex));}}if (!success) {throw new NacosException(NacosException.SERVER_ERROR, ex);}//创建定时任务GlobalExecutor.scheduleByCommon(new AddressServerSyncTask(), 5_000L);}@Overridepublic void destroy() throws NacosException {shutdown = true;}@Overridepublic Map<String, Object> info() {Map<String, Object> info = new HashMap<>(4);info.put("addressServerHealth", isAddressServerHealth);info.put("addressServerUrl", addressServerUrl);info.put("envIdUrl", envIdUrl);info.put("addressServerFailCount", addressServerFailCount);return info;}private void syncFromAddressUrl() throws Exception {RestResult<String> result = restTemplate.get(addressServerUrl, Header.EMPTY, Query.EMPTY, genericType.getType());if (result.ok()) {isAddressServerHealth = true;Reader reader = new StringReader(result.getData());try {afterLookup(MemberUtil.readServerConf(EnvUtil.analyzeClusterConf(reader)));} catch (Throwable e) {Loggers.CLUSTER.error("[serverlist] exception for analyzeClusterConf, error : {}",ExceptionUtil.getAllExceptionMsg(e));}addressServerFailCount = 0;} else {addressServerFailCount++;if (addressServerFailCount >= maxFailCount) {isAddressServerHealth = false;}Loggers.CLUSTER.error("[serverlist] failed to get serverlist, error code {}", result.getCode());}}// 定时任务class AddressServerSyncTask implements Runnable {@Overridepublic void run() {if (shutdown) {return;}try {//拉取服务列表syncFromAddressUrl();} catch (Throwable ex) {addressServerFailCount++;if (addressServerFailCount >= maxFailCount) {isAddressServerHealth = false;}Loggers.CLUSTER.error("[serverlist] exception, error : {}", ExceptionUtil.getAllExceptionMsg(ex));} finally {GlobalExecutor.scheduleByCommon(this, 5_000L);}}}
}

初始全量同步: Distro协议节点启动时会从其他节点全量同步数据。在Nacos中,整体流程如下:
【1】启动一个定时任务线程DistroLoadDataTask加载数据,调用load()方法加载数据。

/**** 数据加载过程*/
@Override
public void run() {try {//加载数据load();if (!checkCompleted()) {GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());} else {loadCallback.onSuccess();Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");}} catch (Exception e) {loadCallback.onFailed(e);Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);}
}/**** 加载数据,并同步* @throws Exception*/
private void load() throws Exception {while (memberManager.allMembersWithoutSelf().isEmpty()) {Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");TimeUnit.SECONDS.sleep(1);}while (distroComponentHolder.getDataStorageTypes().isEmpty()) {Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");TimeUnit.SECONDS.sleep(1);}//同步数据for (String each : distroComponentHolder.getDataStorageTypes()) {if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {//从远程机器上同步所有数据loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));}}
}

【2】调用loadAllDataSnapshotFromRemote()方法从远程机器同步所有的数据。

/**** 从远程机器上同步所有数据*/
private boolean loadAllDataSnapshotFromRemote(String resourceType) {DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);if (null == transportAgent || null == dataProcessor) {Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}",resourceType, transportAgent, dataProcessor);return false;}//遍历集群成员节点,不包括自己for (Member each : memberManager.allMembersWithoutSelf()) {try {Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());//从远程节点加载数据,调用http请求接口: distro/datums;DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());//处理数据boolean result = dataProcessor.processSnapshot(distroData);Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(),result);if (result) {return true;}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);}}return false;
}

【3】从namingProxy代理获取所有的数据data
 ● 构造http请求,调用httpGet方法从指定的server获取数据。
 ● 从获取的结果result中获取数据bytes

/**** 从namingProxy代理获取所有的数据data,从获取的结果result中获取数据bytes;* @param targetServer target server.* @return*/
@Override
public DistroData getDatumSnapshot(String targetServer) {try {//从namingProxy代理获取所有的数据data,从获取的结果result中获取数据bytes;byte[] allDatum = NamingProxy.getAllData(targetServer);//将数据封装成DistroDatareturn new DistroData(new DistroKey("snapshot", KeyBuilder.INSTANCE_LIST_KEY_PREFIX), allDatum);} catch (Exception e) {throw new DistroException(String.format("Get snapshot from %s failed.", targetServer), e);}
}/*** Get all datum from target server.* NamingProxy.getAllData* 执行HttpGet请求,并获取返回数据* @param server target server address* @return all datum byte array* @throws Exception exception*/
public static byte[] getAllData(String server) throws Exception {//参数封装Map<String, String> params = new HashMap<>(8);//组装URL,并执行HttpGet请求,获取结果集RestResult<String> result = HttpClient.httpGet("http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL,new ArrayList<>(), params);//返回数据if (result.ok()) {return result.getData().getBytes();}throw new IOException("failed to req API: " + "http://" + server + EnvUtil.getContextPath()+ UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL + ". code: " + result.getCode() + " msg: "+ result.getMessage());
}

【4】处理数据同步到本地processData
 ● 从data反序列化出datumMap
 ● 把数据存储到dataStore,也就是本地缓存dataMap
 ● 监听器不包括key,就创建一个空的service,并且绑定监听器。

/*** 数据处理并更新本地缓存* @param data* @return* @throws Exception*/
private boolean processData(byte[] data) throws Exception {if (data.length > 0) {//从data反序列化出datumMapMap<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class);// 把数据存储到dataStore,也就是本地缓存dataMapfor (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {dataStore.put(entry.getKey(), entry.getValue());//监听器不包括key,就创建一个空的service,并且绑定监听器if (!listeners.containsKey(entry.getKey())) {// pretty sure the service not exist:if (switchDomain.isDefaultInstanceEphemeral()) {// create empty service//创建一个空的serviceLoggers.DISTRO.info("creating service {}", entry.getKey());Service service = new Service();String serviceName = KeyBuilder.getServiceName(entry.getKey());String namespaceId = KeyBuilder.getNamespace(entry.getKey());service.setName(serviceName);service.setNamespaceId(namespaceId);service.setGroupName(Constants.DEFAULT_GROUP);// now validate the service. if failed, exception will be thrownservice.setLastModifiedMillis(System.currentTimeMillis());service.recalculateChecksum();// The Listener corresponding to the key value must not be empty// 与键值对应的监听器不能为空,这里的监听器类型是 ServiceManagerRecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek();if (Objects.isNull(listener)) {return false;}//为空的绑定监听器listener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);}}}//循环所有datumMapfor (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {if (!listeners.containsKey(entry.getKey())) {// Should not happen:Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());continue;}try {//执行监听器的onChange监听方法for (RecordListener listener : listeners.get(entry.getKey())) {listener.onChange(entry.getKey(), entry.getValue().value);}} catch (Exception e) {Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);continue;}// Update data store if listener executed successfully:// 监听器listener执行成功后,就更新dataStoredataStore.put(entry.getKey(), entry.getValue());}}return true;
}

【5】监听器listener执行成功后,就更新data store

增量同步: 新增数据使用异步广播同步:服务注册的InstanceController.register()就是数据入口,它会调用ServiceManager.registerInstance(),执行数据同步的时候,调用addInstance(),在该方法中会执行DistroConsistencyServiceImpl.put(),该方法是增量同步的入口,会调用distroProtocol.sync()方法,代码如下:

/**** 数据保存* @param key   key of data, this key should be globally unique* @param value value of data* @throws NacosException*/
@Override
public void put(String key, Record value) throws NacosException {//将数据存入到dataStore中onPut(key, value);//使用distroProtocol同步数据distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX),DataOperation.CHANGE,globalConfig.getTaskDispatchPeriod() / 2);
}

【1】DistroProtocol使用sync()方法接收增量数据

public void sync(DistroKey distroKey, DataOperation action, long delay) {//向除了自己外的所有节点广播for (Member each : memberManager.allMembersWithoutSelf()) {DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),each.getAddress());DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);//从distroTaskEngineHolder获取延时执行引擎,并将distroDelayTask任务添加进来//执行延时任务发布distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());}}
}

【2】向其他节点发布广播任务:调用distroTaskEngineHolder发布延迟任务

/**** 构造函数指定任务处理器* @param distroComponentHolder*/
public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);//指定任务处理器defaultDelayTaskProcessordelayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
}

【3】调用DistroDelayTaskProcessor.process()方法进行任务投递:将延迟任务转换为异步变更任务

/**** 任务处理过程* @param task     task.* @return*/
@Override
public boolean process(NacosTask task) {if (!(task instanceof DistroDelayTask)) {return true;}DistroDelayTask distroDelayTask = (DistroDelayTask) task;DistroKey distroKey = distroDelayTask.getDistroKey();if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {//将延迟任务变更成异步任务,异步任务对象是一个线程DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);//将任务添加到NacosExecuteTaskExecuteEngine中,并执行distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);return true;}return false;
}

【4】执行变更任务DistroSyncChangeTask.run()方法:向指定节点发送消息

/**** 执行数据同步*/
@Override
public void run() {Loggers.DISTRO.info("[DISTRO-START] {}", toString());try {//获取本地缓存数据String type = getDistroKey().getResourceType();DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());distroData.setType(DataOperation.CHANGE);//向其他节点同步数据boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());if (!result) {handleFailedTask();}Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);} catch (Exception e) {Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);handleFailedTask();}
}

 ● 调用DistroHttpAgent.syncData()方法发送数据
 ● 调用NamingProxy.syncData()方法发送数据

/**** 向其他节点同步数据* @param data         data* @param targetServer target server* @return*/
@Override
public boolean syncData(DistroData data, String targetServer) {if (!memberManager.hasMember(targetServer)) {return true;}//获取数据字节数组byte[] dataContent = data.getContent();//通过Http协议同步数据return NamingProxy.syncData(dataContent, data.getDistroKey().getTargetServer());
}

【5】异常任务调用handleFailedTask()方法进行处理
 ● 调用DistroFailedTaskHandler处理失败任务
 ● 调用DistroHttpCombinedKeyTaskFailedHandler将失败任务重新投递成延迟任务

private void handleFailedTask() {String type = getDistroKey().getResourceType();DistroFailedTaskHandler failedTaskHandler = distroComponentHolder.findFailedTaskHandler(type);if (null == failedTaskHandler) {return;}// 重试失败任务failedTaskHandler.retry(getDistroKey(), ApplyAction.CHANGE);
}

http://www.mrgr.cn/news/55054.html

相关文章:

  • 【MyBatis-Plus系列】QueryWrapper中or的使用
  • linux使用df与du命令查看磁盘大小不一致问题
  • 把其他.ui文件拿到我的工程中使用
  • JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
  • 计算机毕业设计 | SSM 校园线上订餐系统(附源码)
  • 数组的初始化,参数传递,和求和
  • ACM与蓝桥杯竞赛指南 基本输入输出格式一
  • Linux命令进阶·vi\vim编辑器详细命令介绍
  • 【Java知识】java基础-开发一个自定义注解
  • Lesson10---list
  • 江恩理论(Gann Theory)
  • SQL Injection | SQL 注入 —— 报错盲注
  • 【AIGC】智能创作的革新与未来展望
  • 基于模型设计的智能平衡移动机器人-基础实验SCI
  • Win11右键默认显示更多选项
  • 从零开始搭建一个高可用的HBase集群
  • C++ -string -常见用法5
  • DS快速排序和归并排序的非递归实现(16)
  • 【Javaee】网络编程-TCP Socket
  • Linux常用命令详细解析(含完整命令演示过程)
  • windows C++ 有效利用异步代理库(二)
  • 上海市货运资格证二寸照片要求及手机拍照方法
  • C++编程语言:抽象机制:运算符重载(Bjarne Stroustrup)
  • PostgreSQL模板数据库template0和template1的异同点
  • 033 商品搜索
  • 音视频入门基础:FLV专题(17)——FFmpeg源码中,提取Video Tag的VIDEODATA的实现