RabbitMQ's core concept, read must understand!

Author: Haixiang source: cnblogs com/haixiang/p/10853467. html

RabbitMQ features

Compared with other message queues, RabbitMQ has a series of measures to prevent message loss and has strong high availability performance. Its throughput may not be as large as other message queues, but its message security is outstanding and is widely used in financial services.

AMQP protocol

AMQP: Advanced Message Queuing Protocol

AMQP definition: it is a binary protocol with modern characteristics. It is an application layer standard and advanced message queuing protocol that provides unified messaging services. It is an open standard of application layer protocol and is designed for message oriented middleware.

Erlang language was originally based on the architecture mode in the switch field, which makes the performance of RabbitMQ in data interaction between brokers very excellent. The advantage of Erlang: Erlang has the same latency as the native Socket.

RabbitMQ is an open source message broker and queue server, which is used to share data between completely different applications through common protocols. RabbitMQ is written in Erlang language, and RabbitMQ is based on AMQP protocol. Follow the official account Java technology stack to get a series of RabbitMQ tutorials.

RabbitMQ messaging mechanism

The producer sends the message to the specified exchange. Exchange sends the message to 0 - n queues according to its own type (direct, topic, etc.) and routing key, and the queue forwards the message to the consumer.

Server: also known as Broker, it accepts client connections and implements AMQP entity services. Here, it refers to RabbitMQ server

Connection: connection, the network connection between the application and the Broker.

Channel: network channel. Almost all operations are carried out in channel. Channel is the channel for message reading and writing. The client can establish multiple channels: each channel represents a session task.

Virtual Host: virtual address, which is used for logical isolation. It is the top-level message routing. A Virtual Host can have several exchanges and queues. A Virtual Host cannot have exchanges or queues with the same name. The minimum granularity of permission control is Virtual Host.

Binding: the virtual connection between exchange and Queue. The binding can contain routing key.

Routing key: a routing rule that the virtual machine can use to determine how to route a specific message, that is, the key that the switch binds to the Queue.

Queue: also known as Message Queue, Message Queue saves messages and forwards them to consumers.

Message

Message, the data transmitted between the server and the application, is composed of properties and Body. Properties can modify messages, such as message priority, delay and other advanced features;, The Body is the content of the message Body.

In properties, we can set the message expiration time and whether to persist, or pass in the custom map attribute, which can also be obtained on the consumer side.

producer

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;

public class MessageProducer {
    public static void main(String[] args) throws Exception {
        //1. Create a ConnectionFactory and set it
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");

        //2. Create a connection through the connection factory
        Connection connection = factory.newConnection();

        //3. Create a Channel through Connection
        Channel channel = connection.createChannel();

        //4. Declare to use the default switch and take the queue name as the routing key
        String queueName = "msg_queue";

        /**
         * deliverMode When set to 2, it represents persistent messages
         * expiration It means to set the validity period of the message. If it is not received by the consumer for more than 10 seconds, it will be automatically deleted
         * headers Some custom attributes
         * */
        //5. Sending
        Map<String, Object> headers = new HashMap<String, Object>();
        headers.put("myhead1", "111");
        headers.put("myhead2", "222");

        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                .deliveryMode(2)
                .contentEncoding("UTF-8")
                .expiration("100000")
                .headers(headers)
                .build();
        String msg = "test message";
        channel.basicPublish("", queueName, properties, msg.getBytes());
        System.out.println("Send message : " + msg);

        //6. Close the connection
        channel.close();
        connection.close();

    }
}

consumer

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;

public class MessageConsumer {
    public static void main(String[] args) throws Exception{
        //1. Create a ConnectionFactory and set it
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        //2. Create a connection through the connection factory
        Connection connection = factory.newConnection();

        //3. Create a Channel through Connection
        Channel channel = connection.createChannel();

        //4. Declaration
        String queueName = "msg_queue";
        channel.queueDeclare(queueName, false, false, false, null);

        //5. Create consumers and receive messages
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                Map<String, Object> headers = properties.getHeaders();
                System.out.println("head: " + headers.get("myhead1"));
                System.out.println(" [x] Received '" + message + "'");
                System.out.println("expiration : "+ properties.getExpiration());
            }
        };

        //6. Set Channel consumer binding queue
        channel.basicConsume(queueName, true, consumer);
    }
}
Send message : test message

head: 111
 [x] Received 'test message'
100000

Exchange

1. Introduction

Exchange is a switch that receives messages and forwards them to the bound Queue according to the Routing key. Many messages enter exchange. Exchange distributes messages to different queues according to the Routing key.

2. Type

There are many types of Exchange in RabbitMQ. Different types and Message distribution mechanisms are as follows:

  • fanout: broadcast mode. This type of Exchange distributes messages to all queues bound to the Exchange.

  • direct: this type of Exchange will distribute messages to the specified Queue according to the Routing key.

  • Topic: this type of Exchange will distribute messages to the specified Queue according to the Routing key (fuzzy matching).

  • headers: the theme switch is a little similar, but different from the theme switch, the routing is based on the routing key, and the routing value of the header switch is based on the header data of the message. The routing key of the theme switch is only a string, while the header switch can be an integer and hash value

3. Properties

/**
* Declare an exchange, via an interface that allows the complete set of
* arguments.
* @see com.rabbitmq.client.AMQP.Exchange.Declare
* @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
* @param exchange the name of the exchange
* @param type the exchange type
* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
* @param autoDelete true if the server should delete the exchange when it is no longer in use
* @param internal true if the exchange is internal, i.e. can't be directly
* published to by a client.
* @param arguments other properties (construction arguments) for the exchange
* @return a declaration-confirm method to indicate the exchange was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Exchange.DeclareOk exchangeDeclare(String exchange,
                                 String type,boolean durable,
                                 boolean autoDelete,boolean internal,
                                 Map<String, Object> arguments) throws IOException;
  • Name: switch name

  • Switch Type: direct, topic, fax Type

  • Durability: whether persistence is required. true means persistence

  • Auto Delete: when the last one is bound to Exchange Automatically delete the Exchange when the queue on is deleted

  • Internal: whether the current Exchange is used for RabbitMQ internal use. The default is False

  • Arguments: extension parameter, used to extend the AMQP protocol for self-made and customized use

Recent hot article recommendations:

1.Finally got the IntelliJ IDEA activation code through the open source project. It's really fragrant!

2.I wrote a paragraph of logic in Java 8. My colleagues can't understand it. Try it..

3.Hanging Tomcat, the performance of underwow is very explosive!!

4.Chinese people open source a super easy-to-use Redis client, which is really fragrant!!

5.Java development manual (Songshan version) is the latest release. Download it quickly!

Feel good, don't forget to like + forward!

Posted by riddhi on Sun, 15 May 2022 13:47:58 +0300