1. Message producer setup
1.1ActiveMQ download and installation
Official download address: http://activemq.apache.org/download-archives
This article uses ActiveMQ version number: 5.15.13 RELEASE
SpringBoot version 2.2.1 RELEASE
Operation method:
Unzip the downloaded MQ compressed package
Enter directory
According to their own operating system, enter the corresponding winXX folder and double-click to run ActiveMQ bat
Successful startup interface:
Default connection port: 61616
Default web console port: 8161
Default administrator account: admin Password: admin
Browser access console: http://127.0.0.1:8161
Click Manage ActiveMQ broker to enter the home page and click Queues to view the detailed queue information
1.2pom dependency
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> <version>2.2.1.RELEASE</version> </dependency> <!--use springboot2.1+When, MQ Connection pool configuration dependency--> <dependency> <groupId>org.messaginghub</groupId> <artifactId>pooled-jms</artifactId> </dependency> <!--use springboot2.0+And below, maven Configuration dependency--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency>
1.3application.yml
spring: activemq: #MQ service address broker-url: tcp://127.0.0.1:61616 #Whether to use built-in MQ in-memory: false #MQ connection pool pool: #Whether to enable the MQ connection pool to create a connection for each piece of data sent when it is false enabled: true #maximum connection max-connections: 10 #Free time (default: 30 milliseconds) idle-timeout: 30000
1.4 start class to open message queue @ EnableJms
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.jms.annotation.EnableJms; @EnableJms//Open message queue @SpringBootApplication public class ProviderApplication { public static void main(String[] args) { SpringApplication.run(ProviderApplication.class, args); }
1.5 beanconfig create message queue (note whether the import is correct)
import org.apache.activemq.command.ActiveMQQueue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.jms.Queue; @Configuration public class BeanConfig { @Bean public Queue queue(){ return new ActiveMQQueue("Queue1"); } }
1.6ProviderController
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.jms.Queue; @RestController public class ProviderController { //Inject into the queue where messages are stored @Autowired Queue queue; //Inject the tool class encapsulated by springboot @Autowired JmsMessagingTemplate jmsMessagingTemplate; @GetMapping("send") public String send(String msg){ //Send messages to the message queue defined in the queue jmsMessagingTemplate.convertAndSend(queue,msg); //Sending a message using this method can omit step 5. When the method is called, it will automatically create a queue named Queue2 jmsMessagingTemplate.convertAndSend("Queue2",msg); return "OK"; } }
1.7 after postman calls the interface
Successfully sent a message to the queue
Message producer Native Writing
//MQ service address public static final String ACTIVEMQ_URL="tcp://127.0.0.1:61616"; //Bound queue name public static final String QUEUE_NAME="myQueue"; public boolean sendMsg(String msg){ //Create connection factory ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTIVEMQ_URL); try { //Get connection and start access Connection connection=activeMQConnectionFactory.createConnection(); //Create a session (the first parameter transaction and the second parameter sign in) Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //Create destination (queue subject) Queue queue=session.createQueue(QUEUE_NAME); //Create message producer MessageProducer messageProducer = session.createProducer(queue); //Create sent message TextMessage textMessage = session.createTextMessage(msg); //Send to MQ by using message producer messageProducer.send(textMessage); //close resource messageProducer.close(); session.close(); connection.close(); System.out.println("Message sent successfully"); return true; } catch (JMSException e) { System.err.println("Message sending failed"); return false; } }
2. Message consumer building process
Same as steps 1.2 ~ 1.5 of message producer
2.6ConsumerListener
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.annotation.JmsListener; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Component; @Component public class ConsumerListener { @Autowired JmsMessagingTemplate jmsMessagingTemplate; // Use JmsListener to listen on the queue. destination: the name of the queue to listen on @JmsListener(destination = "myQueue") public void handleMessage(String msg) { System.out.println("Message accepted successfully==>" + msg); } }
View the number of MQ console consumers after startup
After the producer sends a message
Message consumer Native Writing
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.io.IOException; public class ConsumerListener { public static void main(String[] args) { //MQ service address String ACTIVEMQ_URL="tcp://127.0.0.1:61616"; //Listening queue String QUEUE_NAME="myQueue"; //Create connection factory ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTIVEMQ_URL); try { //Get connection and start access Connection connection=activeMQConnectionFactory.createConnection(); connection.start(); //Create a session (the first parameter transaction and the second three sign in) Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //Create destination (queue subject) Queue queue=session.createQueue(QUEUE_NAME); //Create consumer MessageConsumer messageConsumer = session.createConsumer(queue); //Get message writing method 1 /* while (true){ TextMessage textMessage=(TextMessage)messageConsumer.receive(); if(null!=textMessage) System.out.println("Received message = = = > "+ textmessage. Gettext()); else break; } messageConsumer.close(); session.close(); connection.close(); System.out.println("-1"); */ //Get message writing method 2 Get messages by listening messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if(null!=message&&message instanceof TextMessage){ TextMessage textMessage= (TextMessage) message; try { System.out.println("The listener received the message==>"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }); //Keep the console on System.in.read(); messageConsumer.close(); session.close(); connection.close(); } catch (JMSException | IOException e) { System.err.println("Failed to get message"); System.out.println("-2"); } } }
Basic steps of JMS development
- Create a connection factory
- Create JMS connection through connection factory
- Start JMS connection
- Create JMS session through connection
- Create JMS destination
- Create a JMS producer or create a JMS message and set the destination
- Create a JMS consumer or register a JMS message listener
- Send or accept JMS message s
- Close all JMS resources (connection,session,producer,consumer, etc.)
legend
matters needing attention:
- Queue mode:
If the producer generates 100 messages, two consumers will work together to receive the 100 messages at the same time, that is, each consumer receives 50 messages for processing and polling. - Topic:
If the producer generates 100 messages, the consumer will not receive the 100 messages before subscribing to the topic.
Only after a consumer subscribes to a topic message can the message generated by the producer be received and processed.
If two more consumers subscribe to this topic message at the same time, when the producer generates 100 messages, the two consumers will receive the 100 messages at the same time.