Introduction and use of RabbitMQ dead letter queue in SpringBoot

Dead letter queue can realize other processing of messages when they are not normally consumed, so as to ensure that messages will not be discarded.

Before talking about the dead letter queue, let's first introduce why we need to use the dead letter queue.

If you want to directly understand the dead letter docking, just jump into the "dead letter queue" section below.

ack mechanism and request rejected attribute

RabbitMQ integration in SpringBoot

In project springboot-demo We see application The configuration of yaml file is as follows


...

listener:
    type: simple
    simple:
      acknowledge-mode: auto
      concurrency: 5
      default-requeue-rejected: true
      max-concurrency: 100
...

among

acknowledge-mode

This configuration item is used to indicate the message confirmation method. There are three configuration methods: none, manual and auto.

none means that no reply will be sent.

manual means that the listener must call channel Basicack() to inform all messages.

auto means that the container will respond automatically unless the MessageListener throws an exception, which is the default configuration.

default-requeue-rejected

This configuration item determines whether the messages rejected due to the exception thrown by the listener are put back into the queue. The default value is true.

At the beginning, I had a misunderstanding about this attribute. I thought rejected means rejection, so connecting request rejected means rejection and putting it back in the queue. Later, I checked the data to understand the function of this attribute and remembered that rejected is an adjective, which should represent the rejected message

Therefore, if this attribute is configured as true, it means it will be put back to the queue. If it is configured as false, it means it will not be put back to the queue.

Let's take a look at how RabbitMQ handles messages using different combinations of the acknowledge mode parameter and the default request rejected parameter.

The code still uses the RabbitApplicationTests in the springboot demo to send messages, and uses the Receiver class to listen for messages in the demo queue queue.

For the Receiver class, a line of code is added, which simulates throwing an exception


@Component
public class Receiver {

    @RabbitListener(queues = "demo_queue")
    public void created(String message) {
        System.out.println("orignal message: " + message);
        int i = 1/0;
    }
}

acknowledge-mode=none, default-requeue-rejected=false

This configuration does not confirm whether the message is consumed normally, so no exception is thrown on the console. Through the RabbitMQ administration page, you can't see the message of putting back to the queue

acknowledge-mode=none, default-requeue-rejected=true

Similarly, this configuration will not confirm whether the message is consumed normally, so no exception is thrown on the console. Moreover, even if the default request rejected configuration is true, the message of putting back to the queue is not seen because there is no confirmation

acknowledge-mode=manual, default-requeue-rejected=false

This configuration needs to manually confirm whether the message is consumed normally, but there is no manual confirmation in the code. I understand that the message is returned to the queue because the ack is not received.

acknowledge-mode=manual, default-requeue-rejected=true

This configuration needs to manually confirm whether the message is consumed normally, but there is no manual confirmation in the code, so the message is put back into the queue, and an exception is thrown on the console (this is not very clear. The different effects caused by setting default request rejected to true and false. If you have any trouble, please leave a message below).

acknowledge-mode=auto, default-requeue-rejected=false

The configuration adopts automatic confirmation. From the result, it is automatic confirmation.

From the results printed on the console, it can be seen that the Receiver method has been executed three times, including the first two messages put back to the queue and the messages sent this time, so all three messages have been consumed.

At the same time, because default request rejected is set to false, even if the consumer throws an exception, the message is not put back to the queue.

acknowledge-mode=auto, default-requeue-rejected=true

The configuration also adopts automatic confirmation. It can be seen from the results that no exception is thrown (this is not very understood), and because default request rejected is set to true, the message returns to the queue again.

To sum up, this list is only to illustrate that in some cases, if the message consumption is wrong, the message is lost due to configuration problems. This is fatal in many cases. For example, the order number paid by the user is fatal if it is directly lost due to abnormal throwing and other reasons.

Therefore, we need to have a guarantee mechanism to ensure that even failed messages can be saved. At this time, the dead letter queue comes in handy.

Dead letter queue

The whole design idea of dead letter queue is like this

Producer -- > message -- > switch -- > queue -- > becomes dead letter -- > Dlx switch -- > queue -- > consumer

Let's see how to use a dead letter queue on the Internet.


