RabbitMQ introduction details of Message Oriented Middleware

1.RabbitMQ

1.1. Introduction:

The full name of MQ is Message Queue, that is, Message Queue. RabbitMQ is a Message Queue developed by erlang language and implemented based on AMQP protocol. It is a communication method between applications. Message Queue is widely used in distributed systems.

RabbitMQ official address: http://www.rabbitmq.com/

erlang: erlang language is specially designed for open high concurrency programs

AMQP: Advanced Message Queue protocol

1.2. Application scenario

Message queues in development usually have the following application scenarios

1. Asynchronous task processing

The message queue notifies the message receiver of the long-time operations that do not need synchronous processing for asynchronous processing, which improves the response time of the application

2. Application decoupling

MQ is equivalent to an intermediary. The producer exchanges with the consumer through MQ, which decouples the application relationship

3. Common message queues

ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ,Redis

4. Why use RabbitMQ?

1. Simple to use and powerful

2. Based on AMQP protocol

3. Active community and complete documents

4. High concurrency and good performance, which is mainly beneficial to erlang language

5. Spring Boot inherits RabbitMQ by default

2. Quick start

2.1. How rabbitmq works

Basic structure of RabbitMQ

The components are described as follows:

  • Broker: Message Queuing service process, which consists of two parts: Exchange and Queue
  • Exchange: message queue switch, which routes and forwards messages to a queue according to certain rules and filters messages
  • Queue: message queue, which stores messages. Messages arrive at the queue and are forwarded to the specified consumer
  • Producer: the message producer, that is, the producer client, which sends the consumption to MQ
  • Consumer: the message consumer, that is, the consumer client, receives messages forwarded by MQ
  • Connection: connection
  • Channel: session channel

Message publishing and receiving process:

Send message:

1. The producer establishes a TCP connection with the Broker

2. Establish pipelines between producers and brokers

3. The producer sends the message to the Broker through the pipeline, and the Exchange forwards the message

4. Exchange forwards messages to the specified Queue

Receive message:

1. The consumer establishes a TCP connection with the Broker

2. Establish channels between consumers and brokers

3. The consumer listens to the specified Queue

4. When a message arrives at the Queue, the Broker pushes the message to the consumer by default

5. Consumer receives message

2.2. Installing RabbitMQ

Installing RabbitMQ using Docker

# Try the rabbitmq version, which includes the web control page
docker search rabbitmq:management
# Pull away
docker pull rabbitmq:management
# Configure mirroring
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
# Enable mirroring
docker start rabbitmq

After the RabbitMQ environment is set up, visit http://localhost:15672/#/ View the control page; The default user name and password are guest. Click login and you will see the following interface:

2.3.RabbitMQ starter < code >

Create a SpringBoot project in POM Introducing corresponding dependencies into XML files

<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
</dependency>
2.3.1. Write message producer (message sender)

Sender operation process:

1. Create connection

2. Create channel

3. Declaration queue

4. Send message

package com.xuecheng.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @Author ScholarTang
 * @Date 2020/10/9 3:09 afternoon
 * @Desc rabbitmq Getting started -- message producer 1
 */

public class Producer {
  //Define queue
  private static final String QUEUE = "HelloWorld";

  public static void main(String[] args) throws Exception {
    //Create a new connection through the connection factory and establish a connection with MQ
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //Configure connection parameters
    connectionFactory.setHost("127.0.0.1");//ip
    connectionFactory.setPort(5672);//port
    connectionFactory.setUsername("guest");//user name
    connectionFactory.setPassword("guest");//password
    connectionFactory.setVirtualHost("/");//Virtual machine. A mq service can have multiple virtual machines, and each virtual machine is an independent mq
    //Establish a new connection
    Connection connection = connectionFactory.newConnection();
    //Create session channel
    Channel channel = connection.createChannel();
    /**
     * Declare the queue (if the queue does not exist in mq, it will be created)
     *
     * Parameter information: String queue, Boolean durable, Boolean exclusive, Boolean autodelete, map < string, Object > arguments
     * Parameter details:
     * 1,queue: Queue name
     * 2,durable: Whether it is persistent; If persistent, the queue will remain after mq restart
     * 3,exclusive: Exclusive (exclusive connection). The queue is only allowed to be accessed in this connection. If the connection is closed, the queue will also be deleted automatically. If this parameter is set to true, it can be used for temporary queues
     * 4,autoDelete: Auto delete: whether to delete the queue when it is no longer in use. If this parameter and exclusive are set to true, the temporary queue can be realized (temporary queue: automatically delete the queue when it is no longer in use)
     * 5,arguments: Parameter, which is used to set the extension parameters of the queue, such as survival time
     */
    channel.queueDeclare(QUEUE, true, false, false, null);
    /**
     * send message
     *
     * Parameter information: String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body
     * Parameter details:
     * 1,exchange: If the switch is not pointed to, the mq default switch is used ("" is an empty string indicating that it is not pointed to)
     * 2,routingKey: Routing key: the switch sends the message to the specified queue according to the routing key. If the default switch is used, the routing key uses the name of the queue
     * 3,props: Message properties
     * 4,body: Message content
     */
    String message = "hello,This is a message";
    channel.basicPublish("", QUEUE, null, message.getBytes());
    System.out.println("send to mq: " + message);
    //Close connection
    connection.close();
  }
}

After executing the main method, control to print send to mq: hello, which is a message. The message producer will generate a message to be consumed in the specified queue, as shown in the following figure

Click the queue name to view the messages in the queue

2.3.2. Write message consumer (message consumer)

Consumer operation process:

1. Create a connection

2. Create channels

3. Claim queue

4. Listening queue

package com.xuecheng.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Author ScholarTang
 * @Date 2020/10/10 12:35 afternoon
 * @Desc rabbitmq Entry program -- message consumer
 */

public class Consumer {
  //Define queue name
  private static final String QUEUE = "HelloWorld";

  public static void main(String[] args) throws Exception{
    //Create a new connection through the connection factory and establish a connection with MQ
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //Set connection parameters
    connectionFactory.setHost("127.0.0.1");//Host ip. The ip address of the device where the mq service is located
    connectionFactory.setPort(5672);//port
    connectionFactory.setVirtualHost("/");//Virtual machine ip
    connectionFactory.setUsername("guest");//user name
    connectionFactory.setPassword("guest");//password
    //make new connection
    Connection connection = connectionFactory.newConnection();
    //Create session channel < all communication between producer and mp service is completed in channel channel >
    Channel channel = connection.createChannel();
    //Realizing consumption method
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
      /**
       * When the supervisor hears the message, this method will be called
       * @param consumerTag The consumer tag is used to identify the consumer. It can be set when listening to the queue
       * @param envelope Envelope. You can obtain the switch, message ID, etc. through the envelope (message ID, mq is used to identify the ID of the message in the channel channel, and can be used to determine whether the message is received)
       * @param properties Message properties (message properties set in the sender)
       * @param body Message content
       * @throws IOException
       */
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("The received message contents are:" + new String(body, "utf-8"));
      }
    };

    /**
     * listen queue 
     * Parameter information: String queue, boolean autoAck, Consumer callback
     * Parameter details:
     * 1,queue: queue
     * 2,autoAck: Automatic reply: after receiving the message, the consumer tells mq that the message has been received. If this parameter is set to true, it means automatic reply. If it is set to false, it needs to be programmed to reply
     * 3,callback: Consumption method: the method to be executed when the consumer receives the message
     */
    channel.basicConsume(QUEUE,true,defaultConsumer);

    /**
     * Does the consumer need to close the connection?
     * No, the consumer needs to stay connected, listen to the queue all the time, and consume the messages in the queue.
     */
  }
}

After executing the main method, the console will print the monitored message content. The received message content is: hello, this is a message. Then look at the query status of RabbitMQ management page, as shown in the following figure:

As shown in the figure above, we can see that the news just produced by producers has been consumed by consumers.

3.RabbitMQ working mode

RabbitMQ can be divided into six working modes, as follows:

1.WorkQueues: working mode

2.Publish/subscribe: publish subscribe mode

3.Routing: routing mode

4.Topics: wildcard mode

5.Header: header repeater

