Go micro integrated RabbitMQ practice and principle

In go micro, the sending and receiving of asynchronous messages is completed through the component of Broker. There are many ways to realize the bottom layer, such as RabbitMQ, Kafka, Redis and so on. This article mainly introduces the method and principle of sending and receiving data by using RabbitMQ in go micro.

Core functions of Broker

The core functions of Broker are Publish and Subscribe, that is, Publish and Subscribe. They are defined as:

Publish(topic string, m *Message, opts ...PublishOption) error
Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)

release

The first parameter of publishing is topic, which is used to identify certain types of messages.

The published data is carried through Message, which includes Message header and Message body. It is defined as follows:

type Message struct {
	Header map[string]string
	Body   []byte
}

The message header is a map, that is, a set of KV (key value pair).

The message body is a byte array, which requires developers to encode and decode when sending and receiving.

subscribe

The first parameter of the subscription is also topic, which is used to filter out the messages to be received.

The subscribed data is processed through Handler, which is a function defined as follows:

type Handler func(Event) error

The parameter Event is an interface, which needs to be implemented by a specific Broker. Its definition is as follows:

type Event interface {
	Topic() string
	Message() *Message
	Ack() error
	Error() error
}
  • Topic() is used to get the topic of the current message, which is also the topic when the publisher sends it.
  • Message() is used to get the message Body, which is also the message sent by the publisher, including Header and Body.
  • Ack() is used to notify the Broker that the message has been received and the Broker can delete the message. It can be used to ensure that the message is consumed at least once.
  • Error() is used to get the error that the Broker has successfully processed the message.

When subscribing to data, developers need to implement the Handler function to receive the instance of Event and extract the data for processing. According to different brokers, they may also need to call Ack() to return error when there is an error.

Go micro integrated RabbitMQ actual combat

After understanding the definition of Broker, let's see how to use go micro to send and receive RabbitMQ messages.

Start a RabbitMQ

If you already have a RabbitMQ server, please skip this step.

Here is a method of using docker to quickly start RabbitMQ. Of course, the premise is that you have to install docker.

Execute the following command to start a docker container of rabbitmq:

docker run --name rabbitmq1 -p 5672:5672 -p 15672:15672 -d rabbitmq

Then enter the container and make some settings:

docker exec -it rabbitmq1 /bin/bash

Start the management tool and disable indicator collection (which will lead to some API500 errors):

rabbitmq-plugins enable rabbitmq_management
 
cd /etc/rabbitmq/conf.d/
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf

Finally restart the container:

docker restart rabbitmq1

Finally, enter in the browser http://127.0.0.0:15672 The default user name and password are guest.

Write transceiver function

To facilitate the demonstration, let's first define the functions of publishing and receiving messages. The Publish function uses the Event type provided by go micro, and other types can also provide the function of Publish. The data format sent here is Json string. The name of the function receiving the message can be taken at will, but the parameters and return values must comply with the specification, that is, as shown in the following code. This function can also be bound to a certain type.

// Define a function to publish a message: publish a message every 1 second
func loopPublish(event micro.Event) {
	for {
		time.Sleep(time.Duration(1) * time.Second)

		curUnix := strconv.FormatInt(time.Now().Unix(), 10)
		msg := "{\"Id\":" + curUnix + ",\"Name\":\"Zhang San\"}"
		event.Publish(context.TODO(), msg)
	}
}

// Define a function to receive messages: print the received messages
func handle(ctx context.Context, msg interface{}) (err error) {
	defer func() {
		if r := recover(); r != nil {
			err = errors.New(fmt.Sprint(r))
			log.Println(err)
		}
	}()

	b, err := json.Marshal(msg)
	if err != nil {
		log.Println(err)
		return
	}

	log.Println(string(b))
	return
}

Write body code

Here is the code, which provides some comments, which will be introduced in detail later.

