Apollo source code - configure publishing notification client

preface

After reading the source code of nacobas in spring, I thought of inserting the source code of nacobas in the subsequent update of Alibaba cloud

Introduction to Apollo

fork source address apollo source code
reference resources Design of apollo Architecture Center
It is mainly divided into four parts: Config Service, Admin Service, Portal and Client
The Release of ReleaseMessage object is introduced above. The first thing of portal Release configuration is to add a Release object, the second thing is to Release ReleaseMessage, and the third thing is the ConfigPublishEvent event event to be discussed in this article

New ConfigPublishEvent event event

@EventListener
public void onConfigPublish(ConfigPublishEvent event) {
executorService.submit(new ConfigPublishNotifyTask(event.getConfigPublishInfo()));
}


private class ConfigPublishNotifyTask implements Runnable {

private ConfigPublishEvent.ConfigPublishInfo publishInfo;

ConfigPublishNotifyTask(ConfigPublishEvent.ConfigPublishInfo publishInfo) {
this.publishInfo = publishInfo;
}

@Override
public void run() {
ReleaseHistoryBO releaseHistory = getReleaseHistory();
if (releaseHistory == null) {
 Tracer.logError("Load release history failed", null);
 return;
}

sendPublishEmail(releaseHistory);

sendPublishMsg(releaseHistory);
}

The tracking code did not find the interaction with configservice. The event listener is nothing more than creating a thread pool and executing thread tasks. The task is to send release object mail and call an interface of remote hermes.

Configure publishing and notify clients

Read official documents: Server design The document describes in detail how the configservice pulls ReleaseMessage

The implementation method is as follows:
1. After configuration publishing, the admin service will insert a message record into the ReleaseMessage table. The message content is the AppId+Cluster+Namespace of configuration publishing. See DatabaseMessageSender

2. A thread of config service will scan the ReleaseMessage table every second to see if there are new message records. See ReleaseMessageScanner
3. If the config service finds a new message record, it will notify all message listeners( ReleaseMessageListener ), such as NotificationControllerV2 For the registration process of message listener, see ConfigServiceAutoConfiguration
4.NotificationControllerV2 will notify the corresponding client after getting the AppId+Cluster+Namespace of configuration publishing

As shown in the figure:

Then, let's follow the official documents to learn about the ReleaseMessageScanner class, which implements the InitializingBean interface and calls the afterPropertiesSet method after the container is started and the bean is initialized

@Override
public void afterPropertiesSet() throws Exception {
//The Default scan interval is 1s
databaseScanInterval = bizConfig.releaseMessageScanIntervalInMilli();
//Query the largest Release Record
maxIdScanned = loadLargestMessageId();
//The timed task is executed after a delay of 1s. Affected by the task, the timing needs to be started after the task is completed
executorService.scheduleWithFixedDelay((Runnable) () -> {
Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageScanner", "scanMessage");
try {
 //Scan message
 scanMessages();
 transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
 transaction.setStatus(ex);
 logger.error("Scan and send message failed", ex);
} finally {
 transaction.complete();
}
}, databaseScanInterval, databaseScanInterval, TimeUnit.MILLISECONDS);

}
private boolean scanAndSendMessages() {
//current batch is 500
List<ReleaseMessage> releaseMessages =
releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
if (CollectionUtils.isEmpty(releaseMessages)) {
return false;
}
//If a new message is published, notify configservice
fireMessageScanned(releaseMessages);
int messageScanned = releaseMessages.size();
//Take the id of the last piece of data and assign the maximum id
maxIdScanned = releaseMessages.get(messageScanned - 1).getId();
//Is there any data
return messageScanned == 500;
}
private void fireMessageScanned(List<ReleaseMessage> messages) {
for (ReleaseMessage message : messages) {
for (ReleaseMessageListener listener : listeners) {
 try {
   //Notify all message listeners and trigger the handleMessage method  
   listener.handleMessage(message, Topics.APOLLO_RELEASE_TOPIC);
 } catch (Throwable ex) {
   Tracer.logError(ex);
   logger.error("Failed to invoke message listener {}", listener.getClass(), ex);
 }
}
}
}

In fireMessageScanned, all listeners will be notified. Let's take a look at the class diagram of ReleaseMessageListener

Refer to configserviceautoconfiguration for the listener registration process. The handlemessage method is to get the published configuration and process it. According to the instructions of the official document: if the Config Service finds a new message record, it will notify all message listeners, and the client will locate the NotificationControllerV2#handleMessage

@Override
public void handleMessage(ReleaseMessage message, String channel) {
logger.info("message received - channel: {}, message: {}", channel, message);
//The content is: aapId + cluster + namespace
String content = message.getMessage();
Tracer.logEvent("Apollo.LongPoll.Messages", content);
//If the channel is not Apollo release, it will not be processed
if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {
return;
}
//retrieveNamespaceFromReleaseMessage implements the lambda expression of Function, and apply returns the namespace through this expression
String changedNamespace = retrieveNamespaceFromReleaseMessage.apply(content);

if (Strings.isNullOrEmpty(changedNamespace)) {
logger.error("message format invalid - {}", content);
return;
}

if (!deferredResults.containsKey(content)) {
return;
}

//create a new list to avoid ConcurrentModificationException
List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get(content));

ApolloConfigNotification configNotification = new ApolloConfigNotification(changedNamespace, message.getId());
//key : appId+cluster+namespace, value: messageId
configNotification.addMessage(content, message.getId());

//do async notification if too many clients
//Client long polling connections > 100
if (results.size() > bizConfig.releaseMessageNotificationBatch()) {
largeNotificationBatchExecutorService.submit(() -> {
 logger.debug("Async notify {} clients for key {} with batch {}", results.size(), content,
     bizConfig.releaseMessageNotificationBatch());
 for (int i = 0; i < results.size(); i++) {
   //100 a batch, sleep for 100ms
   if (i > 0 && i % bizConfig.releaseMessageNotificationBatch() == 0) {
     try {
       //Sleep for 100ms
       TimeUnit.MILLISECONDS.sleep(bizConfig.releaseMessageNotificationBatchIntervalInMilli());
     } catch (InterruptedException e) {
       //ignore
     }
   }
   logger.debug("Async notify {}", results.get(i));
   //Notify the client that the message is: namespace,messageId
   results.get(i).setResult(configNotification);
 }
});
return;
}

logger.debug("Notify {} clients for key {}", results.size(), content);
//Synchronization notification
for (DeferredResultWrapper result : results) {
result.setResult(configNotification);
}
logger.debug("Notification completed");
}
  • Pull the message, notify all ReleaseMessageListener implementation classes ReleaseMessageScanner#fireMessageScanned, and call the handleMessage method.
  • Take the namespace from content:appId+cluster+namespace.
  • Assemble the map notification client of the Apollo confignotification object messageId, namespaceName, key: appid + cluster + namespace, value: messageId.
  • The client uses DeferredResult long polling technology.

Posted by marco839 on Wed, 04 May 2022 20:58:47 +0300