SpringBoot2 integrates ActiveMQ (queue mode)

1. Message producer setup

1.1ActiveMQ download and installation

Official download address: http://activemq.apache.org/download-archives
This article uses ActiveMQ version number: 5.15.13 RELEASE
SpringBoot version 2.2.1 RELEASE
Operation method:
Unzip the downloaded MQ compressed package
Enter directory

According to their own operating system, enter the corresponding winXX folder and double-click to run ActiveMQ bat
Successful startup interface:

Default connection port: 61616
Default web console port: 8161
Default administrator account: admin Password: admin
Browser access console: http://127.0.0.1:8161

Click Manage ActiveMQ broker to enter the home page and click Queues to view the detailed queue information

1.2pom dependency

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
        <version>2.2.1.RELEASE</version>
    </dependency>
    <!--use springboot2.1+When, MQ Connection pool configuration dependency-->
	<dependency>
	    <groupId>org.messaginghub</groupId>
	    <artifactId>pooled-jms</artifactId>
	</dependency>
	<!--use springboot2.0+And below, maven Configuration dependency-->
	<dependency>
	    <groupId>org.apache.activemq</groupId>
	    <artifactId>activemq-pool</artifactId>
	</dependency>

1.3application.yml

spring:
  activemq:
    #MQ service address
    broker-url: tcp://127.0.0.1:61616
    #Whether to use built-in MQ
    in-memory: false
    #MQ connection pool
    pool:
      #Whether to enable the MQ connection pool to create a connection for each piece of data sent when it is false
      enabled: true
      #maximum connection
      max-connections: 10
      #Free time (default: 30 milliseconds)
      idle-timeout: 30000

1.4 start class to open message queue @ EnableJms

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;

@EnableJms//Open message queue
@SpringBootApplication
public class ProviderApplication {
	public static void main(String[] args) {
		SpringApplication.run(ProviderApplication.class, args);
	}

1.5 beanconfig create message queue (note whether the import is correct)

import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.jms.Queue;

@Configuration
public class BeanConfig {
    @Bean
    public Queue queue(){
        return new ActiveMQQueue("Queue1");
    }
}

1.6ProviderController

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.jms.Queue;

@RestController
public class ProviderController {

    //Inject into the queue where messages are stored
    @Autowired
    Queue queue;
    //Inject the tool class encapsulated by springboot
    @Autowired
    JmsMessagingTemplate jmsMessagingTemplate;

    @GetMapping("send")
    public String send(String msg){
        //Send messages to the message queue defined in the queue
        jmsMessagingTemplate.convertAndSend(queue,msg);

        //Sending a message using this method can omit step 5. When the method is called, it will automatically create a queue named Queue2
        jmsMessagingTemplate.convertAndSend("Queue2",msg);
        return "OK";
    }
}

1.7 after postman calls the interface


Successfully sent a message to the queue

Message producer Native Writing
//MQ service address
public static final String ACTIVEMQ_URL="tcp://127.0.0.1:61616";
//Bound queue name
public static final String QUEUE_NAME="myQueue";

public boolean sendMsg(String msg){
    //Create connection factory
    ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTIVEMQ_URL);
    try {
        //Get connection and start access
        Connection connection=activeMQConnectionFactory.createConnection();
        //Create a session (the first parameter transaction and the second parameter sign in)
        Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //Create destination (queue subject)
        Queue queue=session.createQueue(QUEUE_NAME);
        //Create message producer
        MessageProducer messageProducer = session.createProducer(queue);
        //Create sent message
        TextMessage textMessage = session.createTextMessage(msg);
        //Send to MQ by using message producer
        messageProducer.send(textMessage);
		//close resource
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("Message sent successfully");
        return true;
    } catch (JMSException e) {
        System.err.println("Message sending failed");
        return  false;
    }
}

2. Message consumer building process

Same as steps 1.2 ~ 1.5 of message producer

2.6ConsumerListener

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;

@Component
public class ConsumerListener {
    @Autowired
    JmsMessagingTemplate jmsMessagingTemplate;

    // Use JmsListener to listen on the queue. destination: the name of the queue to listen on
    @JmsListener(destination = "myQueue")
    public void handleMessage(String msg) {
        System.out.println("Message accepted successfully==>" + msg);
    }
}

View the number of MQ console consumers after startup
After the producer sends a message

Message consumer Native Writing
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;

public class ConsumerListener {

    public static void main(String[] args) {
        //MQ service address
        String ACTIVEMQ_URL="tcp://127.0.0.1:61616";
        //Listening queue
        String QUEUE_NAME="myQueue";
        //Create connection factory
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        try {
            //Get connection and start access
            Connection connection=activeMQConnectionFactory.createConnection();
            connection.start();
            //Create a session (the first parameter transaction and the second three sign in)
            Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //Create destination (queue subject)
            Queue queue=session.createQueue(QUEUE_NAME);
            //Create consumer
            MessageConsumer messageConsumer = session.createConsumer(queue);
            
            //Get message writing method 1
            /*
            while (true){
                TextMessage textMessage=(TextMessage)messageConsumer.receive();
                if(null!=textMessage)
                    System.out.println("Received message = = = > "+ textmessage. Gettext());
                else
                    break;
            }

            messageConsumer.close();
            session.close();
            connection.close();
            System.out.println("-1");
            */
            //Get message writing method 2 Get messages by listening
            messageConsumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if(null!=message&&message instanceof TextMessage){
                        TextMessage textMessage= (TextMessage) message;
                        try {
                            System.out.println("The listener received the message==>"+textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            //Keep the console on
            System.in.read();
            messageConsumer.close();
            session.close();
            connection.close();
        } catch (JMSException | IOException e) {
            System.err.println("Failed to get message");
            System.out.println("-2");
        }
    }
}

Basic steps of JMS development

  1. Create a connection factory
  2. Create JMS connection through connection factory
  3. Start JMS connection
  4. Create JMS session through connection
  5. Create JMS destination
  6. Create a JMS producer or create a JMS message and set the destination
  7. Create a JMS consumer or register a JMS message listener
  8. Send or accept JMS message s
  9. Close all JMS resources (connection,session,producer,consumer, etc.)

legend

matters needing attention:

  • Queue mode:
    If the producer generates 100 messages, two consumers will work together to receive the 100 messages at the same time, that is, each consumer receives 50 messages for processing and polling.
  • Topic:
    If the producer generates 100 messages, the consumer will not receive the 100 messages before subscribing to the topic.
    Only after a consumer subscribes to a topic message can the message generated by the producer be received and processed.
    If two more consumers subscribe to this topic message at the same time, when the producer generates 100 messages, the two consumers will receive the 100 messages at the same time.

Tags: ActiveMQ

Posted by X74SY on Thu, 19 May 2022 16:51:18 +0300