func main() {
	// Connection parameters of RabbitMQ
	rabbitmqUrl := "amqp://guest:guest@127.0.0.1:5672/"
	exchangeName := "amq.topic"
	subcribeTopic := "test"
	queueName := "rabbitmqdemo_test"

	// The default is application/protobuf. The demo here uses Jason, so it needs to be changed
	server.DefaultContentType = "application/json"

	// Create RabbitMQ Broker
	b := rabbitmq.NewBroker(
		broker.Addrs(rabbitmqUrl),           // RabbitMQ access address, including VHost
		rabbitmq.ExchangeName(exchangeName), // Name of the switch
		rabbitmq.DurableExchange(),          // Messages are persisted when they are in Exchange
		rabbitmq.PrefetchCount(1),           // Maximum number of messages consumed at the same time
	)

	// When creating a Service, some things will be initialized internally, which must be in front of NewSubscribeOptions
	service := micro.NewService(
		micro.Broker(b),
	)
	service.Init()

	// Initialize subscription context: it is not required here. The subscription will have a default value
	subOpts := broker.NewSubscribeOptions(
		rabbitmq.DurableQueue(),   // The queue is persistent. After the consumer disconnects, the message is still saved to the queue
		rabbitmq.RequeueOnError(), // When the message processing function returns error, the message enters the queue again
		rabbitmq.AckOnSuccess(),   // When the message processing function does not return error, go micro sends an Ack to RabbitMQ
	)

	// Register Subscriber 
	micro.RegisterSubscriber(
		subcribeTopic,    // Subscribed topics
		service.Server(), // Registered rpcServer
		handle,           // Message processing function
		server.SubscriberContext(subOpts.Context), // You can also use the default subscription context
		server.SubscriberQueue(queueName),         // Queue name
	)

	// Publish event messages
	event := micro.NewEvent(subcribeTopic, service.Client())
	go loopPublish(event)

	log.Println("Service is running ...")
	if err := service.Run(); err != nil {
		log.Println(err)
	}
}

The main logic is:

1. First, create a RabbitMQ Broker, which implements the standard Broker interface. The main parameters are RabbitMQ access address and RabbitMQ switch. PrefetchCount is used by subscribers (or consumers).

2. Then create a go micro service through NewService and set the broker in. Many things will be initialized here. The most important thing is to create an rpcServer and bind the rpcServer with the broker.

3. Then register the subscription through RegisterSubscriber. This registration has two levels of functions: first, if the queue does not exist on RabbitMQ, create a queue and subscribe to the message of the specified topic; The second is to define the processing method for the go micro program to receive data from the RabbitMQ queue.

Here are the subscription parameters:

func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error
  • Topic: go micro uses the topic mode. When sending a message, the publisher should specify a topic, and the subscriber can only receive messages from one or more topics as needed;
  • s: After receiving the message from RabbitMQ, it will enter the Server for processing. It is created internally during NewService;
  • h: The function handle for receiving messages created in the previous step is used, and the method in the Server will call this function;
  • opts are some subscription options. You need to specify the name of the RabbitMQ queue here; In addition, SubscriberContext defines some behaviors of subscription. Here DurableQueue sets the persistence mode of RabbitMQ subscription message. Generally, we hope that the message will not be lost. The purpose of this setting is that even if the program is disconnected from RabbitMQ, the message will be saved in RabbitMQ queue; AckOnSuccess and requeonerror define the behavior of the program when there is an error in processing the message. If the handle returns error, the message will be returned to RabbitMQ again and then delivered to the program.

4. Then, for demonstration, an Event is created through NewEvent, which sends a message every second.

5. Finally, through service Run () starts the program.

After working hard for a long time, let's see the running effect of this program:

Note that generally publishers and subscribers are in different programs. They are put in one program only for the convenience of demonstration. So if you just publish news, you don't need to subscribe to the code. If you just subscribe, you don't need to publish the code. When you use it, cut it according to your needs.

Go micro integrated RabbitMQ processing flow

In this section, let's see how messages flow in go micro and RabbitMQ. I draw a schematic diagram:

This picture is a little complicated. Let's explain it in detail here.

Firstly, it is divided into three parts: RabbitMQ, message publishing part and message receiving part, which are distinguished by different colors.

  • RabbitMQ is not the focus of this article. Just take it as a whole.
  • Message publishing part: call event from the producer program Publish starts, and then calls the client So far, publish has been processed in the core module of go micro; Then call Broker Publish. The Broker here is the Broker instance of the RabbitMQ plug-in. From here, you enter the RabbitMQ plug-in part, and then send it to RabbitMQ through the publish method of RabbitMQ Connection and the publish method of RabbitMQ CHANLE.
  • Message receiving part: service Run will call rpcServer internally Start, this method will call broker Subscribe. This method is defined in the RabbitMQ plug-in. It will read some RabbitMQ queue settings during RegisterSubscriber, and then pass it to the Consume method of RabbitMQ Connection and the ConsumeQueue method of RabbitMQ Channel in turn. Finally, it will connect to RabbitMQ and set the queue to subscribe on RabbitMQ; These methods also return a type of AMQP Go Channel, broker of delivery Subscribe constantly reads data from the Go Channel and then sends it to the calling broker One of the message processing methods passed in when subscribing is rpcServer Handleevnet: after some processing, the message enters the routing processing module inside rpcServer. Here is route Processmessage. This method will find the subscription registered when RegisterSubscriber according to the topic of the current message, and finally call the function registered at that time to receive the message.

