The most complete RocketMQ learning guide, which is the necessary middleware skills for programmers

1, Introduction

RocketMq is a message oriented middleware developed by Alibaba and donated to Apache. It is introduced on the official website as follows:

Apache RocketMQ™ is a unified messaging engine, lightweight data processing platform.

RocketMQ is a unified message processing engine and a lightweight data processing platform.

  • Low delay. Under high voltage, the response delay within 1 millisecond is more than 99.6%.
  • Highly available, with tracking and audit functions
  • Trillion level message capacity guarantee
  • Since the latest version 4.1, new open distributed messaging and streaming media standards have been used
  • Batch transmission and multi-functional integration to improve throughput
  • If there is enough space, you can save without losing performance

2, Basic concepts

1. Message Model

RocketMQ is mainly composed of producer, Broker and Consumer. Producer is responsible for producing messages, Consumer is responsible for consuming messages and Broker is responsible for storing messages. The Broker corresponds to a server in the actual deployment process. Each Broker can store messages of multiple topics, and the messages of each Topic can also be stored in different brokers in pieces. Message Queue is used to store the physical address of the message. The message address in each Topic is stored in multiple message queues. The ConsumerGroup consists of multiple Consumer instances.

2. Producer message

It is responsible for production messages. Generally, the business system is responsible for production messages. A message producer will send the messages generated in the business application system to the broker server. RocketMQ provides a variety of sending methods, including synchronous sending, asynchronous sending, sequential sending and one-way sending. Both synchronous and asynchronous methods require the broker to return confirmation information, and one-way transmission is not required.

3. Message Consumer

It is responsible for consumption messages. Generally, the background system is responsible for asynchronous consumption. A message consumer pulls messages from the Broker server and provides them to the application. From the perspective of user application, it provides two consumption forms: pull consumption and push consumption.

4. Topic

Represents a collection of messages. Each topic contains several messages. Each message can only belong to one topic. It is the basic unit for RocketMQ message subscription.

5. Proxy server

The message relay role is responsible for storing and forwarding messages. In the RocketMQ system, the proxy server is responsible for receiving and storing messages sent from producers and preparing for consumers' pull requests. The proxy server also stores metadata related to messages, including consumer groups, consumption progress offsets, and topic and queue messages.

6. Name Server

The name service acts as a provider for routing messages. Producers or consumers can find the corresponding Broker IP list of each topic through the name service. Multiple Namesrv instances form a cluster, but they are independent of each other without information exchange.

7. Pull Consumer

A type of Consumer consumption. The application usually actively calls the Consumer's pull message method to pull messages from the Broker server. The initiative is controlled by the application. Once the batch message is obtained, the application will start the consumption process.

8. Consumer push

A type of Consumer consumption. In this mode, the Broker will actively push the data to the Consumer after receiving it. This consumption mode generally has high real-time performance.

9. Producer Group

A collection of producers of the same type, which send messages of the same type with the same sending logic. If a transaction message is sent and the original Producer crashes after sending, the Broker server will contact other Producer instances in the same Producer group to commit or backtrack consumption.

10. Consumer Group

A collection of consumers of the same type, which usually consume the same type of messages and have the same consumption logic. Consumer groups make it very easy to achieve the goals of load balancing and fault tolerance in message consumption. Note that the consumer instances of the consumer group must subscribe to exactly the same Topic. RocketMQ supports two message modes: Clustering and Broadcasting.

11. Clustering

In cluster consumption mode, each Consumer instance of the same Consumer Group allocates messages equally.

12. Broadcasting consumption

In broadcast consumption mode, each Consumer instance of the same Consumer Group receives a full amount of messages.

13. Normal Ordered Message

Under the normal sequential consumption mode, the messages received by consumers through the same consumption queue are in order, while the messages received by different message queues may be out of order.

14. Strictly Ordered Message

In the strictly sequential message mode, all messages received by consumers are sequential.

15. Message

The physical carrier of information transmitted by the message system is the smallest unit of production and consumption data. Each message must belong to a subject. Each message in RocketMQ has a unique Message ID and can carry a Key with a business ID. The system provides the function of querying messages through Message ID and Key.

16. Tag

The flag set for messages is used to distinguish different types of messages under the same topic. Messages from the same business unit can set different labels under the same subject according to different business purposes. Tags can effectively maintain the clarity and consistency of the code and optimize the query system provided by RocketMQ. Consumers can realize different consumption logic for different sub themes according to Tag to achieve better scalability.

