RabbitMQ delay queue (Python version)

Author: Bge's blog

Original link: https://blog.csdn.net/weixin_43437629/article/details/87196729

Recently, when I was doing a task, I encountered data that needed to be delayed. The first way is to store the data in the database, then write a script, scan the data table every five minutes, and then process the data. The actual effect is not good. Because the system itself has been using RabbitMQ as the middleware for asynchronous processing tasks, I wonder whether RabbitMQ can be used to realize delay queue. Although RabbitMQ has no ready-made delay queue, it can be realized by using its two important features: 1. Time To Live(TTL) message timeout mechanism; 2. Dead Letter Exchanges (DLX) dead letter queue. The implementation principle and implementation generation will be described in detail below

Basic principle of delay queue

Time To Live(TTL)

RabbitMQ can set x-expires for the Queue or x-message-ttl for the Message to control the lifetime of the Message. If the Message times out (both are set at the same time, whichever expires first), the Message becomes dead letter
There are two ways to set the expiration time of RabbitMQ messages.

  • By setting the properties of the Queue, all messages in the Queue have the same expiration time. (scheme adopted for this delay Queue)
  • Set the message separately, and the TTL of each message can be different.

If used at the same time, the expiration time of the message shall be subject to the value with the smaller TTL between the two. Once the lifetime of a message in the queue exceeds the set TTL value, it becomes a dead letter

Dead Letter Exchanges(DLX)

The Queue of RabbitMQ can be configured with two parameters: x-dead-letter-exchange and x-dead-letter-routing-key (optional). If a dead letter appears in the Queue, it will be rerouted and forwarded to the specified Queue according to these two parameters.

  • x-dead-letter-exchange: resend the dead letter to the specified exchange after a dead letter occurs
  • x-dead-letter-routing-key: send the dead letter again according to the specified routing key after the dead letter appears

dead letter occurs in the queue:

  • The TTL of a message or queue has expired. (characteristics of delayed queue utilization)
  • The queue has reached its maximum length
  • The message is rejected by the consumer (basic.reject or basic.nack) and request = false

Combining the above two features, set the TTL rules for the queue. After the TTL of the queue expires, the message will become a dead letter, and then use the DLX feature to forward it to another switch and queue to be consumed again, so as to achieve the effect of delayed consumption.

 

 

Design and implementation of delay queue (Python)

From the above description, the implementation of delay queue is roughly divided into two steps:

  1. There are two ways to generate dead letter: per message TTL and Queue TTL. Because my requirement is that all messages have the same delay processing time, Queue TTL is used to set the TTL of the queue in this implementation. If you need to set different delay processing time for messages in the queue, set per message TTL( Official documents)

  2. Set the forwarding rules of dead letter and the Dead Letter Exchanges setting method( Official documents)
    The complete code is as follows:

"""
Created on Fri Aug  3 17:00:44 2018

@author: Bge
"""
import pika,json,logging
class RabbitMQClient:
    def __init__(self, conn_str='amqp://user:pwd@host:port/%2F'):
        self.exchange_type = "direct"
        self.connection_string = conn_str
        self.connection = pika.BlockingConnection(pika.URLParameters(self.connection_string))
        self.channel = self.connection.channel()
        self._declare_retry_queue() #RetryQueue and RetryExchange
        logging.debug("connection established")
    def close_connection(self):
        self.connection.close()
        logging.debug("connection closed")
    def declare_exchange(self, exchange):
        self.channel.exchange_declare(exchange=exchange,
                                      exchange_type=self.exchange_type,
                                      durable=True)
    def declare_queue(self, queue):
        self.channel.queue_declare(queue=queue,
                                   durable=True,)
    def declare_delay_queue(self, queue,DLX='RetryExchange',TTL=60000):
        """
        Create delay queue
        :param TTL: ttl The unit is us,ttl=60000 Represents 60 s
        :param queue:
        :param DLX:Dead letter forwarding exchange
        :return:
        """
        arguments={}
        if DLX:
            #Set the of dead letter forwarding exchange
            arguments[ 'x-dead-letter-exchange']=DLX
        if TTL:
            arguments['x-message-ttl']=TTL
        print(arguments)
        self.channel.queue_declare(queue=queue,
                                   durable=True,
                                   arguments=arguments)
    def _declare_retry_queue(self):
        """
        Create exception switches and queues to store messages that are not processed normally.
        :return:
        """
        self.channel.exchange_declare(exchange='RetryExchange',
                                      exchange_type='fanout',
                                      durable=True)
        self.channel.queue_declare(queue='RetryQueue',
                                   durable=True)
        self.channel.queue_bind('RetryQueue', 'RetryExchange','RetryQueue')
    def publish_message(self,routing_key, msg,exchange='',delay=0,TTL=None):
        """
        Send a message to the specified switch
        :param exchange: RabbitMQ exchanger
        :param msg: The message entity is a serialized JSON character string
        :return:
        """
        if delay==0:
            self.declare_queue(routing_key)
        else:
            self.declare_delay_queue(routing_key,TTL=TTL)
        if exchange!='':
            self.declare_exchange(exchange)
        self.channel.basic_publish(exchange=exchange,
                                   routing_key=routing_key,
                                   body=msg,
                                   properties=pika.BasicProperties(
                                       delivery_mode=2,
                                       type=exchange
                                   ))
        self.close_connection()
        print("message send out to %s" % exchange)
        logging.debug("message send out to %s" % exchange)
    def start_consume(self,callback,queue='#',delay=1):
        """
        Start consumers, start consumption RabbitMQ Messages in
        :return:
        """
        if delay==1:
            queue='RetryQueue'
        else:
            self.declare_queue(queue)
        self.channel.basic_qos(prefetch_count=1)
        try:
            self.channel.basic_consume(  # Consumption news
                callback,  # If you receive a message, call callback Function to process messages
                queue=queue,  # Which queue do you want to receive messages from
            )
            self.channel.start_consuming()
        except KeyboardInterrupt:
            self.stop_consuming()
    def stop_consuming(self):
        self.channel.stop_consuming()
        self.close_connection()
    def message_handle_successfully(channel, method):
        """
        If the message processing completes normally, this method must be called,
        otherwise RabbitMQ It will think that the message processing is unsuccessful and put the message back into the queue to be executed
        :param channel: Of callback function channel parameter
        :param method: Of callback function method parameter
        :return:
        """
        channel.basic_ack(delivery_tag=method.delivery_tag)
    def message_handle_failed(channel, method):
        """
        If the message processing fails, this method should be called, which will automatically put the message into the exception queue
        :param channel: Of callback function channel parameter
        :param method: Of callback function method parameter
        :return:
        """
        channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False)

The release message code is as follows:

from MQ.RabbitMQ import RabbitMQClient
print("start program")
client = RabbitMQClient()
msg1 = '{"key":"value"}'
client.publish_message('test-delay',msg1,delay=1,TTL=10000)
print("message send out")

The consumer code is as follows:

from MQ.RabbitMQ import RabbitMQClient
import json
print("start program")
client = RabbitMQClient()
def callback(ch, method, properties, body):
        msg = body.decode()
        print(msg)
        # If the processing is successful, this message reply is called ack,Indicates that the message has been processed successfully.
        RabbitMQClient.message_handle_successfully(ch, method)
queue_name = "RetryQueue"
client.start_consume(callback,queue_name,delay=0)

 

More references:

[RabbitMQ] will take you through the RabbitMQ dead letter queue

[RabbitMQ] will take you through the RabbitMQ delay queue

Tags: RabbitMQ

Posted by zeroecko on Wed, 25 May 2022 16:55:13 +0300