@Bean("deadLetterExchange")
    public Exchange deadLetterExchange() {
        return ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build();
    }

    @Bean("deadLetterQueue")
    public Queue deadLetterQueue() {
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange declaration dead letter switch
        args.put("x-dead-letter-exchange", "DL_EXCHANGE");
//       x-dead-letter-routing-key declare dead letter routing key
        args.put("x-dead-letter-routing-key", "KEY_R");
        return QueueBuilder.durable("DL_QUEUE").withArguments(args).build();
    }

    @Bean("redirectQueue")
    public Queue redirectQueue() {
        return QueueBuilder.durable("REDIRECT_QUEUE").build();
    }

    /**
     * Dead letter routing through DL_ The key binding key is bound to the dead letter queue
     *
     * @return the binding
     */
    @Bean
    public Binding deadLetterBinding() {
        return new Binding("DL_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "DL_KEY", null);

    }

    /**
     * Dead letter routing through key_ The R binding key is bound to the dead letter queue
     *
     * @return the binding
     */
    @Bean
    public Binding redirectBinding() {
        return new Binding("REDIRECT_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "KEY_R", null);
    }

be careful

  • Declared a direct mode exchange.

  • A dead letter queue deadLetterQueue is declared. The queue is configured with some properties. x-dead-letter-exchange indicates dead letter switch, and x-dead-letter-routing-key indicates dead letter routing key. Because it is in direct mode, this routing key needs to be set.

  • A substitute queue, redirectQueue, is declared, and the messages that become dead letters are finally stored in this queue.

  • Declare the binding relationship, which is the binding of dead letter queue, substitute queue and switch.

So how to simulate the generation of a dead letter message, which can be sent to dl_ The queue message will expire after 10 seconds and then be forwarded to the substitute queue. The code is implemented as follows


public void sendMsg(String content) {
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        MessagePostProcessor messagePostProcessor = message -> {
            MessageProperties messageProperties = message.getMessageProperties();
//            Set code
            messageProperties.setContentEncoding("utf-8");
//            Set expiration time 10 * 1000 ms
            messageProperties.setExpiration("5000");
            return message;
        };
        rabbitTemplate.convertAndSend("DL_EXCHANGE", "DL_KEY", content, messagePostProcessor);
    }

The implementation results are as follows

The message first enters DL_QUEUE fails after 5 seconds and is forwarded to redirect_ In queue.

------------------------------------------------------------------------------------------------------------------

 

#Concept:

  • Scenario where the message will become a dead letter message:

    1. The message is (basic. Reject() or basic Nack()) and request = false, that is, the message is rejected by the consumer and re queued as false.
      1.1 there is a scenario to note: the consumer has set automatic ACK. When the number of repeated deliveries reaches the set maximum number of retries, the message will also be delivered to the dead letter queue, but the internal principle still calls nack/reject.
    2. The message has expired and the TTL lifetime has passed.
    3. The queue has set the maximum number of x-max-length messages, and the current number of messages in the queue has reached this number. If it is delivered again, the messages will be squeezed out, and the messages that are squeezed out are the messages closest to the end being consumed.
  • The coding process is:

    1. There is one (n) Exchange with normal business, such as user Exchange.
    2. There is one (n) Queue for normal business, such as user Queue. (because the Queue needs to be bound to the dead letter switch, two parameters need to be added: dead letter switch: x-dead-letter-exchange, dead letter message routing key: x-dead-letter-routing-key)
    3. Switch and queue binding for normal service.
    4. Define a dead letter switch, such as common dead letter exchange.
    5. Bind the queue of normal business to the dead letter switch (the queue will be bound automatically when x-dead-letter-exchange is set).
    6. Define the dead letter queue user dead letter queue, which is used to receive dead letter messages and bind the dead letter switch.
  • The business process is:

    1. The normal business message is delivered to the normal business Exchange, which routes the message to the bound normal queue according to the routing key.
    2. After the message in the normal service queue becomes a dead letter message, it will be automatically delivered to the dead letter switch bound to the queue (with the configured routing key. If the routing key of the dead letter message is not specified, it will inherit the routing key set by the message in the normal service by default).
    3. After receiving the message, the dead letter switch routes the message to the specified dead letter queue according to the routing rules.
    4. After the message reaches the dead letter queue, you can listen to the dead letter queue and process the dead letter message.
  • Dead letter switch and dead letter queue are also ordinary switches and queues, but we artificially use a certain switch and queue to process dead letter messages.

  • flow chart

Picture png

#Code implementation

  1. to configure

 

spring:
  application:
    name: learn-rabbitmq
  rabbitmq:
    host: localhost
    port: 5672
    username: futao
    password: 123456789
    virtual-host: deadletter-vh
    connection-timeout: 15000
    # Send confirmation
    publisher-confirms: true
    # Route failure callback
    publisher-returns: true
    template:
      # It must be set to true to notify the listener of message routing failure instead of discarding the message
      mandatory: true
    listener:
      simple:
        # Number of messages obtained from RabbitMQ each time
        prefetch: 1
        default-requeue-rejected: false
        # Number of consumers started per queue
        concurrency: 1
        # Maximum number of consumers per queue
        max-concurrency: 1
        # The sign in mode is manual sign in - then you need to manually ACK in the code
        acknowledge-mode: manual

app:
  rabbitmq:
    # Queue definition
    queue:
      # Normal service queue
      user: user-queue
      # Dead letter queue
      user-dead-letter: user-dead-letter-queue
    # Switch definition
    exchange:
      # Normal service switch
      user: user-exchange
      # Dead letter switch
      common-dead-letter: common-dead-letter-exchange
  1. Queue, switch definition and binding.

 

/**
 * Definition and binding of queue and switch
 *
 * @author futao
 * @date 2020/4/7.
 */
@Configuration
public class Declare {

        /**
     * User queue
     *
     * @param userQueueName User queue name
     * @return
     */
    @Bean
    public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName,
                           @Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) {
        return QueueBuilder
                .durable(userQueueName)
                //Declare the switch to which the dead letter message of the queue is sent (after adding this parameter to the queue, it will automatically bind to the switch and set the routing key, which does not need to be set manually by the developer)
                .withArgument("x-dead-letter-exchange", commonDeadLetterExchange)
                //Declare the routing key of the dead letter message of the queue in the switch
                .withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key")
                .build();
    }

    /**
     * PBX
     *
     * @param userExchangeName PBX name
     * @return
     */
    @Bean
    public Exchange userExchange(@Value("${app.rabbitmq.exchange.user}") String userExchangeName) {
        return ExchangeBuilder
                .topicExchange(userExchangeName)
                .durable(true)
                .build();
    }

    /**
     * User queue and switch binding
     *
     * @param userQueue    User queue name
     * @param userExchange PBX name
     * @return
     */
    @Bean
    public Binding userBinding(Queue userQueue, Exchange userExchange) {
        return BindingBuilder
                .bind(userQueue)
                .to(userExchange)
                .with("user.*")
                .noargs();
    }

    /**
     * Dead letter switch
     *
     * @param commonDeadLetterExchange Universal dead letter switch name
     * @return
     */
    @Bean
    public Exchange commonDeadLetterExchange(@Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) {
        return ExchangeBuilder
                .topicExchange(commonDeadLetterExchange)
                .durable(true)
                .build();
    }


   /**
     * Dead letter message routing queue of user queue
     * The dead letter from the user queue is delivered to the dead letter switch ` common dead letter exchange 'and then delivered to the queue
     * This queue is used to receive dead letter messages from user queue
     *
     * @return
     */
    @Bean
    public Queue userDeadLetterQueue(@Value("${app.rabbitmq.queue.user-dead-letter}") String userDeadLetterQueue) {
        return QueueBuilder
                .durable(userDeadLetterQueue)
                .build();
    }

    /**
     * Dead letter queue binding dead letter switch
     *
     * @param userDeadLetterQueue      user-queue Corresponding dead letter queue
     * @param commonDeadLetterExchange Universal dead letter switch
     * @return
     */
    @Bean
    public Binding userDeadLetterBinding(Queue userDeadLetterQueue, Exchange commonDeadLetterExchange) {
        return BindingBuilder
                .bind(userDeadLetterQueue)
                .to(commonDeadLetterExchange)
                .with("user-dead-letter-routing-key")
                .noargs();
    }

}
  • After definition, start the program, and springboot will read the bean s of Queue and Exchange in the Spring container to initialize and bind the Queue and switch. Of course, you can also create and bind manually in the management background of RabbitMQ.
  • View management background

     

    Switch

     

    queue

     

    Queue and dead letter switch

#Testing

  • Message producer

 

/**
 * @author futao
 * @date 2020/4/7.
 */
@Component
public class DeadLetterSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Value("${app.rabbitmq.exchange.user}")
    private String userExchange;

    public void send() {
        User user = User.builder()
                .userName("Astronomy")
                .address("Hangzhou, Zhejiang")
                .birthday(LocalDate.now(ZoneOffset.ofHours(8)))
                .build();
        rabbitTemplate.convertAndSend(userExchange, "user.abc", user);
    }
}