3, Architecture design

1. Technical architecture

RocketMQ architecture is mainly divided into four parts, as shown in the figure above:

  • Producer: the role of message publishing, which supports distributed cluster deployment. Producer selects the corresponding Broker cluster queue for message delivery through the load balancing module of MQ. The delivery process supports fast failure and low delay.

  • Consumer: the role of message consumption, which supports distributed cluster deployment. It supports the consumption of messages in push and pull modes. At the same time, it also supports the consumption of cluster mode and broadcast mode. It provides a real-time message subscription mechanism, which can meet the needs of most users.

  • NameServer: NameServer is a very simple Topic routing registry. Its role is similar to zookeeper in Dubbo. It supports dynamic registration and discovery of brokers. It mainly includes two functions: Broker management. NameServer accepts the registration information of Broker cluster and saves it as the basic data of routing information. Then provide heartbeat detection mechanism to check whether the Broker is still alive; For routing information management, each NameServer will store the entire routing information about the Broker cluster and the queue information for client queries. Then Producer and conductor can know the routing information of the whole Broker cluster through NameServer, so as to deliver and consume messages. NameServer is usually deployed in the form of cluster, and the instances do not communicate with each other. The Broker registers its own routing information with each NameServer, so a complete routing information is saved on each NameServer instance. When a NameServer goes offline for some reason, the Broker can still synchronize its routing information with other nameservers, and the Producer and consumer can still dynamically perceive the routing information of the Broker.

  • Broker server: broker is mainly responsible for the storage, delivery and query of messages and the guarantee of high availability of services. In order to realize these functions, broker includes the following important sub modules.

  1. Remoting Module: the entity of the whole Broker, which is responsible for processing requests from clients.
  2. Client Manager: responsible for managing the client (Producer/Consumer) and maintaining the Topic subscription information of the Consumer
  3. Store Service: provides a convenient and simple API interface to process messages, store them to physical hard disk and query.
  4. HA Service: high availability service, which provides data synchronization between Master Broker and Slave Broker.
  5. Index Service: index the messages delivered to the Broker according to the specific Message key to provide quick query of messages.

2. Deployment architecture

RocketMQ network deployment features

  • NameServer is an almost stateless node that can be deployed in clusters without any information synchronization between nodes.

  • Broker deployment is relatively complex. Brokers are divided into Master and Slave. A Master can correspond to multiple Slave, but a Slave can only correspond to one Master. The corresponding relationship between Master and Slave is defined by specifying the same BrokerName and different brokerids. A BrokerId of 0 indicates a Master and a non-0 indicates a Slave. The Master can also deploy multiple. Each broker establishes long-term connections with all nodes in the NameServer cluster, and regularly registers Topic information to all nameservers. Note: the current version of RocketMQ supports more than one Master Slave in the deployment architecture, but only the Slave server with BrokerId=1 will participate in the message reading load.

  • Producer establishes a long connection with one of the nodes (randomly selected) in the NameServer cluster, regularly obtains Topic routing information from the NameServer, establishes a long connection to the Master providing Topic services, and regularly sends heartbeat to the Master. Producer is completely stateless and can be deployed in a cluster.

  • The Consumer establishes a long connection with one of the nodes (randomly selected) in the NameServer cluster, regularly obtains Topic routing information from the NameServer, establishes a long connection to the Master and Slave providing Topic services, and regularly sends heartbeat to the Master and Slave. Consumers can subscribe to messages from both the Master and Slave. When consumers pull messages from the Master, the Master server will suggest whether to pull messages from the Master or Slave next time according to the distance between the pull offset and the maximum offset (judge whether to read old messages and generate read I/O), whether the Slave server can read and other factors.

Describe the cluster workflow in combination with the deployment architecture diagram:

  • Start the NameServer. After the NameServer is up, listen to the port and wait for the Broker, Producer and Consumer to connect. It is equivalent to a routing control center.
  • The Broker starts, maintains a long connection with all nameservers, and sends heartbeat packets regularly. The heartbeat packet contains the current Broker information (IP + port, etc.) and all Topic information stored. After successful registration, there is a mapping relationship between Topic and Broker in the NameServer cluster.
  • Before sending and receiving messages, you should first create a Topic. When creating a Topic, you need to specify which brokers the Topic will be stored on. You can also create a Topic automatically when sending messages.
  • Producer sends a message. When starting, it first establishes a long connection with one of the NameServer clusters, obtains the brokers of the currently sent topics from the NameServer, polls, selects a queue from the queue list, and then establishes a long connection with the Broker where the queue is located, so as to send a message to the Broker.
  • Similar to Producer, Consumer establishes a long connection with one of the nameservers, obtains which brokers the current subscription Topic exists on, and then directly establishes a connection channel with the Broker to start consuming messages.

