1. Message format of JMS
JMS message consists of the following three parts:
- Message header:
Each header field has corresponding getter and setter methods.
- Message properties:
If you need a value other than the header field, you can use the message properties.
- Message body:
The message types defined by JMS include TextMessage, MapMessage, BytesMessage, StreamMessage and ObjectMessage.
Message type:
2. Message reliability mechanism
Only after it is confirmed can it be considered that it has been successfully consumed. The successful consumption of messages usually includes three stages: customer receiving messages, customer processing messages and message confirmation. In a transactional session, confirmation occurs automatically when a transaction is committed. In a non transactional session, when a message is acknowledged depends on the acknowledgement mode when the session is created. This parameter has the following three optional values:
Session.AUTO_ACKNOWLEDGE: when the client successfully returns from the receive method, or from the messagelistener When the onmessage method returns successfully, the session automatically confirms the message received by the customer.
Session.CLIENT_ACKNOWLEDGE: the customer confirms the message through the acknowledge method of the message. It should be noted that in this mode, confirmation is performed at the session layer: confirming a consumed message will automatically confirm all messages consumed by the session. For example, if a message consumer consumes 10 messages and then confirms the fifth message, all 10 messages are confirmed.
Session.DUPS_ACKNOWLEDGE: this option is only the submission of the confirmation message of the slow session. If the JMS Provider fails, it may cause some duplicate messages. If it is a duplicate message, the JMS Provider must set the JMSRedelivered field of the message header to true.
2.1 priority
You can use message priority to instruct the JMS Provider to submit urgent messages first. The priority is divided into 10 levels, from 0 (lowest) to 9 (highest). If no priority is specified, the default level is 4. It should be noted that the JMS Provider does not necessarily guarantee that messages are submitted in the order of priority.
2.2 message expiration
You can set the message to expire after a certain time. By default, it will never expire.
2.3 temporary destination
Temporary destinations can be created through the createTemporaryQueue method and createTemporaryTopic method on the session. Their lifetime is limited to the time that the connection that created them remains. Only message consumers on the connection that created the temporary destination can extract messages from the temporary destination.
3. What is ActiveMQ
ActiveMQ is an open source implementation of a message middleware based on JMS (Java Message Service) specification. The design goal of ActiveMQ is to provide a standard, message oriented, application integrated message communication middleware that can span multiple languages and systems.
Official website address: ActiveMQ apache. org/
3.1 storage mode
- KahaDB storage: KahaDB is the default persistence policy. All messages are added to a log file in sequence. At the same time, there is an index file recording the storage address of these logs, and a transaction log for message reply operation. Is a solution specifically for message persistence, which optimizes the typical message usage patterns
characteristic:
1. Store messages in log form;
2. The message index is stored in B-Tree structure and can be updated quickly;
3. Fully support JMS transactions;
4. Supporting multiple recovery mechanisms, kahadb can limit the size of each data file. Does not represent the total data capacity.
2. AMQ mode: only applicable before version 5.3. AMQ is also a file database, and the message information is finally stored in the file. There will also be cached data in memory.
3. JDBC storage: using JDBC persistence, the database will create three tables by default. The functions of each table are as follows:
activemq_msgs: both queue and topic messages exist in this table
activemq_acks: stores the information of the persistent subscription and the message ID received by the last persistent subscription ActiveMQ_ Lock: similar to kahadb's lock file, it ensures that only one broker is accessing the database at a certain time
4. LevelDB storage: the persistence performance of LevelDB is higher than that of KahaDB, but the description of LevelDB on ActiveMQ official website: LevelDB is officially recommended and no longer supported. KahaDB is recommended
5.Memory message storage: as the name suggests, memory based message storage means that messages are stored in memory. Persistent = "false" means that persistent storage is not set, but directly stored in memory and set at the broker tag.
3.2 agreement
Protocol official website API: activemq.apache.org/configuring...
- Transmission Control Protocol (TCP):
1. This is the default Broker configuration. The Client listening port of TCP is 61616.
22. Before data transmission over the network, data must be serialized. Messages are serialized into byte streams through a called wire protocol. By default, ActiveMQ calls the wire protocol OpenWire. Its purpose is to promote efficiency and rapid data interaction on the network.
3. URI form of TCP connection: tcp://hostname:port?key=value&key=value
4. Advantages of TCP transmission:
(1) TCP protocol has high transmission reliability and strong stability
(2) efficiency: byte stream transmission, which is very efficient. (3) effectiveness and availability: it is widely used and supports any platform
- New I/O API Protocol(NIO)
- NIO protocol is similar to TCP protocol, but NIO focuses more on the underlying access operations. It allows developers to have more client calls to the same resource and more load on the server.
- Scenarios suitable for using NIO protocol:
(1) There may be a large number of clients to link to the Broker. Generally, a large number of clients to link to the Broker is limited by the number of threads of the operating system. Therefore, NIO implementation requires fewer threads to run than TCP, so NIO protocol is recommended
(2) There may be a slow network transmission for Broker. NIO provides better performance than TCP
3. NIO Connected URI Form: nio://hostname:port?key=value 4. Transport Connector Configuration example:
<transportConnectors> <transportConnector name="tcp" uri="tcp://localhost:61616?trace=true" /> <transportConnector name="nio" uri="nio://localhost:61618?trace=true" /> </transportConnectors>
- User Datagram Protocol(UDP)
1: Difference between UDP and TCP
(1)TCP is a delivery protocol of original stream, which means that data packets are guaranteed. In other words, data packets will not be copied and lost. UDP, on the other hand, does not guarantee the delivery of packets
(2)TCP is also a stable and reliable packet delivery protocol, which means that data will not be lost in the process of transmission. This ensures reliable transmission between transmission and reception. On the contrary, UDP is just a link protocol, so it has no reliability
2: It can be concluded from the above: TCP is used in stable and reliable scenarios; UDP is usually used in scenarios of fast data transmission and fear of data loss. When ActiveMQ passes through the firewall, it can only be used
3: URI form of UDP connection: udp://hostname:port?key=value
4: Example of Transport Connector configuration:
<transportConnectors> <transportConnector name="udp" uri="udp://localhost:61618?trace=true" /> </transportConnectors>
- Secure Sockets Layer Protocol (SSL)
1: URI form of connection: ssl://hostname:port?key=value 2: Example of Transport Connector configuration:
<transportConnectors> <transportConnector name="ssl" uri="ssl://localhost:61617?trace=true"/> </transportConnectors>
4. Case (Hello World)
Here we take windows as an example
Download address: activemq.apache.org/components/...
4.1 installation and startup
After decompression, directly execute bin / win64 / ActiveMQ bat
Insert picture description here
4.2 web console
http://localhost:8161/ Account password: admin/admin
4.3 web console
Modify the ActiveMQ configuration file ActiveMQ / conf / jetty XML jettyport node: after modifying the configuration file, save and restart the ActiveMQ service
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"> <!-- the default port number for the web console --> <property name="host" value="127.0.0.1"/> <property name="port" value="8161"/> </bean>
4.4 development
1. jar introduction:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
2. Sender :
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @program: activemq_01 * @ClassName Sender * @description: message sending * @author: muxiaonong * @create: 2020-10-02 13:01 * @Version 1.0 **/ public class Sender { public static void main(String[] args) throws Exception{ // 1. Get connection factory ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616" ); // 2. Get a connection to activeMq Connection connection = factory.createConnection(); // 3. Get session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 4. Find the destination and get the destination. The consumer will also get messages from this destination Queue queue = session.createQueue("user"); // 5.1 message Creator MessageProducer producer = session.createProducer(queue); // Consumer -- > consumer // Producer -- > Creator // 5.2. Create message for (int i = 0; i < 100; i++) { TextMessage textMessage = session.createTextMessage("hi: "+i); // 5.3 write message to destination producer.send(textMessage); Thread.sleep(1000); } // 6. Close the connection connection.close(); System.out.println("end....."); } }
3. Receiver :
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @program: activemq_01 * @ClassName Receiver * @description: messages receiving * @author: muxiaonong * @create: 2020-10-02 13:01 * @Version 1.0 **/ public class Receiver { public static void main(String[] args) throws Exception{ // 1. Get connection factory ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616" ); // 2. Get a connection to activeMq Connection connection = factory.createConnection(); connection.start(); // 3. Get session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 4. Find the destination and get the destination. The consumer will also get messages from this destination Destination queue = session.createQueue("user"); // 5 get message MessageConsumer consumer = session.createConsumer(queue); while(true){ TextMessage message = (TextMessage)consumer.receive(); System.out.println("message: "+message.getText()); } } }
Test results:
message: hi: 38 message: hi: 39 message: hi: 40 message: hi: 41 message: hi: 42 message: hi: 43 message: hi: 44 message: hi: 45
The web background shows that a consumer is connected and has consumed 68 messages, but there are no messages to be consumed in this queue
5. Summary
Today's MQ introductory tutorial series is here. Interested partners can try it. If they encounter any problems or have questions, they can leave a message below. Small farmers will reply to you as soon as they see it. MQ, as a message middleware, is often used in both interview and work, so it is necessary to understand and learn a technical point. Today's sharing is here, Thank you for watching. I'll see you in the next article. Come on!
6. Reference link
[01] ActiveMQ detailed introductory tutorial series - [ActiveMQ (II)]