How does RabbitMQ ensure reliable delivery of messages?

Spring Boot integrates RabbitMQ

Spring can be configured in three ways

  1. XML based
  2. Based on JavaConfig
  3. Annotation based

Of course, XML is rarely used for configuration now. Let's just introduce the configuration method using JavaConfig and annotations

RabbitMQ integrates Spring Boot. We only need to add the corresponding starter

 <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>

Annotation based

In application Yaml is configured as follows

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /

log:
  exchange: log.exchange
  info:
    queue: info.log.queue
    binding-key: info.log.key
  error:
    queue: error.log.queue
    binding-key: error.log.key
  all:
    queue: all.log.queue
    binding-key: '*.log.key'

The consumer code is as follows:

@Slf4j
@Component
public class LogReceiverListener {

    /**
     * Receive info level logs
     */
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "${log.info.queue}", durable = "true"),
                    exchange = @Exchange(value = "${log.exchange}", 
                                         type = ExchangeTypes.TOPIC),
                    					 key = "${log.info.binding-key}"
            )
    )
    public void infoLog(Message message) {
        String msg = new String(message.getBody());
        log.info("infoLogQueue The message received is: {}", msg);
    }

    /**
     * Receive all logs
     */
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "${log.all.queue}", durable = "true"),
                    exchange = @Exchange(value = "${log.exchange}", 
                                         type = ExchangeTypes.TOPIC),
                    					 key = "${log.all.binding-key}"
            )
    )
    public void allLog(Message message) {
        String msg = new String(message.getBody());
        log.info("allLogQueue The message received is: {}", msg);
    }
}

The producers are as follows:

@RunWith(SpringRunner.class)
@SpringBootTest
public class MsgProducerTest {

    @Autowired
    private AmqpTemplate amqpTemplate;
    @Value("${log.exchange}")
    private String exchange;
    @Value("${log.info.binding-key}")
    private String routingKey;

    @SneakyThrows
    @Test
    public void sendMsg() {
        for (int i = 0; i < 5; i++) {
            String message = "this is info message " + i;
            amqpTemplate.convertAndSend(exchange, routingKey, message);
        }

        System.in.read();
    }
}

The Spring Boot method for message ack is a little different from the native api method for message ack

Native api message ack mode

There are two ways to confirm messages

Auto ack = auto ACK
Manual confirmation (autoAck=false)

Consumers can specify the autoAck parameter when consuming messages

String basicConsume(String queue, boolean autoAck, Consumer callback)

autoAck=false: RabbitMQ will wait for the consumer to display the reply confirmation message before removing the message from memory (or disk)

autoAck=true: RabbitMQ will automatically set the sent messages as confirmation, and then delete them from memory (or disk), regardless of whether the consumer actually consumes these messages

The manual confirmation method is as follows, with 2 parameters

basicAck(long deliveryTag, boolean multiple)

deliveryTag: used to identify messages delivered in the channel. When RabbitMQ pushes a message to the Consumer, it will be attached with a deliveryTag so that the Consumer can tell RabbitMQ which message has been confirmed when the message is confirmed.
RabbitMQ ensures that in each channel, the deliveryTag of each message is incremented from 1

multiple=true: messages with message ID < = deliverytag will be confirmed

myltiple=false: all messages with message id=deliveryTag will be confirmed

The news has not been confirmed. What will happen?

If the message in the queue is sent to the consumer and the consumer does not confirm the message, the message will remain in the queue until confirmed.
If the message sent to consumer A is not confirmed, rabbitmq will not consider re delivering the unconfirmed message of consumer A to another consumer until the connection between consumer A and rabbitmq is interrupted

Method of message ack in Spring Boot

There are three ways, which are defined in the AcknowledgeMode enumeration class

The default ack mode of spring boot for messages is AUTO.

In the actual scenario, we usually use manual ack.

application. The configuration of yaml is changed as follows

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual # Manual ack, auto by default

The corresponding consumer code is changed to:

@Slf4j
@Component
public class LogListenerManual {

    /**
     * Receive info level logs
     */
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "${log.info.queue}", durable = "true"),
                    exchange = @Exchange(value = "${log.exchange}", 
                                         type = ExchangeTypes.TOPIC),
                    					 key = "${log.info.binding-key}"
            )
    )
    public void infoLog(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody());
        log.info("infoLogQueue The message received is: {}", msg);
        try {
            // Write all kinds of business logic here
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }
}

The annotations we used above have the following functions

Based on JavaConfig

Since annotations are so convenient, why do you need JavaConfig?
JavaConfig is convenient to customize various properties, such as configuring multiple virtual host s at the same time

How RabbitMQ ensures reliable delivery of messages

A message often goes through the following stages

Therefore, to ensure the reliable delivery of messages, you only need to ensure the reliable delivery of these three stages:

Production stage

Reliable delivery at this stage mainly depends on ConfirmListener (publisher confirmation) and ReturnListener (failure notification)
As mentioned earlier, the flow process of a message in RabbitMQ is
producer -> rabbitmq broker cluster -> exchange -> queue -> consumer

