Explain in detail two methods of RabbitMQ to realize delay message: dead letter queue + delay plug-in

1. Dead letter queue

1.1 when does the message become dead letter

  1. The message is not received and the consumer uses basic Reject or basic NACK, and the request callback queue property is set to false.
  2. The time of the message in the queue exceeds the expiration time (TTL) set for the message.
  3. The message queue reaches its maximum length and then receives the message.

1.2 principle of dead letter queue

When a message becomes dead letter in the queue, it will be republished to another exchange switch, which is DLX. Therefore, we only need to add an optional "x-dead-letter-exchange" parameter when declaring the normal service queue. The value is the dead letter switch. The dead letter will be republished by rabbitmq to the configured switch, and then we can listen to the switch.

1.3 code implementation

  1. Introducing amqp dependency
  2. Declaration switch, queue
package com.lank.demo.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

/**
 * @author lank
 * @since 2020/12/14 10:44
 */
@Configuration
public class RabbitmqConfig {

    //Dead letter switch, queue, routing related configuration
    public static final String DLK_EXCHANGE = "dlk.exchange";
    public static final String DLK_ROUTEKEY = "dlk.routeKey";
    public static final String DLK_QUEUE = "dlk.queue";

    //Service switch, queue and routing related configuration
    public static final String DEMO_EXCHANGE = "demo.exchange";
    public static final String DEMO_QUEUE = "demo.queue";
    public static final String DEMO_ROUTEKEY = "demo.routeKey";

    //Switch, queue and route related configuration of delay plug-in DelayedMessagePlugin
    public static final String DMP_EXCHANGE = "dmp.exchange";
    public static final String DMP_ROUTEKEY = "dmp.routeKey";
    public static final String DMP_QUEUE = "dmp.queue";

    @Bean
    public DirectExchange demoExchange(){
        return new DirectExchange(DEMO_EXCHANGE,true,false);
    }

    @Bean
    public Queue demoQueue(){
        //You only need to add x-dead-letter-exchange when declaring the service queue, and the value is dead letter switch
        Map<String,Object> map = new HashMap<>(1);
        map.put("x-dead-letter-exchange",DLK_EXCHANGE);
        //This parameter x-dead-letter-routing-key can modify the routing key of the dead letter. If it is not set, the routing key of the original message will be used
        map.put("x-dead-letter-routing-key",DLK_ROUTEKEY);
        return new Queue(DEMO_QUEUE,true,false,false,map);
    }

    @Bean
    public Binding demoBind(){
        return BindingBuilder.bind(demoQueue()).to(demoExchange()).with(DEMO_ROUTEKEY);
    }

    @Bean
    public DirectExchange dlkExchange(){
        return new DirectExchange(DLK_EXCHANGE,true,false);
    }

    @Bean
    public Queue dlkQueue(){
        return new Queue(DLK_QUEUE,true,false,false);
    }

    @Bean
    public Binding dlkBind(){
        return BindingBuilder.bind(dlkQueue()).to(dlkExchange()).with(DLK_ROUTEKEY);
    }


    //Delay plug-in use
    //1. A switch of type x-delayed is declared
    //2. Add an x-delayed-type value to the parameter, which is the type of the switch and is used for the mapping of routing key s
    @Bean
    public CustomExchange dmpExchange(){
        Map<String, Object> arguments = new HashMap<>(1);
        arguments.put("x-delayed-type", "direct");
        return new CustomExchange(DMP_EXCHANGE,"x-delayed-message",true,false,arguments);
    }

    @Bean
    public Queue dmpQueue(){
        return new Queue(DMP_QUEUE,true,false,false);
    }

    @Bean
    public Binding dmpBind(){
        return BindingBuilder.bind(dmpQueue()).to(dmpExchange()).with(DMP_ROUTEKEY).noargs();
    }
    

}
  1. Declare a class for sending messages with expiration time
package com.lank.demo.rabbitmq;

import com.lank.demo.config.RabbitmqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author lank
 * @since 2020/12/14 10:33
 */
@Component
@Slf4j
public class MessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //Encapsulation of sending messages using dead letter queue
    public void send(String message,Integer time){
        String ttl = String.valueOf(time*1000);
        //Both exchange and routingKey are business. You only need to set the expiration time of the message
        rabbitTemplate.convertAndSend(RabbitmqConfig.DEMO_EXCHANGE, RabbitmqConfig.DEMO_ROUTEKEY,message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //Set the expiration time of the message in milliseconds
                message.getMessageProperties().setExpiration(ttl);
                return message;
            }
        });
        log.info("Using dead letter queue messages:{}Sent successfully,Expiration time:{}Seconds.",message,time);
    }

    //Encapsulation of message sending method using delay plug-in
    public void send2(String message,Integer time){
        rabbitTemplate.convertAndSend(RabbitmqConfig.DMP_EXCHANGE, RabbitmqConfig.DMP_ROUTEKEY,message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
            //To use the delay plug-in, you only need to add the x-delay attribute in the header of the message. The value is the expiration time, in milliseconds
                message.getMessageProperties().setHeader("x-delay",time*1000);
                return message;
            }
        });
        log.info("Sending messages using the delay plug-in:{}Sent successfully,Expiration time:{}Seconds.",message,time);
    }
}
  1. Write a class for consuming messages
package com.lank.demo.rabbitmq;

import com.lank.demo.config.RabbitmqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author lank
 * @since 2020/12/15 15:57
 */

@Component
@Slf4j
public class MessageReceiver {

    @RabbitHandler
    @RabbitListener(queues = RabbitmqConfig.DLK_QUEUE)
    public void onMessage(Message message){
        log.info("Use the dead letter queue to receive messages:{}",new String(message.getBody()));
    }