4, Examples

1. Basic sample

In the basic example, we provide the following function scenarios:

  • Use RocketMQ to send three types of messages: synchronous message, asynchronous message and one-way message. The first two messages are reliable because there will be a response whether the transmission is successful or not.
  • Use RocketMQ to consume the received messages.

1.1 adding dependencies:

maven:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.0</version>
</dependency>

gradle:

compile 'org.apache.rocketmq:rocketmq-client:4.3.0'

1.2 message sending

1. The Producer sends a synchronization message
This reliable synchronous sending method is widely used, such as important message notification and short message notification.

public class SyncProducer {
	public static void main(String[] args) throws Exception {
    	// Instantiate message Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    	// Set the address of NameServer
    	producer.setNamesrvAddr("localhost:9876");
    	// Start Producer instance
        producer.start();
    	for (int i = 0; i < 100; i++) {
    	    // Create a message and specify Topic, Tag and message body
    	    Message msg = new Message("TopicTest" /* Topic */,
        	"TagA" /* Tag */,
        	("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        	);
        	// Send messages to a Broker
            SendResult sendResult = producer.send(msg);
            // Whether the message is successfully delivered is returned through sendResult
            System.out.printf("%s%n", sendResult);
    	}
    	// If you no longer send messages, close the Producer instance.
    	producer.shutdown();
    }
}

2. Send asynchronous message
Asynchronous messages are usually used in business scenarios that are sensitive to response time, that is, the sender cannot tolerate waiting for the Broker's response for a long time.

public class AsyncProducer {
	public static void main(String[] args) throws Exception {
    	// Instantiate message Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    	// Set the address of NameServer
        producer.setNamesrvAddr("localhost:9876");
    	// Start Producer instance
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
	
	int messageCount = 100;
        // Instantiate the countdown calculator based on the number of messages
	final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
    	for (int i = 0; i < messageCount; i++) {
                final int index = i;
            	// Create a message and specify Topic, Tag and message body
                Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // SendCallback receives a callback that returns results asynchronously
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.printf("%-10d OK %s %n", index,
                            sendResult.getMsgId());
                    }
                    @Override
                    public void onException(Throwable e) {
      	              System.out.printf("%-10d Exception %s %n", index, e);
      	              e.printStackTrace();
                    }
            	});
    	}
	// Wait for 5s
	countDownLatch.await(5, TimeUnit.SECONDS);
    	// If you no longer send messages, close the Producer instance.
    	producer.shutdown();
    }
}

3. One way send message
This method is mainly used in scenarios that do not particularly care about sending results, such as log sending.

public class OnewayProducer {
	public static void main(String[] args) throws Exception{
    	// Instantiate message Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    	// Set the address of NameServer
        producer.setNamesrvAddr("localhost:9876");
    	// Start Producer instance
        producer.start();
    	for (int i = 0; i < 100; i++) {
        	// Create a message and specify Topic, Tag and message body
        	Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        	);
        	// Send a one-way message without returning any results
        	producer.sendOneway(msg);

    	}
    	// If you no longer send messages, close the Producer instance.
    	producer.shutdown();
    }
}

1.3 consumption news

public class Consumer {

	public static void main(String[] args) throws InterruptedException, MQClientException {

    	// Instantiate consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

    	// Set the address of NameServer
        consumer.setNamesrvAddr("localhost:9876");

    	// Subscribe to one or more topics and tags to filter messages that need to be consumed
        consumer.subscribe("TopicTest", "*");
    	// Register the callback implementation class to handle the messages pulled back from the broker
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // Mark that the message has been successfully consumed
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // Start consumer instance
        consumer.start();
        System.out.printf("Consumer Started.%n");
	}
}

