Source code analysis of nacos service registration

Yesterday, I analyzed the source code analysis of nacos dynamic configuration. Today, let's take a look at the relevant source code of nacos service registration.

1, Principle of registry

The principles of common service registries are generally the same, as follows:

Service registration process of nacos:

2, Client registration service source code

To use the service registration function of nacos, you need to first introduce the corresponding dependencies:

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

By convention, from spring The factories file cannot find the entry for automatic configuration

From com alibaba. cloud. nacos. registry. In nacoserviceregistriautoconfiguration, find the method to create the NacosAutoServiceRegistration object

@Bean
@ConditionalOnBean({AutoServiceRegistrationProperties.class})
public NacosAutoServiceRegistration nacosAutoServiceRegistration(NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) {
    return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration);
}

NacosAutoServiceRegistration class inherits from AbstractAutoServiceRegistration class. Viewing the whole inheritance relationship, you can see that AbstractAutoServiceRegistration implements the ApplicationListener interface

It's easy to do now. We directly find the Event related methods and finally determine the onApplicationEvent(WebServerInitializedEvent event) method in the AbstractAutoServiceRegistration class. The code is as follows:

public void onApplicationEvent(WebServerInitializedEvent event) {
    this.bind(event);
}

/** @deprecated */
@Deprecated
public void bind(WebServerInitializedEvent event) {
    ApplicationContext context = event.getApplicationContext();
    if (!(context instanceof ConfigurableWebServerApplicationContext) || !"management".equals(((ConfigurableWebServerApplicationContext)context).getServerNamespace())) {
        this.port.compareAndSet(0, event.getWebServer().getPort());
        // The start method was called
        this.start();
    }
}

The contents of the start() method are as follows:

public void start() {
    // Judge whether the registration has been opened
    if (!this.isEnabled()) {
        if (logger.isDebugEnabled()) {
            logger.debug("Discovery Lifecycle disabled. Not starting");
        }
    } else {
        if (!this.running.get()) {
            this.context.publishEvent(new InstancePreRegisteredEvent(this, this.getRegistration()));
            // Call register() method
            this.register();
            if (this.shouldRegisterManagement()) {
                this.registerManagement();
            }

            this.context.publishEvent(new InstanceRegisteredEvent(this, this.getConfiguration()));
            this.running.compareAndSet(false, true);
        }

    }
}

This register method will eventually call com alibaba. cloud. nacos. registry. In the service register method, the content is as follows:

public void register(Registration registration) {
    if (StringUtils.isEmpty(registration.getServiceId())) {
        log.warn("No service to register for nacos client...");
    } else {
        // ....  Other codes are omitted

        try {
            // Call the registerInstance() method
            namingService.registerInstance(serviceId, group, instance);
            log.info("nacos registry, {} {} {}:{} register finished", new Object[]{group, serviceId, instance.getIp(), instance.getPort()});
        } catch (Exception var7) {
            log.error("nacos registry, {} register failed...{},", new Object[]{serviceId, registration.toString(), var7});
            ReflectionUtils.rethrowRuntimeException(var7);
        }
    }
}

Will eventually call com alibaba. nacos. client. naming. net. Namingproxy#registerservice method

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    
    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
            instance);
    // A series of parameters are spliced
    final Map<String, String> params = new HashMap<String, String>(16);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put(CommonParams.GROUP_NAME, groupName);
    params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
    params.put("ip", instance.getIp());
    params.put("port", String.valueOf(instance.getPort()));
    params.put("weight", String.valueOf(instance.getWeight()));
    params.put("enable", String.valueOf(instance.isEnabled()));
    params.put("healthy", String.valueOf(instance.isHealthy()));
    params.put("ephemeral", String.valueOf(instance.isEphemeral()));
    params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
    
    // Call / nacos/v1/ns/instance on the server
    reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
    
}

So far, the code for registering the service when the client starts has been analyzed. Next, let's see how the server handles it

3, The server receives the source code of service registration

According to the called / nacos/v1/ns/instance path, find the corresponding processing method of the server: com alibaba. nacos. naming. controllers. InstanceController#register

@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
    // Get namespaceId 
    final String namespaceId = WebUtils
            .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    // Get serviceName
    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    
    final Instance instance = HttpRequestInstanceBuilder.newBuilder()
            .setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
    
    // registerInstance method called
    getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
    return "ok";
}

The implementation of getInstanceOperator() is as follows:

private InstanceOperator getInstanceOperator() {
    // Here you can judge whether grpc is used 
    return upgradeJudgement.isUseGrpcFeatures() ? instanceServiceV2 : instanceServiceV1;
}