This process can also be divided into business part, core module part and plug-in part.

  • First, create a Broker implementation of the plug-in and register it in the rpcServer of the core module;
  • The message is sent from the business part to the core module part, and then to the plug-in part that implements the Broker;
  • The receiving of the message first enters the plug-in part, then flows to the core module part, and then flows to the business part.

As can be seen from the figure above, messages need to be processed by this RabbitMQ plug-in. In fact, you can only use this plug-in to send and receive messages. I have submitted this demo code to Github. Interested students can get the address of Github warehouse at the end of the article.

From the above division, we can understand the overall design idea of the designer, grasp the key nodes, make good use of them, and quickly locate problems.

Several pits filled

Messages published by other frameworks cannot be received

This is because of route Processmessage uses a header dedicated to go micro to find Subscriptions:

// get the subscribers by topic
	subs, ok := router.subscribers[msg.Topic()]

This msg Topic returns the topic field in the following example:

	rpcMsg := &rpcMessage{
		topic:       msg.Header["Micro-Topic"],
		contentType: ct,
		payload:     &raw.Frame{Data: msg.Body},
		codec:       cf,
		header:      msg.Header,
		body:        msg.Body,
	}

Other frameworks will not have such a header unless it is specially adapted to go micro.

In the scenario of using RabbitMQ, the whole development is centered on RabbitMQ, and the processing logic of go micro does not consider the use of wildcards in RabbitMQ subscription. When the topic of the published message and the topic of the received message match the value of micro topic, they are processed according to the principle of equality. Therefore, the topic of RabbitMQ message can be used to set this message header. RabbitMQ. rbroker. After receiving the message in subscribe, you can make this setting:

// Messages sent from other frameworks to rabbitmq do not have this header.
		// The 'RoutingKey' in the message can be used as this header.
		// Then the message can be transfered to the subscriber which bind this topic.
		msgTopic := header["Micro-Topic"]
		if msgTopic == "" {
			header["Micro-Topic"] = msg.RoutingKey
		}

In this way, the consumer program developed by go micro can receive the messages published by other frameworks, and other frameworks do not need to be adapted.

Unlimited blocking of subscribers and publishers after RabbitMQ restart

The RabbitMQ plug-in of go micro uses another library at the bottom: GitHub com/streadway/amqp

For publishers, when RabbitMQ disconnects, amqp library will notify go micro through Go Channel synchronization, and then go micro can initiate reconnection. The problem lies in this synchronization notification. The RabbitMQ plug-in of go micro is set to receive the closing notification of the connection and channel, but only processes one notification and reconnects, which leads to a Go Channel blocking all the time, and this blocking will lead to a lock that cannot be released. This lock is needed when publishing, so the publisher will be blocked indefinitely. The solution is to add a loop to the outer layer, and then reconnect when all the notifications are received.

For subscribers, when RabbitMQ disconnects, it will block on a Go Channel until it returns a value, which indicates that the connection has been re established and the subscriber can rebuild the consumption channel. The problem also occurs in the blocked Go Channel, because the Go Channel will be re assigned every time it receives the closing notification of amqp, and the Go Channel the subscriber is waiting for may be the old value before and will never return, so the subscriber will be blocked indefinitely. The solution is to add a time when select ing After, so that the waiting Go Channel has the opportunity to update to the new value.

The code will not be posted. If you are interested, you can see it in Github: https://github.com/go-micro/plugins/commit/9f64710807221f3cc649ba4fe05f75b07c66c00c

The modifications on these two issues have been incorporated into the official warehouse. Just go to get the latest code.

When these two pits are filled, they can basically meet my needs. Of course, there may be other pitfalls. For example, the RabbitMQ plug-in of go micro seems to have no function confirmed by the publisher. To realize this, we have to think about how to change it.

Well, that's the main content of this article.

Old rule, the code has been uploaded to Github. Welcome to: https://github.com/bosima/go-demo/tree/main/go-micro-broker-rabbitmq

For more architecture knowledge, please pay attention to the firefly architecture of wechat official account. Original content, please indicate the source for reprint.

Tags: Go go-micro

Posted by newzub on Sat, 07 May 2022 05:26:18 +0300