[Design from Source Code] Message Bus Asynchronous Processing for Ant Golden Suit SOFARegistry

[Design from Source Code] Message Bus Asynchronous Processing for Ant Golden Suit SOFARegistry

0x00 Summary

SOFARegistry is a production-level, time-efficient and highly available service registry for Ant Golden Suits Open Source.

This series of articles focuses on the analysis of design and architecture, that is, using multiple articles, to reverse summarize the implementation mechanism and architecture ideas of DataServer or SOFARegistry from multiple perspectives, so that you can learn how Ali designed.

This is the fifth article about asynchronous processing of the SOFARegistry message bus.

Why 0x01 Separates

Previously, we talked about the message bus of SOFARegistry, and in this article we talked about a variant of DataChangeEventCenter.

The DataChangeEventCenter is stand-alone and specializes in processing messages related to data changes.

Why should we separate? Because:

  • Architecturally, the DataChangeEventCenter is designed to handle data change messages, which are decoupling;
  • Technically, DataChangeEventCenter is also different from EventCenter in implementation techniques, so it needs to be handled separately.
  • But the deeper reason is that business scenarios are different, and as we can see from the analysis below, the DataChangeEventCenter and business coupling are fairly close;

0x02 Business Area

2.1 Application Scenarios

The unique business scenario for the DataChangeEventCenter is as follows:

  • Merge function is required. That is, there will be more than one notification in a short period of time, not one by one, only the last one can be processed;
  • Processing messages asynchronously;
  • The message order needs to be guaranteed;
  • Delayed operation;
  • Need to improve processing power, parallel processing;

As a result, the DataChangeEventCenter code is closely tied to the business, and the EventCenter above is no longer appropriate.

2.2 Delay and Merge

We will describe the delay and merge operations separately.

2.2.1 Business Features

One feature of the Ant Golden Suit business is that it discovers service downtime in seconds through connection-sensitive features.

SOFARegistry therefore determines in the design of health testing that "service data is bound to the entity connection of the service publisher, clearing the data immediately after disconnection". This feature is called connection sensitivity for short. Connection sensitivity refers to the fact that all Clients maintain long connections with SessionServer in SOFA egistry, and each long connection has a heart beat based on SOFAB olt. If a disconnected client initiates a reconnection immediately, the reliable connection between Client and SessionServer is always maintained.

2.2.2 Questions

However, the problem is that there may be a lot of reconnection operations in the short term due to network problems. For example, if only network problems cause the connection to be disconnected and the actual service process is not down, the client immediately initiates a reconnection to SessionServer and re-registers all service data.

However, if the process is short enough (such as disconnection and reconnection within 500ms), service subscribers** should not feel offline. SOFARegistry should therefore be handled internally.

2.2.3 Solution

Merge and delay operations are done inside the SOFARegistry to ensure that users are not affected. For example, the data inside the DataServer defers merging the changed Publisher service information through mergeDatum, and version is the latest version number after merging.

For the DataChangeEventCenter, this functionality is assisted by delays and merges of messages.

2.3 Realization of Ant Gold Clothing

The following is a description of the overall functionality of the DataChangeEventCenter:

  • When a data publisher goes offline, it triggers publishDataProcessor or unPublishDataHandler, respectively.
  • Handler first determines the state of the current node:
    • Return request fails if not working;
    • If working, Handler adds a data change event to the dataChangeEventCenter, triggering the onChange method of the DataChangeEventCenter. Used to asynchronously notify the Event Change Center of changes in data;
  • When the Event Change Center receives the event, it adds the event to the queue. In this case, the dataChangeEventCenter will process the downline and downline data asynchronously according to different event types.
  • At the same time, the DataChangeHandler will publish this event change information through ChangeNotifier, notifying other nodes to synchronize the data;

0x03 DataChangeEventCenter

3.1 Overview

The DataChangeEventCenter is divided into four parts:

  • Event Center: Organize into message centers;
  • Event Queue: Used for multiple separate processing to increase processing power;
  • Event Task: Start a thread inside each Queue for asynchronous processing to increase processing power;
  • Event Handler: Used to process internal ChangeData;

Next, let's talk about it, because the DataChangeEventCenter is closely integrated with the business, so we'll go into it.

3.2 DataChangeEventCenter

3.2.1 Definition