ConfirmListener can get whether the message is sent from the producer to the broker
ReturnListener can get messages that cannot be routed to the queue from exchange

I use the api of Spring Boot Starter to demonstrate the effect

application.yaml

spring:
  rabbitmq:
    host: myhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual # Manual ack, auto by default

log:
  exchange: log.exchange
  info:
    queue: info.log.queue
    binding-key: info.log.key

Publisher confirmation callback

@Component
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {

    @Autowired
    private MessageSender messageSender;

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String msgId = correlationData.getId();
        String msg = messageSender.dequeueUnAckMsg(msgId);
        if (ack) {
            System.out.println(String.format("news {%s} Successfully sent to mq", msg));
        } else {
            // You can add some retry logic
            System.out.println(String.format("news {%s} send out mq fail", msg));
        }
    }
}

Failure notification callback

@Component
public class ReturnCallback implements RabbitTemplate.ReturnCallback {

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        String msg = new String(message.getBody());
        System.out.println(String.format("news {%s} Cannot be routed correctly, routingKey by {%s}", msg, routingKey));
    }
}

RabbitMQ configuration class:

@Configuration
public class RabbitMqConfig {

    @Bean
    public ConnectionFactory connectionFactory(
            @Value("${spring.rabbitmq.host}") String host,
            @Value("${spring.rabbitmq.port}") int port,
            @Value("${spring.rabbitmq.username}") String username,
            @Value("${spring.rabbitmq.password}") String password,
            @Value("${spring.rabbitmq.virtual-host}") String vhost) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(vhost);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
                                         ReturnCallback returnCallback, ConfirmCallback confirmCallback) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setReturnCallback(returnCallback);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        // For returnCallback to take effect, it must be set to true
        rabbitTemplate.setMandatory(true);
        return rabbitTemplate;
    }
}

Here I wrap the RabbitTemplate, which is mainly to add the message id when sending, and save the corresponding relationship between the message id and the message, because the RabbitTemplate Confirmcallback can only get the message id, not the message content, so we need to save this mapping relationship ourselves. In some systems with high reliability requirements, you can save this mapping relationship to the database, send it successfully, delete the mapping relationship, and send it all the time if it fails

@Component
public class MessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public final Map<String, String> unAckMsgQueue = new ConcurrentHashMap<>();

    public void convertAndSend(String exchange, String routingKey, String message) {
        String msgId = UUID.randomUUID().toString();
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(msgId);
        rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
        unAckMsgQueue.put(msgId, message);
    }

    public String dequeueUnAckMsg(String msgId) {
        return unAckMsgQueue.remove(msgId);
    }

}

The test code is

@RunWith(SpringRunner.class)
@SpringBootTest
public class MsgProducerTest {

    @Autowired
    private MessageSender messageSender;
    @Value("${log.exchange}")
    private String exchange;
    @Value("${log.info.binding-key}")
    private String routingKey;

    /**
     * Test failure notification
     */
    @SneakyThrows
    @Test
    public void sendErrorMsg() {
        for (int i = 0; i < 3; i++) {
            String message = "this is error message " + i;
            messageSender.convertAndSend(exchange, "test", message);
        }
        System.in.read();
    }

    /**
     * Test publisher confirmation
     */
    @SneakyThrows
    @Test
    public void sendInfoMsg() {
        for (int i = 0; i < 3; i++) {
            String message = "this is info message " + i;
            messageSender.convertAndSend(exchange, routingKey, message);
        }
        System.in.read();
    }
}

Let's test the loser notice first

Output as

news {this is error message 0} Cannot be routed correctly, routingKey by {test}
news {this is error message 0} Successfully sent to mq
 news {this is error message 2} Cannot be routed correctly, routingKey by {test}
news {this is error message 2} Successfully sent to mq
 news {this is error message 1} Cannot be routed correctly, routingKey by {test}
news {this is error message 1} Successfully sent to mq

Messages were successfully sent to the broker, but were not routed to the queue

Then test the publisher's confirmation

Output as

news {this is info message 0} Successfully sent to mq
infoLogQueue The message received is: {this is info message 0}
infoLogQueue The message received is: {this is info message 1}
news {this is info message 1} Successfully sent to mq
infoLogQueue The message received is: {this is info message 2}
news {this is info message 2} Successfully sent to mq

Messages are successfully sent to the broker and routed to the queue

Storage phase

High availability at this stage has never been studied. After all, clusters are built by operation and maintenance. If there is time in the future, we will add this fast content

Consumption stage

The reliable delivery in the consumption stage mainly depends on ack.
The way of native api ack and Spring Boot framework ack have been introduced earlier

In a word, in the production environment, we usually use a single manual ack. After consumption fails, we will not rejoin the queue (because there is a high probability that we will fail again), but re deliver the message to the dead letter queue to facilitate troubleshooting in the future

Summarize the situation

  1. After ack, the message is deleted from the broker
  2. After nack or reject, it can be divided into the following two cases
    (1) If reque = true, the message will be re queued
    (2) reque=fasle, the message will be directly discarded. If the dead letter queue is specified, it will be delivered to the dead letter queue

Posted by ddoc on Sun, 08 May 2022 00:20:39 +0300