RocketMQ Architecture Principle

Original address: RocketMQ sample - always on the way

1 basic example

In the basic example, we provide the following function scenarios:

  • Use RocketMQ to send three types of messages: synchronous message, asynchronous message and one-way message. The first two messages are reliable because there will be a response whether the transmission is successful or not.
  • Use RocketMQ to consume the received messages.

1.1 adding dependencies:

maven:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.0</version>
</dependency>

gradle

compile 'org.apache.rocketmq:rocketmq-client:4.3.0'

1.2 message sending

1. The Producer sends a synchronization message

This reliable synchronous sending method is widely used, such as important message notification and short message notification.

public class SyncProducer {
	public static void main(String[] args) throws Exception {
    	// Instantiate message Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    	// Set the address of NameServer
    	producer.setNamesrvAddr("localhost:9876");
    	// Start Producer instance
        producer.start();
    	for (int i = 0; i < 100; i++) {
    	    // Create a message and specify Topic, Tag and message body
    	    Message msg = new Message("TopicTest" /* Topic */,
        	"TagA" /* Tag */,
        	("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        	);
        	// Send messages to a Broker
            SendResult sendResult = producer.send(msg);
            // Whether the message is successfully delivered is returned through sendResult
            System.out.printf("%s%n", sendResult);
    	}
    	// If you no longer send messages, close the Producer instance.
    	producer.shutdown();
    }
}

2. Send asynchronous message

Asynchronous messages are usually used in business scenarios that are sensitive to response time, that is, the sender cannot tolerate waiting for the Broker's response for a long time.

public class AsyncProducer {
	public static void main(String[] args) throws Exception {
    	// Instantiate message Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    	// Set the address of NameServer
        producer.setNamesrvAddr("localhost:9876");
    	// Start Producer instance
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
	
	int messageCount = 100;
        // Instantiate the countdown calculator based on the number of messages
	final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
    	for (int i = 0; i < messageCount; i++) {
                final int index = i;
            	// Create a message and specify Topic, Tag and message body
                Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // SendCallback receives a callback that returns results asynchronously
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.printf("%-10d OK %s %n", index,
                            sendResult.getMsgId());
                    }
                    @Override
                    public void onException(Throwable e) {
      	              System.out.printf("%-10d Exception %s %n", index, e);
      	              e.printStackTrace();
                    }
            	});
    	}
	// Wait for 5s
	countDownLatch.await(5, TimeUnit.SECONDS);
    	// If you no longer send messages, close the Producer instance.
    	producer.shutdown();
    }
}

3. One way send message

This method is mainly used in scenarios that do not particularly care about sending results, such as log sending.

public class OnewayProducer {
	public static void main(String[] args) throws Exception{
    	// Instantiate message Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    	// Set the address of NameServer
        producer.setNamesrvAddr("localhost:9876");
    	// Start Producer instance
        producer.start();
    	for (int i = 0; i < 100; i++) {
        	// Create a message and specify Topic, Tag and message body
        	Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        	);
        	// Send a one-way message without returning any results
        	producer.sendOneway(msg);

    	}
    	// If you no longer send messages, close the Producer instance.
    	producer.shutdown();
    }
}

1.3 consumption news

public class Consumer {

	public static void main(String[] args) throws InterruptedException, MQClientException {

    	// Instantiate consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

    	// Set the address of NameServer
        consumer.setNamesrvAddr("localhost:9876");

    	// Subscribe to one or more topics and tags to filter messages that need to be consumed
        consumer.subscribe("TopicTest", "*");
    	// Register the callback implementation class to handle the messages pulled back from the broker
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // Mark that the message has been successfully consumed
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // Start consumer instance
        consumer.start();
        System.out.printf("Consumer Started.%n");
	}
}

2 example of sequential message

Message order refers to that messages can be consumed (FIFO) according to the sending order of messages. RocketMQ can strictly guarantee the order of messages, which can be divided into partition order or global order.

The principle of sequential consumption is analyzed. By default, the Round Robin polling method will be adopted to send messages to different queues (partition queues); When consuming messages, pull messages from multiple queues. In this case, the order of sending and consumption cannot be guaranteed. However, if the order of sending messages is controlled and they are only sent to the same queue in turn, and they are only pulled from this queue in turn during consumption, the order is guaranteed. When there is only one queue for sending and consumption, it is globally ordered; If multiple queues participate, the partition is orderly, that is, the messages are orderly relative to each queue.

