Here comes the RabbitMQ dry goods you want most!!!

preface

  • At the beginning of the article, we first lead to several questions:
    Are you still worried about the need to synchronize some data between two (more) systems through scheduled tasks?
    Are you struggling with the problem of calling and communicating between different processes in heterogeneous systems?
    If so, congratulations. Messaging makes it easy for you to solve these problems.
    Message service is good at solving the problem of data exchange (message notification / communication) between multiple systems and heterogeneous systems. You can also use it for the mutual call (RPC) of services between systems.
  • RabbitMQ, which will be introduced in this article, is one of the most mainstream message oriented middleware. Mainly through the combination of concept elaboration and examples to briefly introduce RabbitMQ, hoping to be helpful.

Basic concepts

  • About RabbitMQ
    AMQP, namely Advanced Message Queuing Protocol, is an open standard of application layer protocol, which is designed for message oriented middleware. Message middleware is mainly used for decoupling between components. The sender of the message does not need to know the existence of the message consumer, and vice versa.
    The main features of AMQP are message oriented, queue oriented, routing (including point-to-point and publish / subscribe), reliability and security.
    RabbitMQ is an open source AMQP implementation. The server side is written in Erlang language and supports a variety of clients, such as Python, Ruby NET, Java, JMS, C, PHP, ActionScript, XMPP, STOMP, etc. AJAX is supported. It is used to store and forward messages in distributed systems, and performs well in ease of use, scalability, high availability and so on.
  • ConnectionFactory,Connection,Channel
    ConnectionFactory, connection and channel are the most basic objects in the API provided by RabbitMQ. Connection is the socket link of RabbitMQ, which encapsulates some logic related to the socket protocol. ConnectionFactory is the manufacturing factory of connection. Channel is the most important interface for us to deal with RabbitMQ. Most of our business operations are completed in channel, including defining Queue, defining Exchange, binding Queue to Exchange, publishing messages, etc.
  • Queue
    Queue is the internal object of RabbitMQ, which is used to store messages. Messages in RabbitMQ can only be stored in the queue. The producer (P in the figure below) produces messages and finally delivers them to the queue. Consumers (C1 and C2 in the figure below) can get messages from the queue and consume them. And multiple consumers can subscribe to the same queue. At this time, the messages in the queue will be evenly allocated to multiple consumers for processing, instead of each consumer receiving and processing all messages.
  • Exchange
    In the above Queue, you can see that the producer delivers messages to the Queue. In fact, this will never happen in RabbitMQ. The actual situation is that the producer sends the message to Exchange (Exchange in the figure below), and Exchange routes the message to one or more queues (or discards it).

    What logic does Exchange follow to route messages to queues? This will be introduced in Binding.
    There are four types of Exchange in RabbitMQ. Different types have different routing policies, which will be introduced in Exchange Types.
  • routing key
    When sending a message to Exchange, the producer usually specifies a routing key to specify the routing rules of the message, and the routing key needs to be used in combination with Exchange Type and binding key to take effect.
    When the Exchange Type and binding key are fixed, our producer can determine where the message flows by specifying the routing key when sending the message to Exchange.
  • Binding
    RabbitMQ associates Exchange with Queue through Binding, so that RabbitMQ knows how to correctly route messages to the specified Queue
  • Binding key
    When Binding Exchange and Queue, a binding key is usually specified; When consumers send messages to Exchange, they usually specify a routing key; When the binding key matches the routing key, the message will be routed to the corresponding Queue. When binding multiple queues to the same Exchange, these bindings are allowed to use the same binding key. The binding key does not take effect in all cases. It depends on the Exchange Type. For example, Exchange of fanout type will ignore the binding key and route messages to all queues bound to the Exchange.
  • Exchange Types
    RabbitMQ commonly used exchange types include Fanout, Direct, Topic and Headers. In this article, we mainly summarize the first three examples.
    (1)direct
    The direct connected switch delivers the message to the corresponding queue according to the routing key carried by the message. A queue is bound to a direct connected switch and given a routing key. Then, when a message carries a route value of X and the message is sent to the switch through the producer, the switch will find the queue with the binding value of X according to the route value X.

    In the figure above, we can see that the two queues Q1 and Q2 are directly bound to switch E. The first queue is bound with binding key (firstKey), and the second queue is bound with binding key (secondKey). In this switch, the messages published to the switch through the routing key firstKey will be routed to queue Q1, the messages published to the switch by the routing key secondKey will be routed to queue Q2, and all other messages will be discarded.
    (2)Topic
    The routing rules of this type of switch support the fuzzy matching of binding key and routing key, and will route the message to the Queue that meets the conditions. There can be two special characters in the binding key and #, which are used for fuzzy matching. Among them, * is used to match a word, # is used to match 0 or more words, and the words are marked with "." Is a delimiter.

    In the above figure, routingKey = "one firstKey. The message of "two" will be routed to Q1, routingKey = "secondkey The message "one" will be routed to Q2. In short, (asterisk) is used to represent a word (must appear), # (pound sign) is used to represent any number of words (zero or more).
    (3)Fanout
    The routing rule of this type of switch is very simple. It will route all messages sent to the switch to all queues bound to it. At this time, the Routing key does not work.
    (4)Headers
    This type of switch does not rely on the matching rules of routing key and binding key to route messages, but matches according to the headers attribute in the sent message content.
  • Message ack nowledgement
    In practical applications, it may happen that consumers receive messages in the Queue but fail to complete processing (or other accidents), which may lead to message loss. To avoid this situation, we can ask consumers to send a receipt to RabbitMQ after consuming the message, and RabbitMQ will remove the message from the Queue after receiving the message receipt (ack); If RabbitMQ does not receive a receipt and detects that the consumer's RabbitMQ connection is disconnected, RabbitMQ will send the message to other consumers (if there are multiple consumers) for processing. There is no timeout concept here. No matter how long a consumer processes a message, it will not cause the message to be sent to other consumers unless its RabbitMQ connection is disconnected.
    Note: another problem will arise here. If our developers forget to send a receipt to RabbitMQ after processing the business logic, it will lead to serious bug s - more and more messages will accumulate in the Queue; After restarting, consumers will consume these messages and execute business logic repeatedly.
  • Message durability
    If we hope that messages will not be lost even when the RabbitMQ service is restarted, we can set both Queue and Message to durable, which can ensure that our RabbitMQ messages will not be lost in most cases. However, it still can not solve the occurrence of small probability loss events (for example, the RabbitMQ server has received the producer's Message, but the RabbitMQ server is powered off before it can persist the Message). If we need to manage such small probability events, we need to use transactions.
  • Prefetch count (total number of messages sent to consumers each time)
    As mentioned earlier, if multiple consumers subscribe to messages in the same Queue at the same time, the messages in the Queue will be shared equally among multiple consumers. At this time, if the processing time of each message is different, it may lead to the situation that some consumers are busy all the time, while others quickly finish their work and are idle all the time. We can set prefetchCount to limit the number of messages that the Queue sends to each consumer at a time. For example, if we set prefetchCount=1, the Queue sends one message to each consumer at a time; After the consumer has processed this message, the Queue will send another message to the consumer.

Install and run RabbitMQ using docker

  • To install and run commands in Linux:
    //Installing RabbitMQ with administration page
    docker pull rabbitmq:3-management
    //Run rabbitMQ
    docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq 3658aa401173
    //-d background running -p exposed port
    //5672 is the port where the client communicates with RabbitMQ
    //15672 is the port for the management interface to access the web page
    //3658aa401173 mirror id
    //Before execution, we have to go to the server and add a security group of two ports
    
  • Visit the management page of rabbitmq: IP:15672 account password is guest

actual combat

  • In this module, I will write code to demonstrate the instance of provider message push, consumer message consumption, the use of Direct, Topic and Fanout, message callback, manual confirmation, etc.
    -At this time, we need to create a project. First, show the overall project structure:

Direct Exchange

  • Create the SpringBoot project direct provider. We can choose the dependence of RabbitMQ when creating the project. The dependence of creating the project will not be described one by one below.
  • Write the configuration file application yml:
    server:
      port: 8001
    spring:
      #Project name
      application:
        name: direct-provider
      #Configuring rabbitMQ
      rabbitmq:
        host: 116.62.155.206
        port: 5672
        username: guest
        password: guest
    
  • Create directrabbit mqconfig Java (for queue and switch persistence and connection usage settings, they are described in the notes, and the configuration of different switches will not be described in the same way):
    package com.twy.directprovider.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * Self connected switch: Manufacturer
     * @Author twy
     * @CreateTime 2020/11/02
     */
    @Configuration
    public class DirectRabbitMqConfig {
    
        /**
         * Test self connected switch: TestDirectExchange
         * @return
         */
        @Bean
        public DirectExchange TestDirectExchange(){
            return new DirectExchange("TestDirectExchange",true,false);
        }
    
        /**
         * Test queue: TestDirectQueue
         * @return
         */
        @Bean
        public Queue TestDirectQueue(){
            // Total 4 parameters
            // durable: whether to persist. The default value is false. Persistent queue: it will be stored on the disk. It still exists when the message agent restarts. Temporary queue: the current connection is valid
            // exclusive: it is also false by default. It can only be used by the currently created connection, and the queue will be deleted after the connection is closed. This reference takes precedence over durable
            // autoDelete: whether to delete automatically. When no producer or consumer uses this queue, the queue will be deleted automatically.
            // return new Queue("TestDirectQueue",true,true,false);
    
            //Generally, we can set the persistence of the queue, and the other two are false by default
            return new Queue("TestDirectQueue",true);
        }
    
        /**
         * Bind the queue to the switch and set the key for matching: TestDirectBinding
         * @return
         */
        @Bean
        public Binding TestDirectBinding(){
            return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectBinding");
        }
    }
    
  • Then write an interface in the controller layer to push messages to directsendmessagecontroller java:
    /**
     * Simple test: message push
     */
    @RestController
    public class DirectSendMessageController {
        /**
         * Using the RabbitTemplate, it provides methods such as receiving / sending
         */
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @RequestMapping("/sendDirectMessage")
        public String sendDirectMessage() {
            String messageId = String.valueOf(UUID.randomUUID());
            String messageData = "send direct message !!!!";
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            Map<String,Object> map=new HashMap<>();
            map.put("messageId",messageId);
            map.put("messageData",messageData);
            map.put("createTime",createTime);
            //Push message to server
            //Send the message with the binding key value: TestDirectBinding to the switch TestDirectExchange
            rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectBinding", map);
            return "ok";
        }
    }
    
  • Run the project and test the interface through postman:
  • Because we haven't created a consumer yet, we can see that the message has not been consumed. You can see it on the RabbitMQ management page in the figure below:
  • Next, we start to create a direct consumer and configure the application YML profile:
    server:
      port: 8002
    spring:
      #Project name
      application:
        name: direct-consumer
      #Configuring rabbitMQ
      rabbitmq:
        host: 116.62.155.206
        port: 5672
        username: guest
        password: guest
    
  • Then, like the producer, create directrabbit mqconfig Java (consumers can simply use it without adding this configuration. Just build the monitoring in the back. Use the annotation to let the listener listen to the corresponding queue. If it is configured, in fact, the consumer is also the producer and can push the message):
    /**
     * Self connected switches: Consumers
     * Consumers can simply use it without adding this configuration. Just build the monitoring in the back. Use annotations to let the listener monitor the corresponding queue.
     * However, if this configuration is configured, in fact, the consumer is also the generator and can push the message.
     * @Author twy
     * @CreateTime 2020/11/02
     */
    @Configuration
    public class DirectRabbitMqConfig {
    
        /**
         * Test self connected switch: TestDirectExchange
         * @return
         */
        @Bean
        public DirectExchange TestDirectExchange(){
            return new DirectExchange("TestDirectExchange",true,false);
        }
    
        /**
         * Test queue: TestDirectQueue
         * @return
         */
        @Bean
        public Queue TestDirectQueue(){
            //Generally, we can set the persistence of the queue, and the other two are false by default
            return new Queue("TestDirectQueue",true);
        }
    
        /**
         * Bind the queue to the switch and set the key for matching: TestDirectBinding
         * @return
         */
        @Bean
        public Binding TestDirectBinding(){
            return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectBinding");
        }
    
    }
    
  • Create a message receiving and listening class, directconsumerlistener java:
    /**
     * Consumer message receiving and listening class
     */
    @Component
    @RabbitListener(queues = "TestDirectQueue") //The name of the listening queue, previously set in the configuration file
    public class DirectConsumerListener {
    
        @RabbitHandler
        public void receive(Map message){
            System.out.println("Direct Consumer Monitored message:" + message.toString());
        }
    }
    
  • Finally, run the project, and you can see that the message pushed before has been consumed:

Topic Exchange

  • Create a project topic provider and write a configuration file:
    server:
      port: 8011
    spring:
      #Project name
      application:
        name: topic-provider
      #Configuring rabbitMQ
      rabbitmq:
        host: 116.62.155.206
        port: 5672
        username: guest
        password: guest
    
  • Create topicrabbitmqconfig java:
    /**
     * Subject switch configuration class: producer
     */
    
    @Configuration
    public class TopicRabbitMqConfig {
        /**
         * Define binding keys
         */
        public final static String FirstRounting = "topic.first";
        public final static String SecondRounting = "topic.second";
    
        /**
         * Test theme switch
         * @return
         */
        @Bean
        public TopicExchange TestTopicExchange(){
            return new TopicExchange("TestTopicExchange");
        }
    
        /**
         * Topic test queue I
         * @return
         */
        @Bean
        public Queue TestFirstQueue(){
            return new Queue("TestFirstQueue");
        }
    
        /**
         * Topic test queue II
         * @return
         */
        @Bean
        public Queue TestSecondQueue(){
            return new Queue("TestSecondQueue");
        }
    
        /**
         * Test queue binding theme switch
         * Bind TestFirstQueue and TestTopicExchange, and the binding key value is topic first
         * In this way, as long as the routing key carried by the message is topic First, will be distributed to the queue
         * @return
         */
        @Bean
        public Binding firstQueueBindingTopicExchange(){
            return BindingBuilder.bind(TestFirstQueue()).to(TestTopicExchange()).with(FirstRounting);
        }
    
        /**
         * Test queue two binding subject switch
         * Bind TestSecondQueue and TestTopicExchange, and the key value of the binding is to use the generic routing key rule topic#
         * In this way, as long as the routing key carried by the message is topic At the beginning, it will be distributed to the queue
         * @return
         */
        @Bean
        public Binding secondQueueBindingTopicExchange(){
            return BindingBuilder.bind(TestSecondQueue()).to(TestTopicExchange()).with("topic.#");
        }
    }
    
  • Create interface topicsendmessagecontroller Java is used to push messages:
    /**
     * Message push test of subject switch
     */
    @RestController
    public class TopicSendMessageController {
        /**
         * Using the RabbitTemplate, it provides methods such as receiving / sending
         */
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * Test interface of theme switch
         * @param routingKey Bind key value topic first
         * @param message Test of the message to be sent to the topic switch: bind topic first
         * @return
         */
        @RequestMapping("/sendTopicMessage")
        public String sendTopicMessage(@RequestParam("routingKey") String routingKey,
                                       @RequestParam("message") String message) {
            String messageId = String.valueOf(UUID.randomUUID());
            String messageData = message;
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            Map<String,Object> map=new HashMap<>();
            map.put("messageId",messageId);
            map.put("messageData",messageData);
            map.put("createTime",createTime);
            //Push message to server
            //Send the message with the binding key value: routingKey to the switch TestTopicExchange
            rabbitTemplate.convertAndSend("TestTopicExchange", routingKey, map);
            return "ok";
        }
    }
    
  • We then create topic consumer and write the configuration file:
    server:
      port: 8012
    spring:
      #Project name
      application:
        name: topic-consumer
      #Configuring rabbitMQ
      rabbitmq:
        host: 116.62.155.206
        port: 5672
        username: guest
        password: guest
    
  • Create topicrabbitmqconfig java:
    /**
     * Theme switch configuration class: Consumer
     */
    @Configuration
    public class TopicRabbitMqConfig {
        /**
         * Define binding keys
         */
        public final static String FirstRounting = "topic.first";
        public final static String SecondRounting = "topic.second";
    
        /**
         * Test theme switch
         * @return
         */
        @Bean
        public TopicExchange TestTopicExchange(){
            return new TopicExchange("TestTopicExchange");
        }
    
        /**
         * Topic test queue I
         * @return
         */
        @Bean
        public Queue TestFirstQueue(){
            return new Queue("TestFirstQueue");
        }
    
        /**
         * Topic test queue II
         * @return
         */
        @Bean
        public Queue TestSecondQueue(){
            return new Queue("TestSecondQueue");
        }
    
        /**
         * Test queue binding theme switch
         * Bind TestFirstQueue and TestTopicExchange, and the binding key value is topic first
         * In this way, as long as the routing key carried by the message is topic First, will be distributed to the queue
         * @return
         */
        @Bean
        public Binding firstQueueBindingTopicExchange(){
            return BindingBuilder.bind(TestFirstQueue()).to(TestTopicExchange()).with(FirstRounting);
        }
    
        /**
         * Test queue two binding subject switch
         * Bind TestSecondQueue and TestTopicExchange, and the key value of the binding is to use the generic routing key rule topic#
         * In this way, as long as the routing key carried by the message is topic At the beginning, it will be distributed to the queue
         * @return
         */
        @Bean
        public Binding secondQueueBindingTopicExchange(){
            return BindingBuilder.bind(TestSecondQueue()).to(TestTopicExchange()).with("topic.#");
        }
    }
    
  • Create two listening classes, TopicFirstQueueListener and TopicSecondQueueListener:
    /**
     * As long as the routing key carried by the message is topic First, will be distributed to the queue
     */
    @Component
    @RabbitListener(queues = "TestFirstQueue")
    public class TopicFirstQueueListener {
        @RabbitHandler
        public void process(Map message) {
            System.out.println("Topic First Queue Listener Consumer receives message  : " + message.toString());
        }
    }
    
    /**
     * As long as the routing key carried by the message is topic At the beginning, it will be distributed to the queue
     */
    @Component
    @RabbitListener(queues = "TestSecondQueue")
    public class TopicSecondQueueListener {
        @RabbitHandler
        public void process(Map message) {
            System.out.println("Topic Second Queue Listener Consumer receives message  : " + message.toString());
        }
    }
    
  • Run the project to test:

    As can be seen from the above figure, the Routingkey we give is topic First, so both queues receive messages:
  • Run consumer:
  • Next, let's push another message:

    The RoutingKey of the push message is topic# Test. At this time, only the second queue should receive the message and be consumed:

Fanout Exchange

  • Create the project fanout privoder and write the configuration file:

    server:
      port: 8021
    spring:
      #Project name
      application:
        name: fanout-provider
      #Configuring rabbitMQ
      rabbitmq:
        host: 116.62.155.206
        port: 5672
        username: guest
        password: guest
    
  • Create fanoutrabbit mqconfig java:

    /**
     * Sector switch profile
     *
     */
    @Configuration
    public class FanoutRabbitMqConfig {
        /**
         * Test sector switch
         * @return
         */
        @Bean
        public FanoutExchange TestFanoutExchange(){
            return new FanoutExchange("TestFanoutExchange");
        }
    
        /**
         * Queue A
         * @return
         */
        @Bean
        public Queue queueA(){
            return new Queue("fanout.queue.A");
        }
    
        /**
         * Queue B
         * @return
         */
        @Bean
        public Queue queueB(){
            return new Queue("fanout.queue.B");
        }
    
        /**
         * Queue C
         * @return
         */
        @Bean
        public Queue queueC(){
            return new Queue("fanout.queue.C");
        }
    
        /**
         * Queue A bound sector switch
         */
        @Bean
        public Binding queueABindingExchange(){
            return BindingBuilder.bind(queueA()).to(TestFanoutExchange());
        }
    
        /**
         * Queue B bound sector switch
         */
        @Bean
        public Binding queueBBindingExchange(){
            return BindingBuilder.bind(queueB()).to(TestFanoutExchange());
        }
    
        /**
         * Queue C bound sector switch
         */
        @Bean
        public Binding queueCBindingExchange(){
            return BindingBuilder.bind(queueC()).to(TestFanoutExchange());
        }
    }
    
    
  • Write an interface to test:

    /**
     - Fan switch message push test
     */
    @RestController
    public class FanoutSendMessageController {
        /**
         * Using the RabbitTemplate, it provides methods such as receiving / sending
         */
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * Test interface of theme switch
         * @return
         */
        @RequestMapping("/sendFanoutMessage")
        public String sendTopicMessage() {
            String messageId = String.valueOf(UUID.randomUUID());
            String messageData = "message: test FanoutExchange !!!";
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            Map<String,Object> map=new HashMap<>();
            map.put("messageId",messageId);
            map.put("messageData",messageData);
            map.put("createTime",createTime);
            //Push message to server
            //Send the message with the binding key value: routingKey to the switch TestFaoutExchange
            rabbitTemplate.convertAndSend("TestFanoutExchange", null, map);
            return "ok";
        }
    }
    
  • Then we create the project fanout consumer and write three listening classes to listen to queuea, B and C respectively:

    @Component
    	@RabbitListener(queues = "fanout.queue.A")
    	public class FanoutConsumerQueueA {
    	    @RabbitHandler
    	    public void process(Map message) {
    	        System.out.println("FanoutConsumerQueueA Consumer listening queue A,Received message  : " + message.toString());
    	    }
    	}
    	
    	@Component
    	@RabbitListener(queues = "fanout.queue.B")
    	public class FanoutConsumerQueueB {
    	    @RabbitHandler
    	    public void process(Map message) {
    	        System.out.println("FanoutConsumerQueueB Consumer listening queue B,Received message  : " + message.toString());
    	    }
    	}
    	
    	@Component
    	@RabbitListener(queues = "fanout.queue.C")
    	public class FanoutConsumerQueueC {
    	    @RabbitHandler
    	    public void process(Map message) {
    	        System.out.println("FanoutConsumerQueueC Consumer listening queue C,Received message  : " + message.toString());
    	    }
    	}
    
  • Start the producer project call interface push message:

  • You can see that there is a message waiting for consumption in each queue. We start the consumer:

    You can see that as long as the message is sent to the fanoutExchange fan switch, the three queues are bound to the switch, so the three message receiving classes have listened to the message

Message confirmation of producer push message

  • Create the project ack provider, add the configuration file and add the configuration of message confirmation:

    server:
      port: 8031
    spring:
      #Project name
      application:
        name: ack-provider
      #Configuring rabbitMQ
      rabbitmq:
        host: 116.62.155.206
        port: 5672
        username: guest
        password: guest
    
        #Message confirmation configuration
        #The confirmation message has been sent to the switch
        publisher-confirm-type: correlated
        #The confirmation message has been sent to the queue
        publisher-returns: true
    
  • This step is also our most important step, creating providerackrabbit mqconfig Java, configure relevant message confirmation callback functions:

    /**
     * Producer: message confirmation profile
     */
    @Configuration
    public class ProviderAckRabbitMqConfig {
        /**
         * Create a direct switch without binding queues
         *
         * @return
         */
        @Bean
        public DirectExchange noBindingQueueExchange() {
            return new DirectExchange("noBindingQueueExchange");
        }
    
    
        @Bean
        public RabbitTemplate creatRabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate();
            rabbitTemplate.setConnectionFactory(connectionFactory);
            //The callback function can only be triggered when the Mandatory is enabled. The callback function is forced to be called regardless of the message push result
            rabbitTemplate.setMandatory(true);
    
            /**
             * Callback function: ConfirmCallback
             */
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    System.out.println("ConfirmCallback:     " + "Relevant data:" + correlationData);
                    System.out.println("ConfirmCallback:     " + "Confirmation:" + ack);
                    System.out.println("ConfirmCallback:     " + "reason:" + cause);
    
                }
            });
    
            /**
             * Callback function: ReturnCallback
             */
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    System.out.println("ReturnCallback:     " + "Message:" + message);
                    System.out.println("ReturnCallback:     " + "Response code:" + replyCode);
                    System.out.println("ReturnCallback:     " + "Response information:" + replyText);
                    System.out.println("ReturnCallback:     " + "Switch:" + exchange);
                    System.out.println("ReturnCallback:     " + "Routing key:" + routingKey);
                }
            });
    
    
            return rabbitTemplate;
        }
    }
    

    In the configuration file category, only rabbittemplate setMandatory(true); Can trigger the callback function. The callback function is forced to be called regardless of the message push result. We can see that there are two callback functions written above, one is ConfirmCallback and the other is RetrunCallback.

  • So now we want to explore when these two callback functions are triggered?
    (1) The message was pushed to the server, but the switch could not be found in the server
    (2) The message was pushed to the server, the switch was found, but the queue was not found
    (3) When the message is pushed to the server, the switch and queue are not found, which is roughly the same as the switch not found above
    (4) Message push succeeded
    Next, I will write several interfaces to test these situations:

  • The message was pushed to the server, but the switch could not be found in the server:
    (1) Create test interface:

     /**
         * (1)The message is pushed to the server, but the switch cannot be found in the server
         *
         * @return
         */
        @PostMapping("/TestMessageAck1")
        public String TestMessageAck1() {
            String messageId = String.valueOf(UUID.randomUUID());
            String messageData = "AckMessage: Message push server,But in server Switch not found in!!!";
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            Map<String, Object> map = new HashMap<>();
            map.put("messageId", messageId);
            map.put("messageData", messageData);
            map.put("createTime", createTime);
            /**
             * Push the message to the switch named 'non-existing-exchange' (this switch is not created or configured)
             */
            rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", map);
            return "ok";
        }
    

    (2) In this interface, the message is pushed to the non-existent exchange. I didn't create this switch. Next, call the interface:

    (3) From the output result, it means that the switch cannot be found, so it is concluded that the ConfirmCallback callback function is called when the switch cannot be found.

  • The message was pushed to the server, the switch was found, but the queue was not found
    (1) In this case, we first create a direct switch in the previous configuration file, but supply its binding queue:

     /**
         * Create a direct switch without binding queues
         *
         * @return
         */
        @Bean
        public DirectExchange noBindingQueueExchange() {
            return new DirectExchange("noBindingQueueExchange");
        }
    

    (2) Create test interface:

     /**
         * (2)The message is pushed to the server. The switch is found, but the queue is not found
         *
         * @return
         */
        @PostMapping("/TestMessageAck2")
        public String TestMessageAck2() {
            String messageId = String.valueOf(UUID.randomUUID());
            String messageData = "AckMessage: Message push server,The switch was found, but the queue was not found!!!";
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            Map<String, Object> map = new HashMap<>();
            map.put("messageId", messageId);
            map.put("messageData", messageData);
            map.put("createTime", createTime);
            /**
             * Push the message to the switch named 'noBindingQueueExchange' (this switch has no binding queue)
             */
            rabbitTemplate.convertAndSend("noBindingQueueExchange", "TestDirectRouting", map);
            return "ok";
        }
    

    (3) Conduct interface test:
    From the results, we can see that the message is successfully pushed to the server, so the confirmation of the message by ConfirmCallback is true,
    As can be seen from the print parameters of the RetrunCallback callback function, the message was successfully pushed to the switch, but when the route was distributed to the queue, the queue could not be found, so an error no was reported_ ROUTE .
    Conclusion: two callback functions, ConfirmCallback and RetrunCallback, are triggered in this case.

  • When the message is pushed to the server, the switch and queue are not found. This situation is consistent with the return of the first situation. There are no examples here.

  • Message push succeeded
    (1) The test is simple:

        /**
         * (3)Push message succeeded
         */
        @PostMapping("/TestMessageAck3")
        public String TestMessageAck3() {
            String messageId = String.valueOf(UUID.randomUUID());
            String messageData = "AckMessage: Push message succeeded!!!";
            String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
            Map<String, Object> map = new HashMap<>();
            map.put("messageId", messageId);
            map.put("messageData", messageData);
            map.put("createTime", createTime);
            /**
             * Push the message to the switch named 'TestDirectExchange' (this switch is a binding queue)
             * BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectBinding");
             */
            rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectBinding", map);
            return "ok";
        }
    

    (2) Results:

    Conclusion: this situation triggers the ConfirmCallback callback function.
    The above is the use of the message confirmation callback function of the producer's push message. Some business processing can be added to the callback function

