码迷,mamicode.com
首页 > 其他好文 > 详细

消息队列、信号量、共享存储

时间:2015-07-07 22:55:25      阅读:153      评论:0      收藏:0      [点我收藏+]

标签:消息队列   信号量   共享存储   

消息队列、信号量、共享存储是IPC进程间通信的三种形式,它们功能不同,但有一些相似点,下面先介绍它们相类似的特征,然后再逐一说明。

1、相似点

每个内核中的IPC结构(消息队列、信号量、共享存储)都用一个非负整数的标识符加以引用,与文件描述符不同,当一个IPC结构被创建,以后又被删除时,与这种结构相关的标识符连续加1,直至达到一个整型数的最大正直,然后又回转到0。标识符是IPC对象的内部名,还有一个外部名称为键,数据类型是key_t,用于多个合作进程能够在同一IPC对象上会合,键由内核转换成标识符。ftok函数可以把一个路径名和一个项目ID转换为一个键,如下:

#include <sys/ipc.h> 

key_t ftok(const char *pathname, int proj_id);

消息队列、信号量及共享存储都有自己的get函数,msgget、semget及shmget,它们创建IPC结构的方法类似,通常由服务器进程创建。

每一个IPC结构都设置了一个ipc_perm结构,该结构规定了权限和所有者:

struct ipc_perm { 
               key_t          __key;       /* Key supplied to msgget(2) */ 
               uid_t          uid;         /* Effective UID of owner */ 
               gid_t          gid;         /* Effective GID of owner */ 
               uid_t          cuid;        /* Effective UID of creator */ 
               gid_t          cgid;        /* Effective GID of creator */ 
               unsigned short mode;        /* Permissions */ 
               unsigned short __seq;       /* Sequence number */ 
};

在创建IPC结构时,对所有字段都赋初值,以后可以调用msgctl、semctl及shmctl修改uid、gid和mode字段,为了改变这些值,调用进程必须是IPC结构的创建者或超级用户。

消息队列、信号量、共享存储都有自己的内置限制,这些限制的大多数可以通过重新配置内核而加以更改,如sysctl命令。在Linux中,可以运行“ipcs -l”命令查看相关的限制,如下:

$ipcs -l
------ Shared Memory Limits -------- 
max number of segments = 4096 
max seg size (kbytes) = 32768 
max total shared memory (kbytes) = 8388608 
min seg size (bytes) = 1 

------ Semaphore Limits -------- 
max number of arrays = 128 
max semaphores per array = 250 
max semaphores system wide = 32000 
max ops per semop call = 32 
semaphore max value = 32767 

------ Messages Limits -------- 
max queues system wide = 31844 
max size of message (bytes) = 8192 
default max size of queue (bytes) = 16384

2、消息队列

#include <sys/types.h> 
#include <sys/ipc.h> 
#include <sys/msg.h> 

int msgget(key_t key, int msgflg);
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg); 
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, 
                      int msgflg); 

struct msqid_ds { 
               struct ipc_perm msg_perm;     /* Ownership and permissions */ 
               time_t          msg_stime;    /* Time of last msgsnd(2) */ 
               time_t          msg_rtime;    /* Time of last msgrcv(2) */ 
               time_t          msg_ctime;    /* Time of last change */ 
               unsigned long   __msg_cbytes; /* Current number of bytes in 
                                                queue (nonstandard) */ 
               msgqnum_t       msg_qnum;     /* Current number of messages 
                                                in queue */ 
               msglen_t        msg_qbytes;   /* Maximum number of bytes 
                                                allowed in queue */ 
               pid_t           msg_lspid;    /* PID of last msgsnd(2) */ 
               pid_t           msg_lrpid;    /* PID of last msgrcv(2) */ 
};

消息队列是消息的链接表,存放在内核中并由消息队列标识符标识。msgget用于创建一个新队列或打开一个现存的队列。msgsnd将新消息添加到队列尾端。msgrcv用于从队列中取消息。每个消息队列都有一个msqid_dt结构与其相关联,msgctl函数则可以对消息队列的这种结构进行操作。
下面例子说明消息队列的用法,msgsnd.c发送消息,msgrcv.c接收消息,当输入“quit”时结束。