    @RabbitHandler
    @RabbitListener(queues = RabbitmqConfig.DMP_QUEUE)
    public void onMessage2(Message message){
        log.info("Use the delay plug-in to receive the message:{}",new String(message.getBody()));
    }
}
  1. Write test results of sending message method called by Controller
package com.lank.demo.controller;
import com.lank.demo.rabbitmq.MessageSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author lank
 * @since 2020/12/14 17:05
 */
@RestController
public class MessageController {

    @Autowired
    public MessageSender messageSender;

    //Dead letter queue controller
    @GetMapping("/send")
    public String send(@RequestParam String msg,Integer time){
        messageSender.send(msg,time);
        return "ok";
    }

    //Delay plug-in controller
    @GetMapping("/send2")
    public String sendByPlugin(@RequestParam String msg,Integer time){
        messageSender.send2(msg,time);
        return "ok";
    }

}
  1. Configuration file application properties
server.port=4399
#It's good to use the default / for virtual host. If you need / demo, you need to add it on the console yourself
spring.rabbitmq.virtual-host=/demo
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
  1. Start the project and open the rabbitmq console. You can see that the switch and queue have been created.
     
  2. Request in browser http://localhost:4399/send?msg=hello&time=5 , from the output of the console, the message was received just 5s later.
2020-12-16 22:47:28.071  INFO 13304 --- [nio-4399-exec-1] c.l.rabbitmqdlk.rabbitmq.MessageSender   : Using dead letter queue messages:hello Sent successfully,Expiration time:5 Seconds.
2020-12-16 22:47:33.145  INFO 13304 --- [ntContainer#0-1] c.l.r.rabbitmq.MessageReceiver: uses dead letter queue and receives message: hello

1.4 a small pit in the dead letter queue

When I send two messages with different expiration times to the dead letter queue, if the expiration time of message A sent first is greater than that of message B sent later, due to the sequential consumption of messages, message B will not be re publish ed to the dead letter exchange immediately after expiration, but will be consumed together after message A expires.

Send two requests in sequence http://localhost:4399/send?msg= Message A & time = 30 and http://localhost:4399/send?msg= Message B & time = 10, message A is sent first, the expiration time is 30S, message B is sent later, and the expiration time is 10S. We want to receive message B in 10S and message A in 30S, but the result is not. The console output is as follows:

2020-12-16 22:54:47.339  INFO 13304 --- [nio-4399-exec-5] c.l.rabbitmqdlk.rabbitmq.MessageSender   : Using dead letter queue messages:news A Sent successfully,Expiration time:30 Seconds.
2020-12-16 22:54:54.278  INFO 13304 --- [nio-4399-exec-6] c.l.rabbitmqdlk.rabbitmq.MessageSender   : Using dead letter queue messages:news B Sent successfully,Expiration time:10 Seconds.
2020-12-16 22:55:17.356  INFO 13304 --- [ntContainer#0-1] c.l.r.rabbitmq.MessageReceiver: use dead letter queue to receive message: message A
2020-12-16 22:55:17.357  INFO 13304 --- [ntContainer#0-1] c.l.r.rabbitmq.MessageReceiver: using dead letter queue, received message: Message B

Message A30S is successfully consumed, followed by message B. Therefore, when we use the dead letter queue, we should pay attention to whether the expiration time of messages is the same. For example, if the order is not paid for more than 10 minutes, its status should be modified. If the expiration time of each message in a queue is inconsistent, the use of dead letter queue may not achieve the effect of delay. At this time, we can use the delay plug-in to realize this requirement.

2. Delay plug-in

RabbitMQ Delayed Message Plugin is a rabbitmq plug-in, so it needs to be installed before use. Please refer to the GitHub address: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

2.1 how to realize

  1. After installing the plug-in, you only need to declare an exchange with type "x-delayed-message", and configure an attribute with key "x-delayed-typ" and value of switch type (topic/direct/fanout) under its optional parameters.
  2. Declare a queue bound to the switch
  3. When sending a message, add an attribute whose key is "x-delay" and value is expiration time in milliseconds to the header of the message.
  4. The code is just above. The configuration class starts with DMP and the method of sending messages is send2().
  5. After startup, you can see a switch of type x-delayed-message on the rabbitmq console.
     
  6. Continue sending two requests in the browser http://localhost:4399/send2?msg= Message a & time = 30 and http://localhost:4399/send2?msg= The message B & time = 10, and the console output is as follows. There will be no problem with the dead letter queue:
2020-12-16 23:31:19.819  INFO 13304 --- [nio-4399-exec-9] c.l.rabbitmqdlk.rabbitmq.MessageSender   : Sending messages using the delay plug-in:news A Sent successfully,Expiration time:30 Seconds.
2020-12-16 23:31:27.673  INFO 13304 --- [io-4399-exec-10] c.l.rabbitmqdlk.rabbitmq.MessageSender   : Sending messages using the delay plug-in:news B Sent successfully,Expiration time:10 Seconds.
2020-12-16 23:31:37.833  INFO 13304 --- [ntContainer#1-1] c.l.r.rabbitmq.MessageReceiver: using the delay plug-in, received message: Message B
2020-12-16 23:31:49.917  INFO 13304 --- [ntContainer#1-1] c.l.r.rabbitmq.MessageReceiver: use the delay plug-in to receive the message: Message A

Author: javalang

Original link:
https://www.cnblogs.com/javalank/p/14751624.html

 

Tags: Java RabbitMQ Distribution

Posted by aspekt9 on Mon, 16 May 2022 23:53:39 +0300