RocketMQ Getting Started Tutorial

RocketMQ role

Cluster Features

  • NameServer is an almost stateless node that can be deployed in clusters without any information synchronization between nodes
  • Broker deployment is relatively complex. Broker is divided into Master and Slave. A master can correspond to multiple slaves, but a slave can only correspond to one master. The correspondence between master and slave is defined by specifying the same brokername and different brokerid. The brokerId is 0 for master, non-0 for slave. Multiple master s can be deployed, each broker establishes a long connection with all nodes in the nameserver cluster, regularly registers topic information to all nameservers, and a broker reports its own survival status to the nameserver
  • The producer establishes a long connection with one of the nodes (randomly selected) in the nameserver cluster, regularly obtains topic routing information from the nameserver, establishes a long connection to the master that provides topic services, and sends heartbeats to the master regularly. The producer is completely stateless and can be deployed in clusters
  • The Consumer establishes a persistent connection with one of the nodes (randomly selected) in the nameserver cluster, periodically obtains topic routing information from the nameserver, establishes a persistent connection to the master and slave that provide topic services, and regularly sends heartbeats to the master and slave. Consumers can subscribe to messages from both the master and the slave. The subscription rules are determined by the broker configuration.

cluster mode

1) Single Master mode

This method is risky, and once the Broker restarts or goes down, the entire service will be unavailable. It is not recommended to be used in an online environment, but can be used for local testing.

2) Multi-Master mode

A cluster has no slaves, all masters, such as 2 masters or 3 masters, the advantages and disadvantages of this mode are as follows:

  • Advantages: simple configuration, single Master downtime or restart maintenance has no impact on the application, when the disk is configured as RAID10, even if the machine is down and cannot be recovered, because the RAID10 disk is very reliable, messages will not be lost (a small amount of asynchronous disk flushing is lost). message, synchronously brushing one without losing), the performance is the highest;
  • Disadvantages: During the downtime of a single machine, unconsumed messages on this machine cannot be subscribed until the machine is restored, and the real-time nature of messages will be affected.

3) Multi-Master multi-Slave mode (asynchronous)

Each Master is configured with one Slave, and there are multiple pairs of Master-Slave. HA adopts the asynchronous replication method, and the master and backup have a short message delay (millisecond level). The advantages and disadvantages of this mode are as follows:

  • Advantages: Even if the disk is damaged, very few messages are lost, and the real-time performance of messages will not be affected. At the same time, after the Master is down, consumers can still consume from the Slave, and this process is transparent to the application, no manual intervention is required, and the performance is the same. Master mode is almost the same;
  • Disadvantages: The Master is down, and a small amount of messages will be lost in the case of disk damage.

4) Multi-Master multi-Slave mode (synchronization)

Each Master is configured with one Slave, and there are multiple pairs of Master-Slave. HA adopts a synchronous double-write method, that is, only when the master and the standby are successfully written, the success is returned to the application. The advantages and disadvantages of this mode are as follows:

  • Advantages: There is no single point of failure for data and services. In the case of Master downtime, there is no delay in messages, and service availability and data availability are very high;
  • Disadvantages: The performance is slightly lower than that of the asynchronous replication mode (about 10% lower), the RT for sending a single message will be slightly higher, and the current version cannot automatically switch to the master after the master node goes down.

Setting up the environment (windows)

1. System environment variable configuration

variable name: ROCKETMQ_HOME
 variable: MQ decompression path\MQ folder name

2. Start

  • First start the nameserver, execute the Cmd command box and go to the 'MQ folder\bin', then execute 'start mqnamesrv.cmd' to start the NAMESERVER
  • Restart the broker, execute the Cmd command box and enter the 'MQ folder\bin', and then execute 'start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true' to start the BROKER
  • More details https://blog.csdn.net/kingtok/article/details/104212625

message sending

1. Synchronous message sending

The producer sends a message to the broker and waits synchronously when executing the API until the broker server returns the sending result

 //1. Create a message producer producer, and specify the producer group name
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2. Specify the Nameserver address
        producer.setNamesrvAddr("127.0.0.1:9876");
        //3. Start the producer
        producer.start();

        for (int i = 0;i<10;i++){
            //4. Create a message object, specify the topic Topic,Tag and message body
            /**
             * Parameter 1: message topic Topic
             * Parameter 2: Message Tag
             * Message 3: Message content
             */
            Message msg = new Message("base","Tag1",("Hello World"+i).getBytes());
            //5. Send a message
            SendResult result = producer.send(msg);
            System.out.println("Message result:"+result);
            Thread.sleep(1000);
        }
        producer.shutdown();

2. Asynchronous message sending

When the producer sends a message to the broker, it specifies the callback method for successful message sending and sending exception, and returns immediately after calling the API. The producer sends the message thread without blocking, and the callback task for successful or failed message sending is executed in a new thread.

//1. Create a message producer producer, and specify the producer group name
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2. Specify the Nameserver address
        producer.setNamesrvAddr("127.0.0.1:9876");
        //3. Start the producer
        producer.start();

        for (int i = 0;i<10;i++){
            //4. Create a message object, specify the topic Topic,Tag and message body
            /**
             * Parameter 1: message topic Topic
             * Parameter 2: Message Tag
             * Message 3: Message content
             */
            Message msg = new Message("base","Tag2",("Hello World"+i).getBytes());
            //5. Send asynchronous messages
            producer.send(msg, new SendCallback() {
                //Sending a successful callback function
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("Send result:"+sendResult);
                }
                //Send failed callback function
                @Override
                public void onException(Throwable throwable) {
                    System.out.println("abnormal");
                }
            });

            Thread.sleep(1000);
        }
        producer.shutdown();

3. One-way message sending