6.RPC: remote procedure call

3.1.WorkQueues working mode

3.1.1.WorkQueues work mode structure diagram:

Compared with the entry program, the Work Queues mode has one more consumer, and two consumers consume messages in the same queue together. Its characteristics are as follows:

A producer sends messages to a queue. Multiple consumers listen to messages in a queue. Messages cannot be consumed repeatedly. RabbitMQ sends messages to each consumer by polling.

Work Queues mode experience, we can test the experience on the RabbitMQ entry program. We only need to open multiple consumers to listen to queue consumption messages at the same time.

For example, three consumer terminals have been opened:

C1,C2,C3

When the message producer sends the message for the first time, the message will be consumed by C1 (C2 and C3 do not receive the message)

The second time, C2 consumes messages (C1 and C3 received messages), and the third time, C3 (C1 and C2 did not receive messages)... Poll in turn.

3.2.Publish/subscribe: publish subscribe mode

3.2.1. Results of publish / subscribe publishing mode:

Publish subscribe mode:
1. A producer sends a message to the switch

2. There are multiple queues bound to the switch, and each consumer listens to its own queue

3. The producer sends the message to the switch, and the switch forwards the message to each queue bound to the switch, and each queue bound to the switch will receive the message

4. If a message is sent to a switch without a bound queue, the message will be lost

3.2.2. Differences between workqueues working mode and Publish/subscribe publish / subscribe mode:

1. publish/subscribe can define that a switch can bind multiple queues, and a message can be sent to multiple queues and consumed by multiple consumers

2. work queues does not need to define a switch. A message can only be sent to one queue at a time

3. publish/subscribe is more powerful than work queues. publish/subscribe can also enable multiple consumers to monitor the same queue to realize the function of work queues

3.2.3. Application scenario

Use a case to reflect the application scenario of publishing and definition mode. Case:

User notification: when the user recharge is successful or the transfer is completed, the system notifies the user. The notification methods include SMS and email.

The final effect of the case:

The message producer produces a message, which is equivalent to the user's successful recharge or transfer. Then the message is sent to the switch, which forwards the message to the two queues bound to it (the queue for sending e-mail and the queue for sending SMS). When the consumer listening to the queue listens to the message, the corresponding service code is executed.

code

Message producer code

package com.xuecheng.rabbitmq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


/**
 * @Author ScholarTang
 * @Date 2020/10/9 3:09 afternoon
 * @Desc rabbitmq-publish/subscribe Working mode message producer
 */

public class ProducerPublish {
  //Defines the name of the queue that is used to receive e-mail messages
  private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
  //Defines the queue name, which indicates that it is used to receive SMS messages
  private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
  //Define switch name
  private static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";

  public static void main(String[] args) throws Exception {
    //Create a new connection through the connection factory and establish a connection with MQ
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //Configure connection parameters
    connectionFactory.setHost("127.0.0.1");//Host ip. The ip address of the device where the mq service is located
    connectionFactory.setPort(5672);//port
    connectionFactory.setUsername("guest");//user name
    connectionFactory.setPassword("guest");//password
    connectionFactory.setVirtualHost("/");//Virtual machine. A mq service can have multiple virtual machines, and each virtual machine is an independent mq
    //Establish a new connection
    Connection connection = connectionFactory.newConnection();
    //Create session channel
    Channel channel = connection.createChannel();
    /**
     * Declaration notification email queue, declaration notification SMS queue
     * Parameter information: String queue, Boolean durable, Boolean exclusive, Boolean autodelete, map < string, Object > arguments
     * Parameter details:
     * 1,queue: Queue name
     * 2,durable: Whether it is persistent; If persistent, the queue will remain after mq restart
     * 3,exclusive: Exclusive (exclusive connection). The queue is only allowed to be accessed in this connection. If the connection is closed, the queue will also be deleted automatically. If this parameter is set to true, it can be used for temporary queues
     * 4,autoDelete: Auto delete: whether to delete the queue when it is no longer in use. If this parameter and exclusive are set to true, the temporary queue can be realized (temporary queue: automatically delete the queue when it is no longer in use)
     * 5,arguments: Parameter, which is used to set the extension parameters of the queue, such as survival time
     */
    channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
    channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
    /**
     * Define replacement machine
     *
     * Description of method parameters:
     * String exchange, BuiltinExchangeType type
     * 1,exchange: Switch name
     * 2,type: Switch Type 
     *  DIRECT: This corresponds to the Routing mode
     *  FANOUT :The corresponding rabbitmq work type is the publish/subscribe work mode
     *  TOPIC: Corresponding to Topics working mode
     *  HEADERS: Corresponding to Header mode
     */
    channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);

    /**
     * Switch and queue binding
     * Description of method parameters:
     * String queue, String exchange, String routingKey
     * 1,queue: Queue name
     * 2,exchange: Switch name
     * 3,routingKey: Routing key, which is used by the switch to forward to the specified queue according to the route, and set an empty string in the publish subscribe mode
     */
    channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM, "");
    channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_FANOUT_INFORM, "");

    /**
     * send message
     *
     * Parameter information: String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body
     * Parameter details:
     * 1,exchange: If the switch is not pointed to, the mq default switch is used ("" is an empty string indicating that it is not pointed to)
     * 2,routingKey: Routing key: the switch sends the message to the specified queue according to the routing key. If the default switch is used, the routing key uses the name of the queue
     * 3,props: Message properties
     * 4,body: Message content
     */
    for (int i = 0; i < 10; i++) {
      String message = "hello,This is a message" + i;
      channel.basicPublish(EXCHANGE_FANOUT_INFORM, "", null, message.getBytes());
      System.out.println("send to mq: " + message);
    }
    //Close session channel
    channel.close();
    //Close connection
    connection.close();
  }
}

Message consumer_ email, in this business scenario, is used to listen to and consume messages in the queue for sending mail

package com.xuecheng.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Author ScholarTang
 * @Date 2020/10/10 12:35 afternoon
 * @Desc rabbitmq-publish/subscribe Working mode message consumer
 */

public class ConsumerSubscribeEmail {
  //Defines the name of the queue that is used to receive e-mail messages
  private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
  //Define switch name
  private static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";

  public static void main(String[] args) throws Exception{
    //Create a new connection through the connection factory and establish a connection with MQ
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //Set connection parameters
    connectionFactory.setHost("127.0.0.1");//Host ip. The ip address of the device where the mq service is located
    connectionFactory.setPort(5672);//port
    connectionFactory.setVirtualHost("/");//Virtual machine ip
    connectionFactory.setUsername("guest");//user name
    connectionFactory.setPassword("guest");//password
    //make new connection
    Connection connection = connectionFactory.newConnection();
    //Create session channel < all communication between producer and mp service is completed in channel channel >
    Channel channel = connection.createChannel();

    /**
     * Create queue
     */
    channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
    /**
     * Create switch
     */
    channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);

    /**
     * Bind queue to switch
     */
    channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");

    //Realizing consumption method
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
      /**
       * When the supervisor hears the message, this method will be called
       * @param consumerTag The consumer tag is used to identify the consumer. It can be set when listening to the queue
       * @param envelope Envelope. You can obtain the switch, message ID, etc. through the envelope (message ID, mq is used to identify the ID of the message in the channel channel, and can be used to determine whether the message is received)
       * @param properties Message properties (message properties set in the sender)
       * @param body Message content
       * @throws IOException
       */
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("The received message contents are:" + new String(body, "utf-8"));
      }
    };

    /**
     * listen queue 
     * Parameter information: String queue, boolean autoAck, Consumer callback
     * Parameter details:
     * 1,queue: queue
     * 2,autoAck: Automatic reply: after receiving the message, the consumer tells mq that the message has been received. If this parameter is set to true, it means automatic reply. If it is set to false, it needs to be programmed to reply
     * 3,callback: Consumption method: the method to be executed when the consumer receives the message
     */
    channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
  }
}

Message consumer_ sms, in this business scenario, is used to monitor and consume messages in the queue for sending sms

package com.xuecheng.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Author ScholarTang
 * @Date 2020/10/10 12:35 afternoon
 * @Desc rabbitmq Entry program -- message consumer
 */

