标签:ipc
#include <fcntl.h> /* For O_* constants */
#include <sys/stat.h> /* For mode constants */
#include <mqueue.h>
mqd_t mq_open(const char *name, int oflag);
mqd_t mq_open(const char *name, int oflag, mode_t mode,
struct mq_attr *attr);
Link with -lrt.
#include <mqueue.h>
int mq_close(mqd_t mqdes);
Link with -lrt.
#include <mqueue.h>
int mq_unlink(const char *name);
Link with -lrt.#include <mqueue.h> int mq_getattr(mqd_t mqdes, struct mq_attr *attr); int mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr); Link with -lrt.
struct mq_attr {
long mq_flags; /* Flags: 0 or O_NONBLOCK */
long mq_maxmsg; /* Max. # of messages on queue */
long mq_msgsize; /* Max. message size (bytes) */
long mq_curmsgs; /* # of messages currently in queue */
};
#include <mqueue.h>
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
Link with -lrt.
int sln_ipc_mq_loop(void) { mqd_t mqd; struct mq_attr setattr, attr; char *recvbuf = NULL; unsigned int prio; int recvlen; setattr.mq_maxmsg = SLN_IPC_MQ_MAXMSG; setattr.mq_msgsize = SLN_IPC_MQ_MSGSIZE; mqd = mq_open(SLN_IPC_MQ_NAME, O_RDWR | O_CREAT | O_EXCL, 0644, &setattr); //创建消息队列并设置消息队列属性 if ((mqd < 0) && (errno != EEXIST)) { fprintf(stderr, "mq_open: %s\n", strerror(errno)); return -1; } if ((mqd < 0) && (errno == EEXIST)) { // 消息队列存在则打开 mqd = mq_open(SLN_IPC_MQ_NAME, O_RDWR); if (mqd < 0) { fprintf(stderr, "mq_open: %s\n", strerror(errno)); return -1; } } if (mq_getattr(mqd, &attr) < 0) { //获取消息队列属性 fprintf(stderr, "mq_getattr: %s\n", strerror(errno)); return -1; } printf("flags: %ld, maxmsg: %ld, msgsize: %ld, curmsgs: %ld\n", attr.mq_flags, attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs); recvbuf = malloc(attr.mq_msgsize); //为读取消息队列分配当前系统允许的每条消息的最大大小的内存空间 if (NULL == recvbuf) { return -1; } for (;;) { recvlen = mq_receive(mqd, recvbuf, attr.mq_msgsize, &prio); //从消息队列中读取消息 if (recvlen < 0) { fprintf(stderr, "mq_receive: %s\n", strerror(errno)); continue; } printf("recvive length: %d, prio: %d, recvbuf: %s\n", recvlen, prio, recvbuf); } return 0; }
int sln_ipc_mq_send(const char *sendbuf, int sendlen, int prio)
{
mqd_t mqd;
mqd = mq_open(SLN_IPC_MQ_NAME, O_WRONLY); //客户进程打开消息队列
if (mqd < 0) {
fprintf(stderr, "mq_open: %s\n", strerror(errno));
return -1;
}
if (mq_send(mqd, sendbuf, sendlen, prio) < 0) { //客户进程网消息队列中添加一条消息
fprintf(stderr, "mq_send: %s\n", strerror(errno));
return -1;
}
return 0;
}
int sln_ipc_mq_loop(void)
{
mqd_t mqd;
struct mq_attr setattr, attr;
char *recvbuf = NULL;
unsigned int prio;
int recvlen;
memset(&setattr, 0, sizeof(setattr));
setattr.mq_maxmsg = SLN_IPC_MQ_MAXMSG;
setattr.mq_msgsize = SLN_IPC_MQ_MSGSIZE;
mqd = mq_open(SLN_IPC_MQ_NAME, O_RDWR | O_CREAT | O_EXCL, 0644, &setattr);
//mqd = mq_open(SLN_IPC_MQ_NAME, O_RDWR | O_CREAT | O_EXCL, 0644, NULL);
if ((mqd < 0) && (errno != EEXIST)) {
fprintf(stderr, "mq_open: %s\n", strerror(errno));
return -1;
}
if ((mqd < 0) && (errno == EEXIST)) { // name is exist
mqd = mq_open(SLN_IPC_MQ_NAME, O_RDWR);
if (mqd < 0) {
fprintf(stderr, "mq_open: %s\n", strerror(errno));
return -1;
}
}
if (mq_getattr(mqd, &attr) < 0) {
fprintf(stderr, "mq_getattr: %s\n", strerror(errno));
return -1;
}
printf("flags: %ld, maxmsg: %ld, msgsize: %ld, curmsgs: %ld\n",
attr.mq_flags, attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);
recvbuf = malloc(attr.mq_msgsize);
if (NULL == recvbuf) {
return -1;
}
sleep(10); //此处等待10秒,此时客户进程一次性向消息队列加入多条消息
for (;;) {
if (mq_getattr(mqd, &attr) < 0) {
fprintf(stderr, "mq_getattr: %s\n", strerror(errno));
return -1;
}
printf("msgsize: %ld, curmsgs: %ld\n", attr.mq_msgsize, attr.mq_curmsgs);
recvlen = mq_receive(mqd, recvbuf, attr.mq_msgsize, &prio);
if (recvlen < 0) {
fprintf(stderr, "mq_receive: %s\n", strerror(errno));
continue;
}
printf("recvive-> prio: %d, recvbuf: %s\n", prio, recvbuf);
sleep(1); //每秒处理一个消息
}
mq_close(mqd);
return 0;
}
# ./server flags: 0, maxmsg: 10, msgsize: 1024, curmsgs: 0 msgsize: 1024, curmsgs: 10 recvive-> prio: 10, recvbuf: asdf msgsize: 1024, curmsgs: 10 recvive-> prio: 11, recvbuf: 1234 msgsize: 1024, curmsgs: 10 recvive-> prio: 12, recvbuf: asdf msgsize: 1024, curmsgs: 9 recvive-> prio: 9, recvbuf: 1234 msgsize: 1024, curmsgs: 8 recvive-> prio: 8, recvbuf: asdf msgsize: 1024, curmsgs: 7 recvive-> prio: 7, recvbuf: 1234 msgsize: 1024, curmsgs: 6 recvive-> prio: 6, recvbuf: asdf msgsize: 1024, curmsgs: 5 recvive-> prio: 5, recvbuf: 1234 msgsize: 1024, curmsgs: 4 recvive-> prio: 4, recvbuf: asdf msgsize: 1024, curmsgs: 3 recvive-> prio: 3, recvbuf: 1234 msgsize: 1024, curmsgs: 2 recvive-> prio: 2, recvbuf: asdf msgsize: 1024, curmsgs: 1 recvive-> prio: 1, recvbuf: 1234 msgsize: 1024, curmsgs: 0
标签:ipc
原文地址:http://blog.csdn.net/shallnet/article/details/41749613