linux system inter-process communication method: message queue

Introduction

This article continues to introduce the way of communication between linux processes: message queues.
The message queue is also an IPC object of system-V, which also exists in the kernel and has its own ID; and binds it with a unique key.

linux provides some api for us to use this IPC. The following is an example to demonstrate how two processes communicate through message queues.

1. Design Intent

Use message queues to communicate between two processes, one process writes data to the queue, and the other reads data from the queue.

2. Design ideas

1. First, there are two programs: p1.c, p2.c;

2. After the two programs run successively, the system will create two processes (hereinafter collectively referred to as p1, p2);

3. p1 first applies for a message queue and creates two semaphores

4. p2 opens the message queue and opens the semaphore

5. p1 gets the string from the keyboard, encapsulates a message, and then sends the message to the queue

6. p2 reads the message from the queue and prints out the message body

7. Use posix named semaphore to achieve synchronization in the communication process (that is, p1 can read the queue after writing to the queue p2)

Code

p1.c

#include <stdio.h>
#include <sys/types.h>
#include <string.h>
#include <unistd.h>


#include <sys/ipc.h>
#include <sys/msg.h>
#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <semaphore.h>


struct msgbuf 
{
	long mtype;       /* message type, must be > 0 */
	char mtext[100];    /* message data */
};


int main(void)
{
	int msqid;	//Save the id (identifier) ​​of the message queue
	sem_t *sem1 = NULL;	//save semaphore address
	sem_t *sem2 = NULL;	//save semaphore address

	//First apply for a message queue
	msqid = msgget((key_t)123, IPC_CREAT|0666);	
	if(msqid < 0)
	{
		perror("creat a msg queue fail");
		return -1;
	}

	//Create or open a POSIX named semaphore
	sem1 = sem_open("sem1", O_CREAT|O_RDWR, 0666, 1);
	if(sem1 == SEM_FAILED)
	{
		perror("open sem1 fail");
		return -1;
	}
	
	//Create or open a POSIX named semaphore
	sem2 = sem_open("sem2", O_CREAT|O_RDWR, 0666, 0);
	if(sem2 == SEM_FAILED)
	{
		perror("open sem1 fail");
		return -1;
	}
	
	struct msgbuf msg_to_send;
	msg_to_send.mtype = 1;
	
	while(1)
	{
		//Waiting for the semaphore to be valid, P operation
		sem_wait(sem1);

		if(fgets(msg_to_send.mtext, 11, stdin) != msg_to_send.mtext)
		{
			perror("get char from stdin fail");
			return -2;
		}
		
		if(msgsnd(msqid, &msg_to_send, 100, 0) < 0)
		{
			perror("send msg to queue fail");
			return -3;
		}
		
		//Release semaphore, V operation
		sem_post(sem2);

		if(strncmp("quit", msg_to_send.mtext, 4)==0)
		{
			printf("p1 readys to quit\n");
			break;
		}
	}
	
	struct msqid_ds msg_ds;
	struct msginfo msg_info;

	msgctl(msqid, IPC_STAT, &msg_ds);
	printf("==%lu,%lu,%lu==\n", msg_ds.__msg_cbytes,msg_ds.msg_qnum,msg_ds.msg_qbytes);
	
	int array_id = msgctl(msqid, MSG_INFO, (struct msqid_ds *)&msg_info);
	printf("==%d,%d,%d,%d==\n", msg_info.msgpool,msg_info.msgmap,msg_info.msgtql,array_id);
	
	msgctl(array_id, MSG_STAT, &msg_ds);
	printf("==%lu,%lu,%lu==\n", msg_ds.__msg_cbytes,msg_ds.msg_qnum,msg_ds.msg_qbytes);
	

	sleep(2);
	
	//release various resources
	sem_close(sem1);
	sem_close(sem2);

	if(msgctl(msqid, IPC_RMID, NULL) < 0)
	{
		perror("remove msg queue fail");
	}

	return 0;
}

p2.c

#include <stdio.h>
#include <sys/types.h>
#include <string.h>

#include <sys/ipc.h>
#include <sys/msg.h>
#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <semaphore.h>

struct msgbuf 
{
	long mtype;       /* message type, must be > 0 */
	char mtext[100];    /* message data */
};


