1. Dead letter queue
1.1 when does the message become dead letter
- The message is not received and the consumer uses basic Reject or basic NACK, and the request callback queue property is set to false.
- The time of the message in the queue exceeds the expiration time (TTL) set for the message.
- The message queue reaches its maximum length and then receives the message.
1.2 principle of dead letter queue
When a message becomes dead letter in the queue, it will be republished to another exchange switch, which is DLX. Therefore, we only need to add an optional "x-dead-letter-exchange" parameter when declaring the normal service queue. The value is the dead letter switch. The dead letter will be republished by rabbitmq to the configured switch, and then we can listen to the switch.
1.3 code implementation
- Introducing amqp dependency
- Declaration switch, queue
package com.lank.demo.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * @author lank * @since 2020/12/14 10:44 */ @Configuration public class RabbitmqConfig { //Dead letter switch, queue, routing related configuration public static final String DLK_EXCHANGE = "dlk.exchange"; public static final String DLK_ROUTEKEY = "dlk.routeKey"; public static final String DLK_QUEUE = "dlk.queue"; //Service switch, queue and routing related configuration public static final String DEMO_EXCHANGE = "demo.exchange"; public static final String DEMO_QUEUE = "demo.queue"; public static final String DEMO_ROUTEKEY = "demo.routeKey"; //Switch, queue and route related configuration of delay plug-in DelayedMessagePlugin public static final String DMP_EXCHANGE = "dmp.exchange"; public static final String DMP_ROUTEKEY = "dmp.routeKey"; public static final String DMP_QUEUE = "dmp.queue"; @Bean public DirectExchange demoExchange(){ return new DirectExchange(DEMO_EXCHANGE,true,false); } @Bean public Queue demoQueue(){ //You only need to add x-dead-letter-exchange when declaring the service queue, and the value is dead letter switch Map<String,Object> map = new HashMap<>(1); map.put("x-dead-letter-exchange",DLK_EXCHANGE); //This parameter x-dead-letter-routing-key can modify the routing key of the dead letter. If it is not set, the routing key of the original message will be used map.put("x-dead-letter-routing-key",DLK_ROUTEKEY); return new Queue(DEMO_QUEUE,true,false,false,map); } @Bean public Binding demoBind(){ return BindingBuilder.bind(demoQueue()).to(demoExchange()).with(DEMO_ROUTEKEY); } @Bean public DirectExchange dlkExchange(){ return new DirectExchange(DLK_EXCHANGE,true,false); } @Bean public Queue dlkQueue(){ return new Queue(DLK_QUEUE,true,false,false); } @Bean public Binding dlkBind(){ return BindingBuilder.bind(dlkQueue()).to(dlkExchange()).with(DLK_ROUTEKEY); } //Delay plug-in use //1. A switch of type x-delayed is declared //2. Add an x-delayed-type value to the parameter, which is the type of the switch and is used for the mapping of routing key s @Bean public CustomExchange dmpExchange(){ Map<String, Object> arguments = new HashMap<>(1); arguments.put("x-delayed-type", "direct"); return new CustomExchange(DMP_EXCHANGE,"x-delayed-message",true,false,arguments); } @Bean public Queue dmpQueue(){ return new Queue(DMP_QUEUE,true,false,false); } @Bean public Binding dmpBind(){ return BindingBuilder.bind(dmpQueue()).to(dmpExchange()).with(DMP_ROUTEKEY).noargs(); } }
- Declare a class for sending messages with expiration time
package com.lank.demo.rabbitmq; import com.lank.demo.config.RabbitmqConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author lank * @since 2020/12/14 10:33 */ @Component @Slf4j public class MessageSender { @Autowired private RabbitTemplate rabbitTemplate; //Encapsulation of sending messages using dead letter queue public void send(String message,Integer time){ String ttl = String.valueOf(time*1000); //Both exchange and routingKey are business. You only need to set the expiration time of the message rabbitTemplate.convertAndSend(RabbitmqConfig.DEMO_EXCHANGE, RabbitmqConfig.DEMO_ROUTEKEY,message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //Set the expiration time of the message in milliseconds message.getMessageProperties().setExpiration(ttl); return message; } }); log.info("Using dead letter queue messages:{}Sent successfully,Expiration time:{}Seconds.",message,time); } //Encapsulation of message sending method using delay plug-in public void send2(String message,Integer time){ rabbitTemplate.convertAndSend(RabbitmqConfig.DMP_EXCHANGE, RabbitmqConfig.DMP_ROUTEKEY,message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //To use the delay plug-in, you only need to add the x-delay attribute in the header of the message. The value is the expiration time, in milliseconds message.getMessageProperties().setHeader("x-delay",time*1000); return message; } }); log.info("Sending messages using the delay plug-in:{}Sent successfully,Expiration time:{}Seconds.",message,time); } }
- Write a class for consuming messages
package com.lank.demo.rabbitmq; import com.lank.demo.config.RabbitmqConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @author lank * @since 2020/12/15 15:57 */ @Component @Slf4j public class MessageReceiver { @RabbitHandler @RabbitListener(queues = RabbitmqConfig.DLK_QUEUE) public void onMessage(Message message){ log.info("Use the dead letter queue to receive messages:{}",new String(message.getBody())); } @RabbitHandler @RabbitListener(queues = RabbitmqConfig.DMP_QUEUE) public void onMessage2(Message message){ log.info("Use the delay plug-in to receive the message:{}",new String(message.getBody())); } }
- Write test results of sending message method called by Controller
package com.lank.demo.controller; import com.lank.demo.rabbitmq.MessageSender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** * @author lank * @since 2020/12/14 17:05 */ @RestController public class MessageController { @Autowired public MessageSender messageSender; //Dead letter queue controller @GetMapping("/send") public String send(@RequestParam String msg,Integer time){ messageSender.send(msg,time); return "ok"; } //Delay plug-in controller @GetMapping("/send2") public String sendByPlugin(@RequestParam String msg,Integer time){ messageSender.send2(msg,time); return "ok"; } }
- Configuration file application properties
server.port=4399 #It's good to use the default / for virtual host. If you need / demo, you need to add it on the console yourself spring.rabbitmq.virtual-host=/demo spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
- Start the project and open the rabbitmq console. You can see that the switch and queue have been created.
- Request in browser http://localhost:4399/send?msg=hello&time=5 , from the output of the console, the message was received just 5s later.
2020-12-16 22:47:28.071 INFO 13304 --- [nio-4399-exec-1] c.l.rabbitmqdlk.rabbitmq.MessageSender : Using dead letter queue messages:hello Sent successfully,Expiration time:5 Seconds. 2020-12-16 22:47:33.145 INFO 13304 --- [ntContainer#0-1] c.l.r.rabbitmq.MessageReceiver: uses dead letter queue and receives message: hello
1.4 a small pit in the dead letter queue
When I send two messages with different expiration times to the dead letter queue, if the expiration time of message A sent first is greater than that of message B sent later, due to the sequential consumption of messages, message B will not be re publish ed to the dead letter exchange immediately after expiration, but will be consumed together after message A expires.
Send two requests in sequence http://localhost:4399/send?msg= Message A & time = 30 and http://localhost:4399/send?msg= Message B & time = 10, message A is sent first, the expiration time is 30S, message B is sent later, and the expiration time is 10S. We want to receive message B in 10S and message A in 30S, but the result is not. The console output is as follows:
2020-12-16 22:54:47.339 INFO 13304 --- [nio-4399-exec-5] c.l.rabbitmqdlk.rabbitmq.MessageSender : Using dead letter queue messages:news A Sent successfully,Expiration time:30 Seconds. 2020-12-16 22:54:54.278 INFO 13304 --- [nio-4399-exec-6] c.l.rabbitmqdlk.rabbitmq.MessageSender : Using dead letter queue messages:news B Sent successfully,Expiration time:10 Seconds. 2020-12-16 22:55:17.356 INFO 13304 --- [ntContainer#0-1] c.l.r.rabbitmq.MessageReceiver: use dead letter queue to receive message: message A 2020-12-16 22:55:17.357 INFO 13304 --- [ntContainer#0-1] c.l.r.rabbitmq.MessageReceiver: using dead letter queue, received message: Message B
Message A30S is successfully consumed, followed by message B. Therefore, when we use the dead letter queue, we should pay attention to whether the expiration time of messages is the same. For example, if the order is not paid for more than 10 minutes, its status should be modified. If the expiration time of each message in a queue is inconsistent, the use of dead letter queue may not achieve the effect of delay. At this time, we can use the delay plug-in to realize this requirement.
2. Delay plug-in
RabbitMQ Delayed Message Plugin is a rabbitmq plug-in, so it needs to be installed before use. Please refer to the GitHub address: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
2.1 how to realize
- After installing the plug-in, you only need to declare an exchange with type "x-delayed-message", and configure an attribute with key "x-delayed-typ" and value of switch type (topic/direct/fanout) under its optional parameters.
- Declare a queue bound to the switch
- When sending a message, add an attribute whose key is "x-delay" and value is expiration time in milliseconds to the header of the message.
- The code is just above. The configuration class starts with DMP and the method of sending messages is send2().
- After startup, you can see a switch of type x-delayed-message on the rabbitmq console.
- Continue sending two requests in the browser http://localhost:4399/send2?msg= Message a & time = 30 and http://localhost:4399/send2?msg= The message B & time = 10, and the console output is as follows. There will be no problem with the dead letter queue:
2020-12-16 23:31:19.819 INFO 13304 --- [nio-4399-exec-9] c.l.rabbitmqdlk.rabbitmq.MessageSender : Sending messages using the delay plug-in:news A Sent successfully,Expiration time:30 Seconds. 2020-12-16 23:31:27.673 INFO 13304 --- [io-4399-exec-10] c.l.rabbitmqdlk.rabbitmq.MessageSender : Sending messages using the delay plug-in:news B Sent successfully,Expiration time:10 Seconds. 2020-12-16 23:31:37.833 INFO 13304 --- [ntContainer#1-1] c.l.r.rabbitmq.MessageReceiver: using the delay plug-in, received message: Message B 2020-12-16 23:31:49.917 INFO 13304 --- [ntContainer#1-1] c.l.r.rabbitmq.MessageReceiver: use the delay plug-in to receive the message: Message A
Author: javalang
Original link:
https://www.cnblogs.com/javalank/p/14751624.html