We don't have the way of grpc, so we will eventually come to com alibaba. nacos. naming. core. In instanceoperatorserviceimpl#registerinstance method

@Override
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    com.alibaba.nacos.naming.core.Instance coreInstance = parseInstance(instance);
    // Call the registerInstance method
    serviceManager.registerInstance(namespaceId, serviceName, coreInstance);
}

The implementation of registerInstance is as follows:

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    // Create an empty service
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());
    // Get service 
    Service service = getService(namespaceId, serviceName);
    
    checkServiceIsNull(service, namespaceId, serviceName);
    // Specific service registration method
    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

Tracing the createEmptyService() method, we can see that we first tried to obtain a service from a serviceMap object, but there is no need to create one and put it into the serviceMap. We can see that the data structure of the serviceMap is:

    /**
     * Map(namespace, Map(group::serviceName, Service)).
     * In fact, the whole service registration is to save it into the serviceMap object
     */
    private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

OK, go back to the implementation of addInstance method above:

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
        throws NacosException {
    
    // The namespaceId, serviceName and other parameters form a key
    String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
    // I don't know why I called the getService method again....
    Service service = getService(namespaceId, serviceName);
    
    // Here, the service object is locked, so that only one thread can modify the service method at the same time
    synchronized (service) {
        List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
        
        Instances instances = new Instances();
        instances.setInstanceList(instanceList);
        
        // The important method is here
        consistencyService.put(key, instances);
    }
}

This key definition will be used later. Remember first

This put() method will call com alibaba. nacos. naming. consistency. Delegateconsistencyserviceimpl#put

@Override
public void put(String key, Record value) throws NacosException {
    mapConsistencyService(key).put(key, value);
}

Instances is a subclass of the Record interface

The put method here will call com alibaba. nacos. naming. consistency. ephemeral. distro. In distroconsistencyserviceimpl#put method...

@Override
public void put(String key, Record value) throws NacosException {
    // Focus on this method
    onPut(key, value);
    // If upgrade to 2.0.X, do not sync for v1.
    if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
        return;
    }
    distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
            DistroConfig.getInstance().getSyncDelayMillis());
}

The content of onPut method is as follows:

public void onPut(String key, Record value) {
     if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
        Datum<Instances> datum = new Datum<>();
        datum.value = (Instances) value;
        datum.key = key;
        datum.timestamp.incrementAndGet();
        dataStore.put(key, datum);
    }
    
    if (!listeners.containsKey(key)) {
        return;
    }
    
    // Add a task task to the notifier object
    notifier.addTask(key, DataOperation.CHANGE);
}

The notifier object is an internal class that implements the Runnable interface. The addTask method is as follows:

 public void addTask(String datumKey, DataOperation action) {
         
    if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
        return;
    }
    if (action == DataOperation.CHANGE) {
        services.put(datumKey, StringUtils.EMPTY);
    }
    // Finally called this method
    tasks.offer(Pair.with(datumKey, action));
}

Where tasks is a blocking queue

private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);

After you see here, you will find that after dropping the key composed of namespaceId and serviceName into the blocking queue, the whole registration request basically ends. So where is the real service registration? The answer is in the Notifier object and DistroConsistencyServiceImpl object.

Check the source code of DistroConsistencyServiceImpl object to find an init() method, which is as follows:

@PostConstruct
public void init() {
    GlobalExecutor.submitDistroNotifyTask(notifier);
}

In fact, it is handed over to a thread pool for processing, and the incoming object is the notifier object mentioned above. Next, just check the run() method of the notifier

@Override
public void run() {
    Loggers.DISTRO.info("distro notifier started");
    
    // Dead cycle
    for (; ; ) {
        try {
            // Get a Pair object from the blocking queue and call the handle method
            Pair<String, DataOperation> pair = tasks.take();
            handle(pair);
        } catch (Throwable e) {
            Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
        }
    }
}

Next, look at the handle() method:

private void handle(Pair<String, DataOperation> pair) {
            try {
               // ...  Omit some codes
                
                for (RecordListener listener : listeners.get(datumKey)) {
                    count++;
                    try {
                        if (action == DataOperation.CHANGE) {
                            // The onChange method of the Service class will be called here
                            listener.onChange(datumKey, dataStore.get(datumKey).value);
                            continue;
                        }
                        
                        // ...  Omit some codes
                    } catch (Throwable e) {
                        Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
                    }
                }
                // ...  Omit some codes
            } catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
            }
        }

This will call com alibaba. nacos. naming. core. Service#onchange method