public class ConsumerSubscribeSms {
  //Defines the queue name, which indicates that it is used to receive SMS messages
  private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
  //Define switch name
  private static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";

  public static void main(String[] args) throws Exception{
    //Create a new connection through the connection factory and establish a connection with MQ
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //Set connection parameters
    connectionFactory.setHost("127.0.0.1");//Host ip. The ip address of the device where the mq service is located
    connectionFactory.setPort(5672);//port
    connectionFactory.setVirtualHost("/");//Virtual machine ip
    connectionFactory.setUsername("guest");//user name
    connectionFactory.setPassword("guest");//password
    //make new connection
    Connection connection = connectionFactory.newConnection();
    //Create session channel < all communication between producer and mp service is completed in channel channel >
    Channel channel = connection.createChannel();
    /**
     * Create queue
     */
    channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
    /**
     * Create switch
     */
    channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);

    /**
     * Bind queue to switch
     */
    channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");
    //Realizing consumption method
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
      /**
       * When the supervisor hears the message, this method will be called
       * @param consumerTag The consumer tag is used to identify the consumer. It can be set when listening to the queue
       * @param envelope Envelope. You can obtain the switch, message ID, etc. through the envelope (message ID, mq is used to identify the ID of the message in the channel channel, and can be used to determine whether the message is received)
       * @param properties Message properties (message properties set in the sender)
       * @param body Message content
       * @throws IOException
       */
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("The received message contents are:" + new String(body, "utf-8"));
      }
    };

    /**
     * listen queue 
     * Parameter information: String queue, boolean autoAck, Consumer callback
     * Parameter details:
     * 1,queue: queue
     * 2,autoAck: Automatic reply: after receiving the message, the consumer tells mq that the message has been received. If this parameter is set to true, it means automatic reply. If it is set to false, it needs to be programmed to reply
     * 3,callback: Consumption method: the method to be executed when the consumer receives the message
     */
    channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);
  }
}

Execute the message producer side and the message consumer side in turn_ email and message consumers_ The main method of sms, you will see the following scene on the console:

// The message producer sent 10 messages to the switch
send to mq: hello,This is a message 0
send to mq: hello,This is a message 1
send to mq: hello,This is a message 2
send to mq: hello,This is a message 3
send to mq: hello,This is a message 4
send to mq: hello,This is a message 5
send to mq: hello,This is a message 6
send to mq: hello,This is a message 7
send to mq: hello,This is a message 8
send to mq: hello,This is a message 9

//The switch forwards the message to the queue bound to it. Here is QUEUE_INFORM_EMAIL,QUEUE_INFORM_SMS two queues

//Listen to queue_ INFORM_ Consumers in the email queue consumed 10 messages in the queue
 The received message contents are: hello,This is a message 0
 The received message contents are: hello,This is a message 1
 The received message contents are: hello,This is a message 2
 The received message contents are: hello,This is a message 3
 The received message contents are: hello,This is a message 4
 The received message contents are: hello,This is a message 5
 The received message contents are: hello,This is a message 6
 The received message contents are: hello,This is a message 7
 The received message contents are: hello,This is a message 8
 The received message contents are: hello,This is a message 9

//Listen to queue_ INFORM_ The consumer of the SMS queue received 10 messages in the queue
 The received message contents are: hello,This is a message 0
 The received message contents are: hello,This is a message 1
 The received message contents are: hello,This is a message 2
 The received message contents are: hello,This is a message 3
 The received message contents are: hello,This is a message 4
 The received message contents are: hello,This is a message 5
 The received message contents are: hello,This is a message 6
 The received message contents are: hello,This is a message 7
 The received message contents are: hello,This is a message 8
 The received message contents are: hello,This is a message 9

Then open the RabbitMQ management page to view

1. View the connection channel

2. View the switch

​ 1. Click Exchanges to view a list of all switches

​ 2. Find the newly created switch according to the switch name and click to view the switch details

​ 3. Click the Bindings hidden item to view the queue information bound to the switch

be careful:

There will be a special situation at this time. In the above case, what if a queue is monitored by multiple consumers?

The messages in this queue will be polled and consumed by these consumers.

3.3.Routing: routing mode

3.3.1. Structure diagram of routing operation mode:

Routing mode:

1. A switch is bound to multiple queues, each queue is set with routingKey, and a queue can set multiple routingkeys

2. Each consumer listens to their own queue

3. The producer sends the message to the switch. When sending the message, the value of the routingKey needs to be specified. The switch determines that the value of the changed routingKey is equal to the routingKey of that queue. If it is equal, the message will be forwarded to the queue.

3.3.2. The difference between routing mode and Publish/subscribe

1. In the publish/subscribe mode, the routingKey does not need to be specified when binding the switch, and the message will be sent to the queue of each bound switch

2. In the Routing mode, you need to specify the routingKey (that is, the routingKey of the queue) when binding the switch, specify the routingKey of the message when sending the message, and the switch forwards the message to the queue of the corresponding routingKey

3. Routing mode can fully realize the function of Publish/subscribe publish / subscribe mode, which is more powerful than Publish/subscribe mode.

3.3.3. Demonstration example of routing working mode

Take the user notification requirement as an example: when the user recharge is successful or the transfer is completed, the system notifies the user. The notification methods include SMS and email.

Use the Routing mode to achieve the final effect:

According to business requirements, there will be two queues in this scenario, which are used to receive and temporarily store messages of SMS notification and email notification respectively. When the queue is bound with the switch, the routingKey is bound for each queue. When the message sender specifies the routingKey of the message when sending the message, the message is sent to the switch. The switch matches the routingKey of the message with the routingKey of the bound queue. If it is equal, the message is forwarded to the corresponding queue. At this time, the consumer listening to the queue will execute the relevant business code when listening to the message

Code part

message sender

package com.xuecheng.rabbitmq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


/**
 * @Author ScholarTang
 * @Date 2020/10/9 3:09 afternoon
 * @Desc rabbitmq-Routing Working mode message producer
 */

public class ProducerRouting {
  //The name of the queue that receives the mail message
  private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
  //The first routingKey specified when the mail message queue is bound to the switch: routing_key_email_one
  private static final String QUEUE_INFORM_EMAIL_ROUTING_KEY_ONE = "routing_key_email_one";
  //The second routingKey specified when the mail message queue is bound to the switch: routing_key_email_two
  private static final String QUEUE_INFORM_EMAIL_ROUTING_KEY_TWO = "routing_key_email_two";
  //Queue name of receiving SMS message
  private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
  //routingKey specified when the SMS message queue is bound to the switch: routing_key_sms
  private static final String QUEUE_INFORM_SMS_ROUTING_KEY = "routing_key_sms";
  //Define switch name
  private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";

  public static void main(String[] args) throws Exception {
    //Create a new connection through the connection factory and establish a connection with MQ
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //Configure connection parameters
    connectionFactory.setHost("127.0.0.1");//Host ip. The ip address of the device where the mq service is located
    connectionFactory.setPort(5672);//port
    connectionFactory.setUsername("guest");//user name
    connectionFactory.setPassword("guest");//password
    connectionFactory.setVirtualHost("/");//Virtual machine. A mq service can have multiple virtual machines, and each virtual machine is an independent mq
    //Establish a new connection
    Connection connection = connectionFactory.newConnection();
    //Create session channel
    Channel channel = connection.createChannel();
    /**
     * Declaration notification email queue, declaration notification SMS queue
     * Parameter information: String queue, Boolean durable, Boolean exclusive, Boolean autodelete, map < string, Object > arguments
     * Parameter details:
     * 1,queue: Queue name
     * 2,durable: Whether it is persistent; If persistent, the queue will remain after mq restart
     * 3,exclusive: Exclusive (exclusive connection). The queue is only allowed to be accessed in this connection. If the connection is closed, the queue will also be deleted automatically. If this parameter is set to true, it can be used for temporary queues
     * 4,autoDelete: Auto delete: whether to delete the queue when it is no longer in use. If this parameter and exclusive are set to true, the temporary queue can be realized (temporary queue: automatically delete the queue when it is no longer in use)
     * 5,arguments: Parameter, which is used to set the extension parameters of the queue, such as survival time
     */
    channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
    channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
    /**
     * Define replacement machine
     *
     * Description of method parameters:
     * String exchange, BuiltinExchangeType type
     * 1,exchange: Switch name
     * 2,type: Switch Type 
     *  DIRECT: This corresponds to the Routing mode
     *  FANOUT :The corresponding rabbitmq work type is the publish/subscribe work mode
     *  TOPIC: Corresponding to Topics working mode
     *  HEADERS: Corresponding to Header mode
     */
    channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);

