标签:进程间通信 消息队列 message queue systemv消息队列 systemv message queu
System V消息队列相对于POSIX消息队列的区别主要是:
--POSIX消息队列的读操作总是返回消息队列中优先级最高的最早消息,而对于System V消息队列可以返回任意指定优先级(通过消息类型)的消息。
--当向一个空消息队列中写入一个消息时,POSIX消息队列允许产生一个信号或启动一个线程,System V消息队列不提供类似的机制。
系统内核都会为每一个System V消息队列维护一个信息结构,在Linux 2.6.32中的定义如下:
#include<bits/msq.h> /* Obsolete, used only for backwards compatibility and libc5 compiles */ struct msqid_ds { struct ipc_perm msg_perm; /*IPC对象的属性信息和访问权限 */ struct msg *msg_first; /* first message on queue,unused */ struct msg *msg_last; /* last message in queue,unused */ __kernel_time_t msg_stime; /* last msgsnd time */ __kernel_time_t msg_rtime; /* last msgrcv time */ __kernel_time_t msg_ctime; /* last change time */ unsigned long msg_lcbytes; /* Reuse junk fields for 32 bit */ unsigned long msg_lqbytes; /* ditto */ unsigned short msg_cbytes; /* 当前队列中消息的字节数 */ unsigned short msg_qnum; /* number of messages in queue */ unsigned short msg_qbytes; /* 队列允许存放的最大字节数 */ __kernel_ipc_pid_t msg_lspid; /* pid of last msgsnd */ __kernel_ipc_pid_t msg_lrpid; /* last receive pid */ };
/* Include the definition of msqid64_ds */ #include <asm/msgbuf.h> /* message buffer for msgsnd and msgrcv calls */ struct msgbuf { long mtype; /* type of message */ char mtext[1]; /* message text */ };
#include <sys/msg.h> //成功返回非负消息队列描述符,失败返回-1 int msgget(key_t key, int flag);
key:消息队列的键,用来创建一个消息队列。System V IPC都有一个key,作为IPC的外部标识符,创建成功后返回的描述符作为IPC的内部标识符使用。key的主要目的就是使不同进程在同一IPC汇合。key具体说可以有三种方式生成:
· 不同的进程约定好的一个值;
· 通过相同的路径名和项目ID,调用ftok()函数,生成一个键;该函数为key_t ftok(const char *pathname, int proj_id);该函数把从pathname导出的信息与id低8位组合成一个整数IPC键, 调用时pathname必须存在,若不存在ftok调用失败,返回-1,成功返回该整数IPC键值;
· 还可以设置为IPC_PRIVATE,这样就会创建一个新的,唯一的IPC对象;然后将返回的描述符通过某种方式传递给其他进程;
flag:为该消息队列的读写权限组合,可以与IPC_CREAT 或IPC_EXCL相与,其中创建对列时都要使用IPC_CREAT。指定创建或打开消息队列的标志和读写权限(ipc_perm中的mode成员)。我们知道System V IPC定义了自己的操作标志和权限设置标志,而且都是通过该参数传递,这和open函数存在差别,open函数第三个参数mode用于传递文件的权限标志。System V IPC的操作标志包含:IPC_CREAT,IPC_EXCL,权限设置标志如下图:
#include <stdio.h> #include <stdlib.h> #include <errno.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> int main() { key_t lKey; int nMsgId; if ((lKey = ftok("/etc/profile", 1)) == -1) { perror("ftok"); exit(1); } //带参数IPC_CREAT和IPC_EXCL,如果队列不存在则创建队列,已存在则返回EEXIST if ((nMsgId = msgget(lKey, IPC_CREAT | IPC_EXCL | 0666)) == -1) { if (errno != EEXIST)//创建失败且不是由于队列已存在 { perror("msgget"); exit(2); } if ((nMsgId = msgget(lKey, 0)) == -1)//已存在 { perror("msgget"); exit(3); } } printf("MsgID=%d\n", nMsgId); return 0; }输出:
#include <stdio.h> #include <stdlib.h> #include <errno.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> int main() { key_t lKey; int nMsgId; if ((lKey = ftok("/etc/profile", 1)) == -1) { perror("ftok"); exit(1); } //带参数IPC_CREAT和IPC_EXCL,如果队列不存在则创建队列,已存在则返回EEXIST if ((nMsgId = msgget(lKey, IPC_CREAT | IPC_EXCL | 0666)) == -1) { if (errno != EEXIST)//创建失败且不是由于队列已存在 { perror("msgget"); exit(2); } if ((nMsgId = msgget(lKey, 0)) == -1)//已存在 { perror("msgget"); exit(3); } } printf("MsgID=%d\n", nMsgId); return 0; }输出:
实现System V IPC的任何系统都提供两个特殊的程序ipcs和ipcrm。ipcs输出IPC的各种信息,ipcrm则用于删除各种System V IPC。由于System V IPC不属于POSIX标准,所以这两个命令也未被标准化。下面是通过ipcs命令来查看刚刚创建的消息队列。
[root@MiWiFi-R1CM csdnblog]# ipcs -q -i 32769Message Queue msqid=32769
uid=0 gid=0 cuid=0 cgid=0 mode=0666
cbytes=0 qbytes=65536 qnum=0 lspid=0 lrpid=0
send_time=Not set
rcv_time=Not set
change_time=Wed Jun 17 22:49:00 2015
#include <sys/msg.h> //成功返回0,失败返回-1 int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
此函数发送消息到指定的消息对列
msqid:消息队列的描述符;
msgp:指向存放消息的缓冲区,该缓冲区中包含消息类型和消息体两部分内容。该缓冲区的结构是由用户定义的,在<sys/msg.h>中有关于该缓冲区结构定义的参考模版:
struct msgbuf
{
long int mtype; /* type of received/sent message */
char mtext[1]; /* text of the message */
};
缓冲区的开头是一个long型的消息类型,该消息类型必须是一个非负数。紧跟在消息类型后面的是消息体部分(如果消息长度大于0),参考模版中定义的mtext只是说明消息体,该部分可以自定义长度。我们自己的应用都会定义特定的消息结构。
msgsz:缓冲区中消息体部分的长度;
msgflg:设置操作标志。可以为0,IPC_NOWAIT;用于在消息队列中没有可用的空间时,调用线程采用何种操作方式。
标志为IPC_NOWAIT,表示msgsnd操作以非阻塞的方式进行,在消息队列中没有可用的空间时,msgsnd操作会立刻返回。并指定EAGAIN错误;
标志为0,表示msgsnd操作以阻塞的方式进行,这种情况下在消息队列中没有可用的空间时调用线程会被阻塞,直到下面的情况发生:
--等到有存放消息的空间;
--消息队列从系统中删除,这种情况下回返回一个EIDRM错误;
--调用线程被某个捕捉到的信号中断,这种情况下返回一个EINTR错误;
#include <stdio.h> #include <stdlib.h> #include <errno.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> #include <string.h> typedef struct { long int nType; char szText[256]; }MSG; int main() { key_t lKey; int nMsgId; MSG msg; if ((lKey = ftok("/etc/profile", 1)) == -1)//生成键值 { perror("ftok"); exit(1); } if ((nMsgId = msgget(lKey, IPC_CREAT | IPC_EXCL | 0666)) == -1)//创建消息队列 { if (errno != EEXIST) { perror("msgget"); exit(2); } if ((nMsgId = msgget(lKey, 0)) == -1) { perror("msgget"); exit(3); } } memset(&msg, 0x00, sizeof(MSG));//清空队列 msg.nType = 2;//指定消息类型为2 memcpy(msg.szText, "123456", 6);//指定消息内容 if (msgsnd(nMsgId, (const void *)&msg, strlen(msg.szText), IPC_NOWAIT) < 0)//非阻塞发送消息 { perror("msgsnd"); } return 0; }输出:
System V消息队列的读取消息使用下面的函数接口:
#include <sys/msg.h> ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg); //成功返回接收到的消息的消息体的字节数,失败返回-1
msqid:消息队列的描述符;
msgp:指向待存放消息的缓冲区,该缓冲区中将会存放接收到的消息的消息类型和消息体两部分内容。该缓冲区的结构是由用户定义的,和msgsnd相对应。
msgsz:缓冲区中能存放消息体部分的最大长度,这也是该函数能返回的最大数据量;该字段的大小应该是sizeof(msg buffer) - sizeof(long);
msgtyp:希望从消息队列中获取的消息类型。
--msgtyp为0,返回消息队列中的第一个消息;
--msgtyp > 0,返回该消息类型的第一个消息;
--msgtyp < 0,返回小于或等于msgtyp绝对值的消息中类型最小的第一个消息;
msgflg:设置操作标志。可以为0,IPC_NOWAIT,MSG_NOERROR;用于在消息队列中没有可用的指定消息时,调用线程采用何种操作方式。
标志为IPC_NOWAIT,表示msgrcv操作以非阻塞的方式进行,在消息队列中没有可用的指定消息时,msgrcv操作会立刻返回,并设定errno为ENOMSG。
标志为0,表示msgrcv操作是阻塞操作,直到下面的情况发生:
--消息队列中有一个所请求的消息类型可以获取;
--消息队列从系统中删除,这种情况下回返回一个EIDRM错误;
--调用线程被某个捕捉到的信号中断,这种情况下返回一个EINTR错误;
标志为MSG_NOERROR,表示接收到的消息的消息体的长度大于msgsz长度时,msgrcv采取的操作。如果设置了该标志msgrcv在这种情况下回截断数据部分,而不返回错误,否则返回一个E2BIG错误。
测试代码:#include <stdio.h> #include <stdlib.h> #include <errno.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> #include <string.h> typedef struct { long int nType; char szText[256]; }MSG; main() { key_t lKey; int n,nMsgId; MSG msg; if((lKey = ftok("/etc/profile",1)) == -1) { perror("ftok"); exit(1); } if((nMsgId = msgget(lKey,0)) == -1) { perror("ftok"); exit(2); } memset(&msg,0x00,sizeof(MSG)); if((n = msgrcv(nMsgId,(void *)&msg,sizeof(msg.szText),2L,0)) < 0)//从队列接收消息,读出以后就不存在了 { perror("msgrcv"); } else { printf("msgrcv return length=[%d] text=[%s]\n",n,msg.szText);//输出 } return 0; }输出:
对System V消息队列的删除,属性的设置和获取等控制操作要使用下面的函数接口:
#include <sys/msg.h> int msgctl(int msqid, int cmd, struct msqid_ds *buf); //成功返回0,失败返回-1
msqid:消息队列的描述符;
cmd:控制操作的命令,SUS标准提供以下三个命令:
· IPC_RMID,删除msgid指定的消息队列。执行该命令系统会立刻把该消息队列从内核中删除,该消息队列中的所有消息将会被丢弃。对于该命令,buf参数可忽略。
这和已经讨论过的POSIX消息队列有很大差别,POSIX消息队列通过调用mq_unlink来从内核中删除一个消息队列,但消息队列的真正析构会在最后一个mq_close结束后发生。
· IPC_SET,根据buf的所指的值来设置消息队列msqid_ds结构中的msg_perm.uid,msg_perm.gid,msg_perm.mode,msg_qbytes四个成员。
· IPC_STAT,通过buf返回当前消息队列的msqid_ds结构。
· 在Linux下还有例如IPC_INFO,MSG_INFO等命令,具体可以参考Linux手册;
buf:指向msqid_ds结构的指针;
测试代码1:#include <stdio.h> #include <stdlib.h> #include <errno.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> #include <string.h> typedef struct { long int nType; char szText[256]; }MSG; main() { key_t lKey; int n,nMsgId; MSG msg; struct msqid_ds qds; if((lKey = ftok("/etc/profile",1)) == -1) { perror("ftok"); exit(1); } if((nMsgId = msgget(lKey,0)) == -1) { perror("ftok"); exit(2); } memset(&qds,0x00,sizeof(struct msqid_ds)); if(msgctl(nMsgId,IPC_STAT,&qds) < 0)//获取消息队列属性,获取状态放pds中 { perror("msgctl IPC_STAT"); exit(3); } printf("msg_perm.mode=%d\n",qds.msg_perm.mode); qds.msg_perm.mode &= (~0222);//去除消息队列的写权限 if(msgctl(nMsgId,IPC_SET,&qds) < 0)//设置消息队列权限 { perror("msgctl IPC_SET"); exit(4); } memset(&msg,0x00,sizeof(MSG)); msg.nType = 2; memcpy(msg.szText,"12345",5); if(msgsnd(nMsgId,(void *)&msg,5,0) < 0)//发送消息 { perror("msgsnd"); } if(msgctl(nMsgId,IPC_RMID,NULL) < 0)//删除消息 { perror("msgctl IPC_RMID"); exit(5); } return 0; }
说明: (~0222)取反后做与实际上就是去除其他用户的写权限,在C语言中,八进制常用用前缀表示。
测试代码二:
#include <iostream> #include <cstring> #include <errno.h> #include <unistd.h> #include <fcntl.h> #include <sys/msg.h> using namespace std; #define PATH_NAME "/tmp/anonymQueue" key_t CreateKey(const char *pathName) { int fd; if ((fd = open(PATH_NAME, O_CREAT, 0666)) < 0) { cout<<"open file "<<PATH_NAME<<"failed."; cout<<strerror(errno)<<endl; return -1; } close(fd); return ftok(PATH_NAME, 0); } int main(int argc, char **argv) { key_t key; key = CreateKey(PATH_NAME); int msgID; if ((msgID = msgget(key, IPC_CREAT | 0666)) < 0) { cout<<"open message queue failed..."; cout<<strerror(errno)<<endl; return -1; } msqid_ds msgInfo; msgctl(msgID, IPC_STAT, &msgInfo); cout<<"msg_qbytes:"<<msgInfo.msg_qbytes<<endl; cout<<"msg_qnum:"<<msgInfo.msg_qnum<<endl; cout<<"msg_cbytes:"<<msgInfo.msg_cbytes<<endl; return 0; }输出:
关于消息队列中允许存放最大的字节数可以通过IPC_SET命令进行修改,该修改只能针对本消息队列生效。如下测试代码:
int main(int argc, char **argv) { key_t key; key = CreateKey(PATH_NAME); int msgID; if ((msgID = msgget(key, IPC_CREAT | 0666)) < 0) { cout<<"open message queue failed..."; cout<<strerror(errno)<<endl; return -1; } msqid_ds msgInfo; msgctl(msgID, IPC_STAT, &msgInfo); cout<<"msg_qbytes:"<<msgInfo.msg_qbytes<<endl; cout<<"msg_qnum:"<<msgInfo.msg_qnum<<endl; cout<<"msg_cbytes:"<<msgInfo.msg_cbytes<<endl; msgInfo.msg_qbytes = 6553600; if (msgctl(msgID, IPC_SET, &msgInfo) < 0) { cout<<"set message queue failed..."; cout<<strerror(errno)<<endl; return -1; } msgctl(msgID, IPC_STAT, &msgInfo); cout<<"msg_qbytes:"<<msgInfo.msg_qbytes<<endl; cout<<"msg_qnum:"<<msgInfo.msg_qnum<<endl; cout<<"msg_cbytes:"<<msgInfo.msg_cbytes<<endl; return 0; }输出:
对System V IPC,系统往往会存在一些限制,对于消息队列,在Linux2.6.32中,系统内核存在以下限制:
[root@MiWiFi-R1CM csdnblog]# sysctl -a|grep msg
kernel.msgmax = 65536 //每个消息的最大字节数
kernel.msgmni = 1736 //系统范围内允许存在的最大消息队列数
kernel.msgmnb = 65536 //一个消息队列上允许的最大字节数
kernel.auto_msgmni = 1
fs.mqueue.msg_max = 10
fs.mqueue.msgsize_max = 8192
fs.mqueue.msg_default = 10
fs.mqueue.msgsize_default = 8192
对于System V消息队列一般内核还有一个限制:系统范围内的最大消息数,在Linux下这个限制由msgmnb*msgmni决定。
上面已经说过可以通过IPC_SET来设置使用中的消息队列的最大字节数。但是要在系统范围内对内核限制进行修改,在Linux下面可以通过修改/etc/sysctl.conf内核参数配置文件,然后配合sysctl命令来对内核参数进行设置。
标签:进程间通信 消息队列 message queue systemv消息队列 systemv message queu
原文地址:http://blog.csdn.net/shltsh/article/details/46539335