进程间通信--POSIX消息队列

相关函数:

 mqd_t mq_open(const char *name, int oflag);
mqd_t mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);
mqd_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
mqd_t mq_close(mqd_t mqdes);
mqd_t mq_unlink(const char *name);

POSIX通信机制比System V的更加通用!

一.消息队列管道和FIFO的区别,主要有以下两点:

①一个进程向消息队列写入消息之前,并不需要某个进程在该队列上等待该消息的到达,而管道和FIFO是相反的,进程向其中写消息时,
管道和FIFO必需已经打开来读,那么内核会产生SIGPIPE信号。
②IPC的持续性不同。管道和FIFO是随进程的持续性,当管道和FIFO最后一次关闭发生时,仍在管道和FIFO中的数据会被丢弃。消息队列
是随内核的持续性,即一个进程向消息队列写入消息后,然后终止,另外一个进程可以在以后某个时刻打开该队列读取消息。只要内核没
有重新自举,消息队列没有被删除。

二.消息队列中的每条消息通常具有以下属性:
  一个表示优先级的整数;
  消息的数据部分的长度;
  消息数据本身;

进程间通信--POSIX消息队列

/*成功返回消息队列描述符,失败返回-1*/
1. mqd_t mq_open(const char *name, int oflag);
/*成功返回0,失败返回-1*/
1. mqd_t mq_open(const char *name, int oflag, mode_t mode,struct mq_attr *attr);

mq_open用于打开或创建一个消息队列
name:表示消息队列的名字,它符合POSIX IPC的名字规则。
oflag:表示打开的方式,和open函数的类似。
有必须的选项:O_RDONLY,O_WRONLY,O_RDWR,还有可选的选项:O_NONBLOCK,O_CREAT,O_EXCL。
mode:是一个可选参数,在oflag中含有O_CREAT标志且消息队列不存在时,才需要提供该参数。表示默认访问权限。可以参考open。
attr:也是一个可选参数,在oflag中含有O_CREAT标志且消息队列不存在时才需要。该参数用于给新队列设定某些属性,
如果是空指针,那么就采用默认属性。
mq_open返回值是mqd_t类型的值,被称为消息队列描述符,类型为整型.

2. mqd_t mq_close(mqd_t mqdes);
3. mqd_t mq_unlink(const char *name);

mq_close:用于关闭一个消息队列,和文件的close类似,关闭后,消息队列并不从系统中删除。一个进程结束,会自动关闭打开着的
消息队列。
mq_unlink:用于删除一个消息队列。消息队列创建后只有通过调用该函数或者是内核自举才能进行删除。每个消息队列都有一个保存
当前打开着描述符数的引用计数器,和文件一样,因此本函数能够实现类似于unlink函数删除一个文件的机制。
POSIX消息队列的名字所创建的真正路径名和具体的系统实现有关,关于具体POSIX IPC的名字规则可以参考《UNIX 网络编程 卷2:进程间通信》的P14。
经过测试,在Linux 2.6.18中,所创建的POSIX消息队列不会在文件系统中创建真正的路径名。且POSIX的名字只能以一个’/’开头,名字中不能包含其他的’/’。

2 POSIX消息队列的属性
POSIX标准规定消息队列属性mq_attr必须要含有以下四个内容:
long mq_flags //消息队列的标志:0或O_NONBLOCK,用来表示是否阻塞
long mq_maxmsg //消息队列的最大消息数
long mq_msgsize //消息队列中每个消息的最大字节数
long mq_curmsgs //消息队列中当前的消息数目

mq_attr结构的定义如下:
#include <bits/mqueue.h>
struct mq_attr {
  long int mq_flags;    /* Message queue flags. */
  long int mq_maxmsg;  /* Maximum number of messages. */
  long int mq_msgsize; /* Maximum message size. */
  long int mq_curmsgs; /* Number of messages currently queued. */
  long int __pad[4];
};

POSIX消息队列的属性设置和获取通过下面两个函数实现:
#include <mqueue.h>
mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr);
mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);

成功返回0,失败返回-1
mq_getattr:用于获取当前消息队列的属性。
mq_setattr:用于设置当前消息队列的属性。其中mq_setattr中的oldattr用于保存修改前的消息队列的属性,可以为空。
mq_setattr可以设置的属性只有mq_flags,用来设置或清除消息队列的非阻塞标志。newattr结构的其他属性被忽略。mq_maxmsg
和mq_msgsize属性只能在创建消息队列时通过mq_open来设置。mq_open只会设置该两个属性,忽略另外两个属性。mq_curmsgs属
性只能被获取而不能被设置。

三. POSIX消息队列的使用
POSIX消息队列可以通过以下两个函数来进行发送和接收消息:
#include <mqueue.h>
mqd_t mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);