    /**
     * The queue is bound to the switch, and its routingKey can be set during binding (a queue can be set with the opposite song routingKey)
     * Description of method parameters:
     * String queue, String exchange, String routingKey
     * 1,queue: Queue name
     * 2,exchange: Switch name
     * 3,routingKey: Routing key, which is used by the switch to forward to the specified queue according to the route, and set an empty string in the publish subscribe mode
     */
    channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL_ROUTING_KEY_ONE);
    channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL_ROUTING_KEY_TWO);
    channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS_ROUTING_KEY);

    /**
     * send message
     *
     * Parameter information: String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body
     * Parameter details:
     * 1,exchange: If the switch is not pointed to, the mq default switch is used ("" is an empty string indicating that it is not pointed to)
     * 2,routingKey: Routing key: the switch sends the message to the specified queue according to the routing key. If the default switch is used, the routing key uses the name of the queue
     * 3,props: Message properties
     * 4,body: Message content
     */
    for (int i = 0; i < 10; i++) {
      String message = "hello,This is a message" + i;
      channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL_ROUTING_KEY_ONE, null, message.getBytes());
      channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS_ROUTING_KEY, null, message.getBytes());
      System.out.println("send to mq: " + message);
    }
    //Close session channel
    channel.close();
    //Close connection
    connection.close();
  }
}

Execute the main method of the message sender, and the idea console prints the sent message content. Let's take another look at the RabbitMQ management page, as shown in the following figure:

As shown in the figure:

1. Click Exchanges to view the list of all switches

2. Find the newly created switch according to the switch name and click to view the switch details

3. Click the Bindings hidden item to view the queue information bound to the switch

4. You can see that the queue has a specified routingKey (a queue can have multiple routingkeys)

Click the Queue button again to see the Queue information, as shown in the following figure:

In fact, it is not difficult to see that after the message sender executes, the switch matches the routingKey of the message with the routingKey of the bound queue, and the message is forwarded to the corresponding queue, which obtains the queue_inform_email and queue_ inform_ There are 10 messages in the SMS queue, which meets the expected results. At this time, the consumer is sent to consume the message and write the consumer.

Consumer side_ Email is used to listen and consume messages in the email notification queue

package com.xuecheng.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Author ScholarTang
 * @Date 2020/10/21 12:35 afternoon
 * @Desc Consumer side_ Email is used to listen and consume messages in the email notification queue
 */

public class ConsumerRoutingEmail {
  //The name of the queue that receives the mail message
  private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
  //The first routingKey specified when the mail message queue is bound to the switch: routing_key_email_one
  private static final String QUEUE_INFORM_EMAIL_ROUTING_KEY_ONE = "routing_key_email_one";
  //The second routingKey specified when the mail message queue is bound to the switch: routing_key_email_two
  private static final String QUEUE_INFORM_EMAIL_ROUTING_KEY_TWO = "routing_key_email_two";
  //Define switch name
  private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";

  public static void main(String[] args) throws Exception{
    //Create a new connection through the connection factory and establish a connection with MQ
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //Set connection parameters
    connectionFactory.setHost("127.0.0.1");//Host ip. The ip address of the device where the mq service is located
    connectionFactory.setPort(5672);//port
    connectionFactory.setVirtualHost("/");//Virtual machine ip
    connectionFactory.setUsername("guest");//user name
    connectionFactory.setPassword("guest");//password
    //make new connection
    Connection connection = connectionFactory.newConnection();
    //Create session channel < all communication between producer and mp service is completed in channel channel >
    Channel channel = connection.createChannel();

    /**
     * Create queue
     */
    channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
    /**
     * Create switch
     */
    channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);

    /**
     * Bind queue to switch
     */
    channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL_ROUTING_KEY_ONE);
    channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL_ROUTING_KEY_TWO);

    //Realizing consumption method
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
      /**
       * When the supervisor hears the message, this method will be called
       * @param consumerTag The consumer tag is used to identify the consumer. It can be set when listening to the queue
       * @param envelope Envelope. You can obtain the switch, message ID, etc. through the envelope (message ID, mq is used to identify the ID of the message in the channel channel, and can be used to determine whether the message is received)
       * @param properties Message properties (message properties set in the sender)
       * @param body Message content
       * @throws IOException
       */
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("The received message contents are:" + new String(body, "utf-8"));
      }
    };

    /**
     * listen queue 
     * Parameter information: String queue, boolean autoAck, Consumer callback
     * Parameter details:
     * 1,queue: queue
     * 2,autoAck: Automatic reply: after receiving the message, the consumer tells mq that the message has been received. If this parameter is set to true, it means automatic reply. If it is set to false, it needs to be programmed to reply
     * 3,callback: Consumption method: the method to be executed when the consumer receives the message
     */
    channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
  }
}

Message consumer_ sms, monitor and consume messages in the sms notification queue

package com.xuecheng.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Author ScholarTang
 * @Date 2020/10/21 12:35 afternoon
 * @Desc Message consumer_ sms, monitor and consume messages in the sms notification queue
 */

public class ConsumerRoutingSms {
  //Queue name of receiving SMS message
  private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
  //routingKey specified when the SMS message queue is bound to the switch: routing_key_sms
  private static final String QUEUE_INFORM_SMS_ROUTING_KEY = "routing_key_sms";
  //Define switch name
  private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";

  public static void main(String[] args) throws Exception{
    //Create a new connection through the connection factory and establish a connection with MQ
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //Set connection parameters
    connectionFactory.setHost("127.0.0.1");//Host ip. The ip address of the device where the mq service is located
    connectionFactory.setPort(5672);//port
    connectionFactory.setVirtualHost("/");//Virtual machine ip
    connectionFactory.setUsername("guest");//user name
    connectionFactory.setPassword("guest");//password
    //make new connection
    Connection connection = connectionFactory.newConnection();
    //Create session channel < all communication between producer and mp service is completed in channel channel >
    Channel channel = connection.createChannel();

    /**
     * Create queue
     */
    channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
    /**
     * Create switch
     */
    channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);

    /**
     * Bind queue to switch
     */
    channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS_ROUTING_KEY);

    //Realizing consumption method
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
      /**
       * When the supervisor hears the message, this method will be called
       * @param consumerTag The consumer tag is used to identify the consumer. It can be set when listening to the queue
       * @param envelope Envelope. You can obtain the switch, message ID, etc. through the envelope (message ID, mq is used to identify the ID of the message in the channel channel, and can be used to determine whether the message is received)
       * @param properties Message properties (message properties set in the sender)
       * @param body Message content
       * @throws IOException
       */
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("The received message contents are:" + new String(body, "utf-8"));
      }
    };

    /**
     * listen queue 
     * Parameter information: String queue, boolean autoAck, Consumer callback
     * Parameter details:
     * 1,queue: queue
     * 2,autoAck: Automatic reply: after receiving the message, the consumer tells mq that the message has been received. If this parameter is set to true, it means automatic reply. If it is set to false, it needs to be programmed to reply
     * 3,callback: Consumption method: the method to be executed when the consumer receives the message
     */
    channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);
  }
}

Execute the main method of the two consumer segments to consume the messages in their respective listening queues.

Console printing

//ConsumerRoutingEmail
 The received message contents are: hello,This is a message 0
 The received message contents are: hello,This is a message 1
 The received message contents are: hello,This is a message 2
 The received message contents are: hello,This is a message 3
 The received message contents are: hello,This is a message 4
 The received message contents are: hello,This is a message 5
 The received message contents are: hello,This is a message 6
 The received message contents are: hello,This is a message 7
 The received message contents are: hello,This is a message 8
 The received message contents are: hello,This is a message 9

