spring boot rabbitmq integrates the message persistence storage of rabbitmq

Note: the source code in this document comes from: spring-rabbit-2.1.8 RELEASE. jar

 

rabbitmq message persistence storage includes the following three aspects:

 

1. Persistence of exchange

2. queue persistence

3. message persistence

 

Persistence of exchange

 

When declaring exchange, there is a parameter: durable. When the parameter is true, the exchange will be persisted and the rabbitmq server will be restarted, and the exchange will not disappear. The default value of durable is true

public class DirectExchange extends AbstractExchange {
    public static final DirectExchange DEFAULT = new DirectExchange("");

    public DirectExchange(String name) {
        super(name);
    }

    public DirectExchange(String name, boolean durable, boolean autoDelete) {
        super(name, durable, autoDelete);
    }

    public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
        super(name, durable, autoDelete, arguments);
    }

    public final String getType() {
        return "direct";
    }
}
public abstract class AbstractExchange extends AbstractDeclarable implements Exchange {
    private final String name;
    private final boolean durable;
    private final boolean autoDelete;
    private final Map<String, Object> arguments;
    private volatile boolean delayed;
    private boolean internal;

    public AbstractExchange(String name) {
        this(name, true, false);
    }

    public AbstractExchange(String name, boolean durable, boolean autoDelete) {
        this(name, durable, autoDelete, (Map)null);
    }

    public AbstractExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
        this.name = name;
        this.durable = durable;
        this.autoDelete = autoDelete;
        if (arguments != null) {
            this.arguments = arguments;
        } else {
            this.arguments = new HashMap();
        }

    }

 

Persistence of queue

There is also a parameter when declaring the queue: durable. When the parameter is true, the queue will be persisted and the rabbitmq server will be restarted, and the queue will not disappear. The default value of durable is true

public Queue(String name) {
        this(name, true, false, false);
    }

    public Queue(String name, boolean durable) {
        this(name, durable, false, false, (Map)null);
    }

    public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
        this(name, durable, exclusive, autoDelete, (Map)null);
    }

    public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) {
        Assert.notNull(name, "'name' cannot be null");
        this.name = name;
        this.actualName = StringUtils.hasText(name) ? name : Base64UrlNamingStrategy.DEFAULT.generateName() + "_awaiting_declaration";
        this.durable = durable;
        this.exclusive = exclusive;
        this.autoDelete = autoDelete;
        this.arguments = (Map)(arguments != null ? arguments : new HashMap());
    }

 

Persistence of message

We have talked about the persistence of exchange and queue, so how to persist message?

When using rabbit client for message persistence, we set the deliveryMode of basic properties to 2 for message persistence.

AMQP.BasicProperties properties = new AMQP.BasicProperties.
                Builder().
                deliveryMode(2).
                build();

        channel.basicPublish("ex.pc", "key.pc",  properties, "hello world".getBytes());

So how to do persistence by integrating spring boot and using RabbitTemplate?

First, we come to the frequently used message sending method: convertAndSend under the RabbitTemplate class

@Override
    public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException {
        convertAndSend(exchange, routingKey, object, (CorrelationData) null);
    }

Then it calls the overloaded method under this class: convertAndSend. This method converts an object into a message

@Override
    public void convertAndSend(String exchange, String routingKey, final Object object,
            @Nullable CorrelationData correlationData) throws AmqpException {

        send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
    }

During message transformation, we noticed that a MessageProperties object was passed in

protected Message convertMessageIfNecessary(final Object object) {
        if (object instanceof Message) {
            return (Message) object;
        }
        return getRequiredMessageConverter().toMessage(object, new MessageProperties());
    }

In MessageProperties, there is a deliveryMode property. The default value of this property is messagedeliverymode Persistent

 public MessageProperties() {
        this.deliveryMode = DEFAULT_DELIVERY_MODE;
        this.priority = DEFAULT_PRIORITY;
    }

static {
DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
DEFAULT_PRIORITY = 0;
}

 

After the message conversion is completed, the send method of the same method is called

@Override
    public void send(final String exchange, final String routingKey,
            final Message message, @Nullable final CorrelationData correlationData)
            throws AmqpException {
        execute(channel -> {
            doSend(channel, exchange, routingKey, message,
                    (RabbitTemplate.this.returnCallback != null
                            || (correlationData != null && StringUtils.hasText(correlationData.getId())))
                            && RabbitTemplate.this.mandatoryExpression.getValue(
                                    RabbitTemplate.this.evaluationContext, message, Boolean.class),
                    correlationData);
            return null;
        }, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
    }

 

This method calls the doSend method again

public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message, // NOSONAR complexity
            boolean mandatory, @Nullable CorrelationData correlationData)
                    throws Exception { // NOSONAR TODO: change to IOException in 2.2.

        String exch = exchangeArg;
        String rKey = routingKeyArg;
        if (exch == null) {
            exch = this.exchange;
        }
        if (rKey == null) {
            rKey = this.routingKey;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Publishing message " + message
                    + "on exchange [" + exch + "], routingKey = [" + rKey + "]");
        }

        Message messageToUse = message;
        MessageProperties messageProperties = messageToUse.getMessageProperties();
        if (mandatory) {
            messageProperties.getHeaders().put(PublisherCallbackChannel.RETURN_LISTENER_CORRELATION_KEY, this.uuid);
        }
        if (this.beforePublishPostProcessors != null) {
            for (MessagePostProcessor processor : this.beforePublishPostProcessors) {
                messageToUse = processor.postProcessMessage(messageToUse, correlationData);
            }
        }
        setupConfirm(channel, messageToUse, correlationData);
        if (this.userIdExpression != null && messageProperties.getUserId() == null) {
            String userId = this.userIdExpression.getValue(this.evaluationContext, messageToUse, String.class);
            if (userId != null) {
                messageProperties.setUserId(userId);
            }
        }
        sendToRabbit(channel, exch, rKey, mandatory, messageToUse);
        // Check if commit needed
        if (isChannelLocallyTransacted(channel)) {
            // Transacted channel created by this template -> commit.
            RabbitUtils.commitIfNecessary(channel);
        }
    }

In this method, we finally see the operation of sending a message to rabbitmq: sendToRabbit. This method converts the MessageProperties object to BasicProperties. So far, we finally understand how to realize the persistence of mesge in spring rabbit. The default message is persistent

protected void sendToRabbit(Channel channel, String exchange, String routingKey, boolean mandatory,
            Message message) throws IOException {
        BasicProperties convertedMessageProperties = this.messagePropertiesConverter
                .fromMessageProperties(message.getMessageProperties(), this.encoding);
        channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());
    }

 

How to change the persistence property of message?

According to the above source code analysis, the default message in spring is persistent. How to change the persistence attribute?

1. Use the send method to send a message. Set the property deliveryMode of MessageProperties in message

2. Customize the MessageConverter and set the property deliveryMode of MessageProperties during message conversion

3. Customize the MessagePropertiesConverter and set the deliveryMode when the MessageProperties object is converted to BasicProperties

Tags: RabbitMQ Spring Boot

Posted by discofreakboot on Thu, 05 May 2022 15:50:47 +0300