1. Scenario 1.1

The message is (basic. Reject() or basic nack ()) and request = false, that is, the message is rejected or nack by the consumer, and re joining the queue is false.

The difference between nack() and reject() is that reject() does not support batch rejection, while nack() can

  • Consumer code

 

/**
 * @author futao
 * @date 2020/4/9.
 */
@Slf4j
@Component
public class Consumer {

    /**
     * Normal user queue message listening consumer
     *
     * @param user
     * @param message
     * @param channel
     */
    @RabbitListener(queues = "${app.rabbitmq.queue.user}")
    public void userConsumer(User user, Message message, Channel channel) {
        log.info("Normal user service monitoring: message received:[{}]", JSON.toJSONString(user));
        try {
            //The parameters are: DeliveryTag of the message, whether to reject in batch, and whether to rejoin the queue
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            log.info("Sign off...The routing key of the message is:[{}]", message.getMessageProperties().getReceivedRoutingKey());
        } catch (IOException e) {
            log.error("Message reject sign in failed", e);
        }
    }

    /**
     * @param user
     * @param message
     * @param channel
     */
    @RabbitListener(queues = "${app.rabbitmq.queue.user-dead-letter}")
    public void userDeadLetterConsumer(User user, Message message, Channel channel) {
        log.info("Dead letter message received:[{}]", JSON.toJSONString(user));
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            log.info("Dead letter queue sign in message....Message routing key is:[{}]", message.getMessageProperties().getReceivedRoutingKey());
        } catch (IOException e) {
            log.error("Dead letter queue message signing failed", e);
        }
    }
}
  • It can be seen that after the normal message is NACK, it finally reaches the dead letter queue, and the routing key has changed.

    Dead letter message


    1. Scenario 1.2

