rocketMQ learning notes -- message reading, writing and storage structure

In rocketMQ, messages are stored in the hard disk. commitLog and consumeQueue are a queue in the concept of structure, but in the specific implementation, they are stored in a multi file structure

The default size of commitLog file is 1g, and the default size of consumequeue file is 600w bytes, that is, 30w entries can be stored

    public int getMappedFileSizeConsumeQueue() {
		// mappedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE
        // ConsumeQueue.CQ_STORE_UNIT_SIZE = 20
        int factor = (int) Math.ceil(this.mappedFileSizeConsumeQueue / (ConsumeQueue.CQ_STORE_UNIT_SIZE * 1.0));
        return (int) (factor * ConsumeQueue.CQ_STORE_UNIT_SIZE);
    }

commitLog

First, take a look at the file storage structure of commitLog

There is a load method in the CommitLog class, which loads mappedFileQueue, that is, the logical queue of rocketMQ

    public boolean load() {
        boolean result = this.mappedFileQueue.load();
        log.info("load commit log " + (result ? "OK" : "Failed"));
        return result;
    }

In the load method of mappedFileQueue:

  1. Read root folder
  2. Traversal file sorting
  3. Use MappedFile class to read the files in the folder and create fileChannel
    private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
        + File.separator + "commitlog";
    // CommitLog file size,default is 1G
    private int mappedFileSizeCommitLog = 1024 * 1024 * 1024;

The mappedFile object is the operation object of commitLog on the hard disk

In the consumption process, subscribe to topic in the consumption process, obtain the offset from the consumption queue under topic, and then obtain the message from the commitLog according to the offset

    public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
        // The size here is 1g of the default configuration
        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
        // Find the specific mappedFile through offset and mappedFileQueue
        // The logic here is offset / size. Find out which file the message exists in
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
        if (mappedFile != null) {
            // Calculate read position
            int pos = (int) (offset % mappedFileSize);
            // Here is the location
            // The internal processing logic is set. The buffer size is all the data after pos
            SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
            return result;
        }

        return null;
    }

After obtaining the buffer, you can read the number of bits to obtain the length of the message, and then get the content of the message body

Let's first look at the structure of a message

		   // 1 TOTALSIZE
            this.msgStoreItemMemory.putInt(msgLen);
            // 2 MAGICCODE
            this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
            // 3 BODYCRC calibration parameters
            this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
            // 4 QUEUEID
            this.msgStoreItemMemory.putInt(msgInner.getQueueId());
            // 5 FLAG
            this.msgStoreItemMemory.putInt(msgInner.getFlag());
            // 6 QUEUEOFFSET
            this.msgStoreItemMemory.putLong(queueOffset);
            // 7 PHYSICALOFFSET
            this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
            // 8 SYSFLAG
            this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
            // 9 BORNTIMESTAMP
            this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
            // 10 BORNHOST
            this.resetByteBuffer(bornHostHolder, bornHostLength);
            this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
            // 11 STORETIMESTAMP
            this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
            // 12 STOREHOSTADDRESS
            this.resetByteBuffer(storeHostHolder, storeHostLength);
            this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
            // 13 RECONSUMETIMES
            this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
            // 14 Prepared Transaction Offset
            this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
            // 15 BODY
            this.msgStoreItemMemory.putInt(bodyLength);
            if (bodyLength > 0)
                this.msgStoreItemMemory.put(msgInner.getBody());
            // 16 TOPIC
            this.msgStoreItemMemory.put((byte) topicLength);
            this.msgStoreItemMemory.put(topicData);
            // 17 PROPERTIES
            this.msgStoreItemMemory.putShort((short) propertiesLength);

When putMessage, the above 17 parts will be processed and stored in the commitLog, so after we get the initial offset

The first int data is the length of the entire message

It can be seen that the getData method only returns a buffer from the beginning of offset to the end of the file or the last write position according to offset

Finally, the logic to get the message body is handled by the caller

ReputMessageService.doReput

When the message cannot be pulled, you can wait for the next message pull. At the same time, the server also supports the long polling mode. If a message pull request does not pull the message, the Broker is allowed to wait for 30s. As long as a new message arrives within this time, it will be directly returned to the consumer. Here, the specific method of RocketMQ is to use the background service thread on the Broker side - ReputMessageService to continuously distribute requests and asynchronously build ConsumeQueue (logical consumption queue) and IndexFile (index file) data.

In the official documents on github, we can know that the notification in the implementation of long polling is sent when the consumeQueue is built, and the construction of consumption queue is completed by using the timed task ReputMessageService