The producer sends a message to the broker and returns it directly when the API is executed, without waiting for the result of the broker server.
sendOneway();

//1. Create a message producer producer, and specify the producer group name
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2. Specify the Nameserver address
        producer.setNamesrvAddr("127.0.0.1:9876");
        //3. Start the producer
        producer.start();

        for (int i = 0;i<10;i++){
            //4. Create a message object, specify the topic Topic,Tag and message body
            /**
             * Parameter 1: message topic Topic
             * Parameter 2: Message Tag
             * Message 3: Message content
             */
            Message msg = new Message("base","Tag1",("Hello World one-way message"+i).getBytes());
            //5. Send a message
            producer.sendOneway(msg);

            Thread.sleep(1000);
        }
        producer.shutdown();

consumer model

  • Load balancing mode: the default consumption mode
  • Broadcast mode: consumer.setMessageModel(MessageModel.BROADCASTING);
		//1. Create a consumer Consumer and formulate a consumer group name
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2. Specify the Nameserver address
        consumer.setNamesrvAddr("127.0.0.1:9876");
        //3. Subscribe to topics and tags
        consumer.subscribe("base","Tag1");
        //Set consumption mode: load balancing || broadcast mode, default load balancing
        consumer.setMessageModel(MessageModel.BROADCASTING);//broadcast
        //4. Set the callback function to process the message
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg:list) {
                    System.out.println(new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5. Start the consumer
        consumer.start();

order

  • Producer: Queue ID needs to be set
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        //1. Create a message producer producer, and specify the producer group name
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2. Specify the Nameserver address
        producer.setNamesrvAddr("127.0.0.1:9876");
        //3. Start the producer
        producer.start();
        //Build a message collection
        List<OrderStep> orderSteps = OrderStep.buildOrders();
        //Send a message
        int i = 0;
        for (OrderStep order:orderSteps) {
            Message message = new Message("OrderTopic","Order","i"+i,order.toString().getBytes());
            i = i+1;
            message.setDelayTimeLevel(2);//Delay
            /**
             * Parameter 1: message queue
             * Parameter 2: The selector of the message queue
             * Parameter 3: Select the business ID of the queue (order ID)
             */
            SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                /**
                 * @param list  queue collection
                 * @param message  message object
                 * @param o   Parameters for business ID
                 * @return
                 */
                @Override
                public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                    long orderId = (long) o;
                    long index = orderId%list.size();
                    System.out.println(index+"~~~"+orderId);
                    return list.get((int) index);
                }
            },order.getOrderId());
            System.out.println("Send result:"+sendResult);
//            Thread.sleep(1000);
        }
        producer.shutdown();
    }
  • Consumer: Order needs to use MessageListenerOrderly
public static void main(String[] args) throws MQClientException {
        //1. Create a consumer Consumer and formulate a consumer group name
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2. Specify the Nameserver address
        consumer.setNamesrvAddr("127.0.0.1:9876");
        //3. Subscribe to topics and tags
        consumer.subscribe("OrderTopic","*");
        //4. Set the callback function to process the message
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                for (MessageExt msg:list) {
                    System.out.println("thread name:"+Thread.currentThread().getName()+"Receive message:"+
                            new String(msg.getBody())+"queue:"+msg.getQueueId()
                            +"´╝îDelay:"+(System.currentTimeMillis()-msg.getBornTimestamp()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        //5. Start the consumer
        consumer.start();
        System.out.println("consumer start");
    }

Delay

Set the sent message. It should be noted that the parameter passed in here is not a delay of a few seconds, but a delay level
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Such as: 1 represents 1s, 2 represents 5s.

 message.setDelayTimeLevel(2);//Delay

batch

  • Load the message into the collection and send it

filter messages

set consumer

  • tags
     consumer.subscribe("base","Tag1 || Tag2");//Consume multiple tag s via || set
    
  • sql
    	consumer.subscribe("base", MessageSelector.bySql("i>4"));
    

affairs

consumer unchanged
Producer: Set up a transaction listener. It should be noted that producer.shutdown() cannot be used, otherwise mq will not be able to check messages.

    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        //1. Create a message producer producer, and specify the producer group name
        TransactionMQProducer producer = new TransactionMQProducer("group1");
        //2. Specify the Nameserver address
        producer.setNamesrvAddr("127.0.0.1:9876");

        //transaction listener
        producer.setTransactionListener(new TransactionListener() {
            //perform local transactions in this method
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                if (message.getTags().equals("Tag1")){
                    return LocalTransactionState.COMMIT_MESSAGE;
                }else if (message.getTags().equals("Tag2")){
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }else if (message.getTags().equals("Tag3")){
                    return LocalTransactionState.UNKNOW;
                }
                return LocalTransactionState.UNKNOW;
            }

            //This method is MQ to perform message transaction status return difference
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.println("message rollback");
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        //3. Start the producer
        producer.start();
        //Build a message collection
        List<Message> list = new ArrayList<>();

        Message msg1 = new Message("base","Tag1",("Hello World 1").getBytes());
        msg1.putUserProperty("i","3");
        Message msg2 = new Message("base","Tag2",("Hello World 2").getBytes());
        msg2.putUserProperty("i","4");
        Message msg3 = new Message("base","Tag3",("Hello World 3").getBytes());
        msg3.putUserProperty("i","5");

        list.add(msg1);
        list.add(msg2);
        list.add(msg3);
        //Send a message
        producer.sendMessageInTransaction(msg1,null);
        producer.sendMessageInTransaction(msg2,null);
        producer.sendMessageInTransaction(msg3,null);

//            Thread.sleep(1000);
//        producer.shutdown();
    }

Tags: Distribution MQ

Posted by dirty_n4ppy on Fri, 13 May 2022 14:46:32 +0300