原文地址:http://blog.csdn.net/jsd2honey/article/details/59663748
IPC(Inter-Process Communication,进程间通信)机制,也被称为System V IPC 。
下面将介绍以下内容:
信号量:用于管理对资源的访问。
共享内存:用于在程序之间高效地共享数据。
消息队列:在程序之间传递数据的一种简单方法。
信号量
编写程序时使用了线程,不管它是运行在多用户系统上、多进程系统上,还是运行在多用户多进程系统上,程序中存在着一部分临界代码,我们需要确保只有一个进程(或一个执行线程)可以进入这个临界代码并拥有对资源独占式的访问权。
要想编写通用的代码,以确保对某个特定的资源具有独占式的访问权是非常困难的。虽然有一个名为Dekker算法的解决方法,但这个算法依赖于“忙等待”或自旋锁。也就是说,一个进程要持续不断地运行以等待某个内存位置被改变。像linux这样的多任务环境下,人们并不愿意使用这种浪费CPU资源的处理方法。但如果硬件支持独占式访问(一般是通过特定的CPU指令的形式),情况就变得简单多了
。一个硬件支持的例子是:用一条指令以原子方式访问并增加寄存器的值,在这个读取/增加/写入操作执行的过程中不会有其他指令(甚至一个中断)发生。
使用带O_EXCL标志的open函数来创建锁文件,他提供了原子化的文件创建方法。他允许一个进程通过获取一个令牌(即新创建的文件)来取得成功。这个方法比较适合于处理简单的问题,但对于复杂的例子,他就显得比较杂乱且缺乏效率。
信号量的一个更正式的定义是:它是一个特殊变量,只允许对它进行等待(wait)和发送信号(signal)这两种操作。在linux编程中,“等待”和“发送信号”都已具有特殊的含义,所以我们将用原先定义的符号来表示这两种操作。
P(信号量变量):用于等待
W(信号量变量):用于发送信号
信号量的定义
最简单的信号量是只能取值0和1的变量,即二进制信号量。可以取多个正整数值的信号量被称为通用信号量。
PV操作的定义非常简单。假设有一个信号量变量sv,这两个操作的定义如下:
P(sv) 如果sv的值大于0,就给它减去1;如果它的值等于0,就挂起该进程的执行。
V(sv) 如果有其他进程因等待sv而被挂起,就让它恢复运行;如果没有进程因等待sv而被挂起,就给它加1。
一个理论性的例子
我们用一个简单的理论性的例子来说明其工作原理。假设有两个进程proc1和proc2,这两个进程都需要在其执行过程中的某一时刻对一个数据库进行独占式的访问。我们定义一个二进制信号量sv,该变量的初始值为1,两个进程都可以访问它。要想对代码中的临界区域进行访问,这两个进程都需要执行相同的处理步骤,事实上,这两个进程可以只是同一个程序的两个不同执行实例。
两个进程共享信号量变量sv。一旦其中一个进程执行了P(sv)操作,他将得到信号量,并可以进入临界区。而第二个进程将被阻止进入临界区,因为当它试图执行P(sv)操作时,他会被挂起以第一个进程离开临界区并执行了V(sv)操作释放信号量。
Linux的信号量机制
信号量函数的定义如下所示:
#include <sys/sem.h>
int semctl(int sem_id,int sem_num,int command,...);
int semget(key_t key,int num_sems,int sem_flags,...);
in semop(int sem_id,struct sembuf * sem_ops,size_t num_sem_ops...);
头文件sys/sem.h通常依赖于另两个头文件sys/types.h和sys/ipc.h。一般情况下,它们都会被sys/sem.h自动包含,因此不需要为它们明确添加相应的#include语句。
这些函数都是用来对成组的信号值进行操作的。这使得对它们的操作要比单个信号值所需要的操作复杂得多。参数key的作用很像一个文件名,它代表程序可能要使用的某个资源,如果多个程序使用相同的key值,它将负责协调工作。由semget函数返回的并用在其他共享内存函数中的标识符也与fopen返回的FILE*文件流很相似,进程需要通过它来访问共享文件。此外,类似于文件的使用情况,不同的进程可以用不同的信号量标识符来指向同一个信号量。对于我们将要介绍的所有IPC机制来说,这种建加上一个标识符的用法是很常见的,尽管每个机制都使用独立的键和标识符。
semget函数
semget函数的作用是创建一个新信号量或取得一个已有信号量的键:
int semget(key_t key,int num_sems,int sem_flags);
第一个参数key是整数值,不相关的进程可以通过它访问同一个信号量。程序对所有信号量的访问都是间接的,它先提供一个键,再由系统生成一个相应的信号量标识符。只有semget函数才直接使用信号键,所以其他的信号量函数都是使用由semget函数返回的信号量标识符。
有一个特殊的信号量键值IPC_PRIVATE,它的作用是创建一个只有创建进程才可以访问的信号量,但这个键值很少有实际的用途。在创建新的信号量时,你需要给键提供一个唯一的非零整数。
num_sems参数指定需要的信号量数目。它几乎总是取值1。
sem_flags参数是一组标志,它与open函数的标志非常相似。它低端的9个比特是该信号量的权限,其作用类似于文件的访问权限。此外,它们还可以和值IPC_CREAT做按位或操作,来创建一个新信号量。即使在设置了IPC_CREAT标志后给出的键时一个已有信号量的键,也不会产生错误。如果函数用不到IPC_CREAT标志,该标志就会被悄悄地忽略掉。我们可以通过联合使用标志IPC_CREAT和IPC_EXCL来确保创建出的是一个新的、唯一的信号量。如果该信号量已存在,它将返回一个错误。
semget函数在成功时返回一个正数值,它就是其他信号量函数将用到的信号量标识符。如果失败,则返回-1。
semop函数
semop函数用于改变信号量的值,它的定义如下所示:
int semop(int sem_id,struct sembuf * sem_ops,size_t num_sem_ops);
第一个参数sem_id是由semget返回的信号量标识符。第二个参数sem_ops是指向一个结构数组的指针,每个数组元素至少包含以下几个成员:
struct sembuf{
short sem_num;
short sem_op;
short sem_flg;
}
第一个成员sem_num是信号量编号,除非你需要使用一组信号量,否则它的值一般为0。sem_op成员的值是信号量在一次操作中需要改变的数值(你可以用一个非1的数值来改变信号量的值)。通常只会用到两个值,一个是-1,也就是P操作,它等待信号量变为可用;一个是+1,也就是V操作,它发送信号表示信号量现在可用。
最后一个成员sem_flg通常被设置为SEM_UNDO。它将使得操作系统跟踪当前进程对这个信号量的修改情况,如果这个进程在没有释放该信号量的情况下终止,操作系统将自动释放该进程持有的信号量。除非你对信号量的行为有特殊的要求,否则应该养成设置sem_flg为SEM_UNDO的好习惯。如果决定使用一个非SEM_UNDO的值,那就一定要注意保持设置的一致性,否则你很可能会搞不清楚内核是否会在进程退出时清理信号量。
semop调用的一切动作都是一次性完成的,这是为了避免出现因使用多个信号量而可能发生的竞争现象。
semctl函数
semctl函数用来直接控制信号量信息,它的定义如下所示:
int semctl(int sem_id,int sem_num,int command,...);
第一个参数sem_id是由semget返回的信号量标识符。sem_num参数是信号量编号,当需要用到成组的信号量时,就要用到这个参数,它一般取值为0,表示这是第一个也是唯一的一个信号量。command参数是将要采取的动作。如果还有第四个参数,它将会是一个union semun结构,根据X/OPEN规范的定义,它至少包含以下几个成员:
union semun{
int val;
struct semid_ds * buf;
unsigned short * array;
}
虽然X/OPEN规范中指出,semun联合结构必须由程序员自己定义,但大多数linux版本会在某个头文件(一般是sem.h)中给出该结构的定义。如果你发现确实需要自己来定义该结构。
semctl函数中的command参数可以设置许多不同的值,但只有下面介绍的两个值最常用。
SETVAL:用来把信号量初始化为一个已知的值。这个值通过union semun中的val成员设置。其作用是在信号量第一次使用之前对它进行设置。
IPC_RMID:用于删除一个已经无需继续使用的信号量标识符。
semctl函数将根据command参数的不同而返回不同的值。对于SETVAL和IPC_RMID,成功时返回0,失败时返回-1。
使用信号量
大部分需要使用信号量来解决的问题只需使用一个最简单的二进制信号量即可。程序sem1.c来试验信号量,该程序可以被多次调用。通过一个可选的参数来指定程序是负责创建信号量还是负责删除信号量。用两个不同字符的输出来表示进入和离开临界区域。如果程序启动时带有一个参数,它将在进入和退出临界区域时打印字符X;而程序的其它运行实例将在进入和退出临界区域时打印字符O。因为在任一给定时刻,只能有一个进程可以进入临界区域,所以字符X和O应该是成对出现的。
实验:信号量 包含了必需的系统头文件之后,我们包含了头文件semun.h。如果系统头文件sys/sem.h没有定义X/OPEN规范所需的联合semun,这个头文件包含了对它的定义。然后是函数原型的声明和全局变量的定义,接着就到了main函数的定义。我们调用semget来创建一个信号量,该函数将返回一个信号量标识符。如果程序是第一个被调用的(也就是说它在被调用时带有一个参数,使得argc>1),就调用set_semvalue初始化信号量并将op_char设置为X:
semun.h
1
2
3
4
5
6
7
8
|
#ifndef SEMUN_H_ #define SEMUN_H_ union semun{ int val; struct semid_ds * buf; unsigned short * array; }; #endif /* SEMUN_H_ */ |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
#include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <sys/sem.h> #include "semun.h" static int set_semvalue( void ); static void del_semvalue( void ); static int semaphore_p( void ); static int semaphore_v( void ); static int sem_id; int main( int argc, char **argv) { int i; int pause_time; char op_char = ‘O‘ ; srand ((unsigned int ) getpid()); sem_id = semget((key_t) 1234, 1, 0666 | IPC_CREAT); if (argc > 1) { if (!set_semvalue()) { perror ( "Failed to initiallize semaphore\n" ); exit (1); } op_char = ‘X‘ ; sleep(2); } /* *接下来是一个循环,它进入和离开临界区10次。在每次循环的开始,首先调用semaphore_p函数,它在 *程序将进入临界区时设置信号量以等待进入 */ for (i = 0; i < 10; i++) { if (!semaphore_p()) exit (1); printf ( "%c" , op_char); fflush (stdout); pause_time = rand () % 3; sleep(pause_time); printf ( "%c" , op_char); fflush (stdout); /* * 在临界区之后,调用semaphore_v来将信号量设置为可用,然后等待一段随机的时间,再进入下一次循环 * 。在整个循环语句执行完毕后,调用del_semvalue函数来清理代码: */ if (!semaphore_v()) exit (1); pause_time = rand () % 2; sleep(pause_time); } printf ( "n%d - finished\n" , getpid()); if (argc > 1) { sleep(10); del_semvalue(); } exit (0); } /* * 函数set_semvalue通过将semctl调用的command参数设置为SETVAL来初始化信号量。 * 在使用信号量之前必须要这样做: * */ static int set_semvalue() { union semun sem_union; sem_union.val=1; if (semctl(sem_id,0,SETVAL,sem_union)==-1) { return 0; } return 1; } /* * 函数del_semvalue的形式与上面的函数几乎一样, * 只不过它通过将semctl调用的command设置为IPC_RMID来删除信号量ID: * */ static void del_semvalue() { union semun sem_union; if (semctl(sem_id,0,IPC_RMID,sem_union)==-1) { perror ( "Failed to delete semaphore\n" ); } } //semaphore_p对信号量做减1操作 static int semaphore_p() { struct sembuf sem_b; sem_b.sem_num=0; sem_b.sem_op=-1; sem_b.sem_flg=SEM_UNDO; if (semop(sem_id,&sem_b,1)==-1) { perror ( "semaphore_p failed" ); return 0; } return 1; } //semaphore_p对信号量做加1操作 static int semaphore_v() { struct sembuf sem_b; sem_b.sem_num=0; sem_b.sem_op=1; sem_b.sem_flg=SEM_UNDO; if (semop(sem_id,&sem_b,1)==-1) { perror ( "semaphore_v failed" ); return 0; } return 1; } |
运行:
1
2
3
|
[root@localhost linux_C] # ./sem1 OOXXOOXXOOXXOOXXOOXXOOXXOOXXOOXXOOXXOOn2182 - finished X[root@localhost linux_C] # Xn2181 - finished |
字符“O”和“X”分别代表程序的第一个和第二个调用实例。因为每个程序都在其进入和离开临界区打印,所以每个字符都应该成对出现。如果这个程序在你的系统上不能正常工作,你可能需要在启动程序之前执行命令stty -tostop,以确保产生tty输出的后台程序不会引发系统生成一个信号。
共享内存
共享内存是3个IPC机制中的第二个。它允许两个不相关的进程访问同一个逻辑内存。共享内存是在两个正在运行的进程之间传递数据的一种非常有效的方式。虽然X/Open标准并没有对它做出要求,但大多数共享内存的具体实现,都把由不同进程之间共享的内存安排为同一段物理内存。
共享内存是由IPC为进程创建的一个特殊的地址范围,它将出现在该进程的地址空间中。其他进程可以将同一段共享内存连接到它们自己的地址空间中。所有进程都可以访问共享内存中的地址,就好像它们是由malloc分配的一样。如果某个进程向共享内存写入了数据,所做的改动将立刻被可以访问同一段共享内存的任何其他进程看到。
共享内存为在多个进程之间共享和传递数据提供了一种有效的方式。由于它并未提供同步机制,所以我们通常需要用其他的机制来同步对共享内存的访问。我们一般是用共享内存来提供对大块内存区域的有效访问,同时通过传递小消息来同步对该内存的访问。
在第一个进程结束对共享内存的写操作之前,并无自动的机制可以阻止第二个进程开始对它进行读取。对共享内存访问的同步控制必须由程序员来负责。
共享内存使用的函数类似于信号量的函数,它们的定义如下:
#include <sys/shm.h>
void * shmat(int shm_id,const void * shm_addr,int shmflg);
int shmctl(int shm_id,int cmd,struct shmid_ds * buf);
int shmget(key_t key,size_t size,int shmflg);
与信号量的情况一样,头文件sys/types.h和sys/ipc.h通常被shm.h自动包含进程序。
shmget函数
我们用shmget函数来创建共享内存:
int shmget(key_t key,size_t size,int shmflg);
与信号量一样,程序需要提供一个参数key,它有效地为共享内存段命名,shmget函数返回一个共享内存标识符,该标识符将用于后续的共享内存函数。有一个特殊的键值IPC_PRIVATE,它用于创建一个只属于创建进程的共享内存。通常你不会用到这个值,而且你可能会发现在一些linux系统中,私有的共享内存其实不是真正的私有。
第二个参数size以字节为单位指定需要共享的内存容量。
第三个参数shmflg包含9个比特的权限标志,它们的作用与创建文件时使用的mode标志一样。由IPC_CREAT定义的一个特殊比特必须和权限标志按位或才能创建一个新的共享内存段。设置IPC_CREAT标志的同时 ,给shmget函数传递一个已有共享内存段的键并不是一个错误,如果无需用到IPC_CREAT标志,该标志就会被悄悄地忽略掉。
权限标志对共享内存非常有用,因为它们允许一个进程创建的共享内存可以被共享内存的创建者所拥有的进程写入,同时其他用户创建的进程只能读取该共享内存。我们可以利用这个功能来提供一种有效的对数据进行只读访问的方法,通过将数据放入共享内存并设置它的权限,就可以避免数据被其他用户修改。
如果共享内存创建成功,shmget返回一个非负整数,即共享内存标识符;如果失败,返回-1。
shmat函数
第一次创建共享内存段时,它不能被任何进程访问。要想启动对该共享内存的访问,必须将其连接到一个进程的地址空间中。这项工作由shmat函数来完成,它的定义如下所示:
void * shmat(int shm_id,const void * shm_addr,int shmflg);
第一个参数shm_id是由shmget返回的共享内存标识符。
第二个参数shm_addr指定的是共享内存连接到当前进程中的地址位置。它通常是一个空指针,表示让系统来选择共享内存出现的地址。
第三个参数shmflg是一组位标志。它的两个可能取值是SHM_RND(这个标志与shm_addr联合使用,用来控制共享内存连接的地址)和SHM_RDONLY(它使得连接的内存只读)。很少需要控制共享内存连接的地址,通常都是让系统来选择一个地址,否则就会使应用程序对硬件的依赖性过高。
如果shmat调用成功,它返回一个指向共享内存第一个字节的指针;如果失败,它就返回-1。
共享内存的读写权限由它的属主(共享内存的创建者)、它的访问权限和当前进程的属主决定。共享内存的访问权限类似于文件的访问权限。当shmflg & SHM_RDONLY为true时的情况。此时即使该共享内存的访问权限允许写操作,它都不能被写入。
shmdt函数
shmdt函数的作用是将共享内存从当前进程中分离。它的参数是shmat返回的地址指针。成功时它返回0,失败返回-1。注意,将共享内存分离并未删除它,只是使得该共享内存对当前进程不再可用。
shmctl函数
共享内存的控制函数,它的定义如下:
int shmctl(int shm_id,int command,struct shmid_ds * buf);
shmid_ds结构至少包含以下成员:
struct shmid_ds{
uid_t shm_perm.uid;
uid_t shm_perm.gid;
mode_t shm_perm.mode;
}
第一个参数shm_id是shmget返回的共享内存标识符。
第二个参数command是要采取的动作,它可以取3个值:
IPC_STAT 把shmid_ds结构中的数据设置为共享内存的当前关联值
IPC_SET 如果进程有足够的权限,就把共享内存的当前关联值设置为shmid_ds结构中给出的值
IPC_RMID 删除共享内存段
第三个参数buf是一个指针,它指向包含共享内存模式和访问权限的结构。成功时返回0,失败返回-1.X/Open规范没有定义当你试图删除一个正处于连接状态的共享内存字段时将会发生的情况。通常这个已经被删除的处于连接状态的共享内存段还能继续使用,直到它从最后一个进程中分离为止。但因为这个行为并未在规范中定义,所以最好不要依赖它。
实验:共享内存
第一个程序(消费者)将创建一个共享内存段,然后把写到它里面的数据都显示出来。第二个程序(生产者)将连接一个已有的共享内存段,并允许我们向其中输入数据。
(1)我们首先创建一个公共的头文件,来定义我们希望分发的共享内存。我们将其命名为
shm_com.h:
1
2
3
4
5
6
7
8
|
#ifndef SHM_COM_H_ #define SHM_COM_H_ #define TEXT_SZ 2048 struct shared_use_st{ int written_by_you; char some_text[TEXT_SZ]; }; #endif /* SHM_COM_H_ */ |
这里定义的结构在消费者和生产者程序中都会用到。当有数据写入这个结构时,我们用该结构中的一个整型标志wirtten_by_you来通知消费者。需要传输的文本长度2K是由我们随意决定的。
(2)第一个程序shm1.c是消费者程序。在头文件之后,通过设置了IPC_CREAT标志位的shmget调用来创建共享内存段(其长度就是我们的共享内存结构的长度):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
#include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <string.h> #include <sys/shm.h> #include "shm_com.h" int main( int argc, char **argv) { int running=1; void * shared_memory=NULL; struct shared_use_st * shared_stuff; int shmid; srand ((unsigned int )getpid()); shmid=shmget((key_t)1234, sizeof ( struct shared_use_st),0666|IPC_CREAT); if (shmid==-1) { perror ( "shmget failed" ); exit (1); } /* * (3)让程序可以访问这个共享内存 * */ shared_memory=shmat(shmid,NULL,0); if (shared_memory==( void *)-1) { perror ( "shmat failed" ); exit (1); } printf ( "Memory attached at %X\n" ,( int )shared_memory); /* * (4)程序的下一部分将shared_memory分配给shared_stuff,然后它输出written_by_you中 * 的文本。循环将一直执行到在written_by_you中找到end字符串为止。sleep调用强迫消费者程序 * 在临界区域多待一会儿,让生产者程序等待: * */ shared_stuff=( struct shared_use_st *)shared_memory; shared_stuff->written_by_you=0; while (running) { if (shared_stuff->written_by_you) { printf ( "You wrote:%s" ,shared_stuff->some_text); sleep( rand ()%4); shared_stuff->written_by_you=0; if ( strncmp (shared_stuff->some_text, "end" ,3)==0) { running=0; } } } /* * 最后,共享内存被分离,然后被删除: * */ if (shmdt(shared_memory)==-1) { perror ( "shmdt failed" ); exit (1); } if (shmctl(shmid,IPC_RMID,0)==-1) { perror ( "shmctl failed" ); exit (1); } exit (0); } |
第二个程序shm2.c是生产者程序,我们通过它向消费者程序输入数据。它与shm1.c很类似
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
#include <unistd.h> #include <stdlib.h> #include <stdio.h> #include <string.h> #include <sys/shm.h> #include "shm_com.h" int main( int argc, char **argv) { int running=1; void * shared_memory=NULL; struct shared_use_st * shared_stuff; char buffer[BUFSIZ]; int shmid; shmid=shmget((key_t)1234, sizeof ( struct shared_use_st),0666|IPC_CREAT); if (shmid==-1) { perror ( "shmget failed" ); exit (1); } shared_memory=shmat(shmid,NULL,0); if (shared_memory==( void *)-1) { perror ( "shmat failed" ); exit (1); } printf ( "Memory attached at %X\n" ,( int )shared_memory); shared_stuff=( struct shared_use_st *)shared_memory; while (running) { while (shared_stuff->written_by_you==1) { sleep(1); printf ( "waiting for client...\n" ); } printf ( "Enter some text:" ); fgets (buffer,BUFSIZ,stdin); strncpy (shared_stuff->some_text,buffer,TEXT_SZ); shared_stuff->written_by_you=1; if ( strncmp (buffer, "end" ,3)==0) { running=0; } } if (shmdt(shared_memory)==-1) { perror ( "shmdt failed" ); exit (1); } exit (0); } |
运行:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
[root@localhost linux_C] # ./shm1 & [7] 14327 [root@localhost linux_C] # Memory attached at 5B0B7000 . /shm2 Memory attached at 77C19000 Enter some text:yao You wrote:yao waiting for client... waiting for client... Enter some text:xia You wrote:xia waiting for client... waiting for client... waiting for client... Enter some text:end You wrote:end [7]+ Done . /shm1 |
实验解析:
第一个程序shm1创建共享内存段,然后将它连接到自己的地址空间中。我们在共享内存的开始处使用了一个结构shared_use_st。该结构中有两个标志written_by_you,当共享内存中有数据写入时,就设置这个标志。这个标志被设置时,程序就从共享内存中读取文本,将它打印出来,然后清楚这个标志表示已经读完数据。我们用一个特殊字符串end来退出循环。接下来,程序分离共享内存段并删除它。
第二个程序shm2使用相同的键1234来取得并连接同一个共享内存段。然后它提示用户输入一些文本。如果标志written_by_you被设置,shm2就知道客户进程还未读完上一次的数据,因此就继续等待。当其他进程清除了这个标志后,shm2写入新数据并设置该标志。它还使用字符串end来终止并分离共享内存段。
我们只能提供自己的、非常简陋的同步标志written_to_you,它包括一个非常缺乏效率的忙等待(不停地循环)。这可以使得我们的示例比较简单,但在实际编程中,我们应该使用信号量或通过传递消息(使用管道或IPC消息)、生成信号的方法来提供应用程序读、写部分之间的一种更有效率的同步机制。
消息队列
消息队列与命名管道有许多相似之处,但少了在打开和关闭管道方面的复杂性。但使用消息队列并未解决我们在使用命名管道时遇到的一些问题,比如管道满时的阻塞问题。
消息队列提供了一种在两个不相关的进程之间传递数据的相当简单且有效的方法。与命名管道相比,消息队列的优势在于,它独立于发送和接收进程而存在,这消除了在同步命名管道的打开和关闭时可能产生的一些困难。
消息队列提供了一种从一个进程向一个进程发送一个数据块的方法。而且,每个数据块都被认为含有一个类型,接收进程可以独立地接收含有不同类型值的数据块。好消息是,我们可以通过发送消息来几乎完全避免命名管道的同步和阻塞问题。更好的是,我们可以用一些方法来提前查看紧急消息。坏消息是:与管道一样,每个数据块都有一个最大长度的限制,系统中所有队列所包含的全部数据的总长度也有一个上限。
虽然X/Open规范说明这些限制是强制的,但它并未提供发现这些限制的方法,只是告诉我们超过这些限制是引起一些消息队列函数失败的原因之一。linux系统有两个宏定义MSGMAX和MSGMNB,它们以字节为单位分别定义了一条消息的最大长度和一个队列的最大长度。其他系统中的这些宏定义可能会不一样或甚至根本就不存在。
消息队列函数的定义如下所示:
#include <sys/msg.h>
int msgctl(int msqid,int cmd,struct msqid_ds * buf);
int msgget(key_t key,int msgflg);
int msgrcv(int msqid,void * msg_ptr,size_t msg_sz,long int msgtype,int msgflg);
int msgsnd(int msqid,const void * msg_ptr,size_t msg_sz,int msgflg);
与信号量和共享内存一样,头文件sys/types.h和sys/ipc.h通常被msg.h自动包含进程序。
msgget函数
用msgget函数来创建和访问一个消息队列:
int msgget(key_t key,int msgflg);
与其他IPC机制一样,程序必须提供一个键值来命名某个特定的消息队列。特殊键值IPC_PRIVATE用于创建私有队列,从理论上来说,它应该只能被当前进程访问,但同信号量和共享内存的情况一样,消息队列在某些linux系统中事实上并非私有。由于私有队列没有什么用处,所以这并不是一个很严重的问题。与以前一样,第二个参数msgflg由9个权限标志组成。由IPC_CREAT定义的一个特殊位必须和权限标志按位或才能创建一个新的消息队列。在设置IPC_CREAT标志时,如果给出的是一个已有消息队列的键也不会产生错误。如果消息队列已有,则IPC_CREAT标志就被悄悄地忽略掉。
成功时msgget函数返回一个正整数,即队列标识符,失败时返回-1。
msgsnd函数
msgsnd函数用来把消息添加到消息队列中:
int msgsnd(int msqid,const void * msg_ptr,size_t msg_sz,int msgflg);
消息的结构受到两方面的约束,首先它的长度必须小于系统规定的上限;其次,它必须以一个长整型成员变量开始,接收函数将用这个成员变量来确定消息的类型。当使用消息时,最好把消息结构定义为下面这样:
struct my_message{
long int message_type;
}
由于在消息的接收中要用到message_type,所以你不能忽略它。你必须在声明自己的数据结构时包含它,并且最好将它初始化为一个已知值。
第一个参数msqid是由msgget函数返回的消息队列标识符。
第二个参数msg_ptr是一个指向准备发送消息的指针,消息必须像刚才说的那样以一个长整型成员变量开始。
第三个参数msg_sz是msg_ptr指向的消息的长度。这个长度不能包括长整型消息类型成员变量的长度。
第四个参数msgflg控制在当前信息队列满或队列消息到达系统范围的限制时将要发生的事情。如果msgflg中设置了IPC_NOWAIT标志,函数将立刻返回,不发送消息并且返回值-1。如果哦msgflg中的IPC_NOWAIT标志被清除,则发送进程将挂起以等待队列中腾出可用空间。
成功时这个函数返回0,失败时返回-1。如果调用成功,消息数据的一份副本将被放到消息队列中。
msgrcv函数
msgrcv函数从一个消息队列中获取消息:
int msgrcv(int msqid,void * msg_ptr,size_t msg_sz,long int msgtype,int msgflg);
第一个参数msqid是由msgget函数返回的消息队列标识符。
第二个参数msg_ptr是一个指向准备接收消息的指针,消息必须像前面msgsnd函数中介绍的那样以一个长整型成员变量开始。
第三个参数msg_sz是msg_ptr指向的消息的长度,它不包括长整型消息类型成员变量的长度。
第四个参数msgtype是一个长整数,它可以实现一种简单形式的接收优先级。如果msgtype的值为0,就获取队列中的第一个可用消息。如果它的值大于0,将获取具有相同消息类型的第一个消息。如果它的值小于0,将获取消息类型等于或小于msgtype的绝对值的第一个消息。
如果只想按照消息发送的顺序来接收它们,就把msgtype设置为0。如果只想获取某一特定类型的消息,就把msgtype设置为相应的类型值。如果想接收类型等于或小于n的消息,就把msgtype设置为-n。
第5个参数msgflg用于控制当队列中没有相应类型的消息可以接收时将发生的事情。如果msgflg中的IPC_NOWAIT标志被设置,函数将会立刻返回,返回值是-1。如果msgflg中的IPC_NOWAIT标志被清除,进程将会挂起以等待一条相应类型的消息到达。
成功时msgrcv函数返回放到接收缓冲区中的字节数,消息被复制到由msg_ptr指向的用户分配的缓冲区中,然后删除消息队列中的对应消息。失败时返回-1。
msgctl函数
msgctl函数,它的作用与共享内存的控制函数非常相似:
int msgctl(int msqid,int command,struct msqid_ds * buf);
msqid_ds结构至少包含以下成员:
struct msqid_ds {
uid_t msg_perm.uid;
uid_t msg_perm.gid;
mode_t msg_perm.mode;
}
第一个参数msqid是由msgget返回的消息队列标识符。
第二个参数command是将要采取的动作。它可以取3个值,如下所示:
IPC_STAT:把msqid_ds结构中的数据设置为消息队列的当前关联值
IPC_SET:如果进程有足够的权限,就把消息队列中的当前关联值设置为msqid_ds结构中给出的值
IPC_RMID:删除消息队列
成功时它返回0,失败时返回-1。如果删除消息队列时,某个进程正在msgsnd或msgrcv函数中等待,这两个函数将失败。
实验:消息队列
我们将编写两个程序msg1.c用于接收消息,msg2.c用于发送消息。我们将允许两个程序都可以创建消息队列,但只有接收者在接收完最后一个消息之后可以删除它。
msg1.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
#include <stdlib.h> #include <stdio.h> #include <string.h> #include <errno.h> #include <unistd.h> #include <sys/msg.h> struct my_msg_st { long int my_msg_type; char some_text[BUFSIZ]; }; int main( int argc, char **argv) { int running = 1; int msgid; struct my_msg_st some_data; long int msg_to_receive = 0; //首先建立消息队列 msgid = msgget((key_t) 1234, 0666 | IPC_CREAT); if (msgid == -1) { perror ( "msgget failed" ); exit (1); } //然后从队列中获取消息,直到遇见end消息为止。最后,删除消息队列: while (running) { if (msgrcv(msgid, ( void *) &some_data, BUFSIZ, msg_to_receive, 0) == -1) { perror ( "msgrcv failed" ); exit (1); } printf ( "You wrote:%s" , some_data.some_text); if ( strncmp (some_data.some_text, "end" , 3) == 0) { running = 0; } } if (msgctl(msgid, IPC_RMID, 0) == -1) { perror ( "msgctl failed" ); exit (1); } exit (0); } |
发送者程序msg2.c与msg1.c很相似。在main函数的变量定义部分,删除了对msg_to_receive的定义并把它替换为buffer[BUFSIZ]。去掉删除消息队列的语句,在running循环中做如下的改动。我们现在通过调用msgsnd来发送用户输入的文本到消息队列中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
#include <stdlib.h> #include <stdio.h> #include <unistd.h> #include <string.h> #include <sys/msg.h> #define MAX_TEXT 512 struct my_msg_st{ long int my_msg_type; char some_text[MAX_TEXT]; }; int main( int argc, char **argv) { int running=1; struct my_msg_st some_data; int msgid; char buffer[BUFSIZ]; msgid=msgget((key_t)1234,0666|IPC_CREAT); if (msgid==-1) { perror ( "msgget failed" ); exit (1); } while (running) { printf ( "Enter some text:" ); fgets (buffer,BUFSIZ,stdin); some_data.my_msg_type=1; strcpy (some_data.some_text,buffer); if (msgsnd(msgid,( void *)&some_data,MAX_TEXT,0)==-1) { perror ( "msgsnd failed" ); exit (1); } if ( strncmp (buffer, "end" ,3)==0) { running=0; } } exit (0); } |
假设消息队列中有空间,发送者可以创建队列,放一些数据到队列中,然后在接收者启动之前就退出。我们将先运行发送者msg2。
运行:
1
2
3
4
5
6
7
8
|
[root@localhost linux_C] # ./msg2 Enter some text:yao Enter some text:xia Enter some text:end [root@localhost linux_C] # ./msg1 You wrote:yao You wrote:hesdf You wrote:end |
实验解析:
发送者程序通过msgget来创建一个消息队列,然后用msgsnd向队列中增加消息。接收者用msgget获得消息队列标识符,然后开始接收消息,直到接收到特殊的文本end为止。然后它用msgctl来删除消息队列以完成清理工作