nacos单机服务注册源码解析
目录
1 依赖引入
2 客户端源码
2.1 入口
2.2 监听spring启动事件
2.3 注册流程
2.4 总结
3 服务端源码
3.1 接受http请求
3.2 从阻塞队列取出异步注册
3.3 总结
4 nacos注册高性能优化
1 依赖引入
<properties><java.version>8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.3.2.RELEASE</spring-boot.version><spring-cloud-alibaba.version>2.2.5.RELEASE</spring-cloud-alibaba.version><spring-cloud.version>Hoxton.SR8</spring-cloud.version>
</properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>
2 客户端源码
2.1 入口
核心类:NacosServiceRegistryAutoConfiguration 由springboot自动装配导入
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(NacosServiceRegistry registry,AutoServiceRegistrationProperties autoServiceRegistrationProperties,NacosRegistration registration) {//1 核心类 return new NacosAutoServiceRegistration(registry,autoServiceRegistrationProperties, registration);
}
2.2 监听spring启动事件
核心类:NacosAutoServiceRegistration 实现ApplicationListener接口 监听spring的WebServerInitializedEvent事件 当spring启动完成 即可调用监听方法
调用的是NacosAutoServiceRegistration的父类:AbstractAutoServiceRegistration
核心方法:onApplicationEvent
public void onApplicationEvent(WebServerInitializedEvent event) {//1 绑定关系 监听到spring启动完成事件bind(event);
}public void bind(WebServerInitializedEvent event) {ApplicationContext context = event.getApplicationContext();if (context instanceof ConfigurableWebServerApplicationContext) {if ("management".equals(((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {return;}}this.port.compareAndSet(0, event.getWebServer().getPort());//1 开始执行注册流程this.start();
}public void start() {if (!isEnabled()) {if (logger.isDebugEnabled()) {logger.debug("Discovery Lifecycle disabled. Not starting");}return;}// only initialize if nonSecurePort is greater than 0 and it isn't already running// because of containerPortInitializer belowif (!this.running.get()) {this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration()));//1 注册方法 register();if (shouldRegisterManagement()) {registerManagement();}this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));this.running.compareAndSet(false, true);}}
2.3 注册流程
核心类:NacosServiceRegistry 核心方法:register
public void register(Registration registration) {if (StringUtils.isEmpty(registration.getServiceId())) {log.warn("No service to register for nacos client...");return;}//1 获取服务信息NamingService namingService = namingService();String serviceId = registration.getServiceId();String group = nacosDiscoveryProperties.getGroup();Instance instance = getNacosInstanceFromRegistration(registration);try {//2 注册服务namingService.registerInstance(serviceId, group, instance);log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,instance.getIp(), instance.getPort());}catch (Exception e) {log.error("nacos registry, {} register failed...{},", serviceId,registration.toString(), e);// rethrow a RuntimeException if the registration is failed.// issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132rethrowRuntimeException(e);}
}public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {//1 校验服务参数NamingUtils.checkInstanceIsLegal(instance);String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);//2 临时实例添加心跳if (instance.isEphemeral()) {BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);beatReactor.addBeatInfo(groupedServiceName, beatInfo);}//3 注册服务serverProxy.registerService(groupedServiceName, groupName, instance);
}public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,instance);//1 组装服务信息final Map<String, String> params = new HashMap<String, String>(16);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, serviceName);params.put(CommonParams.GROUP_NAME, groupName);params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());params.put("ip", instance.getIp());params.put("port", String.valueOf(instance.getPort()));params.put("weight", String.valueOf(instance.getWeight()));params.put("enable", String.valueOf(instance.isEnabled()));params.put("healthy", String.valueOf(instance.isHealthy()));params.put("ephemeral", String.valueOf(instance.isEphemeral()));params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));//2 通过http调用服务端进行注册reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);}
2.4 总结
-
首先通过springboot的自动装配 引入一个监听器 该监听器会监听spring启动完成的事件
-
spring启动完成后 开始执行注册流程
-
首先组装必要参数 封装成instance对象
-
然后通过http的方式调用服务端完成注册
3 服务端源码
源码版本:1.4.1
3.1 接受http请求
核心类:InstanceController 核心方法:register
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);//1 解析实例数据final Instance instance = parseInstance(request);//2 注册服务serviceManager.registerInstance(namespaceId, serviceName, instance);return "ok";
}public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {//1 创建一个空服务 如果不存在就直接创建 存在就什么都不做createEmptyService(namespaceId, serviceName, instance.isEphemeral());//2 获取内存中存储该服务的相关信息Service service = getService(namespaceId, serviceName);if (service == null) {throw new NacosException(NacosException.INVALID_PARAM,"service not found, namespace: " + namespaceId + ", service: " + serviceName);}//3 添加服务addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)throws NacosException {//1 创建服务的唯一标识String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);//2 获取服务Service service = getService(namespaceId, serviceName);synchronized (service) {//3 获取该服务的所有实例List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);Instances instances = new Instances();instances.setInstanceList(instanceList);//4 注册服务consistencyService.put(key, instances);}
}//实现类:DelegateConsistencyServiceImpl.java
public void put(String key, Record value) throws NacosException {mapConsistencyService(key).put(key, value);
}//实现类:DistroConsistencyServiceImpl.java(临时实例)
public void put(String key, Record value) throws NacosException {//服务注册onPut(key, value);distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,globalConfig.getTaskDispatchPeriod() / 2);
}public void onPut(String key, Record value) {//1 暂存实例信息if (KeyBuilder.matchEphemeralInstanceListKey(key)) {Datum<Instances> datum = new Datum<>();datum.value = (Instances) value;datum.key = key;datum.timestamp.incrementAndGet();dataStore.put(key, datum);}if (!listeners.containsKey(key)) {return;}//2 添加任务notifier.addTask(key, DataOperation.CHANGE);
}public void addTask(String datumKey, DataOperation action) {if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {return;}if (action == DataOperation.CHANGE) {services.put(datumKey, StringUtils.EMPTY);}//1 将实例信息存放在阻塞队列tasks.offer(Pair.with(datumKey, action));
}
3.2 从阻塞队列取出异步注册
核心类:DistroConsistencyServiceImpl.java 核心方法:run
public void run() {Loggers.DISTRO.info("distro notifier started");for (; ; ) {try {//1 取出需要注册的实例信息Pair<String, DataOperation> pair = tasks.take();//2 注册handle(pair);} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);}}
}private void handle(Pair<String, DataOperation> pair) {try {String datumKey = pair.getValue0();DataOperation action = pair.getValue1();services.remove(datumKey);int count = 0;if (!listeners.containsKey(datumKey)) {return;}for (RecordListener listener : listeners.get(datumKey)) {count++;try {if (action == DataOperation.CHANGE) {//1 执行注册listener.onChange(datumKey, dataStore.get(datumKey).value);continue;}if (action == DataOperation.DELETE) {listener.onDelete(datumKey);continue;}} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);}}if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",datumKey, count, action.name());}} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);}
}//实现类:service
public void onChange(String key, Instances value) throws Exception {Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);//1 参数校验for (Instance instance : value.getInstanceList()) {if (instance == null) {// Reject this abnormal instance list:throw new RuntimeException("got null instance " + key);}if (instance.getWeight() > 10000.0D) {instance.setWeight(10000.0D);}if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {instance.setWeight(0.01D);}}//2 服务注册updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));recalculateChecksum();
}public void updateIPs(Collection<Instance> instances, boolean ephemeral) {Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());for (String clusterName : clusterMap.keySet()) {ipMap.put(clusterName, new ArrayList<>());}for (Instance instance : instances) {try {if (instance == null) {Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");continue;}if (StringUtils.isEmpty(instance.getClusterName())) {instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);}if (!clusterMap.containsKey(instance.getClusterName())) {Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",instance.getClusterName(), instance.toJson());Cluster cluster = new Cluster(instance.getClusterName(), this);cluster.init();getClusterMap().put(instance.getClusterName(), cluster);}List<Instance> clusterIPs = ipMap.get(instance.getClusterName());if (clusterIPs == null) {clusterIPs = new LinkedList<>();ipMap.put(instance.getClusterName(), clusterIPs);}clusterIPs.add(instance);} catch (Exception e) {Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);}}for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {//make every ip mineList<Instance> entryIPs = entry.getValue();//1 服务注册clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);}setLastModifiedMillis(System.currentTimeMillis());getPushService().serviceChanged(this);StringBuilder stringBuilder = new StringBuilder();for (Instance instance : allIPs()) {stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");}Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),stringBuilder.toString());}public void updateIps(List<Instance> ips, boolean ephemeral) {//1 获取注册的引用Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());//2 采用写时复制的思想赋值for (Instance ip : toUpdateInstances) {oldIpMap.put(ip.getDatumKey(), ip);}//3 对比更新List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values());if (updatedIPs.size() > 0) {for (Instance ip : updatedIPs) {Instance oldIP = oldIpMap.get(ip.getDatumKey());// do not update the ip validation status of updated ips// because the checker has the most precise result// Only when ip is not marked, don't we update the health status of IP:if (!ip.isMarked()) {ip.setHealthy(oldIP.isHealthy());}if (ip.isHealthy() != oldIP.isHealthy()) {// ip validation status updatedLoggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(),(ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName());}if (ip.getWeight() != oldIP.getWeight()) {// ip validation status updatedLoggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP.toString(),ip.toString());}}}List<Instance> newIPs = subtract(ips, oldIpMap.values());if (newIPs.size() > 0) {Loggers.EVT_LOG.info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),getName(), newIPs.size(), newIPs.toString());for (Instance ip : newIPs) {HealthCheckStatus.reset(ip);}}List<Instance> deadIPs = subtract(oldIpMap.values(), ips);if (deadIPs.size() > 0) {Loggers.EVT_LOG.info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),getName(), deadIPs.size(), deadIPs.toString());for (Instance ip : deadIPs) {HealthCheckStatus.remv(ip);}}toUpdateInstances = new HashSet<>(ips);//4 替换引用if (ephemeral) {ephemeralInstances = toUpdateInstances;} else {persistentInstances = toUpdateInstances;}
}
3.3 总结
-
首先是接收到了客户端的http请求 解析实例信息
-
封装实例信息 将实例信息存储到阻塞队列 响应成功
-
单线程从阻塞队列取出任务 执行注册
-
注册采用写时复制的思想 完成注册
4 nacos注册高性能优化
-
首先是注册的异步 这样可以加快注册速度
-
由于服务注册和服务拉取用的是同一个map 为了防止并发冲突 需要加锁 nacos在这里做了优化 采用写时复制的思想 牺牲短时间的一致性 提高并发