// msgsnd.c 
#include <stdio.h> 
#include <stdlib.h> 
#include <stdbool.h> 
#include <string.h> 
#include <errno.h> 
#include <sys/msg.h> 
#include <unistd.h> 

struct msg_st 
{ 
    long int msg_type; 
    char text[BUFSIZ]; 
}; 

int main(void) 
{ 
    struct msg_st data; 
    data.msg_type = 1; 
    char buf[BUFSIZ]; 
    key_t akey = 1000; 
    int msgid = -1; 
    bool running = true; 

    // create message queue 
    msgid = msgget(akey, 0666 | IPC_CREAT); 
    if (-1 == msgid) { 
        fprintf(stderr, "msgget failed with error: %d\n", errno); 
        exit(EXIT_FAILURE); 
    } 

    // loop for sending data to message queue 
    while (running) { 
        printf("Input text: "); 
        fgets(buf, BUFSIZ, stdin); 
        strcpy(data.text, buf); 
        // send data 
        if (-1 == msgsnd(msgid, (void*)&data, BUFSIZ, 0)) 
        { 
            fprintf(stderr, "msgsnd failed\n"); 
            exit(EXIT_FAILURE); 
        } 
        // input "quit" to finish 
        if(0 == strncmp(buf, "quit", 4)) { 
            running = false; 
        } 
        sleep(1); 
    } 

    exit(EXIT_SUCCESS); 
} 

// msgrcv.c 
#include <stdio.h> 
#include <stdlib.h> 
#include <stdbool.h> 
#include <string.h> 
#include <errno.h> 
#include <sys/msg.h> 
#include <unistd.h> 

struct msg_st 
{ 
    long int msg_type; 
    char text[BUFSIZ]; 
}; 

int main(void) 
{ 

    struct msg_st data; 
    data.msg_type = 0; 
    key_t akey = 1000; 
    int msgid = -1; 
    bool running = true; 

    // create messge queue 
    msgid = msgget(akey, 0666 | IPC_CREAT); 
    if (-1 == msgid) { 
        fprintf(stderr, "msgget failed with error: %d\n", errno); 
        exit(EXIT_FAILURE); 
    } 

    // loop for getting data from message queue 
    while (running) { 
        // receive data 
        if(-1 == msgrcv(msgid, (void*)&data, BUFSIZ, data.msg_type, 0)) 
        { 
            fprintf(stderr, "msgrcv failed with errno: %d\n", errno); 
            exit(EXIT_FAILURE); 
        } 
        printf("Receive text: %s\n",data.text); 
        // receive "quit" to finish 
        if(0 == strncmp(data.text, "quit", 4)) { 
            running = false; 
        } 
    } 

    // delete message queue 
    if (-1 == msgctl(msgid, IPC_RMID, 0)) 
    { 
        fprintf(stderr, "msgctl(IPC_RMID) failed\n"); 
        exit(EXIT_FAILURE); 
    } 

    exit(EXIT_SUCCESS); 
} 

3、信号量

#include <sys/types.h> 
#include <sys/ipc.h> 
#include <sys/sem.h> 

int semget(key_t key, int nsems, int semflg);
int semctl(int semid, int semnum, int cmd, ...); 
int semop(int semid, struct sembuf *sops, unsigned nsops);

信号量是一个计数器,用于多进程对共享数据对象的访问。为了获得共享资源,进程需要执行下列操作:
(1)测试控制该资源的信号量。
(2)若此信号量的值为正,则进程可以使用该资源,进程将信号量值减1,表示它使用了一个资源单位。
(3)若此信号量的值为0,则进程进入休眠状态,直至信号量值大于0,进程被唤醒后,它返回第(1)步。

当进程不再使用由一个信号量控制的共享资源时,该信号量值增1,如果有进程正在休眠等待此信号量,则唤醒它们。为了正确地实现信号量,信号量值的测试及减1操作应当是原子操作,为此,信号量通常是在内核中实现的。常用的信号量形式被成为二元信号量或双态信号量,它控制单个资源,初始值为1,但是一般而言,信号量的初值可以是任一正值,该值说明有多少个共享资源单位是在内核中实现的。