The doReput method is the core method. Here mq we do several things:

  1. The reputFromOffset is set when starting the messageStore. It is recovered according to the maximum offset of consumeQueue in the hard disk. If the offset does not exist, it is set to the minimum offset of commitLog
  2. As long as the reputFromOffset is less than the maximum offset in the commitLog, the consumeQueue will be built all the time
  3. After the above judgment and settings are completed, call the getData method of commitLog, which is described above
  4. Read the buffer information returned by 3. Since there is more than one piece of data in the buffer, the message is read cyclically according to the size of the buffer, and the offset of the message is stored in consumeQueue
  5. After saving, if the master starts long polling, it will notify holdService to start consumption
  6. reputFromOffset + the length of a currently processed message to obtain a new reputFromOffset

Asynchronously constructing consumeQueue can even be described as simple. It only needs to obtain offset through the agreed message structure, build size and tag into a 20 byte entry, and then brush it into the hard disk

consumeQueue

The file storage structure of consumeQueue is the same as commitLog, but consumeQueue is classified according to topic. There can be multiple consumequeues under a topic, which can be distinguished by using queueId as the file name. The same as commitLog is that there are multiple files under the queueId folder to form a logical consumeQueue

consumeQueue requests offset from the broker through queueId and topic

    public long queryOffset(final String group, final String topic, final int queueId) {
        // topic@group
        String key = topic + TOPIC_GROUP_SEPARATOR + group;
        // Get map through spliced key
        ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
        if (null != map) {
            // The latest offset is stored in the map
            Long offset = map.get(queueId);
            if (offset != null)
                return offset;
        }

        return -1;
    }

If the key does not exist in the offsetTable, - 1 will be returned

After the ConsumerManageProcessor obtains - 1, it will conclude that the consumeQueue is consumed for the first time, so obtain the minimum offset of the queue and read the first message in the queue

Here's why not getting the key in the offsetTable can prove that the queue is consumed for the first time

        if (storeOffsetEnable) {
            // Update consumption offset after consumption
            this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
                requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
        }

At the end of the consumption logic, if the consumption is completed, the offset of the consumption will be updated, and the commitOffset method is called

    private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
        // Get the map to which the offset belongs
        ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
        if (null == map) {
            map = new ConcurrentHashMap<Integer, Long>(32);
            map.put(queueId, offset);
            this.offsetTable.put(key, map);
        } else {
            Long storeOffset = map.put(queueId, offset);
            if (storeOffset != null && offset < storeOffset) {
                log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
            }
        }
    }

In this method, we can see that if the key does not exist in the offsetTable, the key will be created. The offset passed in here is the initial offset of the next message

So far, the mission of consumeQueue has been completed. consumeQueue provides offset to the consumer, and the consumer provides offset to the broker. The message is taken out from the commitLog according to the offset and returned to the consumer

summary

rocketMQ tries to ensure sequential writing in the commitLog, and the commitLog builds a consumeQueue. Consumers read sequentially in the consumeQueue, so the reading speed is close to that of memory

The reading of the commitLog is indeed random, but since the offset s in the consumeQueue are actually ordered, the random reading of the commitLog will have an impact on the performance

PageCache is the OS Cache of files, which is used to speed up the reading and writing of files. Generally speaking, the speed of sequential reading and writing of files by the program is almost close to that of memory. The main reason is that the OS uses PageCache mechanism to optimize the performance of reading and writing access operations, and uses part of the memory as PageCache. For data writing, the OS will first write to the Cache, and then the pdflush kernel thread will brush the data in the Cache to the physical disk in an asynchronous manner. For data reading, if the PageCache is missed when reading a file, the OS will pre read the data files of other adjacent blocks while accessing the read file from the physical disk.

In RocketMQ, the ConsumeQueue logical consumption queue stores less data and is read sequentially. Under the pre reading function of the page cache mechanism, the reading performance of the ConsumeQueue file is almost close to that of the read memory, and it will not affect the performance even when there is message accumulation. For the log data file stored in the CommitLog message, there will be more random access when reading the message content, which will seriously affect the performance. If an appropriate system IO scheduling algorithm is selected, such as setting the scheduling algorithm to "Deadline" (if SSD is used for block storage at this time), the performance of random reading will also be improved.

In addition, RocketMQ mainly reads and writes files through MappedByteBuffer. Among them, the FileChannel model in NIO is used to directly map the physical files on the disk to the memory address of the user state (this Mmap method reduces the performance overhead of traditional IO copying the disk file data back and forth between the buffer of the operating system kernel address space and the buffer of the user application address space), and the operation of the file is transformed into direct operation of the memory address, This greatly improves the reading and writing efficiency of files (just because the memory mapping mechanism needs to be used, the file storage of RocketMQ uses the fixed length structure to store, which is convenient to map the whole file to memory at one time).

Summarize the file structure

-commitlog

- 00000000000000 (file name 20 digits)

​ -00000000001073741824

-consumequeue

​ -topic1

​ -queue1

​ -00000000000000000000

​ -00000000000005997854

​ -queue2

​ -topic2

Tags: Java

Posted by scarhand on Sun, 08 May 2022 21:21:03 +0300