📃 Personal homepage: Pikachu moving forward
🌞Blog description: Dreams may be out of reach, but the important thing is the process of chasing dreams. Use blogs to record your own growth and record your step-by-step marks of climbing up.
🔥Personal column: message middleware
1. Basic introduction to subscription model
- P: Producer, send message to switch
- C: Consumer, receiving messages
- X: An exchange, which on the one hand receives the message sent by the producer, and on the other hand knows what to do with the message, should it be attached to a specific queue? Should it be appended to multiple queues? Or it should be discarded. Its rules are defined by the exchange type.
- Queue: message queue, receive messages, cache messages
- Each consumer listens to its own queue
- The producer sends the message to the broker, and the exchange forwards the message to each queue bound to the exchange, and each queue bound to the exchange will receive the message.
2. Switch
- The core idea of ​​the messaging model in RabbitMQ is that the producer never sends any messages directly to the queue. In fact, many times, the producer doesn't even know if the message will be delivered to any queue. Instead, producers can only send messages to the _exchange_. The work of a switch is a very simple matter. On the one hand, it receives messages from producers and on the other hand pushes them to the queue. The exchange must know exactly what to do with the messages it receives. Should it be attached to a specific queue? Should it be appended to multiple queues? Or it should be discarded. Its rules are defined by _Exchange Type_.
- The Exchange is only responsible for forwarding messages, and does not have the ability to store messages, so if there is no queue bound to the Exchange, or there is no queue that meets the routing rules, then the message will be lost!
switch type
- Fanout: broadcast, deliver the message to all queues bound to the exchange
- Direct: Direct, send the message to the queue that matches the specified routing key
- Topic: wildcard, send the message to the queue that conforms to the routing pattern (routing pattern)
3. Publish-subscribe model
3.1 Basic introduction
To configure a fanout type switch, it is not necessary to specify the corresponding routing key, and the message will be routed to each message queue at the same time. Each message queue can store the same message, which is then associated by its own message queue. of consumer spending
3.2 Producers
public class Producer { public static String FANOUT_EXCHANGE = " fanout_exchange"; public static String FANOUT_QUEUE_1 = "fanout_queue_1"; public static String FANOUT_QUEUE_2 = "fanout_queue_2"; public static void main(String[] args) { try { Channel channel = ConnectUtil.getChannel(); //declare switch (switch name, switch type) channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); //declare queue channel.queueDeclare(FANOUT_QUEUE_1,true,false,false,null); channel.queueDeclare(FANOUT_QUEUE_2,true,false,false,null); //Bind exchanges to queues channel.queueBind(FANOUT_QUEUE_1,FANOUT_EXCHANGE,""); channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHANGE,""); //Send a message for (int i = 1; i <=10 ; i++) { String msg="hello bunny, publish subscribe model : "+i; channel.basicPublish(FANOUT_EXCHANGE, "", null, msg.getBytes()); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
3.3 Consumers
Consumer 1
public class Consumer1 { public static void main(String[] args) { try { Channel channel = ConnectUtil.getChannel(); channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null); channel.exchangeDeclare(Producer.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); //Bind the queue to the switch Queue name, switch name, routing key channel.queueBind(Producer.FANOUT_QUEUE_1, Producer.FANOUT_EXCHANGE, ""); //receive message DefaultConsumer consumer = new DefaultConsumer(channel) { /** * Consumption callback function, when a message is received, this method will be automatically executed * @param consumerTag consumer identification * @param envelope The content of the message packet (such as switch, routing key, message id, etc.) * @param properties attribute information * @param body message data * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Message received by messager 1:" + new String(body, "UTF-8")); } }; //Listen for messages (queue name, whether to automatically confirm the message, consumer object) channel.basicConsume(Producer.FANOUT_QUEUE_1, true, consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
Consumer 2
public class Consumer2 { public static void main(String[] args) { try { Channel channel = ConnectUtil.getChannel(); channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null); channel.exchangeDeclare(Producer.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); //Bind the queue to the switch Queue name, switch name, routing key channel.queueBind(Producer.FANOUT_QUEUE_2, Producer.FANOUT_EXCHANGE, ""); //receive message DefaultConsumer consumer = new DefaultConsumer(channel) { /** * Consumption callback function, when a message is received, this method will be automatically executed * @param consumerTag consumer identification * @param envelope The content of the message packet (such as switch, routing key, message id, etc.) * @param properties attribute information * @param body message data * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Message received by messager 2:" + new String(body, "UTF-8")); } }; //Listen for messages (queue name, whether to automatically confirm the message, consumer object) channel.basicConsume(Producer.FANOUT_QUEUE_2, true, consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
3.4 Testing