参考资料:
http://blog.csdn.net/JXH_123/article/details/23450031 秒杀多线程系列
http://www.cnblogs.com/egmkang/archive/2012/11/17/2763295.html 合理的设计与使用消息队列
http://blog.csdn.net/yand789/article/details/17095993 UDP多线程通信server程序
http://blog.csdn.net/no_mame/article/details/17437273 UDP 多线程客户端与单线程服务器端
http://so.csdn.net/search?utf8=%E2%9C%93&t=&q=+udp%E5%A4%9A%E7%BA%BF%E7%A8%8B&commit=%E6%90%9C+%E7%B4%A2&sort= WaitforMultipleObjects 使用详解
http://blog.csdn.net/lyd_253261362/article/details/4450202 WaitforMultipleObjects 使用详解
http://bbs.csdn.net/topics/30350133 请教关于WaitforMultipleObjects的用法
http://blog.csdn.net/chw1989/article/details/7453217 多线程套接字编程
http://www.cnblogs.com/shootingstars/archive/2004/07/15/24602.html 在主线程中慎用WaitforMultipleObjects
http://blog.sina.com.cn/s/blog_7dc67d520100t2fb.html C++中queue等的使用方法
http://www.cnblogs.com/ZXYloveFR/p/3738155.html 一家人的周末餐与多线程----起步
问题:如何利用多线程实现UDP多服务器端的数据的融合,并保证周期为100ms以内。
方案二:
/************************************************************************/
/*
程序思路:先考虑一次传输过程,利用事件内核,来同步子线程与主线程,
利用临界区来实现锁的机制。
在利用事件时,我们利用自动复位事件实现,SetEvent来激活WaitForMultipleObjects,
当两个线程都执行完,程序执行到下一步,进入临界区,这里必须用锁,不然资源就会
发生冲突。
在临界区里面,主要是obs等一些变量的清零,还有数据的求并集,以及发送操作。
在利用UDP发送操作时,无谓的初始化可以省略,不要多搞。
*/
/************************************************************************/
最后,利用方案二实现。
遇到的问题:
到目前为止,我们都是等待其来临,自己去触发事件。但现实中有一难点无法处理,8线雷达,其每次都发送四层数据的信息
,需要两帧数据才可以发送数据。也就是说,八线雷达其数据周期为160ms,工作频率为12.5hz,我在想,假如我提高其频率,
将两帧数据放在八线雷达本身的客户端去处理,返回给我的还是一样的频率,这个可能会好点。提高到25hz即可。两帧数据位一周期,理论上应该可以解决这个问题。
也就是说,数据同步问题,还需要斟酌的去实现。
下面贴出代码:
.h文件 :
1: #ifndef NORMALNODE
2: #define NORMALNODE
3: #include <iostream>
4: #include <sstream>
5: #include <queue>
6: #include <Windows.h>
7: //socket头文件
8: #include "winsock.h"
9: //socket库的lib
10: #pragma comment(lib,"ws2_32.lib")
11: using namespace std;
12:
13: typedef unsigned short int uint;
14: typedef unsigned char uchar;
15: #define MAXSIZE 4000 // scan data模式 获得的最多点的个数
16:
17: const UINT16 Port1 = 8060;
18: const UINT16 Port2 = 8080;
19:
20: DWORD WINAPI Child0Func(LPVOID);
21: DWORD WINAPI Child1Func(LPVOID);
22:
23: char szbuffer[2][2000] = {0};//缓冲区用于存放字节流数据
24:
25: const int startoffset = 4;
26: const int PointLength = 6;
27:
28: CRITICAL_SECTION g_csThreadCode;
29:
30: typedef struct Point //点转换为矩阵的x与y坐标。
31: {
32: uint x;
33: uint y;
34: uchar value;
35: uchar U;
36: } Point;
37:
38: typedef struct Matrix //稀疏矩阵数据结构
39: {
40: int Num;
41: Point point[MAXSIZE];
42:
43: } Matrix;
44:
45: UINT16 count1 = 0;
46: Matrix matrix1;
47: Matrix matrix2;
48: #endif
.cpp 文件:
1: // ThreadDemo6.cpp : 定义控制台应用程序的入口点。
2: //
3: #include "stdafx.h"
4: #include "Demo.h"
5: int _tmain(int argc, _TCHAR* argv[])
6: {
7: //初始化socket库
8: WSADATA wsa = {0}; //WinSockApi 取WSA+DATA组成套接字结构体
9: WSAStartup(MAKEWORD(2,2),&wsa);
10:
11: HANDLE hChild[4];
12: HANDLE hEvent[4];
13: DWORD threadId[4];
14: int obs1 = 0;
15: int obs2 = 0;
16: bool bDone = false;
17:
18: DWORD dwStart = GetTickCount();
19: InitializeCriticalSection(&g_csThreadCode);
20: //CreateEvent创建一个事件的时候,最后一个字符串是该事件的名字,
21: //这样就可以在其余的地方通过这个名字来找到相应的事件了,等于是一个标识的作用
22:
23: //第二个参数为true,所以为手动置位 。ResetEvent,使事件处于未触发状态
24: //一般情况下,当其处于触发状态,也就是SetEvent时,其他人可以更改
25: //第二个参数为false,表示自动置位,也就是不用ResetEvent。
26: //创建事件
27: hEvent[0] = CreateEventW(NULL,FALSE,FALSE,_T("ChildEvent0"));
28: hEvent[1] = CreateEventW(NULL,FALSE,FALSE,_T("ChildEvent1"));
29:
30: hChild[0] = CreateThread(NULL,0,Child0Func,0,0,&threadId[0]);//ID号与句柄
31: hChild[1] = CreateThread(NULL,0,Child1Func,0,0,&threadId[1]);
32:
33: /************************************************************************/
34: /*UDP部分 */
35: /************************************************************************/
36: SOCKET sClient;
37: int iLen; //服务器地址长度
38: int iSend; //接收数据的缓冲
39: struct sockaddr_in ser; //服务器端地址
40: //建立服务器端地址
41: ser.sin_family=AF_INET;
42: ser.sin_port=htons(8060);
43: ser.sin_addr.s_addr=inet_addr("127.0.0.1"); // 本机IP地址,测试用
44: //ser.sin_addr.s_addr = inet_addr("192.168.1.3"); //决策机IP地址
45:
46: //建立客户端数据报套接口
47: sClient=socket(AF_INET,SOCK_DGRAM,IPPROTO_UDP);
48:
49: if(sClient==INVALID_SOCKET)
50: {
51: printf("socket()Failed:%d\n",WSAGetLastError());
52: return 0;
53: }
54: iLen=sizeof(ser);
55: /********************************以上是UDP发送部分程序*****************************/
56:
57:
58: while(!bDone)
59: {
60: //下面这个方法有两种状态,一个是一个个去响应事件,一个是总的去响应事件。
61: DWORD dwStatus = WaitForMultipleObjects(2,hEvent,TRUE,INFINITE);//infinite,不等待,setEvent可以使其返回为0
62: if ((dwStatus >= WAIT_OBJECT_0)&&(dwStatus <= WAIT_OBJECT_0 + 1 ) )
63: {
64: DWORD dwStart = GetTickCount();
65: EnterCriticalSection(&g_csThreadCode);//进入关键段 临界区
66: //数据处理
67: UINT8 str1[2000] = {0};
68: UINT8 str2[2000] = {0};
69: //开辟两个无符号类型,这里是加了锁机制处理,string 默认并不是无符号类型
70: for (int i = 0; i < 2000; ++i)
71: {
72: str1[i] = szbuffer[0][i];
73: str2[i] = szbuffer[1][i];
74: }
75: matrix1.Num = str1[3]*16*16*16 + str1[2]*16*16 + str1[1]*16 + str1[0];
76: matrix2.Num = str2[3]*16*16*16 + str2[2]*16*16 + str2[1]*16 + str2[0];
77: for (int i = 0; i < matrix1.Num; ++i)
78: {
79: matrix1.point[obs1].x = str1[startoffset + 1 + i*PointLength]*16 + str1[startoffset + 0 + i*PointLength];
80: matrix1.point[obs1].y = str1[startoffset + 3 + i*PointLength]*16 + str1[startoffset + 2 + i*PointLength];
81: matrix1.point[obs1].value = str1[startoffset + 4 + i*PointLength];
82: matrix1.point[obs1].U = str1[startoffset + 5 + i*PointLength];
83: obs1++; //注意清零
84: }
85: for (int i = 0; i < matrix2.Num; ++i)
86: {
87: matrix2.point[obs2].x = str1[startoffset + 1 + i*PointLength]*16 + str1[startoffset + 0 + i*PointLength];
88: matrix2.point[obs2].y = str1[startoffset + 3 + i*PointLength]*16 + str1[startoffset + 2 + i*PointLength];
89: matrix2.point[obs2].value = str1[startoffset + 4 + i*PointLength];
90: matrix2.point[obs2].U = str1[startoffset + 5 + i*PointLength];
91: obs2++; //注意清零
92: }
93: obs1 = 0;
94: obs2 = 0;
95: Point poi;
96: //数据求并集然后发送
97: for (int i = 0; i < matrix1.Num; ++i)
98: {
99: poi = matrix1.point[i]; //四线雷达数据集
100: for (int j = 0; j < matrix2.Num; ++j)
101: {
102: if ((poi.x == matrix2.point[j].x) && (poi.y == matrix2.point[j].y))
103: {
104: matrix2.point[j] = matrix2.point[j+1];//大的替换小的
105: matrix2.Num--;
106: break;
107: }
108: }
109: }
110:
111: for (int i = 0; i < matrix2.Num; ++i)
112: {
113: matrix1.point[matrix1.Num + i] = matrix2.point[i];
114: }
115: matrix1.Num = matrix1.Num + matrix2.Num;
116:
117: uint len = matrix1.Num*sizeof(Point) + 4;
118: char buffer[10000];
119: memcpy(buffer,(char*)&matrix1,len);
120: iSend=sendto(sClient,buffer,len-1,0,(struct sockaddr*)&ser,iLen);
121: cout << "*************Total time:" << GetTickCount() - dwStart << endl;
122: LeaveCriticalSection(&g_csThreadCode);//离开关键段
123: }
124: }
125:
126:
127: for (int j = 0; j < 4; ++j)
128: {
129: CloseHandle(hEvent[j]);
130: CloseHandle(hChild[j]);
131: }
132: DeleteCriticalSection(&g_csThreadCode);
133: cout << "*************Total time:" << GetTickCount() - dwStart << endl;
134: return 0;
135: }
136:
137:
138: //子线程可以将主线程创建的Event激活
139: DWORD WINAPI Child0Func(LPVOID p)
140: {
141: HANDLE hEvent;
142: hEvent = OpenEventW(EVENT_ALL_ACCESS,FALSE,_T("ChildEvent0"));
143: SOCKET socksvr = socket(AF_INET,SOCK_DGRAM,IPPROTO_UDP);
144: if (INVALID_SOCKET == socksvr)
145: {
146: return 0;
147: }
148: struct sockaddr_in svraddr = {0};
149: svraddr.sin_family = AF_INET;
150: svraddr.sin_port = htons(5070);
151: svraddr.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
152: if ( bind(socksvr,(struct sockaddr*)&svraddr,sizeof(svraddr)) == SOCKET_ERROR)//地址与套接字绑定
153: {
154: cout << "套接字绑定错误" << endl;
155: return 0;
156: }
157: struct sockaddr_in clientaddr = {0};
158: int nLen = sizeof(clientaddr);
159: while (true)
160: {
161: recvfrom(socksvr,(char*)szbuffer[0],2000,0,(struct sockaddr*)&clientaddr,&nLen);//构造ip地址
162: // cout << "flag1: "<<count1++ <<endl;
163: SetEvent(hEvent); //将事件置为有信号状态,WaitForObject返回为WAIT_OBJECT_0
164: }
165: }
166:
167: //子线程可以将主线程创建的Event激活
168: DWORD WINAPI Child1Func(LPVOID p)
169: {
170: HANDLE hEvent;
171: hEvent = OpenEventW(EVENT_ALL_ACCESS,FALSE,_T("ChildEvent1"));
172: SOCKET socksvr = socket(AF_INET,SOCK_DGRAM,IPPROTO_UDP);
173: if (INVALID_SOCKET == socksvr)
174: {
175: return 0;
176: }
177: struct sockaddr_in svraddr = {0};
178: svraddr.sin_family = AF_INET;
179: svraddr.sin_port = htons(5080);
180: svraddr.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
181: if ( bind(socksvr,(struct sockaddr*)&svraddr,sizeof(svraddr)) == SOCKET_ERROR)//地址与套接字绑定
182: {
183: cout << "套接字绑定错误" << endl;
184: return 0;
185: }
186: struct sockaddr_in clientaddr = {0};
187: int nLen = sizeof(clientaddr);
188: while (true)
189: {
190: recvfrom(socksvr,(char*)szbuffer[1],2000,0,(struct sockaddr*)&clientaddr,&nLen);//构造ip地址
191: // cout << "flag2: "<<count1++ <<endl;
192: SetEvent(hEvent); //将事件置为有信号状态,WaitForObject返回为WAIT_OBJECT_0
193: }
194: }
原文地址:http://www.cnblogs.com/zhuxuekui/p/3764434.html