@Override
public void onChange(String key, Instances value) throws Exception {
    
    Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
    
    for (Instance instance : value.getInstanceList()) {
        
        if (instance == null) {
            // Reject this abnormal instance list:
            throw new RuntimeException("got null instance " + key);
        }
        // Process weights
        if (instance.getWeight() > 10000.0D) {
            instance.setWeight(10000.0D);
        }
        
        if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
            instance.setWeight(0.01D);
        }
    }
    // Important methods
    updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
    
    recalculateChecksum();
}

The updateIPs method is easy to understand. To put it simply, the registered service is placed in a clusterMap object

public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
    Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
    for (String clusterName : clusterMap.keySet()) {
        ipMap.put(clusterName, new ArrayList<>());
    }
    
    for (Instance instance : instances) {
        try {
            if (instance == null) {
                Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
                continue;
            }
            
            if (StringUtils.isEmpty(instance.getClusterName())) {
                instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
            }
            // If the previous cluster does not exist, create a cluster
            if (!clusterMap.containsKey(instance.getClusterName())) {
                Loggers.SRV_LOG
                        .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                                instance.getClusterName(), instance.toJson());
                Cluster cluster = new Cluster(instance.getClusterName(), this);
                cluster.init();
                getClusterMap().put(instance.getClusterName(), cluster);
            }
            
            List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
            if (clusterIPs == null) {
                clusterIPs = new LinkedList<>();
                ipMap.put(instance.getClusterName(), clusterIPs);
            }
            
            clusterIPs.add(instance);
        } catch (Exception e) {
            Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
        }
    }
    
    for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
        //make every ip mine
        List<Instance> entryIPs = entry.getValue();
        // Here, the updateIps method will be called again
        clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
    }
    
    setLastModifiedMillis(System.currentTimeMillis());
    getPushService().serviceChanged(this);
    ApplicationUtils.getBean(DoubleWriteEventListener.class).doubleWriteToV2(this, ephemeral);
    StringBuilder stringBuilder = new StringBuilder();
    
    for (Instance instance : allIPs()) {
        stringBuilder.append(instance.toIpAddr()).append('_').append(instance.isHealthy()).append(',');
    }
    
    Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
            stringBuilder.toString());
    
    }

Take a look at com alibaba. nacos. naming. core. Cluster #updateips method

public void updateIps(List<Instance> ips, boolean ephemeral) {
    
    Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
    
    HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
    
    for (Instance ip : toUpdateInstances) {
        oldIpMap.put(ip.getDatumKey(), ip);
    }
    
    // Get updated services
    List<Instance> updatedIps = updatedIps(ips, oldIpMap.values());
    if (updatedIps.size() > 0) {
        for (Instance ip : updatedIps) {
            Instance oldIP = oldIpMap.get(ip.getDatumKey());
            
            // do not update the ip validation status of updated ips
            // because the checker has the most precise result
            // Only when ip is not marked, don't we update the health status of IP:
            if (!ip.isMarked()) {
                ip.setHealthy(oldIP.isHealthy());
            }
            
            if (ip.isHealthy() != oldIP.isHealthy()) {
                // ip validation status updated
                Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(),
                        (ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName());
            }
            
            if (ip.getWeight() != oldIP.getWeight()) {
                // ip validation status updated
                Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP, ip);
            }
        }
    }
    // Get new services
    List<Instance> newIPs = subtract(ips, oldIpMap.values());
    if (newIPs.size() > 0) {
        Loggers.EVT_LOG
                .info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),
                        getName(), newIPs.size(), newIPs);
        
        for (Instance ip : newIPs) {
            // Establish a health check for each new service
            HealthCheckStatus.reset(ip);
        }
    }
    // Get expired services
    List<Instance> deadIPs = subtract(oldIpMap.values(), ips);
    
    if (deadIPs.size() > 0) {
        Loggers.EVT_LOG
                .info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),
                        getName(), deadIPs.size(), deadIPs);
        
        for (Instance ip : deadIPs) {
            // Remove health checks from expired services
            HealthCheckStatus.remv(ip);
        }
    }
    
    toUpdateInstances = new HashSet<>(ips);
    
    // Replace the existing object with the final result. The idea here is similar to the idea of COW, which realizes the separation of reading and writing without interference
    if (ephemeral) {
        ephemeralInstances = toUpdateInstances;
    } else {
        persistentInstances = toUpdateInstances;
    }
}

4, Summary

The source code of service registration is quite easy to understand. It's not well written. Please criticize and correct the problems. Thank you!

Tags: Java

Posted by primefalcon on Tue, 24 May 2022 18:08:25 +0300