1.4 sending delay message

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class ScheduledMessageProducer {
   public static void main(String[] args) throws Exception {
      // Instantiate a producer to generate a delayed message
      DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
      // Start producer
      producer.start();
      int totalMessagesToSend = 100;
      for (int i = 0; i < totalMessagesToSend; i++) {
          Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
          // Set delay Level 3, and this message will be sent after 10s (now only supports a fixed number of times, see delaytimelevel for details)
          message.setDelayTimeLevel(3);
          // send message
          producer.send(message);
      }
       // Shut down producer
      producer.shutdown();
  }
}

1.5 batch messages

Sending messages in bulk can significantly improve the performance of delivering small messages. The limitation is that these batch messages should have the same topic, the same waitStoreMsgOK, and cannot be delayed messages. In addition, the total size of this batch of messages should not exceed 4MB.

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
   producer.send(messages);
} catch (Exception e) {
   e.printStackTrace();
   //Processing error
}

The complexity increases only when you send a large batch, and you may not be sure whether it exceeds the size limit (4MB). At this time, you'd better split your message list:

public class ListSplitter implements Iterator<List<Message>> { 
    private final int SIZE_LIMIT = 1024 * 1024 * 4;
    private final List<Message> messages;
    private int currIndex;
    public ListSplitter(List<Message> messages) { 
        this.messages = messages;
    }
    @Override public boolean hasNext() {
        return currIndex < messages.size(); 
    }
    @Override public List<Message> next() { 
        int startIndex = getStartIndex();
        int nextIndex = startIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex); 
            int tmpSize = calcMessageSize(message);
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break; 
            } else {
                totalSize += tmpSize; 
            }
        }
        List<Message> subList = messages.subList(startIndex, nextIndex); 
        currIndex = nextIndex;
        return subList;
    }
    private int getStartIndex() {
        Message currMessage = messages.get(currIndex); 
        int tmpSize = calcMessageSize(currMessage); 
        while(tmpSize > SIZE_LIMIT) {
            currIndex += 1;
            Message message = messages.get(curIndex); 
            tmpSize = calcMessageSize(message);
        }
        return currIndex; 
    }
    private int calcMessageSize(Message message) {
        int tmpSize = message.getTopic().length() + message.getBody().length(); 
        Map<String, String> properties = message.getProperties();
        for (Map.Entry<String, String> entry : properties.entrySet()) {
            tmpSize += entry.getKey().length() + entry.getValue().length(); 
        }
        tmpSize = tmpSize + 20; // Increase the overhead of log by 20 bytes
        return tmpSize; 
    }
}
//Split a big message into several small messages
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
  try {
      List<Message>  listItem = splitter.next();
      producer.send(listItem);
  } catch (Exception e) {
      e.printStackTrace();
      //Processing error
  }
}

1.6 filtering messages

In most cases, TAG is a simple and useful design that allows you to select the messages you want. For example:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

The consumer will receive a message containing TAGA or TAGB or TAGC. However, the limitation is that a message can only have one label, which may not work in complex scenarios. In this case, you can filter messages using SQL expressions. The SQL feature can be calculated by the properties when sending the message. Some simple logic can be implemented under the syntax defined by RocketMQ. Here is an example:

------------
| message  |
|----------|  a > 5 AND b = 'abc'
| a = 10   |  --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message  |
|----------|   a > 5 AND b = 'abc'
| a = 1    |  --------------------> Missed
| b = 'abc'|
| c = true |
------------

1 basic grammar

RocketMQ only defines some basic syntax to support this feature. You can also easily expand it.

Numerical comparison, such as:>,>=,<,<=,BETWEEN,=;
Character comparison, such as:=,<>,IN;
IS NULL perhaps IS NOT NULL;
Logical symbols AND,OR,NOT;
Constant support types are:

Value, for example: 123, 3.1415;
Characters, such as:'abc',Must be wrapped in single quotation marks;
NULL,Special constants
 Boolean, TRUE or FALSE

Only consumers using push mode can use sql statements of SQL92 standard. The interfaces are as follows:

public void subscribe(finalString topic, final MessageSelector messageSelector)

2. Producer sample
When sending a message, you can set the properties of the message through putUserProperty

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
   tag,
   ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);

producer.shutdown();

3. Consumer sample
Use messageselector Bysql to filter messages using sql

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// Only subscribed messages have this attribute a, a > = 0 and a < = 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
   @Override
   public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
   }
});
consumer.start();

5, Cluster deployment

1. Cluster construction

1.1 single Master mode

This method is risky. Once the Broker restarts or goes down, the whole service will be unavailable. It is not recommended to use in online environment and can be used for local testing.

