Implementation of blocking queue based on Redis in actual combat

In the process of daily demand development, it is inevitable to encounter the need for asynchronous processing through code, such as batch email, batch SMS and data import. In order to reduce the waiting of users, they don't want to keep turning around. Therefore, asynchronous processing is required. The method is to add the data to be processed to the queue, and then conduct asynchronous processing according to the order of queuing.

This queue can be a professional message queue, such as RocketMQ/RabbitMQ. In general projects, if it is only for asynchronous, it is a bit like killing a chicken with an ox knife.
You can also use JVM memory based queues, but if the project is restarted, the queue data will be lost.
Most projects will use Redis middleware as cache. At this time, it is a very suitable choice to use Redis's list structure to realize queue.

Therefore, this paper mainly explains the implementation of asynchronous queue based on Redis.

This article is the first personal technology blog: https://nullpointer.pw/redis-block-queue.html

There are also many ways to implement queue based on Redis list. First, the first way is not recommended, that is, using LPUSH to produce messages, and then consuming messages through RPOP in while(true). This way can indeed be realized, but continuous code polling is bound to consume some system resources.

The second method is also not recommended. It also produces messages through LPUSH, and then blocks and consumes messages through BRPOP. Compared with the first method, this method reduces useless polling and reduces the consumption of system resources. However, there may be queue message loss. If the message is taken out and processing fails, the taken out message will be lost.

The second method is the method to be introduced below. First, the message is produced through LPUSH, and then the new message in the list is blocked through BRPOPLPUSH. At the same time, the message is backed up to another list. This method has the advantages of the second method, that is, it reduces useless polling and backs up the message without losing data. If the processing is successful, The current message in the backup list can be deleted through LREM. The implementation of this method can be referred to Mode: Secure queue .

Redis Foundation

# Insert one or more values value into the header of the list key
LPUSH key value [value ...]

# Blocking wait, pop up the last element (tail element) in the list source and return it to the client. Insert the pop-up element of source into the destination list as the header element of the destination list. The timeout parameter timeout accepts a number in seconds as the value. Setting the timeout parameter to 0 means that the blocking time can be extended indefinitely.
BRPOPLPUSH source destination timeout

# According to the value of the parameter count, remove the elements in the list equal to the parameter value.
LREM key count value

Code implementation queue message producer

The author uses Spring related API s to call Redis instructions. First, implement the production code of the message and encapsulate it into a tool class method. It's very simple here. You call the lpush method to add the serialized key and value to the list.

@Resource
private RedisConnectionFactory connectionFactory;

public void lPush(@Nonnull String key, @Nonnull String value) {
  RedisConnection connection = RedisConnectionUtils.getConnection(connectionFactory);
  try {
    byte[] byteKey = RedisSerializer.string().serialize(getKey(key));
    byte[] byteValue = RedisSerializer.string().serialize(value);
    assert byteKey != null;
    connection.lPush(byteKey, byteValue);
  } finally {
    RedisConnectionUtils.releaseConnection(connection, connectionFactory);
  }
}

Code implementation queue message consumer

Because there are a lot of codes to realize queue consumption messages, it is impossible to write this piece of code for every place that needs to block consumption. Therefore, the functional interface of Java 8 is used to realize the transmission of methods, and the code for blocking message acquisition is executed by a new thread.

Some people see that the following code needs to roast. It doesn't mean that you don't need while(true). Why do you still have it here? Here's a little explanation, because SpringBoot generally specifies the global timeout of timeout. Even if BRPOPLPUSH is set to 0, that is, infinite, when the timeout value is exceeded, a QueryTimeoutException exception will be thrown to cause the thread to exit, Therefore, try/catch is added to catch and ignore exceptions, and while(true) is used to ensure that the thread can continue to execute.
The current message processing result is recorded in the code. If the processing result is successful, the current message of the backup queue needs to be deleted.

public void bRPopLPush(@Nonnull String key, Consumer<String> consumer) {
  CompletableFuture.runAsync(() -> {
    RedisConnection connection = RedisConnectionUtils.getConnection(connectionFactory);
    try {
      byte[] srcKey = RedisSerializer.string().serialize(getKey(key));
      byte[] dstKey = RedisSerializer.string().serialize(getBackupKey(key));
      assert srcKey != null;
      assert dstKey != null;
      while (true) {
        byte[] byteValue = new byte[0];
        boolean success = false;
        try {
          byteValue = connection.bRPopLPush(0, srcKey, dstKey);
          if (byteValue != null && byteValue.length != 0) {
            consumer.accept(new String(byteValue));
            success = true;
          }
        } catch (Exception ignored) {
          // Prevent getting the key from reaching the timeout and throwing QueryTimeoutException exception to exit
        } finally {
          if (success) {
            // The key of the backup queue is deleted only after the processing is successful
            connection.lRem(dstKey, 1, byteValue);
          }
        }
      }
    } finally {
      RedisConnectionUtils.releaseConnection(connection, connectionFactory);
    }
  });
}

Test code

@Test
public void testLPush() throws InterruptedException {
  String queueA = "queueA";
  int i = 0;
  while (true) {
    String msg = "Hello-" + i++;
    redisBlockQueue.lPush(queueA, msg);
    System.out.println("lPush: " + msg);
    Thread.sleep(3000);
  }
}

@Test
public void testBRPopLPush() {
  String queueA = "queueA";
  redisBlockQueue.bRPopLPush(queueA, (val) -> {
    // The specific business logic is handled here
    System.out.println("val: " + val);
  });

  // Prevent Junit process from exiting
  LockSupport.park();
}

Operation results:

Project usage

For ease of use, I extract it into a tool class, which can be used through Spring injection,
Queue consumption can be blocked when the project is started by using the following methods. Listen to the queue and wait for consumption

@Resource
private RedisBlockQueue redisBlockQueue;

@PostConstruct
public void init() {
   redisBlockQueue.bRPopLPush(xx, (value) -> {
     //...
   });
}

Download the complete code of this article

Posted by misteryoji on Mon, 23 May 2022 10:22:08 +0300