Java coding to realize ActiveMQ communication

1, ActiveMQ port

ActiveMQ uses port 616 to provide JMS services

ActiveMQ uses port 8161 to provide management console services

2, JMS overall architecture

Java Message Service

Roughly speaking, Destination queue and topic

3, Basic steps of JMS development

Basic steps of JMS development

1: Create a connection factory

2: Create JMS connection through connection factory

3: Start JMS connection

4: Create JMS session through JMS connection

5: Create JMS destination (destination queue / topic)

6: Create a JMS producer or create a JMS consume r and set the destination

7: Create a JMS consumer or register a JMS message listener

8: Send or receive JMS messages

9: Close all JMS resources

4, Queue mode

Message producer code

public class JmsProduce {
    public static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616";
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws JMSException {
        //1. Create a connection factory and use the default user name and password according to the given url address
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2. Get the connection and start it
        Connection connection = factory.createConnection();
        connection.start();

        //3. Create a session
        //Two parameters: ① transaction ② sign in
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4. Create destination (specifically queue or topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5. Create the producer of the message
        MessageProducer messageProducer = session.createProducer(queue);
        //6. Generate three queues of messages sent to MQ by using messageProducer
        for(int i=0;i<3;i++){
            //7. Create message
            //text type
            TextMessage textMessage = session.createTextMessage("mag---" + i);
            //8. Send to mq through messageProducer
            messageProducer.send(textMessage);
        }
        //9. Close resources
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("---Publish message to mq---");
    }

Consumer code 1 (blocking consumer)

public class JmsConsumer {
    public static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616";
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws JMSException, IOException {
        //1. Create a connection factory and use the default user name and password according to the given url address
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2. Get the connection and start it
        Connection connection = factory.createConnection();
        connection.start();

        //3. Create a session
        //Two parameters: ① transaction ② sign in
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4. Create destination (specifically queue or topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5. Create consumers
        MessageConsumer messageConsumer = session.createConsumer(queue);
  /*
  Synchronous blocking mode (receive())
  The subscriber or receiver calls the MessageConsumer's receive() method to receive the message. The receive method receives the message before it can receive the message (or 
  Will be blocked until timeout
 */
   while (true){
            TextMessage testMessage =(TextMessage) messageConsumer.receive();
            if(testMessage!=null){
                System.out.println("**Consumers receive information:"+testMessage.getText());
            }else {
                break;
            }
        }
   }
        messageConsumer.close();
        session.close();
        connection.close();
}

Consumer code 2 (asynchronous listening consumer)

public class JmsConsumer {
    public static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616";
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws JMSException, IOException {
        //1. Create a connection factory and use the default user name and password according to the given url address
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2. Get the connection and start it
        Connection connection = factory.createConnection();
        connection.start();

        //3. Create a session
        //Two parameters: ① transaction ② sign in
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4. Create destination (specifically queue or topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5. Create consumers
        MessageConsumer messageConsumer = session.createConsumer(queue);

  /*
   Asynchronous non blocking mode listener (onMessage)
        The subscriber or consumer registers the message listener setMessageListener for the consumer through the created consumer object,
        When there is a message in the message, the system will automatically call the onMessage method of MessageListener class
        We only need to judge the message type in the onMessage method to get the message
   */
        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if(null != message && message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("Received text Message:"+(textMessage.getText()));
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        //If the console is not destroyed or added, the message will be processed before the program ends
        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();

    }
}

Console description

Number Of Pending Messages=Messages waiting to be consumed. This is the number of messages not out of the queue. The formula is=Total received-Total number of outgoing queues.
Number Of Consumers=Number of consumers, the number of consumers on the consumer side.
Messages Enqueued=The number of incoming messages, the total number of messages in the queue, including the number of messages out of the queue. This number only increases but not decreases.
Messages Dequeued=The number of messages out of the queue can be understood as the number consumed by consumers.
Summary:
When a message enters the queue, the message waiting to be consumed is 1 and the message entering the queue is 1.
When the message is consumed, the message waiting to be consumed is 0, the message entering the queue is 1, and the message leaving the queue is 1.
When there is another message, the message waiting to be consumed is 1 and the message entering the queue is 2.

Comparison of two consumption modes

Synchronous blocking mode

The subscriber or receiver uses the receive() method of MessageConsumer to receive the message. The receive method will be blocked until the message can be received (or timeout).

Asynchronous non blocking mode (listener onMessage())

The subscriber or receiver registers a message listener through the setMessageListener(MessageListener listener) of the MessageConsumer,

When the message arrives, the system will automatically call the onMessage(Message message) method of the listener MessageListener.

5, Topic queue mode

The characteristics of publish / subscribe messaging domain are as follows:

(1) The producer publishes messages to topic. Each message can have multiple consumers, belonging to a 1: N relationship;

(2) There is a temporal correlation between producers and consumers. Consumers who subscribe to a topic can only consume the messages published since its subscription.

(3) When the producer produces, the topic does not save the message. It is stateless and does not land. If no one subscribes to produce, it is a waste message. Therefore, generally start the consumer first and then the producer.

The JMS specification allows customers to create persistent subscriptions, which eases the time dependency requirement to a certain extent. Persistent subscriptions allow consumers to consume messages they send when they are not activated. In a word, it's like our wechat official account subscription

Producer code

public class JmsProduce {
    public static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616";
    public static final String TOPIC_NAME = "topic-atguigu";
    public static void main(String[] args) throws JMSException {
        //1. Create a connection factory and use the default user name and password according to the given url address
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2. Get the connection and start it
        Connection connection = factory.createConnection();
        connection.start();

        //3. Create a session
        //Two parameters: ① transaction ② sign in
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4. Create destination (specifically queue or topic)
        Topic topic = session.createTopic(TOPIC_NAME);
        //5. Create the producer of the message
        MessageProducer messageProducer = session.createProducer(topic);
        //6. Generate three queues of messages sent to MQ by using messageProducer
        for(int i=0;i<3;i++){
            //7. Create message
            TextMessage textMessage = session.createTextMessage("TOPIC_NAME---" + i);
            //8. Send to mq through messageProducer
            messageProducer.send(textMessage);
        }
        //9. Close resources
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("---TOPIC_NAME Publish message to mq---");
    }
}

Consumer code

public class JmsConsumer {
    public static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616";
    public static final String TOPIC_NAME = "topic-atguigu";
    public static void main(String[] args) throws JMSException, IOException {
        /*
        edit configurations --> allow parallel run A program starts multiple instances at the same time
         */
        System.out.println("----------Consumer 1---------------");
        //1. Create a connection factory and use the default user name and password according to the given url address
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2. Get the connection and start it
        Connection connection = factory.createConnection();
        connection.start();

        //3. Create a session
        //Two parameters: ① transaction ② sign in
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4. Create destination (specifically queue or topic)
        Topic topic = session.createTopic(TOPIC_NAME);
        //5. Create consumers
        MessageConsumer messageConsumer = session.createConsumer(topic);
  /*
   Asynchronous non blocking mode listener (onMessage)
        The subscriber or consumer registers the message listener setMessageListener for the consumer through the created consumer object,
        When there is a message in the message, the system will automatically call the onMessage method of MessageListener class
        We only need to judge the message type in the onMessage method to get the message
   */
        messageConsumer.setMessageListener(message -> {
                if(null != message && message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("Message received:"+((TextMessage) message).getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
        });
        System.in.read();//If the console is not destroyed or added, the message will be processed before the program ends
        messageConsumer.close();
        session.close();
        connection.close();

    }
}

6, Comparison of two modes

Comparison items

Working mode

Presence or absence of status

Transfer integrity

Processing efficiency

Queue queue

In the "load balancing" mode, if there is no consumer at present, the message will not be discarded; If there are multiple consumers, a message will only be sent to one of them and ask the consumer to ack the information

The Queue data will be saved as a file on the mq server by default. For example, Active MQ is usually saved in $AMQ_ Under home \ data \ Kr store \ data, it can also be configured as DB storage

Messages are not discarded

Since a message is only sent to one consumer, even if there are more consumers, the performance will not be significantly reduced. Of course, the specific performance of different message protocols is also different

Mode Topic queue mode

In the "subscribe publish" mode, if there is no subscriber at present, the message will be discarded. If there are multiple subscribers, these subscribers will receive the message

Stateless

If there is no subscriber, the message is discarded

Because the message should be copied according to the number of subscribers, the processing performance will decrease significantly with the increase of subscribers, and the performance differences of different message protocols should be combined

Tags: Java ActiveMQ programming language

Posted by goldberg on Sat, 21 May 2022 00:10:07 +0300