int main(void)
{
	int msqid;	//Save the id (identifier) ​​of the message queue
	sem_t *sem1 = NULL;	//save semaphore address
	sem_t *sem2 = NULL;	//save semaphore address
	ssize_t rcv_bytes;	//Holds the number of bytes read from the queue
	int ret;

	//First apply for a message queue
	msqid = msgget((key_t)123, IPC_CREAT|0666);	
	if(msqid < 0)
	{
		perror("creat a msg queue fail");
		return -1;
	}

	//Create or open a POSIX named semaphore
	sem1 = sem_open("sem1", O_CREAT|O_RDWR, 0666, 1);
	if(sem1 == SEM_FAILED)
	{
		perror("open sem1 fail");
		return -1;
	}
	
	//Create or open a POSIX named semaphore
	sem2 = sem_open("sem2", O_CREAT|O_RDWR, 0666, 0);
	if(sem2 == SEM_FAILED)
	{
		perror("open sem1 fail");
		return -1;
	}
	
	struct msgbuf msg_to_rcv;
	
	while(1)
	{
		//Waiting for the semaphore to be valid, P operation
		sem_wait(sem2);
		
		ret = msgrcv(msqid, &msg_to_rcv, 100, 1, 0);
		if(ret < 0)
		{
			perror("receive msg from queue fail");
			return -3;
		}
		rcv_bytes = ret;
		printf("received a msg[%d bytes]:%s\n", (int)rcv_bytes, msg_to_rcv.mtext);
		
		sem_post(sem1);

		if(strncmp("quit", msg_to_rcv.mtext, 4)==0)
		{
			printf("p1 readys to quit\n");
			break;
		}
	}
	
	//release various resources

	return 0;
}

operation result

p1 process:

p2 process:

extend

  • The type of the message is specified by the linux kernel and cannot be changed;
  • In this example only one message is sent at a time, in fact, multiple messages can be sent at one time;
  • And we can use the msgctl function to set or get the status or information of the message queue.

The prototype of the msgctl function is as follows:

int msgctl(int msqid, int cmd, struct msqid_ds *buf);

We can get IPC status, IPC information, etc. through some commands.
These states or information will be placed in the structure struct msqid_ds,

struct msqid_ds {
               struct ipc_perm msg_perm;     /* Owner and permissions */
               time_t          msg_stime;    /* The last time the message was sent*/
               time_t          msg_rtime;    /* The last time the message was received */
               time_t          msg_ctime;    /* The time of the last change, the change here refers to the IPC SET*/
               unsigned long   __msg_cbytes; /* The number of bytes currently in the queue (nonstandard) */
               msgqnum_t       msg_qnum;     /* The current number of messages in the queue */
               msglen_t        msg_qbytes;   /* The maximum number of bytes allowed by the queue */
               pid_t           msg_lspid;    /* The process ID of the last message sent */
               pid_t           msg_lrpid;    /* The process ID of the last message received */
           };

The struct ipc_perm structure has the following members:

struct ipc_perm {
               key_t          __key;       /* Key supplied to msgget(2) */
               uid_t          uid;         /* Effective UID of owner */
               gid_t          gid;         /* Effective GID of owner */
               uid_t          cuid;        /* Effective UID of creator */
               gid_t          cgid;        /* Effective GID of creator */
               unsigned short mode;        /* Permissions */
               unsigned short __seq;       /* Sequence number */
           };

I will not explain the above structure, and it may be better (more appropriate) for everyone to understand English.

We can use the following commands to manipulate message queues, which are briefly described here:

IPC_STAT: This command can obtain the status of the specified IPC, and all status information is returned through the above msqid_ds structure.

IPC_SET: This command can set some properties of IPC, such as:

msg_qbytes,
msg_perm.uid, 
msg_perm.gid,  
msg_perm.mode.

IPC_RMID: This command can delete the message queue.

IPC_INFO: This command can obtain the system's restrictions and parameters on the message queue, and this information is returned through the following structure (pay attention to the cast type to the type of the third parameter):

struct msginfo {
                      int msgpool; /* Size in kibibytes of buffer pool
                                      used to hold message data;
                                      unused within kernel */
                      int msgmap;  /* Maximum number of entries in message
                                      map; unused within kernel */
                      int msgmax;  /* Maximum number of bytes that can be
                                      written in a single message */
                      int msgmnb;  /* Maximum number of bytes that can be
                                      written to queue; used to initialize
                                      msg_qbytes during queue creation
                                      (msgget(2)) */
                      int msgmni;  /* Maximum number of message queues */
                      int msgssz;  /* Message segment size;
                                      unused within kernel */
                      int msgtql;  /* Maximum number of messages on all queues
                                      in system; unused within kernel */
                      unsigned short int msgseg;
                                   /* Maximum number of segments;
                                      unused within kernel */
                  };

MSG_INFO: This command can get the system resource information consumed by the message queue and return it through the msginfo structure (note that the type is cast to the type of the third parameter):

msgpool - The number of message queues currently existing in the system
msgmap - The total number of messages in all current message queues in the system
msgtql - The total number of bytes of all messages in all message queues of the system

MSG_STAT: This command can get information about all message queues, and the information is returned through the msqid_ds structure.

Summarize

This article introduces the role of the message queue in detail, how to achieve inter-process communication through it; and introduces some operations on the message queue.

Posted by kaisaj on Thu, 05 May 2022 18:04:54 +0300