Message confirmation mechanism for consumers receiving messages

  • The message confirmation mechanism of the consumer is different from that of the producer, because the message reception is originally monitoring the message, and the qualified message will be consumed. Therefore, there are three main modes for the confirmation mechanism of consumer message reception:
    (1) Automatic confirmation, which is also the default message confirmation. AcknowledgeMode.NONE. RabbitMQ successfully sends the message (that is, the message is successfully written into the TCP Socket), and immediately considers that this delivery has been handled correctly, regardless of whether the consumer successfully handles this delivery. Therefore, in this case, if the consumer logic throws an exception, that is, the consumer fails to process the message successfully, it is equivalent to losing the message. Generally, in this case, we use try catch to catch exceptions and print logs to track data, so as to find out the corresponding data for subsequent processing.
    (2) Manual confirmation is also the mode most of us choose when we configure the receiving message confirmation mechanism. After receiving the message, the consumer manually calls basic ack/basic. nack/basic. After reject, RabbitMQ will consider this delivery successful after receiving these messages.
    [1] basic.ack is used for positive confirmation
    [2] basic.nack is used for negative acknowledgement (Note: This is the RabbitMQ extension of AMQP 0-9-1)
    [3] basic.reject is used for negative confirmation, but it is different from basic Compared with NACK, NACK has one limitation: it can only reject a single message at a time
    The above three methods on the consumer side indicate that the message has been delivered correctly, but basic ACK indicates that the message has been processed correctly.
    And basic nack,basic.reject indicates that it has not been handled correctly. Let's focus on basic Reject and basic nack.

  • channel.basicReject(deliveryTag, true)
    Refuse to consume the current message. The first parameter is the unique id of the data received by the current message. The second parameter, if true is passed, is to throw the data back into the queue. Then the message will be consumed next time. Setting false is to tell the server that I already know the message data, reject it for some reasons, and the server will throw away the message. I don't want to consume the message next time.

  • channel.basicNack(deliveryTag, false, true)
    The first parameter is the unique id of the data received by the current message, and the second parameter is whether to target multiple messages; If it is true, that is, if the tagID of the message for the current channel at one time is less than the current message, it will refuse to confirm. The third parameter refers to whether to re list, that is, whether to throw the unconfirmed message back to the queue.