A DataChangeEventQueue queue array is maintained in the DataChangeEventCenter, which is the core. Each element in the array is an event queue. Specifically defined as follows:

public class DataChangeEventCenter {

    /**
     * count of DataChangeEventQueue
     */
    private int                    queueCount;

    /**
     * queues of DataChangeEvent
     */
    private DataChangeEventQueue[] dataChangeEventQueues;

    @Autowired
    private DataServerConfig       dataServerConfig;

    @Autowired
    private DatumCache             datumCache;
}

3.2.2 Message Types

The DataChangeEventCenter handles IDataChangeEvent type messages specifically in three implementations:

  • public class ClientChangeEvent implements IDataChangeEvent
  • public class DataChangeEvent implements IDataChangeEvent
  • public class DatumSnapshotEvent implements IDataChangeEvent

These different types of messages can be placed in the same queue, and which queue to put depends on a particular discriminant method, such as hash based on Publisher's DataInfoId, to decide which Queue to put.

That is, when the onChange method of the corresponding handler is triggered, the Hash value of the dataInfoId of the change service is calculated, which further determines the queue number where the service registers its data, thus encapsulating the change data as a data change object and passing it into the queue.

3.2.3 Initialization

In the initialization function, EventQueue is built, and each Queue starts a thread to process messages.

@PostConstruct
public void init() {
    if (isInited.compareAndSet(false, true)) {
        queueCount = dataServerConfig.getQueueCount();
        dataChangeEventQueues = new DataChangeEventQueue[queueCount];
        for (int idx = 0; idx < queueCount; idx++) {
            dataChangeEventQueues[idx] = new DataChangeEventQueue(idx, dataServerConfig, this,datumCache);
            dataChangeEventQueues[idx].start();
        }
    }
}

3.2.4 Put Messages

Put messages are simple, and how to decide which Queue an Event should be put into is based on a specific way of judging, such as hash based on Publisher's DataInfoId, to decide which Queue to put:

int idx = hash(publisher.getDataInfoId());
Datum datum = new Datum(publisher, dataCenter);
dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
                DataSourceTypeEnum.PUB, datum));

3.2.5 How to process messages

Specifically through dataChangeEventQueues.onChange handles different message types, such as the following functions. It all finds the queue and calls:

public void onChange(Publisher publisher, String dataCenter) {
    int idx = hash(publisher.getDataInfoId());
    Datum datum = new Datum(publisher, dataCenter);
    if (publisher instanceof UnPublisher) {
        datum.setContainsUnPub(true);
    }
    if (publisher.getPublishType() != PublishType.TEMPORARY) {
        dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
            DataSourceTypeEnum.PUB, datum));
    } else {
        dataChangeEventQueues[idx].onChange(new DataChangeEvent(DataChangeTypeEnum.MERGE,
            DataSourceTypeEnum.PUB_TEMP, datum));
    }
}

public void onChange(ClientChangeEvent event) {
    for (DataChangeEventQueue dataChangeEventQueue : dataChangeEventQueues) {
        dataChangeEventQueue.onChange(event);
    }
}

public void onChange(DatumSnapshotEvent event) {
    for (DataChangeEventQueue dataChangeEventQueue : dataChangeEventQueues) {
        dataChangeEventQueue.onChange(event);
    }
}

public void sync(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType, Datum datum) {
    int idx = hash(datum.getDataInfoId());
    DataChangeEvent event = new DataChangeEvent(changeType, sourceType, datum);
    dataChangeEventQueues[idx].onChange(event);
}

3.3 DataChangeEvent

Because DataChangeEvent s are the most common, we'll show them separately.

The DataChangeEvent is distinguished by the DataChangeTypeEnum and DataSourceTypeEnum, which are the processing type and the source of the message.

DataChangeTypeEnum is divided into:

  • MERGE, if the change type is MERGE, updates the new Datum in the cache that needs to be updated, and updates the version number;
  • COVER, if the change type is COVER, will overwrite the original cache;

DataSourceTypeEnum is divided into:

  • PUB : pub by client;
  • PUB_TEMP : pub temporary data;
  • SYNC: sync from dataservers in other datacenter;
  • BACKUP: from dataservers in the same datacenter;
  • CLEAN: local dataInfo check,not belong this node schedule remove;
  • SNAPSHOT: Snapshot data, after renew finds data inconsistent;

