标签:
前言:在某互联网公司实习了好几个月,有一个月都是在做基于UDP协议的应用层软件开发,目的是要用在流媒体服务器上,传输高清视频图像帧。整个开发过程,从0到最后完成了几百兆以上的大文件可靠传输,但效率方面还需要进一步提升。UDP网络传输协议部分编程,由于存在丢包问题,确实有点复杂,现在分享一下自己的开发经验。
#ifndef UDPNONBLOCKINGOUTPUT #define UDPNONBLOCKINGOUTPUT #include "winsock.h" #include <iostream> #include <stdio.h> #include <windows.h> #include <tchar.h> #include <time.h> #include <vector> #pragma comment(lib,"wsock32.lib") #define SERVER_PORT 2000 #define CLIENT_PORT 3000 #define BEGIN_NUM 19900711 #define DATA_NUM 20160113 #define CHECK_NUM 20161817 #define END_CHECK_NUM 20160114 #define END_NUM 11700991 #define END_ALL 20160115 #define BLOCK_DATA_SIZE (10 * 1024) #define FILE_HEAD 4 #define BLOCK_HEAD 4 #define DESTADDRESS "192.168.27.170" #define FILENAME "D:\\48M.mp4" //UDT包头 typedef struct _UDP_HEAD_ { UINT m_PacketType; //包类型 1数据包 2确认包 UINT m_PacketLen; //本次数据帧长度(不包括包头大小,仅仅数据部分大小) UINT m_PacketId; //包序列号 UINT m_Check; //效验和 以上字段的相加 }UDP_HEAD; class CUdpNonBlockingOutput { public: CUdpNonBlockingOutput() { m_flagNonBlocking = 0; InitSocket(); } ~CUdpNonBlockingOutput() { closesocket(m_socketClient); WSACleanup(); } // static DWORD WINAPI ThreadFun(LPVOID pM); void InitSocket(); void SendKeyPackage(char *eachBuf, UINT size); void RecvKeyPackage(char *eachBuf, UINT size); void SendFile(); void SendMsg(); // void RecvMsg(int size); private: char m_RecvBuff[1024]; char m_SendBuff[1024]; sockaddr_in Client; sockaddr_in ClientAddr; SOCKET m_socketServer; SOCKET m_socketClient; int m_len; int m_flagNonBlocking; DWORD dwFileSize; BYTE * fileData; HANDLE m_Mutex; static std::vector<UINT> m_lackPackageNum; }; #endif
#include "UdpNonBlockingOutput.h" void CUdpNonBlockingOutput::InitSocket() { WSADATA WSAData; if (WSAStartup(MAKEWORD(2, 2), &WSAData) != 0) { printf("sock init failed!\n"); return; } m_socketServer = socket(AF_INET, SOCK_DGRAM, 0);//创建socket m_socketClient = socket(AF_INET, SOCK_DGRAM, 0);//创建socket if (m_socketClient == SOCKET_ERROR) { printf("sock create failed\n"); WSACleanup(); return; } ClientAddr.sin_family = AF_INET; ClientAddr.sin_addr.s_addr = htonl(INADDR_ANY); ClientAddr.sin_port = htons(CLIENT_PORT); if (bind(m_socketServer, (struct sockaddr*)&ClientAddr, sizeof(struct sockaddr_in)) == SOCKET_ERROR)//绑定 { printf("sock bind error!\n"); closesocket(m_socketClient); WSACleanup(); return; } Client.sin_family = AF_INET; Client.sin_addr.s_addr = inet_addr(DESTADDRESS); Client.sin_port = htons(SERVER_PORT); m_len = sizeof(Client); return; } std::vector<UINT> CUdpNonBlockingOutput::m_lackPackageNum; void CUdpNonBlockingOutput::SendKeyPackage(char *eachBuf,UINT size) { char recvKeyWord[4] = { 0 }, keyWord[4] = { 0 }, charFileSize[4] = { 0 }; memcpy(keyWord, eachBuf, FILE_HEAD); //拷贝前4个字节 unsigned long ul = 1;//------------------设置非阻塞模式 int ret; ret = ioctlsocket(m_socketServer, FIONBIO, (unsigned long *)&ul); if (ret == SOCKET_ERROR) { printf("unblock failed!\n"); } sendto(m_socketClient, (char*)eachBuf, size, 0, (SOCKADDR*)&Client, m_len); // DWORD lTimeOut = 1; // setsockopt(m_socketServer, SOL_SOCKET, SO_RCVTIMEO, (char *)&lTimeOut, sizeof(DWORD));//设置超时重发 for (int i = 0, startClock, endClock; i < 3; i++) { startClock = clock(); while (1) { if (recvfrom(m_socketServer, recvKeyWord, FILE_HEAD, 0, (struct sockaddr*)&ClientAddr, &m_len) > 0) { if (!strcmp(recvKeyWord, keyWord)) { return; } } endClock = clock(); if ((endClock - startClock) > 5) break; } std::cout << "123"; sendto(m_socketClient, (char*)eachBuf, size, 0, (SOCKADDR*)&Client, m_len); } } void CUdpNonBlockingOutput::RecvKeyPackage(char *eachBuf,UINT size) { char recvKeyWord[4] = { 0 }; unsigned long ul = 1; int ret; ret = ioctlsocket(m_socketServer, FIONBIO, (unsigned long *)&ul);//设置非阻塞模式 if (ret == SOCKET_ERROR) { printf("unblock failed!\n"); } int startClock = clock(), endClock;//接收关键包 while (1) { endClock = clock(); if ((endClock - startClock) > 5) break; if (recvfrom(m_socketServer, eachBuf, size, 0, (struct sockaddr*)&ClientAddr, &m_len) > 0) { memcpy(recvKeyWord, eachBuf, FILE_HEAD); break; } } startClock = clock(); while (1) { if (sendto(m_socketClient, recvKeyWord, sizeof(recvKeyWord), 0, (struct sockaddr*)&Client, m_len) > 0) break; endClock = clock(); if ((endClock - startClock) > 5) break; } } //void CUdpNonBlockingOutput::RecvMsg(int size) //{ // recvfrom(m_socketServer, m_RecvBuff, BLOCK_HEAD + 1, 0, (struct sockaddr*)&Client, &m_len); // int DataPos = 0; // for (int i = 0; i < 4; i++) // { // DataPos += ((UCHAR)m_RecvBuff[i]) << (8 * (4 - i - 1)); //获取数据包序列号 // } // m_lackPackageNum.push_back(DataPos); // std::cout << DataPos << std::endl; //} // ////定时检测线程 //DWORD WINAPI CUdpNonBlockingOutput::ThreadFun(LPVOID lpParameter) //{ // printf("开启线程ID号为%d的子线程\n", GetCurrentThreadId()); // CUdpNonBlockingOutput *pObj = (CUdpNonBlockingOutput *)lpParameter; // while (1) // { // //WaitForSingleObject(pObj->m_Mutex, INFINITE); // pObj->RecvMsg(BLOCK_HEAD); // // ReleaseMutex(pObj->m_Mutex); // } // return 0; //} void CUdpNonBlockingOutput::SendFile() { //通信、数据、服务、业务模型 //1、获得文件的大小 int start, end; HANDLE hFile; DWORD dwHighSize, dwBytesRead; hFile = CreateFile(_T(FILENAME), GENERIC_READ, FILE_SHARE_READ, NULL, OPEN_EXISTING, FILE_FLAG_SEQUENTIAL_SCAN, NULL); dwFileSize = GetFileSize(hFile, &dwHighSize); std::cout << "dwFileSize=" << dwFileSize << std::endl; //2、读文件内容到 BYTE * fileData 中 BOOL bSuccess; fileData = (BYTE*)malloc(dwFileSize); bSuccess = ReadFile(hFile, fileData, dwFileSize, &dwBytesRead, NULL); CloseHandle(hFile); //3、判断文件是否成功读取 if (!bSuccess || (dwBytesRead != dwFileSize)) { std::cout << "读取失败" << std::endl;; free(fileData); return; } //开启一个线程 /* m_Mutex = CreateMutex(NULL, TRUE, NULL); HANDLE ThreadHandle = CreateThread(NULL, 0, &CUdpNonBlockingOutput::ThreadFun, (LPVOID)this, NULL, NULL); CloseHandle(ThreadHandle);*/ //4、根据文件大小和设定的包大小,判断是一次发送还是分包发送 DWORD retval = 0; UINT DataPos = 0; BYTE *eachBuf = new BYTE[BLOCK_DATA_SIZE + 2 * FILE_HEAD]; memset(eachBuf, 0, BLOCK_DATA_SIZE + 2 * FILE_HEAD); eachBuf[DataPos++] = BEGIN_NUM >> 24 & 0xff;//文件起始标识符 eachBuf[DataPos++] = BEGIN_NUM >> 16 & 0xff; eachBuf[DataPos++] = BEGIN_NUM >> 8 & 0xff; eachBuf[DataPos++] = BEGIN_NUM & 0xff; eachBuf[DataPos++] = dwFileSize >> 24 & 0xff; eachBuf[DataPos++] = dwFileSize >> 16 & 0xff; eachBuf[DataPos++] = dwFileSize >> 8 & 0xff; eachBuf[DataPos++] = dwFileSize & 0xff; //5、若待发送的文件小于分包大小,则一次发完 //start = clock(); if (BLOCK_DATA_SIZE >= dwFileSize) { //二次封装要发送的数据包 //1、-----做关键包发送------ memcpy(&eachBuf[DataPos], fileData, dwFileSize);//报文格式:报文起始标识符(4)+文件大小(4)+数据(dwFileSize) retval = sendto(m_socketClient, (char*)eachBuf, dwFileSize + 2 * FILE_HEAD, 0, (SOCKADDR*)&Client, m_len); } //6、分步发送 else { DWORD n = dwFileSize / BLOCK_DATA_SIZE; //共需要几次全额发送 DWORD yu = dwFileSize % BLOCK_DATA_SIZE; //最后剩下的字 符大小 std::cout << "n=" << n << "yu=" << yu << "retval=" << retval << std::endl; //6.1 首先发送文件起始标识符、文件大小,连发三次,建立UDP连接 //报文格式:文件起始位(4)+文件大小(4) //2、-----做关键包发送------ SendKeyPackage((char*)eachBuf, 2 * FILE_HEAD); //sendto(m_socketClient, (char*)eachBuf, 2 * FILE_HEAD, MSG_DONTROUTE, (SOCKADDR*)&Client, m_len); //DWORD lTimeOut = 50; //setsockopt(m_socketClient, SOL_SOCKET, SO_RCVTIMEO, (char *)&lTimeOut, sizeof(DWORD));//设置超时重发 //recvfrom(m_socketServer, m_RecvBuff, BLOCK_DATA_SIZE + 2 * FILE_HEAD, 0, (struct sockaddr*)&ClientAddr, &m_len); for (int i = 0; i < n; i++) { DataPos = 0; eachBuf[DataPos++] = DATA_NUM >> 24 & 0xff;//文件起始标识符 eachBuf[DataPos++] = DATA_NUM >> 16 & 0xff; eachBuf[DataPos++] = DATA_NUM >> 8 & 0xff; eachBuf[DataPos++] = DATA_NUM & 0xff; eachBuf[DataPos++] = i >> 24 & 0xff; eachBuf[DataPos++] = i >> 16 & 0xff; eachBuf[DataPos++] = i >> 8 & 0xff; eachBuf[DataPos++] = i & 0xff; //std::cout << DataPos << std::endl; memcpy(&eachBuf[DataPos], fileData + i*BLOCK_DATA_SIZE, BLOCK_DATA_SIZE);//报文格式:数据包序列号(4)+数据(BLOCK_DATA_SIZE) if (sendto(m_socketClient, (char*)eachBuf, BLOCK_DATA_SIZE + 2 * BLOCK_HEAD, 0, (SOCKADDR*)&Client, m_len) < 0) std::cout << "Sendto error!!!"; } DataPos = 0; eachBuf[DataPos++] = END_NUM >> 24 & 0xff;//文件结束标识符 eachBuf[DataPos++] = END_NUM >> 16 & 0xff; eachBuf[DataPos++] = END_NUM >> 8 & 0xff; eachBuf[DataPos++] = END_NUM & 0xff; eachBuf[DataPos++] = n >> 24 & 0xff; eachBuf[DataPos++] = n >> 16 & 0xff; eachBuf[DataPos++] = n >> 8 & 0xff; eachBuf[DataPos++] = n & 0xff; memcpy(&eachBuf[DataPos], fileData + n*BLOCK_DATA_SIZE, yu);//报文格式:文件结束标识符(4)+ 数据包序列号(4)+数据(yu),断开UDP连接 //3、-----做关键包发送------ SendKeyPackage((char*)eachBuf, yu + FILE_HEAD + BLOCK_HEAD); //sendto(m_socketClient, (char*)eachBuf, yu + FILE_HEAD + BLOCK_HEAD, 0, (SOCKADDR*)&Client, m_len); int flag_recv=1, flag_status = 0; char charFileSize[4] = { 0 }; char *RecvLackNumBuf = new char[2 * 1024 + BLOCK_HEAD + FILE_HEAD+1];//4、-----做关键包接收------ while (flag_recv) { start = clock(); //接收下一步 操作 命令 重传机制 memset(RecvLackNumBuf, 0, 2 * 1024 + BLOCK_HEAD + FILE_HEAD + 1); int ret = recvfrom(m_socketServer, RecvLackNumBuf, 2 * 1024 + 2 * FILE_HEAD + 1, 0, (struct sockaddr*)&ClientAddr, &m_len); if (ret > 0) { std::cout << "Check Recv:" << ret <<std::endl; // break; } //end = clock(); flag_status = 0; memcpy(charFileSize, RecvLackNumBuf, FILE_HEAD); //拷贝前4个字节 for (int i = 0; i < 4; i++) { flag_status += ((UCHAR)charFileSize[i]) << (8 * (4 - i - 1)); //获取文件标识符 } memcpy(charFileSize, RecvLackNumBuf + FILE_HEAD, FILE_HEAD); //拷贝第5-8个字节 dwFileSize = 0; for (int i = 0; i < 4; i++) { dwFileSize += ((UCHAR)charFileSize[i]) << (8 * (4 - i - 1)); //获取文件大小 } if (flag_status == END_ALL) { std::cout << "end" << std::endl; memcpy(charFileSize, RecvLackNumBuf, FILE_HEAD);//关键数据包接收、回复 、 sendto(m_socketClient, charFileSize, sizeof(charFileSize), 0, (struct sockaddr*)&Client, m_len); flag_recv = 0; break; } else if (flag_status == CHECK_NUM) { std::cout << "check" << std::endl; memcpy(charFileSize, RecvLackNumBuf, FILE_HEAD);//关键数据包接收、回复 、 sendto(m_socketClient, charFileSize, sizeof(charFileSize), 0, (struct sockaddr*)&Client, m_len); if (dwFileSize > 510) dwFileSize = 510;//需要改进,代码不规范,每次重发包数目 最大为510个 for (int i = 0; i < dwFileSize; i++) { UINT DataPos = 0; memcpy(charFileSize, RecvLackNumBuf + i* FILE_HEAD + 2 * BLOCK_HEAD, FILE_HEAD); //拷贝第9个字节后面 for (int i = 0; i < 4; i++) { DataPos += ((UCHAR)charFileSize[i]) << (8 * (4 - i - 1)); //获取文件大小 } UINT Num = 0; eachBuf[Num++] = DATA_NUM >> 24 & 0xff;//文件起始标识符 eachBuf[Num++] = DATA_NUM >> 16 & 0xff; eachBuf[Num++] = DATA_NUM >> 8 & 0xff; eachBuf[Num++] = DATA_NUM & 0xff; eachBuf[Num++] = DataPos >> 24 & 0xff; eachBuf[Num++] = DataPos >> 16 & 0xff; eachBuf[Num++] = DataPos >> 8 & 0xff; eachBuf[Num++] = DataPos & 0xff; memcpy(&eachBuf[Num], fileData + DataPos*BLOCK_DATA_SIZE, BLOCK_DATA_SIZE);//报文格式:数据类型(4) +数据包序列号(4)+数据(BLOCK_DATA_SIZE) if (sendto(m_socketClient, (char*)eachBuf, BLOCK_DATA_SIZE + FILE_HEAD + BLOCK_HEAD + 1, 0, (SOCKADDR*)&Client, m_len) < 0) { std::cout << "Check sendto error!!!" << std::endl; break; } } UINT Num = 0; eachBuf[Num++] = END_CHECK_NUM >> 24 & 0xff;//重发校验结束标识符 eachBuf[Num++] = END_CHECK_NUM >> 16 & 0xff; eachBuf[Num++] = END_CHECK_NUM >> 8 & 0xff; eachBuf[Num++] = END_CHECK_NUM & 0xff; eachBuf[Num++] = END_CHECK_NUM >> 24 & 0xff;//重发校验结束标识符 eachBuf[Num++] = END_CHECK_NUM >> 16 & 0xff; eachBuf[Num++] = END_CHECK_NUM >> 8 & 0xff; eachBuf[Num++] = END_CHECK_NUM & 0xff; for (int i = 0; i < 3; i++)//5、-----做关键包发送------ if(sendto(m_socketClient, (char*)eachBuf, FILE_HEAD + BLOCK_HEAD, 0, (SOCKADDR*)&Client, m_len) < 0) { std::cout << "Check sendto error!!!" << std::endl; break; } } //flag_recv = 0; } std::cout << "flag_status:" << flag_status << "-" << "dwFileSize:" << dwFileSize << std::endl; //Sleep(500); end = clock(); std::cout << "耗时:" << end - start << std::endl; } }
#ifndef UDPNONBLOCKINGINPUT #define UDPNONBLOCKINGINPUT #include "winsock.h" #include <iostream> #include <process.h> #include "time.h" #include "windows.h" #include <stdio.h> #include <vector> #include <algorithm> #pragma comment(lib,"wsock32.lib") #define SERVER_PORT 2000 #define CLIENT_PORT 3000 #define BEGIN_NUM 19900711 #define DATA_NUM 20160113 #define CHECK_NUM 20161817 #define END_CHECK_NUM 20160114 #define END_NUM 11700991 #define END_ALL 20160115 #define BLOCK_DATA_SIZE (10 * 1024) #define FILE_HEAD 4 #define BLOCK_HEAD 4 #define DESTADDRESS "192.168.27.170" #define FILENAME "D:\\file.MP4" //每个包的协议头:数据类型(4字节,起始标识、终结标识、数据标识)+数据帧长度(4)+包序列号(4) //UDT包头 typedef struct _UDP_HEAD_ { UINT m_PacketType; //包类型 1数据包 2确认包 UINT m_PacketLen; //本次数据帧长度(不包括包头大小,仅仅数据部分大小) UINT m_PacketId; //包序列号 UINT m_Check; //效验和 以上字段的相加 }UDP_HEAD; class CUdpNonBlockingInput { public: CUdpNonBlockingInput() { m_flag_status = 0; m_dwFileSize = 0; m_flagNonBlocking = 0; InitSocket(); } ~CUdpNonBlockingInput() { closesocket(m_socketServer); WSACleanup(); } //static DWORD WINAPI ThreadFun(LPVOID pM); void InitSocket(); void SendKeyPackage(char *eachBuf, UINT size); void RecvKeyPackage(char *eachBuf, UINT size); void StartRevDataes(); void SendLackPackageNum(std::vector<UINT> m_lackPackageNum); private: char RecvBuff[1024]; char SendBuff[1024]; sockaddr_in Server; sockaddr_in ServerAddr; SOCKET m_socketServer; SOCKET m_socketClient; int m_len; int m_flagNonBlocking; DWORD m_dwFileSize; UINT m_prePackageNum; UINT m_flag_status; HANDLE m_Mutex; static std::vector<UINT> m_lackPackageNum; }; #endif
#include "UdpNonBlockingInput.h" void CUdpNonBlockingInput::InitSocket() { WSADATA WSAData; if (WSAStartup(MAKEWORD(2, 2), &WSAData) != 0) { printf("sock init failed!\n"); return; } m_socketServer = socket(AF_INET, SOCK_DGRAM, 0);//创建socket m_socketClient = socket(AF_INET, SOCK_DGRAM, 0);//创建socket if (m_socketServer == SOCKET_ERROR) { printf("sock create failed\n"); WSACleanup(); return; } ServerAddr.sin_family = AF_INET; ServerAddr.sin_addr.s_addr = htonl(INADDR_ANY); ServerAddr.sin_port = htons(SERVER_PORT); if (bind(m_socketServer, (struct sockaddr*)&ServerAddr, sizeof(struct sockaddr_in)) == SOCKET_ERROR)//绑定 { printf("sock bind error!\n"); closesocket(m_socketServer); WSACleanup(); return; } ////////////////////////////////////////////////////////////////////////// Server.sin_family = AF_INET; Server.sin_addr.s_addr = inet_addr(DESTADDRESS); Server.sin_port = htons(CLIENT_PORT); m_len = sizeof(Server); return; } void CUdpNonBlockingInput::SendKeyPackage(char *eachBuf,UINT size) { char recvKeyWord[4] = { 0 }, keyWord[4] = { 0 }, charFileSize[4] = { 0 }; memcpy(keyWord, eachBuf, FILE_HEAD); //拷贝前4个字节 sendto(m_socketClient, (char*)eachBuf, size, 0, (SOCKADDR*)&Server, m_len); // DWORD lTimeOut = 1; // setsockopt(m_socketServer, SOL_SOCKET, SO_RCVTIMEO, (char *)&lTimeOut, sizeof(DWORD));//设置超时重发 for (int i = 0, startClock, endClock; i < 100; i++) { startClock = clock(); while (1) { if (recvfrom(m_socketServer, recvKeyWord, FILE_HEAD, 0, (struct sockaddr*)&ServerAddr, &m_len) < 0) return; else { if (!strcmp(recvKeyWord, keyWord)) { return; } } endClock = clock(); if ((endClock - startClock) > 5) break; } sendto(m_socketClient, (char*)eachBuf, size, 0, (SOCKADDR*)&Server, m_len); } } void CUdpNonBlockingInput::RecvKeyPackage(char *eachBuf, UINT size) { char recvKeyWord[4] = { 0 }; int startClock = clock(), endClock;//接收关键包 while (1) { if (recvfrom(m_socketServer, eachBuf, size, 0, (struct sockaddr*)&ServerAddr, &m_len) > 0) { memcpy(recvKeyWord, eachBuf, FILE_HEAD); break; } endClock = clock(); if ((endClock - startClock) > 5) break; } startClock = clock(); while (1) { if (sendto(m_socketClient, recvKeyWord, sizeof(recvKeyWord), 0, (struct sockaddr*)&Server, m_len) > 0) break; endClock = clock(); if ((endClock - startClock) > 5) break; } } std::vector<UINT> CUdpNonBlockingInput::m_lackPackageNum; void CUdpNonBlockingInput::SendLackPackageNum(std::vector<UINT> m_lackPackageNum) { int startClock = clock(),endClock; if (m_lackPackageNum.size() > 0) { char *SendLackNumBuf = new char[2 * 1024 + BLOCK_HEAD + FILE_HEAD]; int DataPos = 0; SendLackNumBuf[DataPos++] = CHECK_NUM >> 24 & 0xff;//报文格式:校验标识符CHECK_NUM + 包大小FileSize SendLackNumBuf[DataPos++] = CHECK_NUM >> 16 & 0xff; SendLackNumBuf[DataPos++] = CHECK_NUM >> 8 & 0xff; SendLackNumBuf[DataPos++] = CHECK_NUM & 0xff; SendLackNumBuf[DataPos++] = m_lackPackageNum.size() >> 24 & 0xff; SendLackNumBuf[DataPos++] = m_lackPackageNum.size() >> 16 & 0xff; SendLackNumBuf[DataPos++] = m_lackPackageNum.size() >> 8 & 0xff; SendLackNumBuf[DataPos++] = m_lackPackageNum.size() & 0xff; for (int i = 0; i < m_lackPackageNum.size(); i++) { if (DataPos >(2 * 1024))//防止数组下标越界 重发的数据包不会超过510个 break; SendLackNumBuf[DataPos++] = m_lackPackageNum[i] >> 24 & 0xff;//报文格式:数据标识符+包大小+数据内容 SendLackNumBuf[DataPos++] = m_lackPackageNum[i] >> 16 & 0xff; SendLackNumBuf[DataPos++] = m_lackPackageNum[i] >> 8 & 0xff; SendLackNumBuf[DataPos++] = m_lackPackageNum[i] & 0xff; } SendKeyPackage(SendLackNumBuf, 2 * 1024 + BLOCK_HEAD + FILE_HEAD + 1); //sendto(m_socketClient, SendLackNumBuf, 2 * 1024 + BLOCK_HEAD + FILE_HEAD + 1, 0, (struct sockaddr*)&Server, m_len); if (SendLackNumBuf) { delete SendLackNumBuf; SendLackNumBuf = NULL; } } endClock = clock(); std::cout << "重发耗时:" << endClock - startClock << "ms" << m_lackPackageNum.size() << std::endl; } ////定时检测线程 //DWORD WINAPI CUdpNonBlockingInput::ThreadFun(LPVOID lpParameter) //{ // printf("开启线程ID号为%d的子线程\n", GetCurrentThreadId()); // CUdpNonBlockingInput *pObj = (CUdpNonBlockingInput *)lpParameter; // WaitForSingleObject(pObj->m_Mutex, INFINITE); // pObj->SendLackPackageNum(m_lackPackageNum); // ReleaseMutex(pObj->m_Mutex); // return 0; //} void CUdpNonBlockingInput::StartRevDataes() { //创建接收缓存区 、创建文件 //开启一个线程 //m_Mutex = CreateMutex(NULL, TRUE, NULL); //HANDLE ThreadHandle = CreateThread(NULL, 0, &CUdpNonBlockingInput::ThreadFun, (LPVOID)this, NULL, NULL); //CloseHandle(ThreadHandle); char *eachBuf = new char[BLOCK_DATA_SIZE + 3 * FILE_HEAD];//开启接收缓存区 memset(eachBuf, 0, BLOCK_DATA_SIZE + 2 * FILE_HEAD + 1); FILE *fp;//创建文件 unsigned int RecvNum = 0, flag_recv = 1; m_prePackageNum = 0;//当前序列号 m_lackPackageNum.clear();//待确认的序列号队列 fp = fopen(FILENAME, "wb"); //sendto(m_socketClient, "Recv", sizeof("Revc") + 1, 0, (struct sockaddr*)&Server, m_len); //1、读取第一组数据,获取文件大小,建立UDP连接--------1、做关键包接收 RecvKeyPackage(eachBuf, BLOCK_DATA_SIZE + 2 * FILE_HEAD); //recvfrom(m_socketServer, eachBuf, BLOCK_DATA_SIZE + 2 * FILE_HEAD, 0, (struct sockaddr*)&ServerAddr, &m_len); //m_from.sin_addr.S_un.S_addr = inet_addr(inet_ntoa(m_from.sin_addr));//String转换成char型函数c_str() char charFileSize[4] = { 0 }; memcpy(charFileSize, eachBuf, FILE_HEAD); //拷贝前4个字节 for (int i = 0; i < 4; i++) { m_flag_status += ((UCHAR)charFileSize[i]) << (8 * (4 - i - 1)); //获取文件起始符 } memcpy(charFileSize, eachBuf + FILE_HEAD, FILE_HEAD); //拷贝第5-8个字节 for (int i = 0; i < 4; i++) { m_dwFileSize += ((UCHAR)charFileSize[i]) << (8 * (4 - i - 1)); //获取文件大小 } int startClock = clock(), endClock; if (m_flag_status == BEGIN_NUM)//成功建立UDP连接 { //2、根据文件大小,判断是一次接收完 if (m_dwFileSize <= BLOCK_DATA_SIZE) { fwrite(eachBuf + 2 * FILE_HEAD, m_dwFileSize, 1, fp); fclose(fp); delete eachBuf; } else//还是分步接收 { //开辟接收内存 char *FileBuffer = new char[m_dwFileSize]; memset(FileBuffer, 0, m_dwFileSize); DWORD n = m_dwFileSize / BLOCK_DATA_SIZE; //共需要几次全额发送 DWORD yu = m_dwFileSize % BLOCK_DATA_SIZE; //最后剩下的字符大小 //DWORD lTimeOut = 2; //setsockopt(m_socketServer, SOL_SOCKET, SO_RCVTIMEO, (char *)&lTimeOut, sizeof(DWORD));//设置接收超时 2ms unsigned int DataPos = 0, flag_check = 0; flag_recv = 1; //--- unsigned long ul = 1; ioctlsocket(m_socketServer, FIONBIO, (unsigned long *)&ul);//设置0阻塞模式 while (flag_recv) { if (recvfrom(m_socketServer, eachBuf, BLOCK_DATA_SIZE + 2 * BLOCK_HEAD + 1, 0, (struct sockaddr*)&ServerAddr, &m_len) == -1) { //std::cout << "Recv error!!!"; //break; } else RecvNum++; memcpy(charFileSize, eachBuf, FILE_HEAD); //拷贝前4个字节 m_flag_status = 0; for (int i = 0; i < 4; i++) { m_flag_status += ((UCHAR)charFileSize[i]) << (8 * (4 - i - 1)); //获取文件标识符 } switch (m_flag_status) { case BEGIN_NUM:break; case DATA_NUM: { DataPos = 0; for (int i = 0; i < 4; i++) { DataPos += ((UCHAR)eachBuf[i+FILE_HEAD]) << (8 * (4 - i - 1)); //获取数据包序列号 } if (DataPos >= 0 && DataPos <= n) { if (flag_check == 0) { if ((DataPos - m_prePackageNum) > 1)//获取缺失的分包序列号 { for (int i = DataPos - m_prePackageNum - 1; i > 0; i--) { m_lackPackageNum.push_back(DataPos - i); } } } else//去掉收到的分包序列号 { //std:: cout << 8; m_lackPackageNum.erase(remove(m_lackPackageNum.begin(), m_lackPackageNum.end(), DataPos), m_lackPackageNum.end()); } m_prePackageNum = DataPos; //std::cout << DataPos << std::endl; memcpy(FileBuffer + DataPos*BLOCK_DATA_SIZE, eachBuf + BLOCK_HEAD + FILE_HEAD, BLOCK_DATA_SIZE); } break; } case END_NUM: { std::cout << "END_NUM:" << RecvNum << std::endl; flag_check = 1;//丢包序号队列 memcpy(FileBuffer + n*BLOCK_DATA_SIZE, eachBuf + FILE_HEAD + BLOCK_HEAD, yu); char recvKeyWord[4] = { 0 }; memcpy(recvKeyWord, eachBuf, FILE_HEAD);//关键数据包接收、回复 、 sendto(m_socketClient, recvKeyWord, sizeof(recvKeyWord), 0, (struct sockaddr*)&Server, m_len); if (m_lackPackageNum.size() > 0)//判断是否丢包 { /*for (int i = 0; i < m_lackPackageNum.size(); i++) std::cout << m_lackPackageNum[i] << std::endl;*/ //for (int i = 0; i < 3;i++) SendLackPackageNum(m_lackPackageNum);//2、-----做关键包发送------ } else//发送结束标识符 { std::cout << "SUCESS!!!" << std::endl; if (eachBuf) { delete eachBuf; eachBuf = NULL; } fwrite(FileBuffer, m_dwFileSize, 1, fp); fclose(fp); char SendCheckCountBuf[4]; DataPos = 0; SendCheckCountBuf[DataPos++] = END_ALL >> 24 & 0xff;//报文格式:校验标识符CHECK_NUM + 包大小FileSize SendCheckCountBuf[DataPos++] = END_ALL >> 16 & 0xff; SendCheckCountBuf[DataPos++] = END_ALL >> 8 & 0xff; SendCheckCountBuf[DataPos++] = END_ALL & 0xff;//3、-----做关键包发送------ SendKeyPackage(SendCheckCountBuf, BLOCK_HEAD + 1); // sendto(m_socketClient, SendCheckCountBuf, BLOCK_HEAD + 1, 0, (struct sockaddr*)&Server, m_len); flag_recv = 0; } break; } case END_CHECK_NUM:{ std::cout << "END_CHECK_NUM" << std::endl; flag_check = 1;//丢包序号队列 if (m_lackPackageNum.size() > 0)//判断是否丢包 { SendLackPackageNum(m_lackPackageNum);//4、-----做关键包发送------ } else//发送结束标识符 { if (eachBuf) { delete eachBuf; eachBuf = NULL; } fwrite(FileBuffer, m_dwFileSize, 1, fp); fclose(fp); char SendCheckCountBuf[4]; DataPos = 0; SendCheckCountBuf[DataPos++] = END_ALL >> 24 & 0xff;//报文格式:校验标识符CHECK_NUM + 包大小FileSize SendCheckCountBuf[DataPos++] = END_ALL >> 16 & 0xff; SendCheckCountBuf[DataPos++] = END_ALL >> 8 & 0xff; SendCheckCountBuf[DataPos++] = END_ALL & 0xff; //5、-----做关键包发送------ SendKeyPackage(SendCheckCountBuf, BLOCK_HEAD + 1); //sendto(m_socketClient, SendCheckCountBuf, BLOCK_HEAD + 1, 0, (struct sockaddr*)&Server, m_len); flag_recv = 0; std::cout << "SUCESS!!!" << std::endl; } break; } } } } } else { return; } endClock = clock(); std::cout << "收到数据:" << RecvNum << "-" << m_dwFileSize << "耗时:" << endClock - startClock << std::endl; }
标签:
原文地址:http://blog.csdn.net/sty23122555/article/details/51506952