Note: be cautious when using the confirmation mode of re listing after unconfirmed, because there may also be a backlog of messages that have been thrown back due to poor consideration

  • Create the ACK consumer project and add simplemessagelistenercontainer java:

    /**
     * Consumer: message confirmation profile
     */
    @Configuration
    public class ConsumerAckRabbitMqConfig {
    
        @Autowired
        private CachingConnectionFactory connectionFactory;
    
        //Message receiving and processing class
        @Autowired
        private MyAckListener myAckListener;
    
    
        /**
         * Configure message reception -- > manual confirmation
         */
        @Bean
        public SimpleMessageListenerContainer simpleMessageListenerContainer(){
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
            container.setConcurrentConsumers(1);
            container.setMaxConcurrentConsumers(1);
            // RabbitMQ defaults to automatic confirmation, which is changed to manual confirmation
            container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    
            //Set up a queue
            container.setQueueNames("TestDirectQueue");
            //If multiple queues are set at the same time, they are as follows: the premise is that all queues must have been created and exist
            //  container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3");
    
    
            //Another way to set queues is to use addQueues if you want to set multiple queues
            //container.setQueues(new Queue("TestDirectQueue",true));
            //container.addQueues(new Queue("TestDirectQueue2",true));
            //container.addQueues(new Queue("TestDirectQueue3",true));
            container.setMessageListener(myAckListener);
    
            return container;
        }
    }
    