如果消息队列已满(即队列上的消息数量等于队列的mq_maxmsg属性),那么默认情况下,mq_send()阻塞,直到有足够的空间可用于允许消息排队或直到调用被信
号处理程序中断。 如果为消息队列描述启用了O_NONBLOCK标志,则调用会立即失败返回,并errno为EAGAIN。

成功返回0,出错返回-1

mqd_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);

如果队列为空,那么默认情况下,mq_receive()阻塞,直到消息变为可用,或者调用被信号处理程序中断。 如果为消息队列描述启用了O_NONBLOCK标志,
那么调用会立即失败,errno为错误EAGAIN。

成功返回接收到消息的字节数,出错返回-1

#ifdef __USE_XOPEN2K
mqd_t mq_timedsend(mqd_t mqdes, const char *msg_ptr,size_t msg_len, unsigned msg_prio, const struct timespec *abs_timeout);
mqd_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio, const struct timespec *abs_timeout);
#endif

mq_send(): 向消息队列中写入一条消息
mq_receive(): 从消息队列中读取一条消息。
参数:
mqdes:消息队列描述符;
msg_ptr:指向消息体缓冲区的指针;
msg_len:消息体的长度,其中mq_receive的该参数不能小于能写入队列中消息的最大长度,即一定要大于等于该队列的mq_attr结构
中mq_msgsize。如果mq_receive中的msg_len小于该值,就会返回EMSGSIZE错误。POXIS消息队列发送的消息长度可以为0。
msg_prio:消息的优先级;它是一个小于MQ_PRIO_MAX的数,数值越大,优先级越高。
POSIX消息队列在调用mq_receive时总是返回队列中最高优先级的最早的消息。如果消息不需要设定优先级,那么可以在mq_send是置msg_prio
为0,mq_receive的msg_prio置为NULL。
默认情况下mq_send和mq_receive是阻塞进行调用,可以通过mq_setattr来设置为O_NONBLOCK。

还有两个XSI定义的扩展接口限时发送和接收消息的函数:mq_timedsend和mq_timedreceive函数。

 /* send.c */
#include <mqueue.h>
#include <stdbool.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <errno.h>
#include <string.h> #define MAXSIZE 1024
int main(int argc,char**argv)
{
if(argc!=)
{
printf("Usage: %s /mqname \n", argv[]);
return -;
}
char Msg[MAXSIZE]="Hello World";
char *name = argv[]; int flags = O_RDWR | O_CREAT | O_EXCL ;
mode_t mode = S_IRUSR | S_IWUSR| S_IRGRP |S_IROTH; struct mq_attr attr;
attr.mq_flags=;
attr.mq_maxmsg=;
attr.mq_msgsize=sizeof(Msg);
attr.mq_curmsgs=; mqd_t mqid = mq_open(name,flags,mode,&attr);
if(mqid==-)
{
printf("mq_open() failed! error %s (%d)\r\n",strerror(errno),errno);
return -;
} int i;
for(i=;i<;i++)
{
if(mq_send(mqid,Msg,strlen(Msg),i)==-)
{
perror("mq_send error");
return -;
}
} mq_close(mqid); return ;
}
 /* recv.c */
#include <mqueue.h>
#include <stdbool.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <errno.h>
#include <string.h> int main(int argc,char**argv)
{
if(argc !=)
{
printf("Usage: %s /mqname \r\n", argv[]);
return -;
}
const int MAXSIZE =;
char RecvBuff[MAXSIZE];
int prio;
ssize_t n;
char* name = argv[];
int flags = O_RDONLY; memset(RecvBuff, , sizeof(RecvBuff));
mqd_t mqid = mq_open(name, flags); struct mq_attr attr;
if(mqid == -)
{
printf("mq_open error %s (%d)\r\n",strerror(errno),errno);
return -;
}
while(true)
{
if(mq_getattr(mqid,&attr) == -)
{
printf("get attr error\r\n");
break;
}
if(attr.mq_curmsgs == (long))
{
printf("no messages in queue\r\n");
break;
} if((n = mq_receive(mqid,RecvBuff,sizeof(RecvBuff),&prio)) == -)
{
perror("mq_receive error");
return -;
}
printf("read %ld bytes\r\n", (long)n);
printf("prio is %d\r\n", prio);
printf("%s \r\n\n", RecvBuff);
} mq_close(mqid);
mq_unlink(name); return ;
}
/* Makefile */

all:
gcc send.c -o send -lrt
gcc recv.c -o recv -lrt
/*运行结果*/

read  bytes
prio is
Hello World read bytes
prio is
Hello World read bytes
prio is
Hello World read bytes
prio is
Hello World read bytes
prio is
Hello World
上一篇:SAP 金税接口代码 供参考


下一篇:有关binlog的那点事(三)(mysql5.7.13)