The consumer has set automatic sign in. When the number of repeated deliveries reaches the set maximum number of retries, the message will also be delivered to the dead letter queue, but the internal principle still calls nack/reject.

  • application. Some configuration changes are needed in YML

 

spring:
  application:
    name: learn-rabbitmq
  rabbitmq:
    listener:
      simple:
        # Number of messages obtained from RabbitMQ each time
        prefetch: 1
        default-requeue-rejected: false
        # Number of consumers started per queue
        concurrency: 1
        # Maximum number of consumers per queue
        max-concurrency: 1
        # Automatic sign in
        acknowledge-mode: auto
        retry:
          enabled: true
          # First attempt interval
          initial-interval: 10S
          # Maximum duration between attempts.
          max-interval: 10S
          # Maximum number of retries (= first normal delivery 1 + number of retries 4)
          max-attempts: 5
          # Multiplier of last retry time
          multiplier: 1.0
  • Consumer code

 

/**
 * @author futao
 * @date 2020/4/9.
 */
@Slf4j
@Configuration
public class AutoAckConsumer {

    /**
     * Normal user queue message listening consumer
     *
     * @param user
     */
    @RabbitListener(queues = "${app.rabbitmq.queue.user}")
    public void userConsumer(User user) {
        log.info("Normal user service monitoring: message received:[{}]", JSON.toJSONString(user));
        throw new RuntimeException("Abnormal simulation");
    }

    /**
     * @param user
     */
    @RabbitListener(queues = "${app.rabbitmq.queue.user-dead-letter}")
    public void userDeadLetterConsumer(User user) {
        log.info("Receive the dead letter message and sign it automatically:[{}]", JSON.toJSONString(user));
    }
}
  • Test results:

     

    image.png

     

    image.png

  • It can be seen from the test results that if the message is not normally consumed, it will be retried. If it is not normally consumed in the end, it will be delivered to the dead letter queue.

I don't know the function of the initial interval and Max interval parameters. Now the test result is that the shortest time will always be taken as the next delivery time

