Yesterday, I analyzed the source code analysis of nacos dynamic configuration. Today, let's take a look at the relevant source code of nacos service registration.
1, Principle of registry
The principles of common service registries are generally the same, as follows:
Service registration process of nacos:
2, Client registration service source code
To use the service registration function of nacos, you need to first introduce the corresponding dependencies:
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency>
By convention, from spring The factories file cannot find the entry for automatic configuration
From com alibaba. cloud. nacos. registry. In nacoserviceregistriautoconfiguration, find the method to create the NacosAutoServiceRegistration object
@Bean @ConditionalOnBean({AutoServiceRegistrationProperties.class}) public NacosAutoServiceRegistration nacosAutoServiceRegistration(NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) { return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration); }
NacosAutoServiceRegistration class inherits from AbstractAutoServiceRegistration class. Viewing the whole inheritance relationship, you can see that AbstractAutoServiceRegistration implements the ApplicationListener interface
It's easy to do now. We directly find the Event related methods and finally determine the onApplicationEvent(WebServerInitializedEvent event) method in the AbstractAutoServiceRegistration class. The code is as follows:
public void onApplicationEvent(WebServerInitializedEvent event) { this.bind(event); } /** @deprecated */ @Deprecated public void bind(WebServerInitializedEvent event) { ApplicationContext context = event.getApplicationContext(); if (!(context instanceof ConfigurableWebServerApplicationContext) || !"management".equals(((ConfigurableWebServerApplicationContext)context).getServerNamespace())) { this.port.compareAndSet(0, event.getWebServer().getPort()); // The start method was called this.start(); } }
The contents of the start() method are as follows:
public void start() { // Judge whether the registration has been opened if (!this.isEnabled()) { if (logger.isDebugEnabled()) { logger.debug("Discovery Lifecycle disabled. Not starting"); } } else { if (!this.running.get()) { this.context.publishEvent(new InstancePreRegisteredEvent(this, this.getRegistration())); // Call register() method this.register(); if (this.shouldRegisterManagement()) { this.registerManagement(); } this.context.publishEvent(new InstanceRegisteredEvent(this, this.getConfiguration())); this.running.compareAndSet(false, true); } } }
This register method will eventually call com alibaba. cloud. nacos. registry. In the service register method, the content is as follows:
public void register(Registration registration) { if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No service to register for nacos client..."); } else { // .... Other codes are omitted try { // Call the registerInstance() method namingService.registerInstance(serviceId, group, instance); log.info("nacos registry, {} {} {}:{} register finished", new Object[]{group, serviceId, instance.getIp(), instance.getPort()}); } catch (Exception var7) { log.error("nacos registry, {} register failed...{},", new Object[]{serviceId, registration.toString(), var7}); ReflectionUtils.rethrowRuntimeException(var7); } } }
Will eventually call com alibaba. nacos. client. naming. net. Namingproxy#registerservice method
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName, instance); // A series of parameters are spliced final Map<String, String> params = new HashMap<String, String>(16); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put(CommonParams.GROUP_NAME, groupName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JacksonUtils.toJson(instance.getMetadata())); // Call / nacos/v1/ns/instance on the server reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST); }
So far, the code for registering the service when the client starts has been analyzed. Next, let's see how the server handles it
3, The server receives the source code of service registration
According to the called / nacos/v1/ns/instance path, find the corresponding processing method of the server: com alibaba. nacos. naming. controllers. InstanceController#register
@CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register(HttpServletRequest request) throws Exception { // Get namespaceId final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); // Get serviceName final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); final Instance instance = HttpRequestInstanceBuilder.newBuilder() .setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build(); // registerInstance method called getInstanceOperator().registerInstance(namespaceId, serviceName, instance); return "ok"; }
The implementation of getInstanceOperator() is as follows:
private InstanceOperator getInstanceOperator() { // Here you can judge whether grpc is used return upgradeJudgement.isUseGrpcFeatures() ? instanceServiceV2 : instanceServiceV1; }
We don't have the way of grpc, so we will eventually come to com alibaba. nacos. naming. core. In instanceoperatorserviceimpl#registerinstance method
@Override public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { com.alibaba.nacos.naming.core.Instance coreInstance = parseInstance(instance); // Call the registerInstance method serviceManager.registerInstance(namespaceId, serviceName, coreInstance); }
The implementation of registerInstance is as follows:
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { // Create an empty service createEmptyService(namespaceId, serviceName, instance.isEphemeral()); // Get service Service service = getService(namespaceId, serviceName); checkServiceIsNull(service, namespaceId, serviceName); // Specific service registration method addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }
Tracing the createEmptyService() method, we can see that we first tried to obtain a service from a serviceMap object, but there is no need to create one and put it into the serviceMap. We can see that the data structure of the serviceMap is:
/** * Map(namespace, Map(group::serviceName, Service)). * In fact, the whole service registration is to save it into the serviceMap object */ private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
OK, go back to the implementation of addInstance method above:
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { // The namespaceId, serviceName and other parameters form a key String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); // I don't know why I called the getService method again.... Service service = getService(namespaceId, serviceName); // Here, the service object is locked, so that only one thread can modify the service method at the same time synchronized (service) { List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); // The important method is here consistencyService.put(key, instances); } }
This key definition will be used later. Remember first
This put() method will call com alibaba. nacos. naming. consistency. Delegateconsistencyserviceimpl#put
@Override public void put(String key, Record value) throws NacosException { mapConsistencyService(key).put(key, value); }
Instances is a subclass of the Record interface
The put method here will call com alibaba. nacos. naming. consistency. ephemeral. distro. In distroconsistencyserviceimpl#put method...
@Override public void put(String key, Record value) throws NacosException { // Focus on this method onPut(key, value); // If upgrade to 2.0.X, do not sync for v1. if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) { return; } distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, DistroConfig.getInstance().getSyncDelayMillis()); }
The content of onPut method is as follows:
public void onPut(String key, Record value) { if (KeyBuilder.matchEphemeralInstanceListKey(key)) { Datum<Instances> datum = new Datum<>(); datum.value = (Instances) value; datum.key = key; datum.timestamp.incrementAndGet(); dataStore.put(key, datum); } if (!listeners.containsKey(key)) { return; } // Add a task task to the notifier object notifier.addTask(key, DataOperation.CHANGE); }
The notifier object is an internal class that implements the Runnable interface. The addTask method is as follows:
public void addTask(String datumKey, DataOperation action) { if (services.containsKey(datumKey) && action == DataOperation.CHANGE) { return; } if (action == DataOperation.CHANGE) { services.put(datumKey, StringUtils.EMPTY); } // Finally called this method tasks.offer(Pair.with(datumKey, action)); }
Where tasks is a blocking queue
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
After you see here, you will find that after dropping the key composed of namespaceId and serviceName into the blocking queue, the whole registration request basically ends. So where is the real service registration? The answer is in the Notifier object and DistroConsistencyServiceImpl object.
Check the source code of DistroConsistencyServiceImpl object to find an init() method, which is as follows:
@PostConstruct public void init() { GlobalExecutor.submitDistroNotifyTask(notifier); }
In fact, it is handed over to a thread pool for processing, and the incoming object is the notifier object mentioned above. Next, just check the run() method of the notifier
@Override public void run() { Loggers.DISTRO.info("distro notifier started"); // Dead cycle for (; ; ) { try { // Get a Pair object from the blocking queue and call the handle method Pair<String, DataOperation> pair = tasks.take(); handle(pair); } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } } }
Next, look at the handle() method:
private void handle(Pair<String, DataOperation> pair) { try { // ... Omit some codes for (RecordListener listener : listeners.get(datumKey)) { count++; try { if (action == DataOperation.CHANGE) { // The onChange method of the Service class will be called here listener.onChange(datumKey, dataStore.get(datumKey).value); continue; } // ... Omit some codes } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e); } } // ... Omit some codes } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } }
This will call com alibaba. nacos. naming. core. Service#onchange method
@Override public void onChange(String key, Instances value) throws Exception { Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value); for (Instance instance : value.getInstanceList()) { if (instance == null) { // Reject this abnormal instance list: throw new RuntimeException("got null instance " + key); } // Process weights if (instance.getWeight() > 10000.0D) { instance.setWeight(10000.0D); } if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) { instance.setWeight(0.01D); } } // Important methods updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key)); recalculateChecksum(); }
The updateIPs method is easy to understand. To put it simply, the registered service is placed in a clusterMap object
public void updateIPs(Collection<Instance> instances, boolean ephemeral) { Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size()); for (String clusterName : clusterMap.keySet()) { ipMap.put(clusterName, new ArrayList<>()); } for (Instance instance : instances) { try { if (instance == null) { Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null"); continue; } if (StringUtils.isEmpty(instance.getClusterName())) { instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME); } // If the previous cluster does not exist, create a cluster if (!clusterMap.containsKey(instance.getClusterName())) { Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson()); Cluster cluster = new Cluster(instance.getClusterName(), this); cluster.init(); getClusterMap().put(instance.getClusterName(), cluster); } List<Instance> clusterIPs = ipMap.get(instance.getClusterName()); if (clusterIPs == null) { clusterIPs = new LinkedList<>(); ipMap.put(instance.getClusterName(), clusterIPs); } clusterIPs.add(instance); } catch (Exception e) { Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e); } } for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) { //make every ip mine List<Instance> entryIPs = entry.getValue(); // Here, the updateIps method will be called again clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral); } setLastModifiedMillis(System.currentTimeMillis()); getPushService().serviceChanged(this); ApplicationUtils.getBean(DoubleWriteEventListener.class).doubleWriteToV2(this, ephemeral); StringBuilder stringBuilder = new StringBuilder(); for (Instance instance : allIPs()) { stringBuilder.append(instance.toIpAddr()).append('_').append(instance.isHealthy()).append(','); } Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(), stringBuilder.toString()); }
Take a look at com alibaba. nacos. naming. core. Cluster #updateips method
public void updateIps(List<Instance> ips, boolean ephemeral) { Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances; HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size()); for (Instance ip : toUpdateInstances) { oldIpMap.put(ip.getDatumKey(), ip); } // Get updated services List<Instance> updatedIps = updatedIps(ips, oldIpMap.values()); if (updatedIps.size() > 0) { for (Instance ip : updatedIps) { Instance oldIP = oldIpMap.get(ip.getDatumKey()); // do not update the ip validation status of updated ips // because the checker has the most precise result // Only when ip is not marked, don't we update the health status of IP: if (!ip.isMarked()) { ip.setHealthy(oldIP.isHealthy()); } if (ip.isHealthy() != oldIP.isHealthy()) { // ip validation status updated Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(), (ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName()); } if (ip.getWeight() != oldIP.getWeight()) { // ip validation status updated Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP, ip); } } } // Get new services List<Instance> newIPs = subtract(ips, oldIpMap.values()); if (newIPs.size() > 0) { Loggers.EVT_LOG .info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(), getName(), newIPs.size(), newIPs); for (Instance ip : newIPs) { // Establish a health check for each new service HealthCheckStatus.reset(ip); } } // Get expired services List<Instance> deadIPs = subtract(oldIpMap.values(), ips); if (deadIPs.size() > 0) { Loggers.EVT_LOG .info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(), getName(), deadIPs.size(), deadIPs); for (Instance ip : deadIPs) { // Remove health checks from expired services HealthCheckStatus.remv(ip); } } toUpdateInstances = new HashSet<>(ips); // Replace the existing object with the final result. The idea here is similar to the idea of COW, which realizes the separation of reading and writing without interference if (ephemeral) { ephemeralInstances = toUpdateInstances; } else { persistentInstances = toUpdateInstances; } }
4, Summary
The source code of service registration is quite easy to understand. It's not well written. Please criticize and correct the problems. Thank you!