The following is an example of order partitioning. The sequential process of an order is: create, pay, push and complete. Messages with the same order number will be sent to the same queue successively. When consuming, the same OrderId must get the same queue.

2.1 sequential message production

package org.apache.rocketmq.example.order2;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

/**
* Producer,Send sequential message
*/
public class Producer {

   public static void main(String[] args) throws Exception {
       DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

       producer.setNamesrvAddr("127.0.0.1:9876");

       producer.start();

       String[] tags = new String[]{"TagA", "TagC", "TagD"};

       // Order list
       List<OrderStep> orderList = new Producer().buildOrders();

       Date date = new Date();
       SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
       String dateStr = sdf.format(date);
       for (int i = 0; i < 10; i++) {
           // Add a time prefix
           String body = dateStr + " Hello RocketMQ " + orderList.get(i);
           Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());

           SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
               @Override
               public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                   Long id = (Long) arg;  //Select the send queue according to the order id
                   long index = id % mqs.size();
                   return mqs.get((int) index);
               }
           }, orderList.get(i).getOrderId());//Order id

           System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
               sendResult.getSendStatus(),
               sendResult.getMessageQueue().getQueueId(),
               body));
       }

       producer.shutdown();
   }

   /**
    * Order steps
    */
   private static class OrderStep {
       private long orderId;
       private String desc;

       public long getOrderId() {
           return orderId;
       }

       public void setOrderId(long orderId) {
           this.orderId = orderId;
       }

       public String getDesc() {
           return desc;
       }

       public void setDesc(String desc) {
           this.desc = desc;
       }

       @Override
       public String toString() {
           return "OrderStep{" +
               "orderId=" + orderId +
               ", desc='" + desc + '\'' +
               '}';
       }
   }

   /**
    * Generate simulated order data
    */
   private List<OrderStep> buildOrders() {
       List<OrderStep> orderList = new ArrayList<OrderStep>();

       OrderStep orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("establish");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111065L);
       orderDemo.setDesc("establish");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("payment");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103117235L);
       orderDemo.setDesc("establish");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111065L);
       orderDemo.setDesc("payment");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103117235L);
       orderDemo.setDesc("payment");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111065L);
       orderDemo.setDesc("complete");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("Push");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103117235L);
       orderDemo.setDesc("complete");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("complete");
       orderList.add(orderDemo);

       return orderList;
   }
}

Consumption order

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

package org.apache.rocketmq.example.order2;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
* Sequential message consumption with transaction mode (the application can control when the Offset is submitted)
*/
public class ConsumerInOrder {

   public static void main(String[] args) throws Exception {
       DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
       consumer.setNamesrvAddr("127.0.0.1:9876");
       /**
        * Set whether the Consumer starts consumption from the head of the queue or the end of the queue for the first time < br >
        * If it is not started for the first time, continue to consume according to the last consumption position
        */
       consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

       consumer.subscribe("TopicTest", "TagA || TagC || TagD");

       consumer.registerMessageListener(new MessageListenerOrderly() {

           Random random = new Random();

           @Override
           public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
               context.setAutoCommit(true);
               for (MessageExt msg : msgs) {
                   // You can see that each queue has a unique consumer thread to consume, and the orders are ordered for each queue (partition)
                   System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
               }

               try {
                   //Simulating business logic processing
                   TimeUnit.SECONDS.sleep(random.nextInt(10));
               } catch (Exception e) {
                   e.printStackTrace();
               }
               return ConsumeOrderlyStatus.SUCCESS;
           }
       });

       consumer.start();

       System.out.println("Consumer Started.");
   }
}

3 example of delay message

3.1 start consumer waiting for incoming subscription message

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class ScheduledMessageConsumer {
   public static void main(String[] args) throws Exception {
      // Instantiate consumer
      DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
      // Subscribe to Topics
      consumer.subscribe("TestTopic", "*");
      // Register message listener
      consumer.registerMessageListener(new MessageListenerConcurrently() {
          @Override
          public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
              for (MessageExt message : messages) {
                  // Print approximate delay time period
                  System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
              }
              return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
          }
      });
      // Start consumer
      consumer.start();
  }
}

