1, ActiveMQ port
ActiveMQ uses port 616 to provide JMS services
ActiveMQ uses port 8161 to provide management console services
2, JMS overall architecture
Java Message Service
Roughly speaking, Destination queue and topic
3, Basic steps of JMS development
Basic steps of JMS development
1: Create a connection factory
2: Create JMS connection through connection factory
3: Start JMS connection
4: Create JMS session through JMS connection
5: Create JMS destination (destination queue / topic)
6: Create a JMS producer or create a JMS consume r and set the destination
7: Create a JMS consumer or register a JMS message listener
8: Send or receive JMS messages
9: Close all JMS resources
4, Queue mode
Message producer code
public class JmsProduce { public static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616"; public static final String QUEUE_NAME = "queue01"; public static void main(String[] args) throws JMSException { //1. Create a connection factory and use the default user name and password according to the given url address ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //2. Get the connection and start it Connection connection = factory.createConnection(); connection.start(); //3. Create a session //Two parameters: ① transaction ② sign in Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4. Create destination (specifically queue or topic) Queue queue = session.createQueue(QUEUE_NAME); //5. Create the producer of the message MessageProducer messageProducer = session.createProducer(queue); //6. Generate three queues of messages sent to MQ by using messageProducer for(int i=0;i<3;i++){ //7. Create message //text type TextMessage textMessage = session.createTextMessage("mag---" + i); //8. Send to mq through messageProducer messageProducer.send(textMessage); } //9. Close resources messageProducer.close(); session.close(); connection.close(); System.out.println("---Publish message to mq---"); }
Consumer code 1 (blocking consumer)
public class JmsConsumer { public static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616"; public static final String QUEUE_NAME = "queue01"; public static void main(String[] args) throws JMSException, IOException { //1. Create a connection factory and use the default user name and password according to the given url address ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //2. Get the connection and start it Connection connection = factory.createConnection(); connection.start(); //3. Create a session //Two parameters: ① transaction ② sign in Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4. Create destination (specifically queue or topic) Queue queue = session.createQueue(QUEUE_NAME); //5. Create consumers MessageConsumer messageConsumer = session.createConsumer(queue); /* Synchronous blocking mode (receive()) The subscriber or receiver calls the MessageConsumer's receive() method to receive the message. The receive method receives the message before it can receive the message (or Will be blocked until timeout */ while (true){ TextMessage testMessage =(TextMessage) messageConsumer.receive(); if(testMessage!=null){ System.out.println("**Consumers receive information:"+testMessage.getText()); }else { break; } } } messageConsumer.close(); session.close(); connection.close(); }
Consumer code 2 (asynchronous listening consumer)
public class JmsConsumer { public static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616"; public static final String QUEUE_NAME = "queue01"; public static void main(String[] args) throws JMSException, IOException { //1. Create a connection factory and use the default user name and password according to the given url address ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //2. Get the connection and start it Connection connection = factory.createConnection(); connection.start(); //3. Create a session //Two parameters: ① transaction ② sign in Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4. Create destination (specifically queue or topic) Queue queue = session.createQueue(QUEUE_NAME); //5. Create consumers MessageConsumer messageConsumer = session.createConsumer(queue); /* Asynchronous non blocking mode listener (onMessage) The subscriber or consumer registers the message listener setMessageListener for the consumer through the created consumer object, When there is a message in the message, the system will automatically call the onMessage method of MessageListener class We only need to judge the message type in the onMessage method to get the message */ messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if(null != message && message instanceof TextMessage){ TextMessage textMessage = (TextMessage) message; try { System.out.println("Received text Message:"+(textMessage.getText())); } catch (JMSException e) { e.printStackTrace(); } } } }); //If the console is not destroyed or added, the message will be processed before the program ends System.in.read(); messageConsumer.close(); session.close(); connection.close(); } }
Console description
Number Of Pending Messages=Messages waiting to be consumed. This is the number of messages not out of the queue. The formula is=Total received-Total number of outgoing queues. Number Of Consumers=Number of consumers, the number of consumers on the consumer side. Messages Enqueued=The number of incoming messages, the total number of messages in the queue, including the number of messages out of the queue. This number only increases but not decreases. Messages Dequeued=The number of messages out of the queue can be understood as the number consumed by consumers. Summary: When a message enters the queue, the message waiting to be consumed is 1 and the message entering the queue is 1. When the message is consumed, the message waiting to be consumed is 0, the message entering the queue is 1, and the message leaving the queue is 1. When there is another message, the message waiting to be consumed is 1 and the message entering the queue is 2.
Comparison of two consumption modes
Synchronous blocking mode
The subscriber or receiver uses the receive() method of MessageConsumer to receive the message. The receive method will be blocked until the message can be received (or timeout).
Asynchronous non blocking mode (listener onMessage())
The subscriber or receiver registers a message listener through the setMessageListener(MessageListener listener) of the MessageConsumer,
When the message arrives, the system will automatically call the onMessage(Message message) method of the listener MessageListener.
5, Topic queue mode
The characteristics of publish / subscribe messaging domain are as follows:
(1) The producer publishes messages to topic. Each message can have multiple consumers, belonging to a 1: N relationship;
(2) There is a temporal correlation between producers and consumers. Consumers who subscribe to a topic can only consume the messages published since its subscription.
(3) When the producer produces, the topic does not save the message. It is stateless and does not land. If no one subscribes to produce, it is a waste message. Therefore, generally start the consumer first and then the producer.
The JMS specification allows customers to create persistent subscriptions, which eases the time dependency requirement to a certain extent. Persistent subscriptions allow consumers to consume messages they send when they are not activated. In a word, it's like our wechat official account subscription
Producer code
public class JmsProduce { public static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616"; public static final String TOPIC_NAME = "topic-atguigu"; public static void main(String[] args) throws JMSException { //1. Create a connection factory and use the default user name and password according to the given url address ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //2. Get the connection and start it Connection connection = factory.createConnection(); connection.start(); //3. Create a session //Two parameters: ① transaction ② sign in Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4. Create destination (specifically queue or topic) Topic topic = session.createTopic(TOPIC_NAME); //5. Create the producer of the message MessageProducer messageProducer = session.createProducer(topic); //6. Generate three queues of messages sent to MQ by using messageProducer for(int i=0;i<3;i++){ //7. Create message TextMessage textMessage = session.createTextMessage("TOPIC_NAME---" + i); //8. Send to mq through messageProducer messageProducer.send(textMessage); } //9. Close resources messageProducer.close(); session.close(); connection.close(); System.out.println("---TOPIC_NAME Publish message to mq---"); } }
Consumer code
public class JmsConsumer { public static final String ACTIVEMQ_URL = "tcp://192.168.16.106:61616"; public static final String TOPIC_NAME = "topic-atguigu"; public static void main(String[] args) throws JMSException, IOException { /* edit configurations --> allow parallel run A program starts multiple instances at the same time */ System.out.println("----------Consumer 1---------------"); //1. Create a connection factory and use the default user name and password according to the given url address ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //2. Get the connection and start it Connection connection = factory.createConnection(); connection.start(); //3. Create a session //Two parameters: ① transaction ② sign in Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4. Create destination (specifically queue or topic) Topic topic = session.createTopic(TOPIC_NAME); //5. Create consumers MessageConsumer messageConsumer = session.createConsumer(topic); /* Asynchronous non blocking mode listener (onMessage) The subscriber or consumer registers the message listener setMessageListener for the consumer through the created consumer object, When there is a message in the message, the system will automatically call the onMessage method of MessageListener class We only need to judge the message type in the onMessage method to get the message */ messageConsumer.setMessageListener(message -> { if(null != message && message instanceof TextMessage){ TextMessage textMessage = (TextMessage) message; try { System.out.println("Message received:"+((TextMessage) message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }); System.in.read();//If the console is not destroyed or added, the message will be processed before the program ends messageConsumer.close(); session.close(); connection.close(); } }
6, Comparison of two modes
Comparison items | Working mode | Presence or absence of status | Transfer integrity | Processing efficiency |
Queue queue | In the "load balancing" mode, if there is no consumer at present, the message will not be discarded; If there are multiple consumers, a message will only be sent to one of them and ask the consumer to ack the information | The Queue data will be saved as a file on the mq server by default. For example, Active MQ is usually saved in $AMQ_ Under home \ data \ Kr store \ data, it can also be configured as DB storage | Messages are not discarded | Since a message is only sent to one consumer, even if there are more consumers, the performance will not be significantly reduced. Of course, the specific performance of different message protocols is also different |
Mode Topic queue mode | In the "subscribe publish" mode, if there is no subscriber at present, the message will be discarded. If there are multiple subscribers, these subscribers will receive the message | Stateless | If there is no subscriber, the message is discarded | Because the message should be copied according to the number of subscribers, the processing performance will decrease significantly with the increase of subscribers, and the performance differences of different message protocols should be combined |