Specifically defined as follows:

public class DataChangeEvent implements IDataChangeEvent {

    /**
     * type of changed data, MERGE or COVER
     */
    private DataChangeTypeEnum changeType;

    private DataSourceTypeEnum sourceType;

    /**
     * data changed
     */
    private Datum              datum;
}

3.4 DataChangeEventQueue

DataChangeEventQueue is the core of this sub-module, which is used to process multiple paths separately to increase processing power. Starting a thread inside each Queue for asynchronous processing also increases processing power.

3.4.1 Core Variables

The core here is:

  • BlockingQueue eventQueue;

  • Map<String, Map<String, ChangeData>> CHANGE_DATA_MAP_FOR_MERGE = new ConcurrentHashMap<>();

  • DelayQueue CHANGE_QUEUE = new DelayQueue();

Explain as follows:

  • You can see that the data type of operation here is ChangeData, and converting Datum to ChangeData can unify the way messages are processed or the sources.
  • EvetQueue stores dropped messages, and all message block s are on the queue, which guarantees the sequential processing of messages;
  • CHANGE_DATA_MAP_FOR_MERGE. As the name implies, it mainly deals with message merging. This stores ChangeData separately according to the dataCenter, dataInfoId as the dimension, which can be understood as a matrix Map, adding key-value pairs using the putIfAbsent method. If there is no corresponding value for that key in the map collection, it is added directly and null is returned. If there is already a corresponding value, it is still the original value. In this way, if more than one message is added to the map in a short time, the redundant messages are merged.
  • CHANGE_ The function of QUEUE is to process the dropped ChangeData uniformly, regardless of which data center the data is processed here. One thing to note here is that DelayQueue is used for delayed operations, which were mentioned in our previous business.

Specifically defined as follows:

public class DataChangeEventQueue {

    private final String                               name;

    /**
     * a block queue that stores all data change events
     */
    private final BlockingQueue<IDataChangeEvent>      eventQueue;

    private final Map<String, Map<String, ChangeData>> CHANGE_DATA_MAP_FOR_MERGE = new ConcurrentHashMap<>();

    private final DelayQueue<ChangeData>               CHANGE_QUEUE              = new DelayQueue();

    private final int                                  notifyIntervalMs;

    private final int                                  notifyTempDataIntervalMs;

    private final ReentrantLock                        lock                      = new ReentrantLock();

    private final int                                  queueIdx;

    private DataServerConfig                           dataServerConfig;

    private DataChangeEventCenter                      dataChangeEventCenter;

    private DatumCache                                 datumCache;
}

3.4.2 Startup and Engine

The DataChangeEventQueue#start method is called by a new thread when the DataChangeEventCenter initializes, which continuously retrieves new events from the queue and distributes them. The new data is then added to the node to implement fragmentation. Because eventQueue is a BlockingQueue, you can use while (true) to control it.

When the event is removed, it is based on the DataChangeScopeEnum. Different DATUM will do different processing.

  • If it is DataChangeScopeEnum.DATUM, then judge the dataChangeEvent.getSourceType();
    • If it is DataSourceTypeEnum.PUB_TEMP, then addTempChangeData, is going to CHANGE_QUEUE adds ChangeData;
    • If not, handleDatum;
  • If it is DataChangeScopeEnum.CLIENT, handleClientOff((ClientChangeEvent) event);
  • If it is DataChangeScopeEnum.SNAPSHOT, handleSnapshot((DatumSnapshotEvent) event);

The code is as follows:

public void start() {
    Executor executor = ExecutorFactory
            .newSingleThreadExecutor(String.format("%s_%s", DataChangeEventQueue.class.getSimpleName(), getName()));
    executor.execute(() -> {
        while (true) {
            try {
                IDataChangeEvent event = eventQueue.take();
                DataChangeScopeEnum scope = event.getScope();
                if (scope == DataChangeScopeEnum.DATUM) {
                    DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
                    //Temporary push data will be notify as soon as,and not merge to normal pub data;
                    if (dataChangeEvent.getSourceType() == DataSourceTypeEnum.PUB_TEMP) {
                        addTempChangeData(dataChangeEvent.getDatum(), dataChangeEvent.getChangeType(),
                                dataChangeEvent.getSourceType());
                    } else {
                        handleDatum(dataChangeEvent.getChangeType(), dataChangeEvent.getSourceType(),
                                dataChangeEvent.getDatum());
                    }
                } else if (scope == DataChangeScopeEnum.CLIENT) {
                    handleClientOff((ClientChangeEvent) event);
                } else if (scope == DataChangeScopeEnum.SNAPSHOT) {
                    handleSnapshot((DatumSnapshotEvent) event);
                }
            } 
        }
    });
}

