RabbitMQ and data synchronization

learning target

  • Learn about common MQ products
  • Understand RabbitMQ's 5 message model

1. Problems with search and commodity services

  • The original data of the commodity is stored in the database, and additions, deletions, modifications, and searches are completed in the database.
  • The data source of the search service is the index database. If the products in the database change, the index database data cannot be updated in time.
  • The product details are made static on the page, and the static page data will not change with the database products.

If we modify the price of the product in the background, the search page and product detail page still display the old price, which is obviously wrong. How to solve it?

There are two solutions here:

  • Option 1: Whenever a product is added, deleted or modified in the background, the index database data and static pages must be modified at the same time.

  • Option 2: The search service and the product page service provide external operation interfaces, and the interface is called after the addition, deletion and modification of products in the background.

The above two methods have the same serious problem: code coupling, search and product page services need to be embedded in the background service, which violates the independence principle of microservices.

1.2, message queue (MQ)

1.2.1. What is a message queue

Message queue, namely MQ, Message Queue.

Message queues are typical: producer, consumer model. Producers keep producing messages to the message queue, and consumers keep getting messages from the queue. Because the production and consumption of messages are asynchronous, and only care about the sending and receiving of messages, there is no intrusion of business logic, which realizes the decoupling of producers and consumers.

Combining the previous questions:

  • After the commodity service adds, deletes or modifies the commodity, there is no need to operate the index library or static page, just send a message, and do not care who receives the message.
  • The search service and the static page service receive the message and process the index library and the static page respectively.

If there are other systems that also rely on the data of commodity services in the future, you can also monitor the messages, and commodity services do not need any code modification.

1.2.2.AMQP and JMS

MQ is a model of message communication, and it is implemented concurrently. There are two mainstream ways to implement MQ: AMQP and JMS.

Differences and connections between the two:

  • JMS defines a unified interface to unify message operations; AMQP unifies the format of data interaction by specifying protocols
  • JMS restricts the use of the Java language; AMQP is just a protocol, not an implementation, so it is cross-language.
  • JMS specifies two message models; AMQP's message model is richer.

1.2.3. Common MQ products

  • ActiveMQ: based on JMS
  • RabbitMQ: Based on AMQP protocol, developed in erlang language, with good stability
  • RocketMQ: Based on JMS, Alibaba products, currently handed over to the Apache Foundation
  • Kafka: Distributed Messaging System, High Throughput

1.2.4.RabbitMQ

RabbitMQ is a message management system based on AMQP

Official website: http://www.rabbitmq.com/

Official tutorial: http://www.rabbitmq.com/getstarted.html

RabbitMQ is developed based on Erlang language:

1.3, download and installation

1.3.1. Download

Official website download address: http://www.rabbitmq.com/download.html

1.4, five message models

RabbitMQ provides 6 message models, but the 6th one is actually RPC, not MQ, so I won't learn it. Then there are only 5 left.

But in fact, 3, 4, and 5 belong to the subscription model, but the way of routing is different.

1.4.1. Import the demo project

We use a demo project to understand how RabbitMQ works:

Import project:

After importing:

1.4.2. Basic message model

illustrate

Official documentation states:

RabbitMQ is a Message Broker: it receives messages and delivers them.

You can think of it as a post office: when you deliver mail to a mailbox, you are pretty sure the postman will eventually deliver the mail to your recipient. Similarly, RabbitMQ can be a mailbox, post office, and postman at the same time.

The difference is: RabbitMQ does not deliver paper mail, but binary data

Basic message model diagram:

In the model shown above, there are the following concepts:

  • P: Producer, that is, the program that wants to send the message
  • C: Consumer: The receiver of the message, will always wait for the message to arrive.
  • queue: message queue, the red part in the figure. Similar to a mailbox, messages can be cached; producers deliver messages to it, and consumers get messages from it.

producer

Connection tool class:

public class ConnectionUtil {
    /**
     * Establish a connection to RabbitMQ
     * @return
     * @throws Exception
     */
    public static Connection getConnection() throws Exception {
        //Define a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        //Set service address
        factory.setHost("192.168.56.101");
        //port
        factory.setPort(5672);
        //Set account information, username, password, vhost
        factory.setVirtualHost("/leyou");
        factory.setUsername("leyou");
        factory.setPassword("leyou");
        // Get connection through project
        Connection connection = factory.newConnection();
        return connection;
    }
}

The producer sends a message:

public class Send {

    private final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] argv) throws Exception {
        // get connected
        Connection connection = ConnectionUtil.getConnection();
        // Create a channel from the connection, and use the channel to complete message-related operations
        Channel channel = connection.createChannel();
        // declare (create) a queue
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // Message content
        String message = "Hello World!";
        // Send a message to the specified queue
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        
        System.out.println(" [x] Sent '" + message + "'");

        //Close channels and connections
        channel.close();
        connection.close();
    }
}

console:

View messages in the web console

Entering the queue page, you can see that a new queue has been created: simple_queue

Click the queue name to enter the details page, and you can view the messages:

Viewing the message in the console does not consume the message, so the message is still there.

Consumers get messages

public class Recv {
    private final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] argv) throws Exception {
        // get connected
        Connection connection = ConnectionUtil.getConnection();
        // Create a channel
        Channel channel = connection.createChannel();
        // declare queue
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // Define the consumer of the queue
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // Get the message and process it. This method is similar to event monitoring. If there is a message, it will be automatically called
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body is the message body
                String msg = new String(body);
                System.out.println(" [x] received : " + msg + "!");
            }
        };
        // Listening to the queue, the second parameter: whether to automatically confirm the message.
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

console:

At this point, the message in the queue is gone:

Consumer message confirmation mechanism

As can be seen from the previous case, once the message is received by the consumer, the message in the queue will be deleted.

So the question is: how does RabbitMQ know that the message has been received?

This is achieved through the message confirmation mechanism (Acknowlege). When the consumer gets the message, it will send a receipt ACK to RabbitMQ to inform that the message has been received. However, this kind of receipt ACK is divided into two situations:

  • Automatic ACK: Once the message is received, the consumer automatically sends an ACK
  • Manual ACK: After the message is received, the ACK will not be sent, and it needs to be called manually

Which do you think is better?

It depends on the importance of the message:

  • If the message is not too important and loss doesn't matter, then automatic ACK is more convenient
  • If the message is very important, it cannot be lost. Then it is best to manually ACK after the consumption is completed, otherwise it will automatically ACK after receiving the message, and RabbitMQ will delete the message from the queue. If the consumer goes down at this point, the message is lost.

Our previous tests are all automatic ACK, if we want to manually ACK, we need to change our code:

public class Recv2 {
    private final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] argv) throws Exception {
        // get connected
        Connection connection = ConnectionUtil.getConnection();
        // Create a channel
        final Channel channel = connection.createChannel();
        // declare queue
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // Define the consumer of the queue
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // Get the message and process it. This method is similar to event monitoring. If there is a message, it will be automatically called
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body is the message body
                String msg = new String(body);
                System.out.println(" [x] received : " + msg + "!");
                // ACK manually
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // Listen to the queue, the second parameter is false, manually ACK
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

Notice the last line of code:

channel.basicConsume(QUEUE_NAME, false, consumer);

If the second parameter is true, ACK will be done automatically; if false, manual ACK will be required. Method declaration:

1.4.3.work message model

illustrate

In the basic model just now, one producer, one consumer, the messages produced are directly consumed by consumers. easier.

Work queues, also known as (Task queues), the task model.

When message processing is time-consuming, the speed of message production may be far greater than the speed of message consumption. If things go on like this, the news will pile up more and more and cannot be processed in time. At this point, the work model can be used: let multiple consumers bind to a queue and consume the messages in the queue together. Once the message in the queue is consumed, it will disappear, so the task will not be repeated.

Role:

  • P: Producer: Publisher of the task
  • C1: Consumer, receives the task and completes the task, assuming the completion speed is slower
  • C2: Consumer 2: Pick up the task and complete the task, assuming the completion speed is fast

producer

The producer is almost the same as in case 1:

public class Send {
    private final static String QUEUE_NAME = "test_work_queue";

    public static void main(String[] argv) throws Exception {
        // get connected
        Connection connection = ConnectionUtil.getConnection();
        // get channel
        Channel channel = connection.createChannel();
        // declare queue
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // Cyclic release tasks
        for (int i = 0; i < 50; i++) {
            // Message content
            String message = "task .. " + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");

            Thread.sleep(i * 2);
        }
        // Close channels and connections
        channel.close();
        connection.close();
    }
}

But here we are sending 50 messages in a loop.

Consumer 1

Consumer 2

It is basically similar to Consumer 1, except that the consumption time is not set.

Here is a simulation that some consumers are fast and some are slower.

Next, the two consumers are started together, and then send 50 messages:

It can be found that the two consumers each consume 25 messages, and they are different from each other, which realizes the distribution of tasks.

Able people should do more work

Is there a problem with the implementation just now?

  • Consumer 1 is less efficient than consumer 2, and a task takes longer
  • However, both of them end up consuming the same number of messages
  • Consumer 2 is idle for a lot of time, and consumer 1 is always busy

The current state is to distribute tasks evenly. The correct approach should be that the faster the consumption, the more consumption.

How to achieve it?

We can modify the settings so that consumers only receive one message at the same time, so that no more messages will be received until the processing is completed, so that people who process fast will receive more messages:

Test again:

1.4.4. Subscription Model Classification

Schematic diagram of subscription model:

In the first 2 cases, there are only 3 roles:

  • P: Producer, that is, the program that wants to send the message
  • C: Consumer: The receiver of the message, will always wait for the message to arrive.
  • queue: message queue, the red part in the figure. Similar to a mailbox, messages can be cached; producers deliver messages to it, and consumers get messages from it.

In the subscription model, an exchange role is added, and the process is slightly changed:

  • P: The producer, that is, the program that wants to send the message, but no longer sends it to the queue, but sends it to X (exchange)
  • C: The consumer, the receiver of the message, will always wait for the message to arrive.
  • Queue: message queue, receiving messages and buffering messages.
  • Exchange: Exchange, X in the picture. On the one hand, it receives messages sent by producers. On the other hand, know how to handle messages, such as delivering to a particular queue, delivering to all queues, or discarding messages. Exactly how it works depends on the type of Exchange. Exchange s are of the following 3 types:
    • Fanout: broadcast, deliver the message to all queues bound to the exchange
    • Direct: Direct, send the message to the queue that matches the specified routing key
    • Topic: wildcard, send the message to the queue that conforms to the routing pattern (routing pattern)

Exchange (exchange) is only responsible for forwarding messages and does not have the ability to store messages, so if there is no queue bound to Exchange, or there is no queue that conforms to the routing rules, then the message will be lost!

1.4.5. Subscription Model - Fanout

Fanout, also known as broadcast.

Flow Description

flow chart:

In broadcast mode, the message sending process is as follows:

  • 1) There can be multiple consumers
  • 2) Each consumer has its own queue (queue)
  • 3) Each queue must be bound to an Exchange (exchange)
  • 4) The message sent by the producer can only be sent to the switch. The switch decides which queue to send to, but the producer cannot decide.
  • 5) The exchange sends the message to all bound queues
  • 6) The consumers of the queue can get the message. Implement a message to be consumed by multiple consumers

producer

Two changes:

  • 1) Declare Exchange, no longer declare Queue
  • 2) Send messages to Exchange, no longer to Queue
public class Send {

    private final static String EXCHANGE_NAME = "fanout_exchange_test";