3.2 sending delay message

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class ScheduledMessageProducer {
   public static void main(String[] args) throws Exception {
      // Instantiate a producer to generate a delayed message
      DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
      // Start producer
      producer.start();
      int totalMessagesToSend = 100;
      for (int i = 0; i < totalMessagesToSend; i++) {
          Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
          // Set delay Level 3, and this message will be sent after 10s (now only supports a fixed number of times, see delaytimelevel for details)
          message.setDelayTimeLevel(3);
          // send message
          producer.send(message);
      }
       // Shut down producer
      producer.shutdown();
  }
}

3.3 verification

You will see that the message is consumed 10 seconds later than the storage time.

3.4 usage scenario of delay message

For example, in e-commerce, you can send a delay message after submitting an order, and check the status of the order after 1h. If it is still unpaid, cancel the order and release the inventory.

3.5 restrictions on the use of delay messages

// org/apache/rocketmq/store/config/MessageStoreConfig.java

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

At present, RocketMq does not support any time delay. It is necessary to set several fixed delay levels, from 1s to 2h, corresponding to levels 1 to 18 respectively
If the message consumption fails, it will enter the delayed message queue. The message sending time is related to the set delay level and retry times. See code sendmessageprocessor for details java

4 sample batch message

Sending messages in bulk can significantly improve the performance of delivering small messages. The limitation is that these batch messages should have the same topic, the same waitStoreMsgOK, and cannot be delayed messages. In addition, the total size of this batch of messages should not exceed 4MB.

4.1 sending batch messages

If you only send messages of no more than 4MB at a time, it is easy to use batch processing. An example is as follows:

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
   producer.send(messages);
} catch (Exception e) {
   e.printStackTrace();
   //Processing error
}

4.2 message list segmentation

The complexity increases only when you send a large batch, and you may not be sure whether it exceeds the size limit (4MB). At this time, you'd better split your message list:

public class ListSplitter implements Iterator<List<Message>> { 
    private final int SIZE_LIMIT = 1024 * 1024 * 4;
    private final List<Message> messages;
    private int currIndex;
    public ListSplitter(List<Message> messages) { 
        this.messages = messages;
    }
    @Override public boolean hasNext() {
        return currIndex < messages.size(); 
    }
    @Override public List<Message> next() { 
        int startIndex = getStartIndex();
        int nextIndex = startIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex); 
            int tmpSize = calcMessageSize(message);
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break; 
            } else {
                totalSize += tmpSize; 
            }
        }
        List<Message> subList = messages.subList(startIndex, nextIndex); 
        currIndex = nextIndex;
        return subList;
    }
    private int getStartIndex() {
        Message currMessage = messages.get(currIndex); 
        int tmpSize = calcMessageSize(currMessage); 
        while(tmpSize > SIZE_LIMIT) {
            currIndex += 1;
            Message message = messages.get(curIndex); 
            tmpSize = calcMessageSize(message);
        }
        return currIndex; 
    }
    private int calcMessageSize(Message message) {
        int tmpSize = message.getTopic().length() + message.getBody().length(); 
        Map<String, String> properties = message.getProperties();
        for (Map.Entry<String, String> entry : properties.entrySet()) {
            tmpSize += entry.getKey().length() + entry.getValue().length(); 
        }
        tmpSize = tmpSize + 20; // Increase the overhead of log by 20 bytes
        return tmpSize; 
    }
}
//Split a big message into several small messages
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
  try {
      List<Message>  listItem = splitter.next();
      producer.send(listItem);
  } catch (Exception e) {
      e.printStackTrace();
      //Processing error
  }
}

5 example of filtering messages

In most cases, TAG is a simple and useful design that allows you to select the messages you want. For example:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

The consumer will receive a message containing TAGA or TAGB or TAGC. However, the limitation is that a message can only have one label, which may not work in complex scenarios. In this case, you can filter messages using SQL expressions. The SQL feature can be calculated by the properties when sending the message. Some simple logic can be implemented under the syntax defined by RocketMQ. Here is an example:

------------
| message  |
|----------|  a > 5 AND b = 'abc'
| a = 10   |  --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message  |
|----------|   a > 5 AND b = 'abc'
| a = 1    |  --------------------> Missed
| b = 'abc'|
| c = true |
------------

5.1 basic grammar

RocketMQ only defines some basic syntax to support this feature. You can also easily expand it.

  • Numerical comparison, such as: >, > =, <, < =, BETWEEN, =;
  • Character comparison, such as: =, < >, IN;
  • IS NULL or IS NOT NULL;
  • Logical symbols AND, OR, NOT;

Constant support types are:

  • Value, for example: 123, 3.1415;
  • Characters, such as' abc ', must be enclosed in single quotation marks;
  • NULL, special constant
  • TRUE or FALSE

Only consumers using push mode can use sql statements of SQL92 standard. The interfaces are as follows:

public void subscribe(finalString topic, final MessageSelector messageSelector)

5.2 use examples

1. Producer sample

When sending a message, you can set the properties of the message through putUserProperty

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
   tag,
   ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);

producer.shutdown();

2. Consumer sample

Use messageselector Bysql to filter messages using sql

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// Only subscribed messages have this attribute a, a > = 0 and a < = 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
   @Override
   public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
   }
});
consumer.start();

6 example of message transaction

Transaction messages have three states: commit state, rollback state and intermediate state:

  • TransactionStatus.CommitTransaction: commit a transaction that allows the consumer to consume this message.
  • TransactionStatus.RollbackTransaction: rollback transaction, which means that the message will be deleted and cannot be consumed.
  • TransactionStatus.Unknown: intermediate status, which represents the need to check the message queue to determine the status.

6.1 example of sending transaction message

1. Create transactional producer

Using the TransactionMQProducer class to create a producer and specify a unique producer group, you can set up a custom thread pool to process these check requests. After executing a local transaction, you need to reply to the message queue according to the execution result. Please refer to the previous section for the returned transaction status.

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class TransactionProducer {
   public static void main(String[] args) throws MQClientException, InterruptedException {
       TransactionListener transactionListener = new TransactionListenerImpl();
       TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
       ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
           @Override
           public Thread newThread(Runnable r) {
               Thread thread = new Thread(r);
               thread.setName("client-transaction-msg-check-thread");
               return thread;
           }
       });
       producer.setExecutorService(executorService);
       producer.setTransactionListener(transactionListener);
       producer.start();
       String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
       for (int i = 0; i < 10; i++) {
           try {
               Message msg =
                   new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                       ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
               SendResult sendResult = producer.sendMessageInTransaction(msg, null);
               System.out.printf("%s%n", sendResult);
               Thread.sleep(10);
           } catch (MQClientException | UnsupportedEncodingException e) {
               e.printStackTrace();
           }
       }
       for (int i = 0; i < 100000; i++) {
           Thread.sleep(1000);
       }
       producer.shutdown();
   }
}

2. Implement the transaction listening interface

When sending a semi successful message, we use the executelocetransaction method to execute the local transaction. It returns the first three states of the transaction. The checkLocalTransaction method is used to check the local transaction status and respond to the check request of the message queue. It is also one of the three transaction states mentioned in the previous section.

public class TransactionListenerImpl implements TransactionListener {
  private AtomicInteger transactionIndex = new AtomicInteger(0);
  private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
  @Override
  public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
      int value = transactionIndex.getAndIncrement();
      int status = value % 3;
      localTrans.put(msg.getTransactionId(), status);
      return LocalTransactionState.UNKNOW;
  }
  @Override
  public LocalTransactionState checkLocalTransaction(MessageExt msg) {
      Integer status = localTrans.get(msg.getTransactionId());
      if (null != status) {
          switch (status) {
              case 0:
                  return LocalTransactionState.UNKNOW;
              case 1:
                  return LocalTransactionState.COMMIT_MESSAGE;
              case 2:
                  return LocalTransactionState.ROLLBACK_MESSAGE;
          }
      }
      return LocalTransactionState.COMMIT_MESSAGE;
  }
}