//ConsumerRoutingSms
 The received message contents are: hello,This is a message 0
 The received message contents are: hello,This is a message 1
 The received message contents are: hello,This is a message 2
 The received message contents are: hello,This is a message 3
 The received message contents are: hello,This is a message 4
 The received message contents are: hello,This is a message 5
 The received message contents are: hello,This is a message 6
 The received message contents are: hello,This is a message 7
 The received message contents are: hello,This is a message 8
 The received message contents are: hello,This is a message 9

Take another look at the RabbitMQ management page. As shown in the figure below, the messages in both queues have been consumed

3.4.Topics wildcard working mode

3.4.1. Structure diagram of topics wildcard working mode:

Topics wildcard working mode:
1. A switch can bind multiple queues, and each queue can set one or more wildcard routingkeys

2. The producer sends the message to the switch. The switch matches the queue according to the value of routingKey. The wildcard method is used for matching, and the message is successfully forwarded to the specified queue

3.4.2. The difference between topics and Routing

The basic principles of Topics and Routing are the same, that is, the producer sends the message to the switch, and the switch forwards the message to the queue matching the routingKey according to the routingKey

The difference is the matching method of routingKey. Routing mode is equal matching, and Topics mode is wildcard matching

//Wildcard pattern matching
1,Symbol # : Match one or more words, such as inform.#, you can match information sms,inform.email,inform.email.sms
2,Symbol * : Only one word can be matched, such as inform.*,Can match inform.sms,inform.email
3.4.3.Topics wildcard working mode demonstration example

Case:
Set to notify the user according to the user's notification. Set the user receiving Email to receive Email only, and set the user receiving sms to receive sms only. If both notification types are set to receive, both notifications are valid

code

Message producer program code

package com.xuecheng.rabbitmq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


/**
 * @Author ScholarTang
 * @Date 2020/10/21 3:09 afternoon
 * @Desc rabbitmq-topics Working mode message producer
 */

public class ProducerTopics {
  //The name of the queue that receives the mail message
  private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
  //Queue name of receiving SMS message
  private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
  //The general routingKey match specified when the mail message queue is bound to the switch
  private static final String QUEUE_INFORM_EMAIL_ROUTING_KEY = "inform.#.email.#";
  //The routingKey wildcard specified when the SMS message queue is bound to the switch
  private static final String QUEUE_INFORM_SMS_ROUTING_KEY = "inform.#.sms.#";
  //Define switch name
  private static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";

  public static void main(String[] args) throws Exception {
    //Create a new connection through the connection factory and establish a connection with MQ
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //Configure connection parameters
    connectionFactory.setHost("127.0.0.1");//Host ip. The ip address of the device where the mq service is located
    connectionFactory.setPort(5672);//port
    connectionFactory.setUsername("guest");//user name
    connectionFactory.setPassword("guest");//password
    connectionFactory.setVirtualHost("/");//Virtual machine. A mq service can have multiple virtual machines, and each virtual machine is an independent mq
    //Establish a new connection
    Connection connection = connectionFactory.newConnection();
    //Create session channel
    Channel channel = connection.createChannel();
    /**
     * Declaration notification email queue, declaration notification SMS queue
     * Parameter information: String queue, Boolean durable, Boolean exclusive, Boolean autodelete, map < string, Object > arguments
     * Parameter details:
     * 1,queue: Queue name
     * 2,durable: Whether it is persistent; If persistent, the queue will remain after mq restart
     * 3,exclusive: Exclusive (exclusive connection). The queue is only allowed to be accessed in this connection. If the connection is closed, the queue will also be deleted automatically. If this parameter is set to true, it can be used for temporary queues
     * 4,autoDelete: Auto delete: whether to delete the queue when it is no longer in use. If this parameter and exclusive are set to true, the temporary queue can be realized (temporary queue: automatically delete the queue when it is no longer in use)
     * 5,arguments: Parameter, which is used to set the extension parameters of the queue, such as survival time
     */
    channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
    channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
    /**
     * Define replacement machine
     *
     * Description of method parameters:
     * String exchange, BuiltinExchangeType type
     * 1,exchange: Switch name
     * 2,type: Switch Type 
     *  DIRECT: This corresponds to the Routing mode
     *  FANOUT :The corresponding rabbitmq work type is the publish/subscribe work mode
     *  TOPIC: Corresponding to Topics working mode
     *  HEADERS: Corresponding to Header mode
     */
    channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);

    /**
     * The queue is bound to the switch, and its routingKey can be set during binding (a queue can be set with the opposite song routingKey)
     * Description of method parameters:
     * String queue, String exchange, String routingKey
     * 1,queue: Queue name
     * 2,exchange: Switch name
     * 3,routingKey: Routing key, which is used by the switch to forward to the specified queue according to the route, and set an empty string in the publish subscribe mode
     */
    channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_TOPICS_INFORM, QUEUE_INFORM_EMAIL_ROUTING_KEY);
    channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_TOPICS_INFORM, QUEUE_INFORM_SMS_ROUTING_KEY);

    /**
     * send message
     *
     * Parameter information: String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body
     * Parameter details:
     * 1,exchange: If the switch is not pointed to, the mq default switch is used ("" is an empty string indicating that it is not pointed to)
     * 2,routingKey: Routing key: the switch sends the message to the specified queue according to the routing key. If the default switch is used, the routing key uses the name of the queue
     * 3,props: Message properties
     * 4,body: Message content
     */
    //Send mail messages only
//    for (int i = 0; i < 10; i++) {
//      String message = "this is a mail message" + i;
//      channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.email", null, message.getBytes());
//      System.out.println("send to mq: " + message);
//    }

    //Send SMS messages only
//    for (int i = 0; i < 10; i++) {
//      String message = "this is a short message" + i;
//      channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms", null, message.getBytes());
//      System.out.println("send to mq: " + message);
//    }

    //Send email and SMS messages
    for (int i = 0; i < 10; i++) {
      String message = "Email and SMS messages were sent at the same time:" + i;
      channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.email.sms", null, message.getBytes());
      System.out.println("send to mq: " + message);
    }

    //Close session channel
    channel.close();
    //Close connection
    connection.close();
  }
}			

Similarly, run the message producer program, and the message content sent will be printed on the console, and then return to the RabbitMQ management page to view. Find the created switch according to the switch name defined in the code. Click in to see that the switch is bound to two queues, and the queues have different routingKey wildcard rules, as shown in the following figure:

Back to the code, specify the routingKey of the message when sending the message. When the message is sent to the switch, the switch will match the routingKey of the message with the routingKey of the bound queue according to the routingKey of the message. If it is matched, it will be forwarded to this queue. Then, the messages in the queue are consumed by the consumers listening to the queue

Consumer side_ email program code

package com.xuecheng.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Author ScholarTang
 * @Date 2020/10/10 12:35 afternoon
 * @Desc Message consumer_ email
 */

public class ConsumerTopicsEmail {
  //Defines the queue name used for notification email
  private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
  //Define switch name
  private static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";
  //The general routingKey match specified when the mail message queue is bound to the switch
  private static final String QUEUE_INFORM_EMAIL_ROUTING_KEY = "inform.#.email.#";

  public static void main(String[] args) throws Exception{
    //Create a new connection through the connection factory and establish a connection with MQ
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //Set connection parameters
    connectionFactory.setHost("127.0.0.1");//Host ip. The ip address of the device where the mq service is located
    connectionFactory.setPort(5672);//port
    connectionFactory.setVirtualHost("/");//Virtual machine ip
    connectionFactory.setUsername("guest");//user name
    connectionFactory.setPassword("guest");//password
    //make new connection
    Connection connection = connectionFactory.newConnection();
    //Create session channel < all communication between producer and mp service is completed in channel channel >
    Channel channel = connection.createChannel();

    /**
     * Create queue
     */
    channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
    /**
     * Create switch
     */
    channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);

