Spring Boot integrates RabbitMQ
Spring can be configured in three ways
- XML based
- Based on JavaConfig
- Annotation based
Of course, XML is rarely used for configuration now. Let's just introduce the configuration method using JavaConfig and annotations
RabbitMQ integrates Spring Boot. We only need to add the corresponding starter
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
Annotation based
In application Yaml is configured as follows
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: / log: exchange: log.exchange info: queue: info.log.queue binding-key: info.log.key error: queue: error.log.queue binding-key: error.log.key all: queue: all.log.queue binding-key: '*.log.key'
The consumer code is as follows:
@Slf4j @Component public class LogReceiverListener { /** * Receive info level logs */ @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${log.info.queue}", durable = "true"), exchange = @Exchange(value = "${log.exchange}", type = ExchangeTypes.TOPIC), key = "${log.info.binding-key}" ) ) public void infoLog(Message message) { String msg = new String(message.getBody()); log.info("infoLogQueue The message received is: {}", msg); } /** * Receive all logs */ @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${log.all.queue}", durable = "true"), exchange = @Exchange(value = "${log.exchange}", type = ExchangeTypes.TOPIC), key = "${log.all.binding-key}" ) ) public void allLog(Message message) { String msg = new String(message.getBody()); log.info("allLogQueue The message received is: {}", msg); } }
The producers are as follows:
@RunWith(SpringRunner.class) @SpringBootTest public class MsgProducerTest { @Autowired private AmqpTemplate amqpTemplate; @Value("${log.exchange}") private String exchange; @Value("${log.info.binding-key}") private String routingKey; @SneakyThrows @Test public void sendMsg() { for (int i = 0; i < 5; i++) { String message = "this is info message " + i; amqpTemplate.convertAndSend(exchange, routingKey, message); } System.in.read(); } }
The Spring Boot method for message ack is a little different from the native api method for message ack
Native api message ack mode
There are two ways to confirm messages
Auto ack = auto ACK
Manual confirmation (autoAck=false)
Consumers can specify the autoAck parameter when consuming messages
String basicConsume(String queue, boolean autoAck, Consumer callback)
autoAck=false: RabbitMQ will wait for the consumer to display the reply confirmation message before removing the message from memory (or disk)
autoAck=true: RabbitMQ will automatically set the sent messages as confirmation, and then delete them from memory (or disk), regardless of whether the consumer actually consumes these messages
The manual confirmation method is as follows, with 2 parameters
basicAck(long deliveryTag, boolean multiple)
deliveryTag: used to identify messages delivered in the channel. When RabbitMQ pushes a message to the Consumer, it will be attached with a deliveryTag so that the Consumer can tell RabbitMQ which message has been confirmed when the message is confirmed.
RabbitMQ ensures that in each channel, the deliveryTag of each message is incremented from 1
multiple=true: messages with message ID < = deliverytag will be confirmed
myltiple=false: all messages with message id=deliveryTag will be confirmed
The news has not been confirmed. What will happen?
If the message in the queue is sent to the consumer and the consumer does not confirm the message, the message will remain in the queue until confirmed.
If the message sent to consumer A is not confirmed, rabbitmq will not consider re delivering the unconfirmed message of consumer A to another consumer until the connection between consumer A and rabbitmq is interrupted
Method of message ack in Spring Boot
There are three ways, which are defined in the AcknowledgeMode enumeration class
The default ack mode of spring boot for messages is AUTO.
In the actual scenario, we usually use manual ack.
application. The configuration of yaml is changed as follows
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: / listener: simple: acknowledge-mode: manual # Manual ack, auto by default
The corresponding consumer code is changed to:
@Slf4j @Component public class LogListenerManual { /** * Receive info level logs */ @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${log.info.queue}", durable = "true"), exchange = @Exchange(value = "${log.exchange}", type = ExchangeTypes.TOPIC), key = "${log.info.binding-key}" ) ) public void infoLog(Message message, Channel channel) throws Exception { String msg = new String(message.getBody()); log.info("infoLogQueue The message received is: {}", msg); try { // Write all kinds of business logic here channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } }
The annotations we used above have the following functions
Based on JavaConfig
Since annotations are so convenient, why do you need JavaConfig?
JavaConfig is convenient to customize various properties, such as configuring multiple virtual host s at the same time
How RabbitMQ ensures reliable delivery of messages
A message often goes through the following stages
Therefore, to ensure the reliable delivery of messages, you only need to ensure the reliable delivery of these three stages:
Production stage
Reliable delivery at this stage mainly depends on ConfirmListener (publisher confirmation) and ReturnListener (failure notification)
As mentioned earlier, the flow process of a message in RabbitMQ is
producer -> rabbitmq broker cluster -> exchange -> queue -> consumer
ConfirmListener can get whether the message is sent from the producer to the broker
ReturnListener can get messages that cannot be routed to the queue from exchange
I use the api of Spring Boot Starter to demonstrate the effect
application.yaml
spring: rabbitmq: host: myhost port: 5672 username: guest password: guest virtual-host: / listener: simple: acknowledge-mode: manual # Manual ack, auto by default log: exchange: log.exchange info: queue: info.log.queue binding-key: info.log.key
Publisher confirmation callback
@Component public class ConfirmCallback implements RabbitTemplate.ConfirmCallback { @Autowired private MessageSender messageSender; @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String msgId = correlationData.getId(); String msg = messageSender.dequeueUnAckMsg(msgId); if (ack) { System.out.println(String.format("news {%s} Successfully sent to mq", msg)); } else { // You can add some retry logic System.out.println(String.format("news {%s} send out mq fail", msg)); } } }
Failure notification callback
@Component public class ReturnCallback implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { String msg = new String(message.getBody()); System.out.println(String.format("news {%s} Cannot be routed correctly, routingKey by {%s}", msg, routingKey)); } }
RabbitMQ configuration class:
@Configuration public class RabbitMqConfig { @Bean public ConnectionFactory connectionFactory( @Value("${spring.rabbitmq.host}") String host, @Value("${spring.rabbitmq.port}") int port, @Value("${spring.rabbitmq.username}") String username, @Value("${spring.rabbitmq.password}") String password, @Value("${spring.rabbitmq.virtual-host}") String vhost) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(vhost); connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); return connectionFactory; } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, ReturnCallback returnCallback, ConfirmCallback confirmCallback) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setReturnCallback(returnCallback); rabbitTemplate.setConfirmCallback(confirmCallback); // For returnCallback to take effect, it must be set to true rabbitTemplate.setMandatory(true); return rabbitTemplate; } }
Here I wrap the RabbitTemplate, which is mainly to add the message id when sending, and save the corresponding relationship between the message id and the message, because the RabbitTemplate Confirmcallback can only get the message id, not the message content, so we need to save this mapping relationship ourselves. In some systems with high reliability requirements, you can save this mapping relationship to the database, send it successfully, delete the mapping relationship, and send it all the time if it fails
@Component public class MessageSender { @Autowired private RabbitTemplate rabbitTemplate; public final Map<String, String> unAckMsgQueue = new ConcurrentHashMap<>(); public void convertAndSend(String exchange, String routingKey, String message) { String msgId = UUID.randomUUID().toString(); CorrelationData correlationData = new CorrelationData(); correlationData.setId(msgId); rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData); unAckMsgQueue.put(msgId, message); } public String dequeueUnAckMsg(String msgId) { return unAckMsgQueue.remove(msgId); } }
The test code is
@RunWith(SpringRunner.class) @SpringBootTest public class MsgProducerTest { @Autowired private MessageSender messageSender; @Value("${log.exchange}") private String exchange; @Value("${log.info.binding-key}") private String routingKey; /** * Test failure notification */ @SneakyThrows @Test public void sendErrorMsg() { for (int i = 0; i < 3; i++) { String message = "this is error message " + i; messageSender.convertAndSend(exchange, "test", message); } System.in.read(); } /** * Test publisher confirmation */ @SneakyThrows @Test public void sendInfoMsg() { for (int i = 0; i < 3; i++) { String message = "this is info message " + i; messageSender.convertAndSend(exchange, routingKey, message); } System.in.read(); } }
Let's test the loser notice first
Output as
news {this is error message 0} Cannot be routed correctly, routingKey by {test} news {this is error message 0} Successfully sent to mq news {this is error message 2} Cannot be routed correctly, routingKey by {test} news {this is error message 2} Successfully sent to mq news {this is error message 1} Cannot be routed correctly, routingKey by {test} news {this is error message 1} Successfully sent to mq
Messages were successfully sent to the broker, but were not routed to the queue
Then test the publisher's confirmation
Output as
news {this is info message 0} Successfully sent to mq infoLogQueue The message received is: {this is info message 0} infoLogQueue The message received is: {this is info message 1} news {this is info message 1} Successfully sent to mq infoLogQueue The message received is: {this is info message 2} news {this is info message 2} Successfully sent to mq
Messages are successfully sent to the broker and routed to the queue
Storage phase
High availability at this stage has never been studied. After all, clusters are built by operation and maintenance. If there is time in the future, we will add this fast content
Consumption stage
The reliable delivery in the consumption stage mainly depends on ack.
The way of native api ack and Spring Boot framework ack have been introduced earlier
In a word, in the production environment, we usually use a single manual ack. After consumption fails, we will not rejoin the queue (because there is a high probability that we will fail again), but re deliver the message to the dead letter queue to facilitate troubleshooting in the future
Summarize the situation
- After ack, the message is deleted from the broker
- After nack or reject, it can be divided into the following two cases
(1) If reque = true, the message will be re queued
(2) reque=fasle, the message will be directly discarded. If the dead letter queue is specified, it will be delivered to the dead letter queue