2. Test scenario 2

The message has expired and the TTL lifetime has passed.

  • You need to modify the queue definition and set the expiration time of queue messages x-message-ttl

 

    /**
     * User queue
     *
     * @param userQueueName User queue name
     * @return
     */
    @Bean
    public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName,
                           @Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) {
        return QueueBuilder
                .durable(userQueueName)
                //Declare the switch to which the dead letter message of the queue is sent (after adding this parameter to the queue, it will automatically bind to the switch and set the routing key, which does not need to be set manually by the developer)
                .withArgument("x-dead-letter-exchange", commonDeadLetterExchange)
                //Declare the routing key of the dead letter message of the queue in the switch
                .withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key")
                //Expiration time of messages in this queue - if the messages are not consumed after this time, they will be routed to the dead letter queue
                .withArgument("x-message-ttl", 5000)
                .build();
    }
  • Annotate the consumer of the user queue so that the message cannot be consumed until the time of the message in the queue reaches the set survival time.

    ttl

     

  • According to the log, the message will be delivered to the dead letter queue after 5S.

     

    image.png

  • Note: the message expiration time can be set for the queue, so all messages delivered to the queue will automatically have this attribute. You can also set a specified expiration time for each message before it is delivered. (when both are set, the shorter value is taken by default)

The following test sets the specified expiration time for each message:

  • Producer modify message:

 

/**
 * @author futao
 * @date 2020/4/7.
 */
@Slf4j
@Component
public class DeadLetterSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Value("${app.rabbitmq.exchange.user}")
    private String userExchange;

    public void send(String exp) {
        User user = User.builder()
                .userName("Astronomy")
                .address("Hangzhou, Zhejiang")
                .birthday(LocalDate.now(ZoneOffset.ofHours(8)))
                .build();
        log.info("Message delivery...The specified lifetime is:[{}]ms", exp);
        rabbitTemplate.convertAndSend(userExchange, "user.abc", user, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                MessageProperties messageProperties = message.getMessageProperties();
                //Set expiration time for each message
                messageProperties.setExpiration(exp);
                return message;
            }
        });
    }
}

image.png

  • It can be seen from the test results that each message was delivered to the dead letter queue at the specified time.

[pit] key points!!!: RabbitMQ detection of message Expiration: it will only detect whether the message to be consumed recently has reached the expiration time, and will not detect whether the non end message has expired. The problem is that the non end message has expired, but because the end message has not expired and the non end message is blocked, the non end message will not be detected as expired. Make the business produce results that are seriously inconsistent with expectations.

  • Test the above problems: (set the expiration time of the first message to 10S and the second message to 5S)

     

    image.png

  • It can be seen from the test results that the survival time of the message with ID 1 is 10S and that of the message with ID 2 is 5S. However, only when the first message (id=1) expires and the message with id=2 reaches the end of the queue will it be detected that it has expired.

3. Test scenario 3

The queue has set the maximum number of x-max-length messages, and the current number of messages in the queue has reached this number. If it is delivered again, the messages will be squeezed out, and the messages that are squeezed out are the messages closest to the end being consumed.

  • Modify queue definition

 

  /**
     * User queue
     *
     * @param userQueueName User queue name
     * @return
     */
    @Bean
    public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName,
                           @Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) {
        return QueueBuilder
                .durable(userQueueName)
                //Declare the switch to which the dead letter message of the queue is sent (after adding this parameter to the queue, it will automatically bind to the switch and set the routing key, which does not need to be set manually by the developer)
                .withArgument("x-dead-letter-exchange", commonDeadLetterExchange)
                //Declare the routing key of the dead letter message of the queue in the switch
                .withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key")
                //Maximum number of messages queued
                .withArgument("x-max-length", 2)
                .build();
    }

image.png

  • Post message to queue

     

    image.png

  • It can be seen from the results that when the third message is delivered, RabbitMQ will remove the message at the most consumed end from the queue and deliver it to the dead letter queue.

    image.png

     

    A maximum of two messages will always be held in the queue.

#Others:

#Related:

SpringBoot RabbitMQ realizes reliable message delivery

# TODO:

  • Consumer end current limiting protection
  • Delay queue

Tags: Java RabbitMQ

Posted by jmut on Sun, 22 May 2022 21:04:35 +0300