    /**
     * Bind queue to switch
     */
    channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,QUEUE_INFORM_EMAIL_ROUTING_KEY);

    //Realizing consumption method
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
      /**
       * When the supervisor hears the message, this method will be called
       * @param consumerTag The consumer tag is used to identify the consumer. It can be set when listening to the queue
       * @param envelope Envelope. You can obtain the switch, message ID, etc. through the envelope (message ID, mq is used to identify the ID of the message in the channel channel, and can be used to determine whether the message is received)
       * @param properties Message properties (message properties set in the sender)
       * @param body Message content
       * @throws IOException
       */
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("The received message contents are:" + new String(body, "utf-8"));
      }
    };

    /**
     * listen queue 
     * Parameter information: String queue, boolean autoAck, Consumer callback
     * Parameter details:
     * 1,queue: queue
     * 2,autoAck: Automatic reply: after receiving the message, the consumer tells mq that the message has been received. If this parameter is set to true, it means automatic reply. If it is set to false, it needs to be programmed to reply
     * 3,callback: Consumption method: the method to be executed when the consumer receives the message
     */
    channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
  }
}

Consumer side_ sms program code

package com.xuecheng.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Author ScholarTang
 * @Date 2020/10/10 12:35 afternoon
 * @Desc Consumer side_ sms
 */

public class ConsumerTopicsSms {
  //Defines the queue name used for notification email
  private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
  //Define switch name
  private static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";
  //The general routingKey match specified when the mail message queue is bound to the switch
  private static final String QUEUE_INFORM_SMS_ROUTING_KEY = "inform.#.sms.#";

  public static void main(String[] args) throws Exception{
    //Create a new connection through the connection factory and establish a connection with MQ
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //Set connection parameters
    connectionFactory.setHost("127.0.0.1");//Host ip. The ip address of the device where the mq service is located
    connectionFactory.setPort(5672);//port
    connectionFactory.setVirtualHost("/");//Virtual machine ip
    connectionFactory.setUsername("guest");//user name
    connectionFactory.setPassword("guest");//password
    //make new connection
    Connection connection = connectionFactory.newConnection();
    //Create session channel < all communication between producer and mp service is completed in channel channel >
    Channel channel = connection.createChannel();

    /**
     * Create queue
     */
    channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
    /**
     * Create switch
     */
    channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);

    /**
     * Bind queue to switch
     */
    channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,QUEUE_INFORM_SMS_ROUTING_KEY);

    //Realizing consumption method
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
      /**
       * When the supervisor hears the message, this method will be called
       * @param consumerTag The consumer tag is used to identify the consumer. It can be set when listening to the queue
       * @param envelope Envelope. You can obtain the switch, message ID, etc. through the envelope (message ID, mq is used to identify the ID of the message in the channel channel, and can be used to determine whether the message is received)
       * @param properties Message properties (message properties set in the sender)
       * @param body Message content
       * @throws IOException
       */
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("The received message contents are:" + new String(body, "utf-8"));
      }
    };

    /**
     * listen queue 
     * Parameter information: String queue, boolean autoAck, Consumer callback
     * Parameter details:
     * 1,queue: queue
     * 2,autoAck: Automatic reply: after receiving the message, the consumer tells mq that the message has been received. If this parameter is set to true, it means automatic reply. If it is set to false, it needs to be programmed to reply
     * 3,callback: Consumption method: the method to be executed when the consumer receives the message
     */
    channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);
  }
}

Run two consumer programs. If the message sent by the message producer is a mail message, the message will be consumed by the consumer listening to the mail.

Or the message sent by the message producer is a short message message, and the corresponding message will be consumed by the consumers listening to the short message.

If the message producer sends e-mail and SMS messages at the same time (the routingKey of the message can be matched by the routingKey wildcard of the two queues at the same time), there are the same messages in both queues, and the messages are consumed by the consumers listening to them.

3.5.Header mode

3.5.1.Header mode structure diagram:

The difference between the header mode and the routing mode is that the header mode cancels the routingKey and uses the key/value (key value pair) in the header to match the queue.

3.5.2. Demonstration example of header working mode

We still use the example of topics working mode. The header working mode, routing and topics working modes are also the operation of changing the soup without changing the dressing. In the routing mode, the routingKey of the message is compared with the routingKey of the queue in the switch, and if it is equal, it is forwarded to the queue. Topics adds a matching rule to the routing, while the header discards the routingKey and uses the form of a key value pair. When sending a message, it sets a key value pair for the message. The switch matches its bound queue according to the key value pair of the message, and forwards it to this queue. Finally, the consumer listening to the queue consumes the messages in the queue.

Case:
Set to notify the user according to the user's notification. Set the user receiving Email to receive Email only, and set the user receiving sms to receive sms only. If both notification types are set to receive, both notifications are valid

Code

Message producer program code

package com.xuecheng.rabbitmq;

import com.rabbitmq.client.*;

import java.util.HashMap;
import java.util.Map;


/**
 * @Author ScholarTang
 * @Date 2020/10/21 3:09 afternoon
 * @Desc rabbitmq-header Working mode message producer
 */

public class ProducerHeader {
  //The name of the queue that receives the mail message
  private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
  //Queue name of receiving SMS message
  private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
  //Define switch name
  private static final String EXCHANGE_HEADER_INFORM = "exchange_header_inform";

  public static void main(String[] args) throws Exception {
    //Create a new connection through the connection factory and establish a connection with MQ
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //Configure connection parameters
    connectionFactory.setHost("127.0.0.1");//Host ip. The ip address of the device where the mq service is located
    connectionFactory.setPort(5672);//port
    connectionFactory.setUsername("guest");//user name
    connectionFactory.setPassword("guest");//password
    connectionFactory.setVirtualHost("/");//Virtual machine. A mq service can have multiple virtual machines, and each virtual machine is an independent mq
    //Establish a new connection
    Connection connection = connectionFactory.newConnection();
    //Create session channel
    Channel channel = connection.createChannel();
    /**
     * Declaration notification email queue, declaration notification SMS queue
     * Parameter information: String queue, Boolean durable, Boolean exclusive, Boolean autodelete, map < string, Object > arguments
     * Parameter details:
     * 1,queue: Queue name
     * 2,durable: Whether it is persistent; If persistent, the queue will remain after mq restart
     * 3,exclusive: Exclusive (exclusive connection). The queue is only allowed to be accessed in this connection. If the connection is closed, the queue will also be deleted automatically. If this parameter is set to true, it can be used for temporary queues
     * 4,autoDelete: Auto delete: whether to delete the queue when it is no longer in use. If this parameter and exclusive are set to true, the temporary queue can be realized (temporary queue: automatically delete the queue when it is no longer in use)
     * 5,arguments: Parameter, which is used to set the extension parameters of the queue, such as survival time
     */
    channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
    channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
    /**
     * Define replacement machine
     *
     * Description of method parameters:
     * String exchange, BuiltinExchangeType type
     * 1,exchange: Switch name
     * 2,type: Switch Type 
     *  DIRECT: This corresponds to the Routing mode
     *  FANOUT :The corresponding rabbitmq work type is the publish/subscribe work mode
     *  TOPIC: Corresponding to Topics working mode
     *  HEADERS: Corresponding to Header mode
     */
    channel.exchangeDeclare(EXCHANGE_HEADER_INFORM, BuiltinExchangeType.HEADERS);

    /**
     * The queue is bound to the switch, and its routingKey can be set during binding (a queue can be set with the opposite song routingKey)
     * Description of method parameters:
     * String queue, String exchange, String routingKey,Map<String, Object> arguments
     * 1,queue: Queue name
     * 2,exchange: Switch name
     * 3,routingKey: Routing key, which is used by the switch to forward to the specified queue according to the route, and set an empty string in the publish subscribe mode
     * 4,arguments: Parameter key/value key value pair
     */
    Map<String, Object> emailMap = new HashMap<>();
    emailMap.put("inform.email","email");

    Map<String, Object> smsMap = new HashMap<>();
    smsMap.put("inform.sms","sms");
    channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_HEADER_INFORM, "",emailMap);
    channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_HEADER_INFORM, "",smsMap);

    /**
     * send message
     *
     * Parameter information: String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body
     * Parameter details:
     * 1,exchange: If the switch is not pointed to, the mq default switch is used ("" is an empty string indicating that it is not pointed to)
     * 2,routingKey: Routing key: the switch sends the message to the specified queue according to the routing key. If the default switch is used, the routing key uses the name of the queue
     * 3,props: Message properties
     * 4,body: Message content
     */

    //send mail message
    for (int i = 0; i < 10; i++) {
      Map<String, Object> map = new HashMap<>();
      map.put("inform.email","email");
      String message = "Send mail message:" + i;
      AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
      properties.headers(map);
      channel.basicPublish(EXCHANGE_HEADER_INFORM, "", properties.build(), message.getBytes());
      System.out.println("send to mq: " + message);
    }

    //Send SMS message
    for (int i = 0; i < 10; i++) {
      Map<String, Object> map = new HashMap<>();
      map.put("inform.sms","sms");
      String message = "Send SMS message:" + i;
      AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
      properties.headers(map);
      channel.basicPublish(EXCHANGE_HEADER_INFORM, "", properties.build(), message.getBytes());
      System.out.println("send to mq: " + message);
    }
    //Send email and SMS messages
    for (int i = 0; i < 10; i++) {
      Map<String, Object> map = new HashMap<>();
      map.put("inform.email","email");
      map.put("inform.sms","sms");
      String message = "Email and SMS messages were sent at the same time:" + i;
      AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
      properties.headers(map);
      channel.basicPublish(EXCHANGE_HEADER_INFORM, "", properties.build(), message.getBytes());
      System.out.println("send to mq: " + message);
    }
    //Close session channel
    channel.close();
    //Close connection
    connection.close();
  }
}

