preface
After reading the source code of nacobas in spring, I thought of inserting the source code of nacobas in the subsequent update of Alibaba cloud
Introduction to Apollo
fork source address apollo source code
reference resources Design of apollo Architecture Center
It is mainly divided into four parts: Config Service, Admin Service, Portal and Client
The Release of ReleaseMessage object is introduced above. The first thing of portal Release configuration is to add a Release object, the second thing is to Release ReleaseMessage, and the third thing is the ConfigPublishEvent event event to be discussed in this article
New ConfigPublishEvent event event
@EventListener public void onConfigPublish(ConfigPublishEvent event) { executorService.submit(new ConfigPublishNotifyTask(event.getConfigPublishInfo())); } private class ConfigPublishNotifyTask implements Runnable { private ConfigPublishEvent.ConfigPublishInfo publishInfo; ConfigPublishNotifyTask(ConfigPublishEvent.ConfigPublishInfo publishInfo) { this.publishInfo = publishInfo; } @Override public void run() { ReleaseHistoryBO releaseHistory = getReleaseHistory(); if (releaseHistory == null) { Tracer.logError("Load release history failed", null); return; } sendPublishEmail(releaseHistory); sendPublishMsg(releaseHistory); }
The tracking code did not find the interaction with configservice. The event listener is nothing more than creating a thread pool and executing thread tasks. The task is to send release object mail and call an interface of remote hermes.
Configure publishing and notify clients
Read official documents: Server design The document describes in detail how the configservice pulls ReleaseMessage
The implementation method is as follows:
1. After configuration publishing, the admin service will insert a message record into the ReleaseMessage table. The message content is the AppId+Cluster+Namespace of configuration publishing. See DatabaseMessageSender
2. A thread of config service will scan the ReleaseMessage table every second to see if there are new message records. See ReleaseMessageScanner
3. If the config service finds a new message record, it will notify all message listeners( ReleaseMessageListener ), such as NotificationControllerV2 For the registration process of message listener, see ConfigServiceAutoConfiguration
4.NotificationControllerV2 will notify the corresponding client after getting the AppId+Cluster+Namespace of configuration publishing
As shown in the figure:
Then, let's follow the official documents to learn about the ReleaseMessageScanner class, which implements the InitializingBean interface and calls the afterPropertiesSet method after the container is started and the bean is initialized
@Override public void afterPropertiesSet() throws Exception { //The Default scan interval is 1s databaseScanInterval = bizConfig.releaseMessageScanIntervalInMilli(); //Query the largest Release Record maxIdScanned = loadLargestMessageId(); //The timed task is executed after a delay of 1s. Affected by the task, the timing needs to be started after the task is completed executorService.scheduleWithFixedDelay((Runnable) () -> { Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageScanner", "scanMessage"); try { //Scan message scanMessages(); transaction.setStatus(Transaction.SUCCESS); } catch (Throwable ex) { transaction.setStatus(ex); logger.error("Scan and send message failed", ex); } finally { transaction.complete(); } }, databaseScanInterval, databaseScanInterval, TimeUnit.MILLISECONDS); }
private boolean scanAndSendMessages() { //current batch is 500 List<ReleaseMessage> releaseMessages = releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned); if (CollectionUtils.isEmpty(releaseMessages)) { return false; } //If a new message is published, notify configservice fireMessageScanned(releaseMessages); int messageScanned = releaseMessages.size(); //Take the id of the last piece of data and assign the maximum id maxIdScanned = releaseMessages.get(messageScanned - 1).getId(); //Is there any data return messageScanned == 500; }
private void fireMessageScanned(List<ReleaseMessage> messages) { for (ReleaseMessage message : messages) { for (ReleaseMessageListener listener : listeners) { try { //Notify all message listeners and trigger the handleMessage method listener.handleMessage(message, Topics.APOLLO_RELEASE_TOPIC); } catch (Throwable ex) { Tracer.logError(ex); logger.error("Failed to invoke message listener {}", listener.getClass(), ex); } } } }
In fireMessageScanned, all listeners will be notified. Let's take a look at the class diagram of ReleaseMessageListener
Refer to configserviceautoconfiguration for the listener registration process. The handlemessage method is to get the published configuration and process it. According to the instructions of the official document: if the Config Service finds a new message record, it will notify all message listeners, and the client will locate the NotificationControllerV2#handleMessage
@Override public void handleMessage(ReleaseMessage message, String channel) { logger.info("message received - channel: {}, message: {}", channel, message); //The content is: aapId + cluster + namespace String content = message.getMessage(); Tracer.logEvent("Apollo.LongPoll.Messages", content); //If the channel is not Apollo release, it will not be processed if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) { return; } //retrieveNamespaceFromReleaseMessage implements the lambda expression of Function, and apply returns the namespace through this expression String changedNamespace = retrieveNamespaceFromReleaseMessage.apply(content); if (Strings.isNullOrEmpty(changedNamespace)) { logger.error("message format invalid - {}", content); return; } if (!deferredResults.containsKey(content)) { return; } //create a new list to avoid ConcurrentModificationException List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get(content)); ApolloConfigNotification configNotification = new ApolloConfigNotification(changedNamespace, message.getId()); //key : appId+cluster+namespace, value: messageId configNotification.addMessage(content, message.getId()); //do async notification if too many clients //Client long polling connections > 100 if (results.size() > bizConfig.releaseMessageNotificationBatch()) { largeNotificationBatchExecutorService.submit(() -> { logger.debug("Async notify {} clients for key {} with batch {}", results.size(), content, bizConfig.releaseMessageNotificationBatch()); for (int i = 0; i < results.size(); i++) { //100 a batch, sleep for 100ms if (i > 0 && i % bizConfig.releaseMessageNotificationBatch() == 0) { try { //Sleep for 100ms TimeUnit.MILLISECONDS.sleep(bizConfig.releaseMessageNotificationBatchIntervalInMilli()); } catch (InterruptedException e) { //ignore } } logger.debug("Async notify {}", results.get(i)); //Notify the client that the message is: namespace,messageId results.get(i).setResult(configNotification); } }); return; } logger.debug("Notify {} clients for key {}", results.size(), content); //Synchronization notification for (DeferredResultWrapper result : results) { result.setResult(configNotification); } logger.debug("Notification completed"); }
- Pull the message, notify all ReleaseMessageListener implementation classes ReleaseMessageScanner#fireMessageScanned, and call the handleMessage method.
- Take the namespace from content:appId+cluster+namespace.
- Assemble the map notification client of the Apollo confignotification object messageId, namespaceName, key: appid + cluster + namespace, value: messageId.
- The client uses DeferredResult long polling technology.