semop函数是个原子操作,自动执行信号量集合上的操作数组。

如果拿带undo(SEM_UNDO标志,进程终止时内核检查尚未处理的信号量调整值并作对应的处理)的信号量与记录锁比较,在Linux上,记录锁耗时较长。虽然记录锁慢于信号量锁,但如果只需锁一个资源(例如共享存储)并且不需要使用信号量的所有花哨的功能,则宁可使用记录锁。理由是使用简易,且进程终止时系统会处理任何遗留下来的锁。

先来看一个不使用信号量的例子:

// semaphore2.c 
#include <stdio.h> 
#include <stdlib.h> 

int main(int argc, char *argv[]) 
{ 
    char message = ‘X‘; 
    int i = 0;   

    if (argc > 1) { 
        message = argv[1][0]; 
    } 

    for (i = 0; i < 10; ++i) { 
        printf("%c", message); 
        fflush(stdout); 
        sleep(rand() % 3); 
        printf("%c", message); 
        fflush(stdout); 
        sleep(rand() % 3); 
    } 

    sleep(10); 
    printf("\n%d - finished\n", getpid()); 

    exit(EXIT_SUCCESS); 
}

编译运行:

$gcc -o sem semaphore2.c
$./sem A & ./sem
[1] 5647 
AXAXAXAXXAAXAXAXAAXXAXXAXAAXAXAXAXAXAXXA 

5648 - finished 
5647 - finished 
[1]+  Done                    ./sem A 

一个进程在for循环中连续两次输出A,并启动到后台运行,另一个进程在for循环中连续两次输出X,从上面的结果可以看出,它们相互竞争,结果是乱序的,并不是两个连续的A或者X,下面用信号量改写上面的例子:

#include <unistd.h> 
#include <sys/types.h> 
#include <sys/stat.h> 
#include <fcntl.h> 
#include <stdlib.h> 
#include <stdio.h> 
#include <string.h> 
#include <sys/sem.h> 

union semun 
{ 
    int val; 
    struct semid_ds *buf; 
    unsigned short *arry; 
}; 

static int sem_id = 0; 

int main(int argc, char *argv[]) 
{ 
    key_t akey = 1000; 
    char message = ‘X‘; 
    int i = 0; 

    // create semaphore 
    sem_id = semget(akey, 1, 0666 | IPC_CREAT); 
    if (-1 == sem_id) { 
        fprintf(stderr, "Failed to create semaphore\n"); 
        exit(EXIT_FAILURE); 
    } 

    if (argc > 1) { 
        // semaphore initialization, must 
        union semun sem_union; 
        sem_union.val = 1; 
        if (-1 == semctl(sem_id, 0, SETVAL, sem_union)) { 
            fprintf(stderr, "Failed to initialize semaphore\n"); 
            exit(EXIT_FAILURE); 
        } 

        message = argv[1][0]; 
        sleep(1); 
    } 

    for (i = 0; i < 10; ++i) { 
        // go into critical zone 
        struct sembuf sem_i; 
        sem_i.sem_num = 0; 
        sem_i.sem_op = -1; 
        sem_i.sem_flg = SEM_UNDO; 
        if (-1 == semop(sem_id, &sem_i, 1)) 
        { 
            perror("semop in failed\n"); 
            exit(EXIT_FAILURE); 
        } 

        printf("%c", message); 
        fflush(stdout); 
        sleep(rand() % 3); 
        printf("%c", message); 
        fflush(stdout); 

        // leave critical zone 
        struct sembuf sem_o; 
        sem_o.sem_num = 0; 
        sem_o.sem_op = 1; 
        sem_o.sem_flg = SEM_UNDO; 
        if (-1 == semop(sem_id, &sem_o, 1)) { 
            perror("semop out failed\n"); 
            exit(EXIT_FAILURE); 
        } 

        sleep(rand() % 3); 
    } 

    sleep(10); 
    printf("\n%d - finished\n", getpid()); 

    if (argc > 1) { 
        // delete samaphore 
        sleep(3); 
        union semun sem_union; 
        if (-1 == semctl(sem_id, 0, IPC_RMID, sem_union)) { 
            fprintf(stderr, "Failed to delete semaphore\n"); 
        } 
    } 

    exit(EXIT_SUCCESS); 
} 