container.setAcknowledgeMode(AcknowledgeMode.MANUAL);RabbitMQ defaults to automatic confirmation, which is changed to manual confirmation here. container.setQueueNames("TestDirectQueue"); Set the queue. container.setMessageListener(myAckListener); Set the manual confirmation message listening class.

  • Next, we write a manual message listener class: myacklistener Java (the manual confirmation mode needs to implement ChannelAwareMessageListener):

    /**
     * Manually listening for message classes
     */
    @Component
    public class MyAckListener implements ChannelAwareMessageListener {
    
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try {
                //Because the map is used for Message delivery, some processing needs to be done to take the map out of the Message
                String msg = message.toString();
    
                if ("TestDirectQueue".equals(message.getMessageProperties().getConsumerQueue())) {
                    System.out.println("The consumed message comes from a queue named:" + message.getMessageProperties().getConsumerQueue());
                    System.out.println("MyAckListener(Manually listening for message classes): " + msg);
                    System.out.println("Consumption message from:" + message.getMessageProperties().getConsumerQueue());
                    /**
                     * Processing business
                     */
                    System.out.println("implement TestDirectQueue Business process flow of messages in......");
                }
    
                //The second parameter, manual confirmation, can be batch processed. When this parameter is true, delivery can be confirmed at one time_ All messages with tag less than or equal to the incoming value
                channel.basicAck(deliveryTag, true);
                //The second parameter, true, will be put back in the queue, so you need to judge when to use rejection according to the business logic
                //channel.basicReject(deliveryTag, true);
            } catch (Exception e) {
                channel.basicReject(deliveryTag, false);
                e.printStackTrace();
            }
    
        }
    }
    
  • Next, we send a message to listen:

    Here, the article is basically finished, and the address of the project is pasted below

github address

SpringBoot-RabbitMQ

Tags: RabbitMQ Spring Boot

Posted by james_holden on Sun, 08 May 2022 11:40:29 +0300