Specifically as follows:

      +----------------------------+
      |   DataChangeEventCenter    |
      |                            |
      | +-----------------------+  |
      | | DataChangeEventQueue[]|  |
      | +-----------------------+  |
      +----------------------------+
                   |
                   |
                   v
+------------------+------------------------+
|          DataChangeEventQueue             |
|                                           |
| +---------------------------------------+ |
| |                                       | |
| |    BlockingQueue<IDataChangeEvent> +-------------+
| |                                       | |        |
| |                                       | |      +-v---------+
| | Map<String, Map<String, ChangeData<>  | | <--> |           |
| |                                       | |      | Executor  |
| |                                       | |      |           |
| |         start +------------------------------> |           |
| |                                       | |      +-+---------+
| |                                       | |        |
| |      DelayQueue<ChangeData>  <-------------------+
| |                                       | |
| +---------------------------------------+ |
+-------------------------------------------+

3.4.3 ChangeData

handleDatum is specifically handled by converting Datum to ChangeData.

Why convert to ChangeData for storage?

Because there are different types of messages, whether they are processed or originated. For example, in NotifyFetchDatumHandler. In the fetchDatum function, the Datum is fetched from another data server, and then a message is dropped to the dataChangeEventCenter based on the Datum to inform this Data Server of the BACKUP operation, which is of type COVER.

Converting to ChangeData unifies the way messages are processed or the source.

The user stores a message containing datum.

dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, DataSourceTypeEnum.BACKUP, datum);

The DataChangeEventQueue takes the Datum from the DataChangeEvent and converts it to ChangeData for storage.

private void handleDatum(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType,
                         Datum targetDatum) {
            //get changed datum
            ChangeData changeData = getChangeData(targetDatum.getDataCenter(),
                targetDatum.getDataInfoId(), sourceType, changeType);
            Datum cacheDatum = changeData.getDatum();
            if (changeType == DataChangeTypeEnum.COVER || cacheDatum == null) {
                changeData.setDatum(targetDatum);
            } 
}

ChangeData is defined as follows:

public class ChangeData implements Delayed {

    /** data changed */
    private Datum              datum;

    /** change time */
    private Long               gmtCreate;

    /** timeout */
    private long               timeout;

    private DataSourceTypeEnum sourceType;

    private DataChangeTypeEnum changeType;
}

3.4.4 Processing Datum

3.4.4.1 Join Datum

Here is the actual ChangeData cache and the newly added Atum.

  • First from CHANGE_DATA_MAP_FOR_MERGE retrieves the ChangeData of previously stored changes, and if it does not, generates a join, which prepares for possible subsequent merges;
  • After getting ChangeData
    • If the change type is COVER, the original cache will be overwritten. ChangeeData. SetDatum (targetDatum);
    • Otherwise, MERGE updates the new Datum in the cache that needs to be updated, and updates the version number.

Specifically as follows:

private void handleDatum(DataChangeTypeEnum changeType, DataSourceTypeEnum sourceType,
                         Datum targetDatum) {
    lock.lock();
    try {
        //get changed datum
        ChangeData changeData = getChangeData(targetDatum.getDataCenter(),
            targetDatum.getDataInfoId(), sourceType, changeType);
        Datum cacheDatum = changeData.getDatum();
        if (changeType == DataChangeTypeEnum.COVER || cacheDatum == null) {
            changeData.setDatum(targetDatum);
        } else {
            Map<String, Publisher> targetPubMap = targetDatum.getPubMap();
            Map<String, Publisher> cachePubMap = cacheDatum.getPubMap();
            for (Publisher pub : targetPubMap.values()) {
                String registerId = pub.getRegisterId();
                Publisher cachePub = cachePubMap.get(registerId);
                if (cachePub != null) {
                    // if the registerTimestamp of cachePub is greater than the registerTimestamp of pub, it means
                    // that pub is not the newest data, should be ignored
                    if (pub.getRegisterTimestamp() < cachePub.getRegisterTimestamp()) {
                        continue;
                    }
                    // if pub and cachePub both are publisher, and sourceAddress of both are equal,
                    // and version of cachePub is greater than version of pub, should be ignored
                    if (!(pub instanceof UnPublisher) && !(cachePub instanceof UnPublisher)
                        && pub.getSourceAddress().equals(cachePub.getSourceAddress())
                        && cachePub.getVersion() > pub.getVersion()) {
                        continue;
                    }
                }
                cachePubMap.put(registerId, pub);
                cacheDatum.setVersion(targetDatum.getVersion());
            }
        }
    } finally {
        lock.unlock();
    }
}
3.4.4.2 Presentation of Datum