6.2 restrictions on the use of transaction messages

  1. Transaction messages do not support delayed messages and batch messages.
  2. In order to avoid the accumulation of semi queue messages caused by too many times of single message inspection, we limit the number of times of single message inspection to 15 by default, but users can modify this limit through the transactionCheckMax parameter of the Broker configuration file. If a message has been checked more than N times (N = transactionCheckMax), the Broker will discard the message and print the error log at the same time by default. Users can modify this behavior by overriding the AbstractTransactionalMessageCheckListener class.
  3. The transaction message will be checked after a specific length of time such as the parameter transactionTimeout in the Broker configuration file. When sending a transaction message, the user can also set the user attribute CHECK_IMMUNITY_TIME_IN_SECONDS to change this limit. This parameter takes precedence over the transactionTimeout parameter.
  4. Transactional messages may be checked or consumed more than once.
  5. The target topic message submitted to the user may fail. At present, this depends on the log record. Its high availability is guaranteed by RocketMQ's own high availability mechanism. If you want to ensure that transaction messages are not lost and transaction integrity is guaranteed, it is recommended to use synchronous dual write mechanism.
  6. The producer ID of a transaction message cannot be shared with the producer ID of other types of messages. Unlike other types of messages, transaction messages allow reverse queries, and MQ servers can query consumers through their producer ID.

7 example of logappender

RocketMQ log provides log4j, log4j2 and logback log frameworks as business applications. The following is a configuration example

7.1 log4j example

Use the log4j property configuration as shown in the following example

log4j.appender.mq=org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender
log4j.appender.mq.Tag=yourTag
log4j.appender.mq.Topic=yourLogTopic
log4j.appender.mq.ProducerGroup=yourLogGroup
log4j.appender.mq.NameServerAddress=yourRocketmqNameserverAddress
log4j.appender.mq.layout=org.apache.log4j.PatternLayout
log4j.appender.mq.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-4r [%t] (%F:%L) %-5p - %m%n

Follow the example below to use log4j xml configuration to add logs asynchronously

<appender name="mqAppender1"class="org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender">
  <param name="Tag" value="yourTag" />
  <param name="Topic" value="yourLogTopic" />
  <param name="ProducerGroup" value="yourLogGroup" />
  <param name="NameServerAddress" value="yourRocketmqNameserverAddress"/>
  <layout class="org.apache.log4j.PatternLayout">
      <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss}-%p %t %c - %m%n" />
  </layout>
</appender>
<appender name="mqAsyncAppender1"class="org.apache.log4j.AsyncAppender">
  <param name="BufferSize" value="1024" />
  <param name="Blocking" value="false" />
  <appender-ref ref="mqAppender1"/>
</appender>

7.2 log4j2 example

When using log4j2, the configuration is as follows. If you want to be non blocking, you only need to add references asynchronously

<RocketMQ name="rocketmqAppender" producerGroup="yourLogGroup" nameServerAddress="yourRocketmqNameserverAddress"
   topic="yourLogTopic" tag="yourTag">
  <PatternLayout pattern="%d [%p] hahahah %c %m%n"/>
</RocketMQ>

7.3 logback example

<appender name="mqAppender1"class="org.apache.rocketmq.logappender.logback.RocketmqLogbackAppender">
  <tag>yourTag</tag>
  <topic>yourLogTopic</topic>
  <producerGroup>yourLogGroup</producerGroup>
  <nameServerAddress>yourRocketmqNameserverAddress</nameServerAddress>
  <layout>
      <pattern>%date %p %t - %m%n</pattern>
  </layout>
</appender>
<appender name="mqAsyncAppender1"class="ch.qos.logback.classic.AsyncAppender">
  <queueSize>1024</queueSize>
  <discardingThreshold>80</discardingThreshold>
  <maxFlushTime>2000</maxFlushTime>
  <neverBlock>true</neverBlock>
  <appender-ref ref="mqAppender1"/>
</appender>

8 OpenMessaging sample

OpenMessaging It aims to establish message and stream processing specifications to provide a general framework and industrial guidance scheme for the fields of finance, e-commerce, Internet of things and big data. In the distributed heterogeneous environment, the design principle is cloud oriented, simple, flexible and language independent. Conforming to these specifications will help enterprises easily develop heterogeneous messaging applications across platforms and operating systems. Provides a partial implementation of OpenMessaging API 0.3.0-alpha. The following example demonstrates how to access RocketMQ based on OpenMessaging.

8.1 OMSProducer example