    public static void main(String[] argv) throws Exception {
        // get connected
        Connection connection = ConnectionUtil.getConnection();
        // get channel
        Channel channel = connection.createChannel();
        
        // Declare exchange, specifying the type as fanout
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        
        // Message content
        String message = "Hello everyone";
        // Post message to Exchange
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [producer] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

Consumer 1

public class Recv {
    private final static String QUEUE_NAME = "fanout_exchange_queue_1";

    private final static String EXCHANGE_NAME = "fanout_exchange_test";

    public static void main(String[] argv) throws Exception {
        // get connected
        Connection connection = ConnectionUtil.getConnection();
        // get channel
        Channel channel = connection.createChannel();
        // declare queue
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // bind queue to exchange
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // Define the consumer of the queue
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // Get the message and process it. This method is similar to event monitoring. If there is a message, it will be automatically called
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body is the message body
                String msg = new String(body);
                System.out.println(" [Consumer 1] received : " + msg + "!");
            }
        };
        // Listen to the queue and automatically return to completion
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

Pay attention to the code: the queue needs to be bound to the switch

Consumer 2

public class Recv2 {
    private final static String QUEUE_NAME = "fanout_exchange_queue_2";

    private final static String EXCHANGE_NAME = "fanout_exchange_test";

    public static void main(String[] argv) throws Exception {
        // get connected
        Connection connection = ConnectionUtil.getConnection();
        // get channel
        Channel channel = connection.createChannel();
        // declare queue
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // bind queue to exchange
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        
        // Define the consumer of the queue
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // Get the message and process it. This method is similar to event monitoring. If there is a message, it will be automatically called
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body is the message body
                String msg = new String(body);
                System.out.println(" [Consumer 2] received : " + msg + "!");
            }
        };
        // Listen to the queue, manually return to complete
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

test

We run two consumers and then send 1 message:

1.4.6. Subscription Model - Direct

illustrate

In Fanout mode, a message will be consumed by all subscribed queues. However, in some scenarios, we want different messages to be consumed by different queues. At this time, the Direct type of Exchange will be used.

Under the Direct model:

  • The binding between the queue and the switch cannot be any binding, but a RoutingKey (routing key) must be specified.
  • The sender of the message must also specify the RoutingKey of the message when sending a message to Exchange.
  • Exchange no longer sends messages to each bound queue, but judges based on the Routing key of the message. Only when the Routing key of the queue is exactly the same as the Routing key of the message will the message be received

flow chart:

Diagram:

  • P: Producer, sends a message to Exchange. When sending a message, a routing key is specified.
  • X: Exchange (exchange), receives the message from the producer, and then submits the message to the queue that exactly matches the routing key
  • C1: Consumer, whose queue specifies a message that requires routing key to be error
  • C2: Consumer, whose queue specifies messages that require routing key s as info, error, warning

producer

Here we simulate the addition, deletion and modification of products, and the RoutingKey s for sending messages are: insert, update, delete

public class Send {
    private final static String EXCHANGE_NAME = "direct_exchange_test";

    public static void main(String[] argv) throws Exception {
        // get connected
        Connection connection = ConnectionUtil.getConnection();
        // get channel
        Channel channel = connection.createChannel();
        // Declare exchange, specifying the type as direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // Message content
        String message = "product added, id = 1001";
        // Send a message, and specify the routing key as: insert, which represents a new product
        channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());
        System.out.println(" [Goods and Services:] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

Consumer 1

We assume here that Consumer 1 receives only two types of messages: update items and delete items.

public class Recv {
    private final static String QUEUE_NAME = "direct_exchange_queue_1";
    private final static String EXCHANGE_NAME = "direct_exchange_test";

    public static void main(String[] argv) throws Exception {
        // get connected
        Connection connection = ConnectionUtil.getConnection();
        // get channel
        Channel channel = connection.createChannel();
        // declare queue
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        // Bind the queue to the exchange, and specify the routing key to be subscribed to. Suppose update and delete messages are required here
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");

        // Define the consumer of the queue
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // Get the message and process it. This method is similar to event monitoring. If there is a message, it will be automatically called
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body is the message body
                String msg = new String(body);
                System.out.println(" [Consumer 1] received : " + msg + "!");
            }
        };
        // Listening to the queue, automatic ACK
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

Consumer 2

We assume here that consumer 2 receives all types of messages: new items, updated items, and deleted items.

public class Recv2 {
    private final static String QUEUE_NAME = "direct_exchange_queue_2";
    private final static String EXCHANGE_NAME = "direct_exchange_test";

    public static void main(String[] argv) throws Exception {
        // get connected
        Connection connection = ConnectionUtil.getConnection();
        // get channel
        Channel channel = connection.createChannel();
        // declare queue
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        // Bind the queue to the exchange, and specify the routing key to be subscribed to. Subscribe insert, update, delete
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");

        // Define the consumer of the queue
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // Get the message and process it. This method is similar to event monitoring. If there is a message, it will be automatically called
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body is the message body
                String msg = new String(body);
                System.out.println(" [Consumer 2] received : " + msg + "!");
            }
        };
        // Listening to the queue, automatic ACK
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

test

We sent the RoutingKey s added, deleted, and modified respectively, and found the results:

1.4.7. Subscription Model - Topic

illustrate

Compared with Direct, Topic-type Exchange can route messages to different queues according to RoutingKey. It's just that the Topic type Exchange allows the queue to use wildcards when binding the Routing key!

Routingkey is generally composed of one or more words, and multiple words are separated by ".", for example: item.insert

Wildcard rules:

​ #: matches one or more words

​*: Match no more than no less exactly 1 word

Example:

​ audit.#: can match audit.irs.corporate or audit.irs

​ audit.*: can only match audit.irs

Icon:

explain:

  • Red Queue: usa.# is bound, so any routing key that starts with usa. will be matched to
  • Yellow Queue: #.news is bound, so any routing key ending with .news will be matched

producer

Using topic-type Exchange, there are three routing key s for sending messages: item.isnert, item.update, item.delete:

public class Send {
    private final static String EXCHANGE_NAME = "topic_exchange_test";

    public static void main(String[] argv) throws Exception {
        // get connected
        Connection connection = ConnectionUtil.getConnection();
        // get channel
        Channel channel = connection.createChannel();
        // Declare exchange, specifying type as topic
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        // Message content
        String message = "new product : id = 1001";
        // Send a message, and specify the routing key as: insert, which represents a new product
        channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes());
        System.out.println(" [Goods and Services:] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

Consumer 1

We assume here that consumer 1 only receives two types of messages: update items and delete items

public class Recv {
    private final static String QUEUE_NAME = "topic_exchange_queue_1";
    private final static String EXCHANGE_NAME = "topic_exchange_test";

    public static void main(String[] argv) throws Exception {
        // get connected
        Connection connection = ConnectionUtil.getConnection();
        // get channel
        Channel channel = connection.createChannel();
        // declare queue
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        // Bind the queue to the exchange, and specify the routing key to be subscribed to. Need update, delete
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");

        // Define the consumer of the queue
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // Get the message and process it. This method is similar to event monitoring. If there is a message, it will be automatically called
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body is the message body
                String msg = new String(body);
                System.out.println(" [Consumer 1] received : " + msg + "!");
            }
        };
        // Listening to the queue, automatic ACK
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

Consumer 2

We assume here that consumer 2 receives all types of messages: new items, updated items, and deleted items.

/**
 * Consumer 2
 */
public class Recv2 {
    private final static String QUEUE_NAME = "topic_exchange_queue_2";
    private final static String EXCHANGE_NAME = "topic_exchange_test";

    public static void main(String[] argv) throws Exception {
        // get connected
        Connection connection = ConnectionUtil.getConnection();
        // get channel
        Channel channel = connection.createChannel();
        // declare queue
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        // Bind the queue to the exchange and specify the routing key to be subscribed to. Subscribe insert, update, delete
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");

        // Define the consumer of the queue
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // Get the message and process it. This method is similar to event monitoring. If there is a message, it will be automatically called
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                    byte[] body) throws IOException {
                // body is the message body
                String msg = new String(body);
                System.out.println(" [Consumer 2] received : " + msg + "!");
            }
        };
        // Listening to the queue, automatic ACK
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

1.5. Persistence

How to avoid message loss?

1) The ACK mechanism of the consumer. Consumers can be prevented from losing messages.

2) However, if MQ goes down before the consumer consumes it, the message is gone.

Is it possible to persist messages?

To persist messages, the premise is that both queues and Exchange are persistent.

switch persistence

queue persistence

message persistence

Tags: Java JavaEE AMQP

Posted by andynightingale on Mon, 23 May 2022 06:25:09 +0300