1) Start NameServer

### Start the Name Server first
$ nohup sh mqnamesrv &
 
### Verify that the Name Server started successfully
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

2) Start Broker

### Start Broker
$ nohup sh bin/mqbroker -n localhost:9876 &

### Verify whether the Name Server is started successfully. For example, the IP address of the Broker is 192.168.1.2 and the name is broker-a
$ tail -f ~/logs/rocketmqlogs/Broker.log 
The broker[broker-a, 192.169.1.2:10911] boot success...

1.2 multi Master mode

A cluster has no Slave but all masters, such as two or three masters. The advantages and disadvantages of this mode are as follows:

  • Advantages: the configuration is simple, and the downtime or restart maintenance of a single Master has no impact on the application. When the disk is configured as RAID10, even if the machine downtime is unrecoverable, because the RAID10 disk is very reliable, the message will not be lost (a small number of messages will be lost in asynchronous disk brushing, and none will be lost in synchronous disk brushing), and the performance is the highest;

  • Disadvantages: during the downtime of a single machine, messages that are not consumed on this machine cannot be subscribed before the machine is restored, and the real-time performance of messages will be affected.

1) Start NameServer

The NameServer needs to be started before the Broker. If it is used in the production environment, in order to ensure high availability, it is recommended to start three nameservers in a general scale cluster. The startup commands of each node are the same, as follows:

### Start the Name Server first
$ nohup sh mqnamesrv &
 
### Verify that the Name Server started successfully
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

2) Start the Broker cluster

### On machine A, start the first Master. For example, the IP address of NameServer is 192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &
 
### On machine B, start the second Master. For example, the IP address of NameServer is 192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &

...

The above startup command is used in the case of a single NameServer. For clusters with multiple nameservers, the address list after - n in the Broker startup command can be separated by semicolons, such as 192.168.1.1:9876; 192.161.2:9876.

1.3 multi Master multi Slave mode - asynchronous replication

Each Master is configured with a Slave. There are multiple pairs of Master Slave. HA adopts asynchronous replication mode, and the active and standby have short message delay (in milliseconds). The advantages and disadvantages of this mode are as follows:

  • Advantages: even if the disk is damaged, very few messages are lost, and the real-time performance of messages will not be affected. At the same time, after the Master goes down, consumers can still consume from Slave. Moreover, this process is transparent to the application without manual intervention, and the performance is almost the same as that of multi master mode;

  • Disadvantages: when the Master goes down and the disk is damaged, a small amount of messages will be lost.

1) Start NameServer

### Start the Name Server first
$ nohup sh mqnamesrv &
 
### Verify that the Name Server started successfully
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

2) Start the Broker cluster

### On machine A, start the first Master. For example, the IP address of NameServer is 192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties &
 
### On machine B, start the second Master. For example, the IP address of NameServer is 192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties &
 
### On machine C, start the first Slave. For example, the IP address of NameServer is 192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties &
 
### On machine D, start the second Slave. For example, the IP address of NameServer is 192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &

1.4 multi Master and multi Slave mode - synchronous double write

Each Master is configured with a Slave. There are multiple pairs of Master Slave. The HA adopts the synchronous double write mode, that is, it returns success to the application only if the active and standby are successfully written. The advantages and disadvantages of this mode are as follows:

  • Advantages: there is no single point of failure in data and services. When the Master goes down, there is no delay in messages, and the service availability and data availability are very high;

  • Disadvantages: the performance is slightly lower than that of asynchronous replication mode (about 10% lower), and the RT for sending a single message will be slightly higher. In the current version, after the primary node goes down, the standby machine cannot automatically switch to the host.

1) Start NameServer

### Start the Name Server first
$ nohup sh mqnamesrv &
 
### Verify that the Name Server started successfully
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

2) Start the Broker cluster

### On machine A, start the first Master. For example, the IP address of NameServer is 192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties &
 
### On machine B, start the second Master. For example, the IP address of NameServer is 192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties &
 
### On machine C, start the first Slave. For example, the IP address of NameServer is 192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &
 
### On machine D, start the second Slave. For example, the IP address of NameServer is 192.168.1.1
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties &

The above Broker and Slave are paired by specifying the same BrokerName parameter. The BrokerId of the Master must be 0 and the BrokerId of the Slave must be a number greater than 0. In addition, multiple slaves can be attached to a Master. Multiple slaves under the same Master can be distinguished by specifying different brokerids$ ROCKETMQ_HOME refers to the RocketMQ installation directory. You need to set this environment variable yourself.

