RabbitMQ
[Dark horse programmer RabbitMQ full set of tutorials, rabbitmq message middleware to actual combat]
Day 1 Basics
4 RabbitMQ working mode
4.4 Topic wildcard mode
4.4.1 Mode Description
look at the documentation
Compared with Direct, Topic type can route messages to different queues according to RoutingKey. It's just that the Exchange in the Topic type allows the queue to use wildcards when binding the Routing key!
Routingkey generally consists of one or more words, separated by ".", for example: item.insert
Wildcard rules: # matches one or more words, * matches exactly one word, for example: item.# can match item.insert.abc
or item.insert, item.* can only match item.insert
nothing wrong, very clear
4.4.2 Code writing
Here is a small request:
We have been able to "orientate" to store error-level information in the database before.
Now I hope to store order-related logs [no matter what level] in this way and store them in the database.
[producer]
package com.dingjiaxiong.producer; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * ClassName: Producer_Topics * date: 2022/11/16 10:58 * Send a message * * @author DingJiaxiong */ public class Producer_Topics { public static void main(String[] args) throws IOException, TimeoutException { //1. Create a connection factory ConnectionFactory factory = new ConnectionFactory(); //2. Setting parameters factory.setHost("xxxxxxxxxxxxxxxxxx"); // Server IP [default localhost] factory.setPort(5672); //Port [the default is also 5672] factory.setVirtualHost("/ding"); //Virtual machine [default is /] factory.setUsername("dingjiaxiong"); // Username [default guest] factory.setPassword("12345"); //Password [default guest] //3. Create a connection Connection Connection connection = factory.newConnection(); //4. Create Channel Channel channel = connection.createChannel(); //5. Create the switch /** * exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) * Parameter explanation: * 1. exchange: switch name * 2. type: switch type * [enumerate] * DIRECT("direct"): orientation * FANOUT("fanout"): Fan [Broadcast] [Send a message to each bound queue] * TOPIC("topic"): wildcard way * HEADERS("headers"): parameter matching * 3. durable: Whether to persist * 4. autoDelete: automatically delete * 5. internal: Internal use [General false] * 6. arguments: parameter * */ String exchangeName = "test_topic"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null); //6. Create a queue [two] String queue1Name = "test_topic_queue1"; String queue2Name = "test_topic_queue2"; channel.queueDeclare(queue1Name, true, false, false, null); channel.queueDeclare(queue2Name, true, false, false, null); //7. Bind queues and exchanges /** * queueBind(String queue, String exchange, String routingKey) * Parameter explanation: * 1. queue: parameter * 2. exchange: switch name * 3. routingKey: Routing key [binding rule] [if the switch type is fanout, the default is empty] * */ //The name of the routing key system. The level of the log // Requirements: All error-level logs are stored in the database, and all order system logs are stored in the database channel.queueBind(queue1Name, exchangeName, "#.error"); channel.queueBind(queue1Name, exchangeName, "order.*"); channel.queueBind(queue2Name, exchangeName, "*.*"); //8. Send a message String body = "This is a log message...[order systematic]"; channel.basicPublish(exchangeName,"order.info",null,body.getBytes()); //9. Release resources channel.close(); connection.close(); } }
OK, run directly
OK, check the console
The switch is successfully created, check the binding relationship
OK, no problem
view queue
You can see that both queues have received messages
[consumer]
package com.dingjiaxiong.consumer; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * ClassName: Consumer_Topic1 * date: 2022/11/16 12:37 * * @author DingJiaxiong */ public class Consumer_Topic1 { public static void main(String[] args) throws IOException, TimeoutException { //1. Create a connection factory ConnectionFactory factory = new ConnectionFactory(); //2. Setting parameters factory.setHost("xxxxxxxxxxxxxxxxx"); // Server IP [default localhost] factory.setPort(5672); //Port [the default is also 5672] factory.setVirtualHost("/ding"); //Virtual machine [default is /] factory.setUsername("dingjiaxiong"); // Username [default guest] factory.setPassword("12345"); //Password [default guest] //3. Create a connection Connection Connection connection = factory.newConnection(); //4. Create Channel Channel channel = connection.createChannel(); String queue1Name = "test_topic_queue1"; String queue2Name = "test_topic_queue2"; // Receive news [consumer news] /** * basicConsume(String queue, boolean autoAck, Consumer callback) * Parameter explanation: * 1. queue: queue name * 2. autoAck: Whether to confirm automatically * 3. callback: callback object * */ Consumer consumer = new DefaultConsumer(channel){ //Callback method, when a message is received, this method will be executed automatically /* * 1. consumerTag:ID of the message * 2. envelope:Get some information [switch, routing key...] * 3. properties: configuration information * 4. body: data * */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // System.out.println("consumerTag: " + consumerTag); // System.out.println("Exchange: " + envelope.getExchange()); // System.out.println("RoutingKey: " + envelope.getRoutingKey()); // System.out.println("properties: " + properties); System.out.println("body:" + new String(body)); System.out.println("save logs to database..."); } }; channel.basicConsume(queue1Name,true,consumer); // Don't close the resource, keep it listening } }
OK, consumer 2
OK
run directly
OK, nothing wrong, order.info is not only printed to the console, but also stored in the database
Now come a goods log
send directly
Another goods.error
nothing wrong. [This is the Topic working mode]
4.4.3 Summary
Topic mode can realize the functions of Pub/Sub publishing and subscribing mode and Routing routing mode, but Topic can use wildcards when configuring routing key, which is more flexible.
4.5 Summary of working mode
- Simple Mode HelloWorld
One producer, one consumer, no switch needed (use the default switch).
- Work Queue Mode Work Queue
One producer, multiple consumers (competitive relationship), no need to set a switch (use the default switch).
- Publish/subscribe mode Publish/subscribe
It is necessary to set a switch of type fanout, and bind the switch to the queue. After sending a message to the switch, the switch will send the message to the bound queue.
- Routing mode Routing
It is necessary to set a direct switch, bind the switch to the queue, and specify the routing key. When sending a message to the switch, the switch will send the message to the corresponding queue according to the routing key.
- Wildcard pattern Topic
It is necessary to set a switch of topic type, bind the switch to a queue, and specify a routing key in the form of a wildcard. When a message is sent to the switch, the switch will send the message to the corresponding queue according to the routing key.