执行结果如下:

XXAAXXAAXXAAXXAAXXAAXXAAXXAAXXAAXXXXAAAA 

可见,使用了信号量,输出结果符合预期,两个A或者两个X连在了一起。

4、共享存储

#include <sys/ipc.h> 
#include <sys/shm.h> 

int shmget(key_t key, size_t size, int shmflg);
int shmctl(int shmid, int cmd, struct shmid_ds *buf);
void *shmat(int shmid, const void *shmaddr, int shmflg);
int shmdt(const void *shmaddr);

共享存储允许两个或多个进程共享一给定的存储区,因为数据不需要在客户进程和服务器进程之间复制,所以这是最快的一种IPC。使用共享存储时需要掌握的唯一窍门是多个进程之间对一给定存储区的同步访问,若服务器进程正在将数据放入共享存储区,则在它做完这一操作之前,客户进程不应当去取这些数据,通常,信号量被用来实现对共享存储访问的同步。

shmat函数用于将共享存储段连接到调用进程指定的地址shmaddr上,但一般应指定shmaddr为0,内核会自动选择合适的地址。当对共享存储段的操作已经结束时,则调用shmdt脱接该段。

#include <fcntl.h> 
#include <stdlib.h> 
#include <stdio.h> 
#include <string.h> 
#include <sys/shm.h> 

#define SIZE 1024 
#define exit_err(str) do { perror(str); exit(EXIT_FAILURE); } while (0); 
#define uint32 unsigned long 

int main(void) 
{ 
    int shmid; 
    char *shmptr; 
    key_t key; 
    pid_t pid; 

    if ((pid = fork()) < 0) { 
        exit_err("fork error"); 
    } 

    if(0 == pid) { 
        printf("child process\n"); 

        if ((key = ftok("/dev/null", O_RDWR)) < 0) { 
            exit_err("ftok error"); 
        } 

        if ((shmid = shmget(key, SIZE, 0600 | IPC_CREAT)) < 0) { 
            exit_err("shmget error"); 
        } 

        if ((shmptr = (char*)shmat(shmid, 0, 0)) == (void*)-1) { 
           exit_err("shmat error"); 
        } 

        sleep(1); 
        printf("child pid is %d, share memory from %lx to %lx, content: %s\n",getpid(), (uint32)shmptr, (uint32)(shmptr + SIZE), shmptr); 
        sleep(1); 

        if ((shmctl(shmid, IPC_RMID, 0) < 0)) { 
             exit_err("shmctl error"); 
        } 
        exit(EXIT_SUCCESS); 
    } 
    else { 
        printf("parent process\n"); 

        if ((key = ftok("/dev/null", O_RDWR)) < 0) { 
            exit_err("ftok error");    
        } 

        if ((shmid = shmget(key, SIZE, 0600 | IPC_CREAT | IPC_EXCL)) < 0) { 
          exit_err("shmget error"); 
        } 

        if((shmptr = (char*)shmat(shmid, 0, 0)) == (void*)-1) { 
          exit_err("shmat error"); 
        } 

        memcpy(shmptr, "hello world", sizeof("hello world")); 
        printf("parent pid is %d, share memory from %lx to %lx, content: %s\n",getpid(),(uint32)shmptr, (uint32)(shmptr + SIZE), shmptr); 
    } 

    waitpid(pid, NULL, 0); 
    exit(EXIT_SUCCESS); 
} 

执行结果如下:

parent process 
parent pid is 7275, share memory from 7fb2ebf4a000 to 7fb2ebf4a400, content: hello world 
child process 
child pid is 7276, share memory from 7fb2ebf4a000 to 7fb2ebf4a400, content: hello world

版权声明:本文为博主原创文章,未经博主允许不得转载。

消息队列、信号量、共享存储

标签:消息队列   信号量   共享存储   

原文地址:http://blog.csdn.net/ieearth/article/details/46793337

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!