When extracting, use the take function from CHANGE_QUEUE and CHANGE_DATA_MAP_FOR_MERGE proposes ChangeData.

public ChangeData take() throws InterruptedException {
    ChangeData changeData = CHANGE_QUEUE.take();
    lock.lock();
    try {
        removeMapForMerge(changeData);
        return changeData;
    } finally {
        lock.unlock();
    }
}

Specifically extracting Datum will occur in the DataChangeHandler.

3.5 DataChangeHandler

The DataChangeHandler periodically extracts messages from the DataChangeEventCenter and processes them. The main function is to execute the ChangeNotifier to notify the relevant module: hi, there's new data coming and the brothers get up.

3.5.1 Class Definition

public class DataChangeHandler {

    @Autowired
    private DataServerConfig          dataServerConfig;

    @Autowired
    private DataChangeEventCenter     dataChangeEventCenter;

    @Autowired
    private DatumCache                datumCache;

    @Resource
    private List<IDataChangeNotifier> dataChangeNotifiers;
}

3.5.2 Execution Engine ChangeNotifier

The DataChangeHandler iterates through all DataChangeEventQueues in the DataChangeEventCenter, then takes ChangeData out of the DataChangeEventQueue and generates a ChangeNotifier for each ChangeData.

Each ChangeNotifier is a processing thread.

Each dataChangeEventQueue generates five ChangeNotifier s.

@PostConstruct
public void start() {
    DataChangeEventQueue[] queues = dataChangeEventCenter.getQueues();
    int queueCount = queues.length;
    Executor executor = ExecutorFactory.newFixedThreadPool(queueCount, DataChangeHandler.class.getSimpleName());
    Executor notifyExecutor = ExecutorFactory
            .newFixedThreadPool(dataServerConfig.getQueueCount() * 5, this.getClass().getSimpleName());
  
    for (int idx = 0; idx < queueCount; idx++) {
        final DataChangeEventQueue dataChangeEventQueue = queues[idx];
        final String name = dataChangeEventQueue.getName();
        executor.execute(() -> {
            while (true) {
                 final ChangeData changeData = dataChangeEventQueue.take();
                 notifyExecutor.execute(new ChangeNotifier(changeData, name));
            }
        });
    }
}

3.5.3 Notify

Let's review our business:

When a data publisher goes offline, either publishDataProcessor or unPublishDataHandler is triggered, and Handler adds a data change event to the dataChangeEventCenter to asynchronously notify the Event Change Center of changes in data. When the Event Change Center receives the event, it adds the event to the queue. At this point, the dataChangeEventCenter asynchronously processes the downline and downline data according to different event types.

For ChangeData, a ChangeNotifier is generated for processing. This event change information will be published through ChangeNotifier to notify other nodes of data synchronization.

private class ChangeNotifier implements Runnable {

    private ChangeData changeData;
    private String     name;