Consumer side_ email program code

package com.xuecheng.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * @Author ScholarTang
 * @Date 2020/10/10 12:35 afternoon
 * @Desc Consumer side_ email
 */

public class ConsumerHeaderEmail {
  //Defines the queue name used for notification email
  private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
  //Define switch name
  private static final String EXCHANGE_HEADER_INFORM = "exchange_header_inform";

  public static void main(String[] args) throws Exception{
    //Create a new connection through the connection factory and establish a connection with MQ
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //Set connection parameters
    connectionFactory.setHost("127.0.0.1");//Host ip. The ip address of the device where the mq service is located
    connectionFactory.setPort(5672);//port
    connectionFactory.setVirtualHost("/");//Virtual machine ip
    connectionFactory.setUsername("guest");//user name
    connectionFactory.setPassword("guest");//password
    //make new connection
    Connection connection = connectionFactory.newConnection();
    //Create session channel < all communication between producer and mp service is completed in channel channel >
    Channel channel = connection.createChannel();

    /**
     * Create queue
     */
    channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
    /**
     * Create switch
     */
    channel.exchangeDeclare(EXCHANGE_HEADER_INFORM, BuiltinExchangeType.HEADERS);

    /**
     * Bind queue to switch
     */
    Map<String, Object> smsMap = new HashMap<>();
    smsMap.put("inform.email","email");
    channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADER_INFORM,"",smsMap);

    //Realizing consumption method
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
      /**
       * When the supervisor hears the message, this method will be called
       * @param consumerTag The consumer tag is used to identify the consumer. It can be set when listening to the queue
       * @param envelope Envelope. You can obtain the switch, message ID, etc. through the envelope (message ID, mq is used to identify the ID of the message in the channel channel, and can be used to determine whether the message is received)
       * @param properties Message properties (message properties set in the sender)
       * @param body Message content
       * @throws IOException
       */
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("The received message contents are:" + new String(body, "utf-8"));
      }
    };

    /**
     * listen queue 
     * Parameter information: String queue, boolean autoAck, Consumer callback
     * Parameter details:
     * 1,queue: queue
     * 2,autoAck: Automatic reply: after receiving the message, the consumer tells mq that the message has been received. If this parameter is set to true, it means automatic reply. If it is set to false, it needs to be programmed to reply
     * 3,callback: Consumption method: the method to be executed when the consumer receives the message
     */
    channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
  }
}

Consumer side_ sms program code

package com.xuecheng.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * @Author ScholarTang
 * @Date 2020/10/10 12:35 afternoon
 * @Desc Consumer side_ sms
 */

public class ConsumerHeaderSms {
  //Queue name of receiving SMS message
  private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
  //Define switch name
  private static final String EXCHANGE_HEADER_INFORM = "exchange_header_inform";

  public static void main(String[] args) throws Exception{
    //Create a new connection through the connection factory and establish a connection with MQ
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //Set connection parameters
    connectionFactory.setHost("127.0.0.1");//Host ip. The ip address of the device where the mq service is located
    connectionFactory.setPort(5672);//port
    connectionFactory.setVirtualHost("/");//Virtual machine ip
    connectionFactory.setUsername("guest");//user name
    connectionFactory.setPassword("guest");//password
    //make new connection
    Connection connection = connectionFactory.newConnection();
    //Create session channel < all communication between producer and mp service is completed in channel channel >
    Channel channel = connection.createChannel();

    /**
     * Create queue
     */
    channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
    /**
     * Create switch
     */
    channel.exchangeDeclare(EXCHANGE_HEADER_INFORM, BuiltinExchangeType.HEADERS);

    /**
     * Bind queue to switch
     */
    Map<String, Object> smsMap = new HashMap<>();
    smsMap.put("inform.sms","sms");
    channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADER_INFORM,"",smsMap);

    //Realizing consumption method
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
      /**
       * When the supervisor hears the message, this method will be called
       * @param consumerTag The consumer tag is used to identify the consumer. It can be set when listening to the queue
       * @param envelope Envelope. You can obtain the switch, message ID, etc. through the envelope (message ID, mq is used to identify the ID of the message in the channel channel, and can be used to determine whether the message is received)
       * @param properties Message properties (message properties set in the sender)
       * @param body Message content
       * @throws IOException
       */
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("The received message contents are:" + new String(body, "utf-8"));
      }
    };

    /**
     * listen queue 
     * Parameter information: String queue, boolean autoAck, Consumer callback
     * Parameter details:
     * 1,queue: queue
     * 2,autoAck: Automatic reply: after receiving the message, the consumer tells mq that the message has been received. If this parameter is set to true, it means automatic reply. If it is set to false, it needs to be programmed to reply
     * 3,callback: Consumption method: the method to be executed when the consumer receives the message
     */
    channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);
  }
}

3.6.RCP operating mode

3.6.1.RCP working mode structure diagram

RPC working mode:
RPC is the method by which the client remotely calls the server. Using MQ, the asynchronous call of RPC can be realized based on the Direct switch. The process is as follows:
1. The client is both a producer and a consumer. It sends RPC call messages to the RPC request queue and listens to the RPC response queue at the same time

2. The server listens to the RPC request queue message and executes the method of the server after receiving the message to get the return result of the method

3. The server sends the results of the RPC method to the RPC response queue

3.6.2.RPC working mode demonstration example

Case:

Take the structure diagram as an example, simulate a client sending a message to the server, and the server receives the message and replies

Code

RpcClient

package com.xuecheng.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Scanner;

/**
 * @Author ScholarTang
 * @Date 2020/10/21 8:18 afternoon
 * @Desc rabbitmq_rpc Mode client
 */

public class RpcClient {

  //Define rcp queue name
  private static final String QUEUE_INFORM_RPC = "queue_inform_rpc";
  //rpc queue definition response name
  private static final String QUEUE_INFORM_RPC_RETURN = "queue_inform_rpc_return";