The following example demonstrates how to send a message to the RocketMQ agent in synchronous, asynchronous, or one-way transport.

import io.openmessaging.Future;
import io.openmessaging.FutureListener;
import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.SendResult;
import java.nio.charset.Charset;
import java.util.concurrent.CountDownLatch;

public class SimpleProducer {
    public static void main(String[] args) {
       final MessagingAccessPoint messagingAccessPoint =
           OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
       final Producer producer = messagingAccessPoint.createProducer();
       messagingAccessPoint.startup();
       System.out.printf("MessagingAccessPoint startup OK%n");
       producer.startup();
       System.out.printf("Producer startup OK%n");
       {
           Message message = producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
           SendResult sendResult = producer.send(message);
           //final Void aVoid = result.get(3000L);
           System.out.printf("Send async message OK, msgId: %s%n", sendResult.messageId());
       }
       final CountDownLatch countDownLatch = new CountDownLatch(1);
       {
           final Future<SendResult> result = producer.sendAsync(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
           result.addListener(new FutureListener<SendResult>() {
               @Override
               public void operationComplete(Future<SendResult> future) {
                   if (future.getThrowable() != null) {
                       System.out.printf("Send async message Failed, error: %s%n", future.getThrowable().getMessage());
                   } else {
                       System.out.printf("Send async message OK, msgId: %s%n", future.get().messageId());
                   }
                   countDownLatch.countDown();
               }
           });
       }
       {
           producer.sendOneway(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
           System.out.printf("Send oneway message OK%n");
       }
       try {
           countDownLatch.await();
           Thread.sleep(500); // Wait some time to send the message
       } catch (InterruptedException ignore) {
       }
       producer.shutdown();
   }
}

8.2 OMSPullConsumer

Use OMS PullConsumer to pull messages from the specified queue

import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.consumer.PullConsumer;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.SendResult;

public class SimplePullConsumer {
    public static void main(String[] args) {
       final MessagingAccessPoint messagingAccessPoint =
           OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
       messagingAccessPoint.startup();
       final Producer producer = messagingAccessPoint.createProducer();
       final PullConsumer consumer = messagingAccessPoint.createPullConsumer(
           OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER"));
       messagingAccessPoint.startup();
       System.out.printf("MessagingAccessPoint startup OK%n");
       final String queueName = "TopicTest";
       producer.startup();
       Message msg = producer.createBytesMessage(queueName, "Hello Open Messaging".getBytes());
       SendResult sendResult = producer.send(msg);
       System.out.printf("Send Message OK. MsgId: %s%n", sendResult.messageId());
       producer.shutdown();
       consumer.attachQueue(queueName);
       consumer.startup();
       System.out.printf("Consumer startup OK%n");
       // Run until a message is found to have been sent
       boolean stop = false;
       while (!stop) {
           Message message = consumer.receive();
           if (message != null) {
               String msgId = message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID);
               System.out.printf("Received one message: %s%n", msgId);
               consumer.ack(msgId);
               if (!stop) {
                   stop = msgId.equalsIgnoreCase(sendResult.messageId());
               }
           } else {
               System.out.printf("Return without any message%n");
           }
       }
       consumer.shutdown();
       messagingAccessPoint.shutdown();
   }
}

8.3 OMSPushConsumer

The following example shows how to add OMS PushConsumer to the specified queue and consume these messages through MessageListener.

import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.consumer.MessageListener;
import io.openmessaging.consumer.PushConsumer;

public class SimplePushConsumer {
    public static void main(String[] args) {
       final MessagingAccessPoint messagingAccessPoint = OMS
           .getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
       final PushConsumer consumer = messagingAccessPoint.
           createPushConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER"));
       messagingAccessPoint.startup();
       System.out.printf("MessagingAccessPoint startup OK%n");
       Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
           @Override
           public void run() {
               consumer.shutdown();
               messagingAccessPoint.shutdown();
           }
       }));
       consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
           @Override
           public void onReceived(Message message, Context context) {
               System.out.printf("Received one message: %s%n", message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID));
               context.ack();
           }
       });
       consumer.startup();
       System.out.printf("Consumer startup OK%n");
   }
}

Tags: Java Distribution

Posted by joebarker99 on Mon, 23 May 2022 17:27:34 +0300