    @Override
    public void run() {
        if (changeData instanceof SnapshotData) {
           ......
        } else {
            Datum datum = changeData.getDatum();

            String dataCenter = datum.getDataCenter();
            String dataInfoId = datum.getDataInfoId();
            DataSourceTypeEnum sourceType = changeData.getSourceType();
            DataChangeTypeEnum changeType = changeData.getChangeType();

            if (changeType == DataChangeTypeEnum.MERGE
                && sourceType != DataSourceTypeEnum.BACKUP
                && sourceType != DataSourceTypeEnum.SYNC) {
                //update version for pub or unPub merge to cache
                //if the version product before merge to cache,it may be cause small version override big one
                datum.updateVersion();
            }

            long version = datum.getVersion();

            try {
                if (sourceType == DataSourceTypeEnum.CLEAN) {
                    if (datumCache.cleanDatum(dataCenter, dataInfoId)) {
                      ......
                    }

                } else if (sourceType == DataSourceTypeEnum.PUB_TEMP) {
                    notifyTempPub(datum, sourceType, changeType);
                } else {
                    MergeResult mergeResult = datumCache.putDatum(changeType, datum);
                    Long lastVersion = mergeResult.getLastVersion();

                    if (lastVersion != null
                        && lastVersion.longValue() == LocalDatumStorage.ERROR_DATUM_VERSION) {
                        return;
                    }

                    //lastVersion null means first add datum
                    if (lastVersion == null || version != lastVersion) {
                        if (mergeResult.isChangeFlag()) {
                            notify(datum, sourceType, lastVersion);
                        }
                    }
                }
            } 
        }

    }
}

The notify function traverses dataChangeNotifiers

private void notify(Datum datum, DataSourceTypeEnum sourceType, Long lastVersion) {
    for (IDataChangeNotifier notifier : dataChangeNotifiers) {
        if (notifier.getSuitableSource().contains(sourceType)) {
            notifier.notify(datum, lastVersion);
        }
    }
}

The corresponding beans are:

@Bean(name = "dataChangeNotifiers")
public List<IDataChangeNotifier> dataChangeNotifiers() {
    List<IDataChangeNotifier> list = new ArrayList<>();
    list.add(sessionServerNotifier());
    list.add(tempPublisherNotifier());
    list.add(backUpNotifier());
    return list;
}

As to how to handle notifications, we will write a subsequent article on how to handle them.

At this point, the overall logic of the DataChangeEventCenter is shown in the following figure

                +----------------------------+
                |   DataChangeEventCenter    |
                |                            |
                | +-----------------------+  |
                | | DataChangeEventQueue[]|  |
                | +-----------------------+  |
                +----------------------------+
                             |
                             |
                             v
          +------------------+------------------------+
          |          DataChangeEventQueue             |
          |                                           |
          | +---------------------------------------+ |
          | |                                       | |
          | |    BlockingQueue<IDataChangeEvent> +-------------+
          | |                                       | |        |
          | |                                       | |      +-v---------+
          | | Map<String, Map<String, ChangeData<>  | | <--> |           |
          | |                                       | |      | Executor  |
          | |                                       | |      |           |
          | |         start +------------------------------> |           |
          | |                                       | |      +-+---------+
          | |                                       | |        |
+----------------+ DelayQueue<ChangeData>  <-------------------+
|         | |                                       | |
|         | +---------------------------------------+ |
|         +-------------------------------------------+
|
|
|         +--------------------------+
|  take   |                          |    notify   +-------------------+
+-------> |    DataChangeHandler     | +---------> |dataChangeNotifiers|
          |                          |             +-------------------+
          +--------------------------+

The mobile phone is as follows:

0x04 conclusion

Because of its unique business scenario, Ali separated the DataChangeEventCenter to meet the following business needs. If you have similar needs in your actual work, you can refer to them for reference, and the specific treatment is as follows:

  • Need to improve processing power, parallel processing;
    • Using queue arrays, each Queue can process messages and increase processing power.
  • Processing messages asynchronously;
    • Start a thread inside each Queue for asynchronous processing;
  • The message order needs to be guaranteed;
    • EvetQueue stores dropped messages, and all message block s are on the queue, which guarantees the sequential processing of messages;
  • Delayed operation;
    • DelayQueue is used for delayed operations;
  • Merge operation is required, that is, there will be multiple notifications in a short period of time, do not need to process one by one, only process the last one;
    • Use the putIfAbsent method to add key-value pairs. If there is no corresponding value for that key in the map collection, add it directly and return null. If a corresponding value already exists, it remains the same. In this way, if more than one message is added to the map in a short time, the redundant messages are merged.

0xFF Reference

EventBus Analysis in Guava

Thoughts on Life and Technology

WeChat Public Account: Rosie's Thoughts

If you want to get a timely message for an individual to write an article, or if you want to see the technical information recommended by the individual, you can scan the QR code below (or press the QR code for identification) to focus on the personal public number.

Tags: Microservices

Posted by jnuneznyc on Tue, 03 May 2022 21:08:14 +0300