2 mqadmin management tool

be careful:

  • Command execution method:/ mqadmin {command} {args}
  • Almost all commands need to be configured with - n to represent the NameServer address in the form of ip:port
  • Almost all commands can get help through - h
  • If there are both Broker address (- b) configuration items and clusterName (- c) configuration items, the command will be executed with the Broker address first. If the Broker address is not configured, the command will be executed for all hosts in the cluster, and only one Broker address is supported- B format is ip:port, and the default port is 10911
  • You can see many commands in tools, but not all commands can be used. Only the commands initialized in MQAdminStartup can be used. You can also modify this class and add or customize commands
  • Due to the problem of version update, a few commands may not be updated in time. If you encounter errors, please read the relevant command source code directly

6, FAQ

1. Where do new consumer s consume?

  • If the news in topic is within three days, the new consumer will start consuming from the first of all the news
  • If there is news for more than three days, the new consumer starts with the message for the third day
  • If the consumer restarts, continue from its last location

2. How to deal with consumption failure?

If ReconsumerLater is returned, null or an exception is thrown, the message will be retried up to 16 times

3. How to query failed messages?

  • Query according to topic and time slice
  • Query according to topic and messageId
  • Query according to topic and messageKey

4. Is the message delivered only once?

rocketMQ can ensure that all messages are executed at least once. In most cases, messages will not be consumed repeatedly, but it is not absolute. You should make your own idempotent design

5. How long does each message last?

3 days

6. Size limit of message body?

256kb

7. How to set the number of consumer threads?

consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(20);

8. Common errors

  • A jvm program can only have one producer / consumer instance;
  • fastjson version cannot be too low;
  • topic should be created in advance and have permission.

9. Why RocketMQ?

According to our research, with the increase of queues and virtual topics, ActiveMQ IO module has reached a bottleneck. We try to solve this problem by throttling, circuit breaker or degradation, but the effect is not ideal. So we began to focus on the popular messaging solution Kafka. Unfortunately, Kafka cannot meet our requirements, especially in terms of low latency and high reliability.

In this case, we decided to invent a new messaging engine to deal with a wider set of use cases, from traditional publish / subscribe scenarios to high-capacity real-time zero loss tolerant transaction systems.

The following table shows the comparison between RocketMQ, ActiveMQ and Kafka (according to awesomejava, Apache's most popular messaging solution):

product Language support Agreements and specifications Message order Timing support Batch support radio broadcast Message filtering Message trigger Message storage Information tracing priority High availability and failover Information tracking Configurable management tool
ActiveMQ Java, .NET, C++ etc. Push model, support OpenWire, STOMP, AMQP, MQTT, JMS Exclusive Consumer or Exclusive Queues can ensure ordering Supported Not Supported Supported Supported Not Supported Supports very fast persistence using JDBC along with a high performance journal,such as levelDB, kahaDB Supported Supported Supported, depending on storage,if using kahadb it requires a ZooKeeper server Not Supported The default configuration is low level, user need to optimize the configuration parameters Supported
Kafka Java, Scala etc. Pull model, support TCP Ensure ordering of messages within a partition Not Supported Supported, with async producer Not Supported Supported, you can use Kafka Streams to filter messages Not Supported High performance file storage Supported offset indicate Not Supported Supported, requires a ZooKeeper server Not Supported Kafka uses key-value pairs format for configuration. These values can be supplied either from a file or programmatically. Supported, use terminal command to expose core metrics
RocketMQ Java, C++, Go Pull model, support TCP, JMS, OpenMessaging Ensure strict ordering of messages,and can scale out gracefully Supported Supported, with sync mode to avoid message loss Supported Supported, property filter expressions based on SQL92 Supported High performance and low latency file storage Supported timestamp and offset two indicates Not Supported Supported, Master-Slave model, without another kit Supported Work out of box,user only need to pay attention to a few configurations Supported, rich web and terminal command to expose core metrics

Welcome to pay attention to official account: Fengji, more technical learning and sharing.

Reference website:

http://rocketmq.apache.org/

http://rocketmq.apache.org/docs/quick-start/

Tags: Back-end Middleware MQ

Posted by syamswaroop on Tue, 03 May 2022 03:34:27 +0300