  public static void main(String[] args) throws Exception{
    //Create a new connection through the connection factory and establish a connection with MQ
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //Configure connection parameters
    connectionFactory.setHost("127.0.0.1");//Host ip. The ip address of the device where the mq service is located
    connectionFactory.setPort(5672);//port
    connectionFactory.setUsername("guest");//user name
    connectionFactory.setPassword("guest");//password
    connectionFactory.setVirtualHost("/");//Virtual machine. A mq service can have multiple virtual machines, and each virtual machine is an independent mq
    //Establish a new connection
    Connection connection = connectionFactory.newConnection();
    //Create session channel
    Channel channel = connection.createChannel();
    //Create a new channel and use this channel to save the connection with the rpc response queue
    Channel channelByRpcReturn = connection.createChannel();
    //Declare rpc queue
    channel.queueDeclare(QUEUE_INFORM_RPC, true, false, false, null);
    //Declare the rpc response queue. This operation is to ensure that the queue exists
    channelByRpcReturn.queueDeclare(QUEUE_INFORM_RPC_RETURN, true, false, false, null);
    sendMessage(channel,QUEUE_INFORM_RPC);
    //Realizing consumption method
    DefaultConsumer defaultConsumer = new DefaultConsumer(channelByRpcReturn){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("Server response:" + new String(body, "utf-8"));
        System.out.println();
        sendMessage(channel,QUEUE_INFORM_RPC);
      }
    };
    //Consume messages in rpc response queue
    channelByRpcReturn.basicConsume(QUEUE_INFORM_RPC_RETURN,true,defaultConsumer);
  }

  public static void sendMessage(Channel channel,String queue) throws IOException {
    System.out.print("Send message to server:");
    Scanner scanner = new Scanner(System.in);
    String message = scanner.next();
    System.out.println("client:" + message);
    //Send message to rpc queue
    channel.basicPublish("",queue,null,message.getBytes());
  }
}

RpcServer

package com.xuecheng.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Scanner;

/**
 * @Author ScholarTang
 * @Date 2020/10/21 8:38 afternoon
 * @Desc rabbitmq_rpc Mode server
 */

public class RpcServer {
  //Define rcp queue name
  private static final String QUEUE_INFORM_RPC = "queue_inform_rpc";
  //rpc queue definition response name
  private static final String QUEUE_INFORM_RPC_RETURN = "queue_inform_rpc_return";

  public static void main(String[] args) throws Exception {
    //Create a new connection through the connection factory and establish a connection with MQ
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //Configure connection parameters
    connectionFactory.setHost("127.0.0.1");//Host ip. The ip address of the device where the mq service is located
    connectionFactory.setPort(5672);//port
    connectionFactory.setUsername("guest");//user name
    connectionFactory.setPassword("guest");//password
    connectionFactory.setVirtualHost("/");//Virtual machine. A mq service can have multiple virtual machines, and each virtual machine is an independent mq
    //Establish a new connection
    Connection connection = connectionFactory.newConnection();
    //Create session channel
    Channel channel = connection.createChannel();
    //Create a new channel and use this channel to keep contact with the rpc response queue
    Channel channelByRpcReturn = connection.createChannel();
    //Declare rpc queue
    channel.queueDeclare(QUEUE_INFORM_RPC, true, false, false, null);
    //Declare the rpc response queue. This operation is to ensure that the queue exists
    channelByRpcReturn.queueDeclare(QUEUE_INFORM_RPC_RETURN, true, false, false, null);
    //Realizing consumption method
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("Received message from client:" + new String(body, "utf-8"));
        sendMessage(channelByRpcReturn,QUEUE_INFORM_RPC_RETURN);
      }
    };
    //Consume messages in rpc response queue
    channelByRpcReturn.basicConsume(QUEUE_INFORM_RPC, true, defaultConsumer);
  }
  public static void sendMessage(Channel channel,String queue) throws IOException {
    System.out.print("Response message to client:");
    Scanner scanner = new Scanner(System.in);
    String message = scanner.next();
    System.out.println("Server:" + message);
    //Send message to rpc queue
    channel.basicPublish("",queue,null,message.getBytes());
  }
}

Final effect:

4.SpringBoot integrates RabbitMQ

1. Build SpringBoot project

In order to see the message more intuitively during the test, it is also necessary to build a message consumer and message producer when building the project

2. Introduce RabbitMQ related dependencies into the pom file of the project

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3. Configure RabbitMQ connection information in yml file

# ProducerServer
server:
  port: 44000
spring:
  application:
    name: test‐rabbitmq‐producer
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    
# ConsumerServer
server:
  port: 44000
spring:
  application:
    name: test‐rabbitmq‐consumer
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

4. Write a configuration class to configure switch, queue and other information

We use this configuration class on both the message production side and the message consumption side to ensure that no matter which side gets up first, these queues and switches will be created

RabbitMqConfig.java

package com.xuecheng.test.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author ScholarTang
 * @Date 2020/10/22 10:36 morning
 * @Desc rabbitmq Configuration class
 */
@Configuration
public class RabbitMqConfig {

  //TODO usually writes this information in the configuration. Here, for convenience, it is directly defined as a static constant

  //The name of the queue that receives the mail message
  public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
  //Queue name of receiving SMS message
  public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
  //The general routingKey match specified when the mail message queue is bound to the switch
  public static final String QUEUE_INFORM_EMAIL_ROUTING_KEY = "inform.#.email.#";
  //The routingKey wildcard specified when the SMS message queue is bound to the switch
  public static final String QUEUE_INFORM_SMS_ROUTING_KEY = "inform.#.sms.#";
  //Define switch name
  public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";

  /**
   * Declaration switch
   * @return
   */
  @Bean(EXCHANGE_TOPICS_INFORM)
  public Exchange EXCHANGE_TOPICS_INFORM(){
    return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
  }

  /**
   * Claim queue - mailbox queue
   * @return
   */
  @Bean(QUEUE_INFORM_EMAIL)
  public Queue QUEUE_INFORM_EMAIL(){
    return new Queue(QUEUE_INFORM_EMAIL);
  }

    /**
     * Declaration queue - SMS queue
   * @return
     */
  @Bean(QUEUE_INFORM_SMS)
  public Queue QUEUE_INFORM_SMS(){
    return new Queue(QUEUE_INFORM_EMAIL);
  }


  /**
   * Queue binding switch - mail queue binding switch
   * @param queue Queue instance
   * @param exchange Switch instance
   * @return
   */
  @Bean
  public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(QUEUE_INFORM_EMAIL_ROUTING_KEY).noargs();
  }

    /**
     * Queue bound switch
   * @param queue
     * @param exchange
     * @return
     */
  @Bean
  public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
    return BindingBuilder.bind(queue).to(exchange).with(QUEUE_INFORM_SMS_ROUTING_KEY).noargs();
  }
}

5. Write a message producer program at the message production end to send messages to the switch

We use RabbitTemplate to send messages. Here, I directly complete the operation of sending messages in the SpringBoot test class

/**
 * @Author ScholarTang
 * @Date 2020/10/22 11:00 morning
 * @Desc SpringBoot Program test class
 */

@RunWith(SpringRunner.class)
@SpringBootTest
public class TestRabbitMqApplicationTest {

  /**
   * Inject rabbitTemplate and use rabbitTemplate to send messages
   */
  @Autowired
  private RabbitTemplate rabbitTemplate;

  @Test
  public void test() {
    String message = "This is an email message";
    /**
     * send message
     * Method parameter description: String exchange, String routingKey, Object message
     * 1,exchange: Switch
     * 2,routingKey: Routing Key
     * 3,message: Message content
     */
    rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_TOPICS_INFORM, RabbitMqConfig.QUEUE_INFORM_EMAIL_ROUTING_KEY, message);
  }
}

6. Write a message consumer program at the message consumer to listen to the queue consumption messages

Here, I define the consumer side as a Bean. When the SpringBoot program starts, it will be managed by the Spring container. An annotation is used in the class to listen for messages in the queue and then consume messages

package com.xuecheng.test.rabbitmq.mq;

import com.xuecheng.test.rabbitmq.config.RabbitMqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Author ScholarTang
 * @Date 2020/10/22 11:38 morning
 * @Desc Message consumer
 */

@Slf4j
@Component
public class ReceiveHandler {

  /**
   * @RabbitListener Annotations are used to listen to queues
   * @param message
   */
  @RabbitListener(queues = {RabbitMqConfig.QUEUE_INFORM_EMAIL})
  public void consumer(String message){
    log.info("Message received:{}", message);
  }
}

7. Test

1. First start the SpringBoot program on the message consumer side, and the SpringBoot scan bean loads it into the Spring container. At this time, the queue and switch will be created and the queue will be monitored. When there are messages in the queue, they will be consumed

2. Run the test method of the message production side to send the message

3. The message consumer receives the message

Tags: Java RabbitMQ

Posted by Joe_Dean on Tue, 10 May 2022 22:13:31 +0300