IV. source code analysis of nacos Registry -- health detection

I. core principle flow chart

II. The client monitors the server

2.1 implement udp listening


The constructor of PushReceiver will trigger the initialization of listener

public PushReceiver(HostReactor hostReactor) {
        try {
            this.hostReactor = hostReactor;
            this.udpSocket = new DatagramSocket();
            this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setDaemon(true);
                    thread.setName("com.alibaba.nacos.naming.push.receiver");
                    return thread;
                }
            });
            // Trigger listener
            this.executorService.execute(this);
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] init udp socket failed", e);
        }
    }

2.2 listening for server instance changes

In the run method of PushReceiver, continuously listen to the push request of the server. Then call processServiceJSON to parse the data on the server side

@Override
    public void run() {
        while (!closed) {
            try {
                
                // byte[] is initialized with 0 full filled by default
                byte[] buffer = new byte[UDP_MSS];
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
                
                // Data change of receiving server
                udpSocket.receive(packet);
                
                String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
                NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
                
                PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
                String ack;
                if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
                	// Listening to a changed event triggers a change in the local cache list
                    hostReactor.processServiceJson(pushPacket.data);
                    
                    // send ack to server
                    ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"
                            + "\"\"}";
                } else if ("dump".equals(pushPacket.type)) {
                    // dump data to server
                    ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"
                            + "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap()))
                            + "\"}";
                } else {
                    // do nothing send ack only
                    ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                            + "\", \"data\":" + "\"\"}";
                }
                // 
                udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
                        packet.getSocketAddress()));
            } catch (Exception e) {
                if (closed) {
                    return;
                }
                NAMING_LOGGER.error("[NA] error while receiving push data", e);
            }
        }
    }

III. establish health monitoring at the service end

Remember when the service provider initiated the service registration. In the createEmptyService method, an empty service will be created

In addition, during the creation process, a putServiceAndInit is called. In this method, in addition to creating an empty service and initializing
Service Init method for service initialization


This init method will establish a heartbeat detection mechanism with the current service provider. This heartbeat detection will be performed every 5s

public void init() {
        HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
        for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
            entry.getValue().setService(this);
            entry.getValue().init();
        }
    }

Let's take a look at how the ClientBeatCheckTask is implemented

@Override
    public void run() {
        try {
            // If upgrade to 2.0.X stop health check with v1
            if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
                return;
            }
            if (!getDistroMapper().responsible(service.getName())) {
                return;
            }
            
            if (!getSwitchDomain().isHealthCheckEnabled()) {
                return;
            }
            
            List<Instance> instances = service.allIPs(true);
            
            // first set health status of instances:
            for (Instance instance : instances) {//Traverse the service node for heartbeat detection
            	//If the last heartbeat time of the service instance is greater than the set timeout time, the service is considered offline.
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                    if (!instance.isMarked()) {
                        if (instance.isHealthy()) {
                            instance.setHealthy(false);
                            Loggers.EVT_LOG
                                    .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                            instance.getIp(), instance.getPort(), instance.getClusterName(),
                                            service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                            instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                           //Push service change events.
                            getPushService().serviceChanged(service);
                        }
                    }
                }
            }
            
            if (!getGlobalConfig().isExpireInstance()) {
                return;
            }
            
            // then remove obsolete instances:
            for (Instance instance : instances) {
                
                if (instance.isMarked()) {
                    continue;
                }
                
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                    // delete instance
                    Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
                            JacksonUtils.toJson(instance));
                     //Delete expired service instances
                    deleteIp(instance);
                }
            }
            
        } catch (Exception e) {
            Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
        }
        
    }

Listen for service state change events, traverse all clients, and broadcast messages through udp protocol
UdpPushService.onApplicationEvent

@Override
    public void onApplicationEvent(ServiceChangeEvent event) {
        // ......
                    
                    udpPush(ackEntry);
         }     

At this time, the service consumer should establish a udp service listener, otherwise the server cannot push data. This monitor is
Initialized in the HostReactor constructor

III. summary

  • When nacos calls createemptyservice() - >createserviceifabsent() - >putserviceandinit() - >init() when registering an instance, health detection will be triggered and executed every five seconds (run() in ClientBeatCheckTask)
  • The run() in ClientBeatCheckTask will traverse all instances in the registry, judge whether the instance is alive according to the difference between the last heartbeat time and the default heartbeat timeout time, and publish a ServiceChangeEvent event when the instance does not exist, and delete the current instance from the registry
  • UdpPushService listens to ServiceChangeEvent and pushes messages to servers through upd protocol
  • The client will trigger the listener of upd message through PushReceiver after initializing HostReactor
  • When the client listens to the service instance change message from the server, it will update the current local cache

Tags: Java Spring Cloud programming language

Posted by spitfire_esquive on Thu, 30 Jun 2022 16:41:17 +0300