标签:
不管是数据的发送还是数据的接收,大体的流程我们基本上是都理了一下,还分析了数据收发过程中用的数据结构,接下来就看一些UDT中数据收发更精细的一些控制。
来看一下UDT中数据收发的可靠性保障。
先来看一下CRcvLossList的定义:
class CRcvLossList { public: CRcvLossList(int size = 1024); ~CRcvLossList(); // Functionality: // Insert a series of loss seq. no. between "seqno1" and "seqno2" into the receiver‘s loss list. // Parameters: // 0) [in] seqno1: sequence number starts. // 1) [in] seqno2: seqeunce number ends. // Returned value: // None. void insert(int32_t seqno1, int32_t seqno2); // Functionality: // Remove a loss seq. no. from the receiver‘s loss list. // Parameters: // 0) [in] seqno: sequence number. // Returned value: // if the packet is removed (true) or no such lost packet is found (false). bool remove(int32_t seqno); // Functionality: // Remove all packets between seqno1 and seqno2. // Parameters: // 0) [in] seqno1: start sequence number. // 1) [in] seqno2: end sequence number. // Returned value: // if the packet is removed (true) or no such lost packet is found (false). bool remove(int32_t seqno1, int32_t seqno2); // Functionality: // Find if there is any lost packets whose sequence number falling seqno1 and seqno2. // Parameters: // 0) [in] seqno1: start sequence number. // 1) [in] seqno2: end sequence number. // Returned value: // True if found; otherwise false. bool find(int32_t seqno1, int32_t seqno2) const; // Functionality: // Read the loss length. // Parameters: // None. // Returned value: // the length of the list. int getLossLength() const; // Functionality: // Read the first (smallest) seq. no. in the list. // Parameters: // None. // Returned value: // the sequence number or -1 if the list is empty. int getFirstLostSeq() const; // Functionality: // Get a encoded loss array for NAK report. // Parameters: // 0) [out] array: the result list of seq. no. to be included in NAK. // 1) [out] physical length of the result array. // 2) [in] limit: maximum length of the array. // Returned value: // None. void getLossArray(int32_t* array, int& len, int limit); private: int32_t* m_piData1; // sequence number starts int32_t* m_piData2; // sequence number ends int* m_piNext; // next node in the list int* m_piPrior; // prior node in the list; int m_iHead; // first node in the list int m_iTail; // last node in the list; int m_iLength; // loss length int m_iSize; // size of the static array private: CRcvLossList(const CRcvLossList&); CRcvLossList& operator=(const CRcvLossList&); };
这个容器的定义,看起来与CSndLossList还是蛮相似的。
但这个数据结构更具体的内容则还是需要通过成员函数的定义来厘清。先来看构造函数和析构函数(src/list.cpp):
CRcvLossList::CRcvLossList(int size) : m_piData1(NULL), m_piData2(NULL), m_piNext(NULL), m_piPrior(NULL), m_iHead(-1), m_iTail(-1), m_iLength(0), m_iSize(size) { m_piData1 = new int32_t[m_iSize]; m_piData2 = new int32_t[m_iSize]; m_piNext = new int[m_iSize]; m_piPrior = new int[m_iSize]; // -1 means there is no data in the node for (int i = 0; i < size; ++i) { m_piData1[i] = -1; m_piData2[i] = -1; } } CRcvLossList::~CRcvLossList() { delete[] m_piData1; delete[] m_piData2; delete[] m_piNext; delete[] m_piPrior; }
构造函数和析构函数主要还是在分配内存与释放内存,仍然没有办法将这个数据结构看的太清晰。
接着来看CRcvLossList::insert():
void CRcvLossList::insert(int32_t seqno1, int32_t seqno2) { // Data to be inserted must be larger than all those in the list // guaranteed by the UDT receiver if (0 == m_iLength) { // insert data into an empty list m_iHead = 0; m_iTail = 0; m_piData1[m_iHead] = seqno1; if (seqno2 != seqno1) m_piData2[m_iHead] = seqno2; m_piNext[m_iHead] = -1; m_piPrior[m_iHead] = -1; m_iLength += CSeqNo::seqlen(seqno1, seqno2); return; } // otherwise searching for the position where the node should be int offset = CSeqNo::seqoff(m_piData1[m_iHead], seqno1); int loc = (m_iHead + offset) % m_iSize; if ((-1 != m_piData2[m_iTail]) && (CSeqNo::incseq(m_piData2[m_iTail]) == seqno1)) { // coalesce with prior node, e.g., [2, 5], [6, 7] becomes [2, 7] loc = m_iTail; m_piData2[loc] = seqno2; } else { // create new node m_piData1[loc] = seqno1; if (seqno2 != seqno1) m_piData2[loc] = seqno2; m_piNext[m_iTail] = loc; m_piPrior[loc] = m_iTail; m_piNext[loc] = -1; m_iTail = loc; } m_iLength += CSeqNo::seqlen(seqno1, seqno2); }1. 与 CSndLossList 的insert()有点类似,也是首先处理了 最简单的向空链表中插入元素的case。
这个结构同样是将丢失packet的区间组织成一个链表,但不是单向链表,而是双向链表,相同位置处的m_piData1,m_piData2,m_piNext和m_piPrior的元素共同描述了一个链表中的节点,其中m_piData1的元素描述了区间的起始SeqNo,它总是一个有效的SeqNo;m_piData2的元素描述了区间的结束SeqNo,当这个值为-1时,表示区间里只有一个元素,否则表示区间的结束SeqNo;m_piNext的元素描述了下一个节点的位置,m_piPrior的元素描述前一个的位置。
对于这种case的处理,主要就是插入一个节点,更新链表头指针m_iHead,尾指针m_iTail,和丢失包的个数m_iLength字段就退出。
2. 对于CRcvLossList中之前已经有了节点的情况下,插入丢失packet区间的处理,相比于CSndLossList的insert()中对应部分的处理还是要简单很多。
对于CSndLossList来说,要insert()的case较多,比如接收到了接收端发回来的NAK Loss Report,或者是一个packet超过应有的时间都没有ACK过来等,这样造成的后果就是插入的区间可能是无序的,比如,第一次插入了[8, 10],第二次要插入的区间可能是[3, 5],也可能是[13, 17]。
但在CRcvLossList中就只有在接收到一个数据packet,发现新收到的packet的SeqNo大于(之前接收到的最大的SeqNo + 1)时才去执行insert(),因而总是向链表尾部插入元素,就可以保证链表中的节点总是以区间的起始SeqNo的升序排列的。
在4个数组中节点之间位置的相对关系,同CSndLossList中的一样,节点之间的位置差值等于它们的起始SeqNo之间的差值。
这里的处理过程也正是向链表尾部插入节点的过程。向链表尾部插入时,又要分为两种情况来处理,一是新插入的这个区间起始SeqNo == (原来的尾部节点结束SeqNo + 1),即新插入的区间要与尾部节点合并的情况,此时主要是更新尾部节点的结束SeqNo字段为新插入的这个区间的结束SeqNo,原来的那段code貌似写成下面这样要更好一点吧:
if ((-1 != m_piData2[m_iTail]) && (CSeqNo::incseq(m_piData2[m_iTail]) == seqno1)) { // coalesce with prior node, e.g., [2, 5], [6, 7] becomes [2, 7] m_piData2[m_iTail] = seqno2; } else {
另一种情况就是要完全插入一个新节点的情况,此时则主要是创建一个新节点插入链表尾部,然后更新链表的尾指针m_iTail。
3. 更新丢失包的个数m_iLength字段然后退出。
然后来看用于移除元素的remove()函数。CRcvLossList提供了两个remove函数,一个用于移除一整个丢失接收packet区间,另一个用于移除单独的一个packet。这里先来看一下用于移除packet区间的remove()函数:
bool CRcvLossList::remove(int32_t seqno1, int32_t seqno2) { if (seqno1 <= seqno2) { for (int32_t i = seqno1; i <= seqno2; ++i) remove(i); } else { for (int32_t j = seqno1; j < CSeqNo::m_iMaxSeqNo; ++j) remove(j); for (int32_t k = 0; k <= seqno2; ++k) remove(k); } return true; }
在这个函数中,seqno1为要移除的packet区间的起始SeqNo,seqno2则为结束SeqNo。
在这里主要分成两种情况来处理,一是seqno1小于等于seqno2,此时认为这两个值都没有因为超出了SeqNo的最大限制值,而被绕回到SeqNo取值空间的开始位置处,则一个一个的移除这个区间中的没一个值;二是seqno1大于seqno2,此时认为seqno2是由于超出了SeqNo的最大限制值而被绕回到SeqNo取值空间的开始位置处了,则将要移除的拆分成两个来处理,一是[seqno1, m_iMaxSeqNo),二是[0, seqno2],然后分别移除这两个区间中每一个值。
最后返回true。
接着再来看一下移除单个packet的remove()函数:
bool CRcvLossList::remove(int32_t seqno) { if (0 == m_iLength) return false; // locate the position of "seqno" in the list int offset = CSeqNo::seqoff(m_piData1[m_iHead], seqno); if (offset < 0) return false; int loc = (m_iHead + offset) % m_iSize; if (seqno == m_piData1[loc]) { // This is a seq. no. that starts the loss sequence if (-1 == m_piData2[loc]) { // there is only 1 loss in the sequence, delete it from the node if (m_iHead == loc) { m_iHead = m_piNext[m_iHead]; if (-1 != m_iHead) m_piPrior[m_iHead] = -1; } else { m_piNext[m_piPrior[loc]] = m_piNext[loc]; if (-1 != m_piNext[loc]) m_piPrior[m_piNext[loc]] = m_piPrior[loc]; else m_iTail = m_piPrior[loc]; } m_piData1[loc] = -1; } else { // there are more than 1 loss in the sequence // move the node to the next and update the starter as the next loss inSeqNo(seqno) // find next node int i = (loc + 1) % m_iSize; // remove the "seqno" and change the starter as next seq. no. m_piData1[i] = CSeqNo::incseq(m_piData1[loc]); // process the sequence end if (CSeqNo::seqcmp(m_piData2[loc], CSeqNo::incseq(m_piData1[loc])) > 0) m_piData2[i] = m_piData2[loc]; // remove the current node m_piData1[loc] = -1; m_piData2[loc] = -1; // update list pointer m_piNext[i] = m_piNext[loc]; m_piPrior[i] = m_piPrior[loc]; if (m_iHead == loc) m_iHead = i; else m_piNext[m_piPrior[i]] = i; if (m_iTail == loc) m_iTail = i; else m_piPrior[m_piNext[i]] = i; } m_iLength--; return true; } // There is no loss sequence in the current position // the "seqno" may be contained in a previous node // searching previous node int i = (loc - 1 + m_iSize) % m_iSize; while (-1 == m_piData1[i]) i = (i - 1 + m_iSize) % m_iSize; // not contained in this node, return if ((-1 == m_piData2[i]) || (CSeqNo::seqcmp(seqno, m_piData2[i]) > 0)) return false; if (seqno == m_piData2[i]) { // it is the sequence end if (seqno == CSeqNo::incseq(m_piData1[i])) m_piData2[i] = -1; else m_piData2[i] = CSeqNo::decseq(seqno); } else { // split the sequence // construct the second sequence from CSeqNo::incseq(seqno) to the original sequence end // located at "loc + 1" loc = (loc + 1) % m_iSize; m_piData1[loc] = CSeqNo::incseq(seqno); if (CSeqNo::seqcmp(m_piData2[i], m_piData1[loc]) > 0) m_piData2[loc] = m_piData2[i]; // the first (original) sequence is between the original sequence start to CSeqNo::decseq(seqno) if (seqno == CSeqNo::incseq(m_piData1[i])) m_piData2[i] = -1; else m_piData2[i] = CSeqNo::decseq(seqno); // update the list pointer m_piNext[loc] = m_piNext[i]; m_piNext[i] = loc; m_piPrior[loc] = i; if (m_iTail == i) m_iTail = loc; else m_piPrior[m_piNext[loc]] = loc; } m_iLength--; return true; }
在这个函数中执行的过程如下:
1. 检查List是否为空,若为空,直接返回false推出,否则继续执行。
2. 计算头节点所表示的丢失packet范围的起始SeqNo与参数seqno的offset,若offset小于0,表明要移除的SeqNo已经不在CRcvLossList中了,则直接退出,否则继续执行。
3. 计算可能以seqno作为起始SeqNo字段的节点的位置loc。
4.处理传入参数seqno为某个节点的起始SeqNo值的case,这个case下又分了多个子case来处理:
对于这些case中每种的具体处理,此处不再赘述。
处理完之后就返回true。
5. 传入参数seqno不是某个节点的起始SeqNo值的情况,则需要先找到可能包含了参数seqno的节点。这里是从loc开始,通过一个循环,向loc逐渐减少的方向,依次检查m_piData1数组值来查找的,找到位置小于loc,且距离loc最近的节点。
这种方法不同于CSndLossList::remove()中采用的,从链表的头部开始查找的过程。两种方法自然是各有优劣,这里的方法对于节点非常多,每个节点描述的区间比较小的情况比较高效,而实际上由于接收到的data packet顺序的随机性,CRcvLossList形成这样的链表的可能性还是蛮大的。而CSndLossList::remove()中采用的方法,则对于那种节点数不多,每个节点表示的区间比较长,节点之间距离较远的情况比较高效。
6. 检查找到的节点是否包含参数seqno,若不包含说明seqno不在丢失packet列表CRcvLossList中,则直接返回false退出。
7. 找到的节点表示的范围包含了传入的参数seqno,又可以分为3中case来处理。
case 1:传入的参数seqno是找到的节点的结束SeqNo值,且找到的节点表示的范围中只包含了2个SeqNo,此时需要将该节点的结束SeqNo值置为-1。
case 2:传入的参数seqno是找到的节点的结束SeqNo值,且找到的节点表示的范围中只包含了超过3个的SeqNo,此时则需将该节点的结束SeqNo值置为CSeqNo::decseq(seqno)。
case 3:传入的参数seqno是找到的节点表示的区间中间的某个值,此时则需要将原来的节点一分为2。
8. 递减m_iLength。
9. 返回true。
接收端只有在接收到DropMsg消息时才会执行移除丢失packet区间的remove(int32_t seqno1, int32_t seqno2)。每次接收到一个数据packet时,若其SeqNo不大于m_iRcvCurrSeqNo,都会尝试通过移除单个packet的remove(int32_t seqno)将相应的packet从m_pRcvLossList中移除。
最后再来看几个getter函数:
int CRcvLossList::getLossLength() const { return m_iLength; } int CRcvLossList::getFirstLostSeq() const { if (0 == m_iLength) return -1; return m_piData1[m_iHead]; } void CRcvLossList::getLossArray(int32_t* array, int& len, int limit) { len = 0; int i = m_iHead; while ((len < limit - 1) && (-1 != i)) { array[len] = m_piData1[i]; if (-1 != m_piData2[i]) { // there are more than 1 loss in the sequence array[len] |= 0x80000000; ++len; array[len] = m_piData2[i]; } ++len; i = m_piNext[i]; } }
getLossLength()用于返回CRcvLossList中包含的丢失packet的总个数。
getFirstLostSeq()用于返回CRcvLossList中最小的SeqNo值。
getLossArray()则返回一个用于NAK消息的丢失packet列表。这里可以一窥NAK消息中,丢失packet是怎么表示的。在丢失packet列表中,一个最高位为1,其余位表示一个合法SeqNo的值,及其后紧紧跟着的一个合法SeqNo值表示一个丢失packet区间,一个单独的SeqNo值项则表示一个单独的丢失的packet。
对于CRcvLossList的分析就到这里。
如我们前面在CUDT::processData()中看到的,在发现接收到的packet的SeqNo大于CSeqNo::incseq(m_iRcvCurrSeqNo)时,会在将[CSeqNo::incseq(m_iRcvCurrSeqNo), CSeqNo::decseq(packet.m_iSeqNo)]区间插入m_pRcvLossList之后,就立即向数据的发送端发送一个NAK report。
这里先来看一下NAK消息的发送,在CUDT::sendCtrl(int pkttype, void* lparam, void* rparam, int size)中(src/core.cpp):
void CUDT::sendCtrl(int pkttype, void* lparam, void* rparam, int size) { CPacket ctrlpkt; switch (pkttype) { case 2: //010 - Acknowledgement { 。。。。。。 case 3: //011 - Loss Report { if (NULL != rparam) { if (1 == size) { // only 1 loss packet ctrlpkt.pack(pkttype, NULL, (int32_t *) rparam + 1, 4); } else { // more than 1 loss packets ctrlpkt.pack(pkttype, NULL, rparam, 8); } ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); ++m_iSentNAK; ++m_iSentNAKTotal; } else if (m_pRcvLossList->getLossLength() > 0) { // this is periodically NAK report; make sure NAK cannot be sent back too often // read loss list from the local receiver loss list int32_t* data = new int32_t[m_iPayloadSize / 4]; int losslen; m_pRcvLossList->getLossArray(data, losslen, m_iPayloadSize / 4); if (0 < losslen) { ctrlpkt.pack(pkttype, NULL, data, losslen * 4); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); ++m_iSentNAK; ++m_iSentNAKTotal; } delete[] data; } // update next NAK time, which should wait enough time for the retansmission, but not too long m_ullNAKInt = (m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency; int rcv_speed = m_pRcvTimeWindow->getPktRcvSpeed(); if (rcv_speed > 0) m_ullNAKInt += (m_pRcvLossList->getLossLength() * 1000000ULL / rcv_speed) * m_ullCPUFrequency; if (m_ullNAKInt < m_ullMinNakInt) m_ullNAKInt = m_ullMinNakInt; break; }
发送NAK消息的过程大体为:
1. 要发送的NAK消息可分为两种类型。一种如我们前面在CUDT::processData()中看到的,针对单个packet或单个packet区间的NAK消息;第二种是在定时器中发送,包含了CRcvLossList中尽可能多的丢失packet或区间的NAK。
(1). 对于第一种类型,如我们前面在CUDT::processData()中所见,CUDT::sendCtrl()的rparam参数是一个两元素的int32_t数组,其中第一个元素为丢失packet的SeqNo区间的首个SeqNo,且最高位会被置为1,第二个元素为区间的最后一个SeqNo,参数size用来描述区间中是有1个SeqNo还是有多个。通过前面对CRcvLossList::getLossArray()的分析,实在不难理解为什么rparam的第一个int32_t元素的最高位会被置1,应该说这个是UDT协议消息规范的一部分。
在这里会首先根据size参数的值,将SeqNo或SeqNo区间pack进packet。对于单个packet的情况,不难理解rparam中的两个int32_t中的SeqNo部分都是一样的,只是第一个int32_t的最高位被置为了1。接着是设置packet的目标SocketID为PeerID,之后便是通过发送队列m_pSndQueue发送packet,然后还会更新用于统计的字段m_iSentNAK和m_iSentNAKTotal。
(2). 对于第二种类型,则会分配一块接近m_iPayloadSize大小的数据缓冲区data,接着通过CRcvLossList::getLossArray()将尽可能多的丢失packet或packet区间读进缓冲区data,然后在实际读到了丢失packet或packet区间的情况下,将data pack进packet,并通过发送队列m_pSndQueue发送packet,然后还会更新用于统计的字段m_iSentNAK和m_iSentNAKTotal。最后delete掉前面分配的数据缓冲区data。
在CUDT::checkTimers()中可以看到下面的这段被注释掉的code:
// we are not sending back repeated NAK anymore and rely on the sender‘s EXP for retransmission //if ((m_pRcvLossList->getLossLength() > 0) && (currtime > m_ullNextNAKTime)) //{ // // NAK timer expired, and there is loss to be reported. // sendCtrl(3); // // CTimer::rdtsc(currtime); // m_ullNextNAKTime = currtime + m_ullNAKInt; //}
可以看到,UDT的开发者认为,不再需要在定时器中发送重复的NAK消息了,在CUDT::processData()中首次发现有丢包时发送针对单个packet或单个packet区间的NAK消息就足够了。
因而前面我们讨论的第二种类型的NAK消息的发送实际是不work的,因而相关的部分也都变成了历史遗留问题了。
2. 在实际发送NAK消息之后,还会更新m_ullNAKInt。这个变量原本主要用于控制定时器中对于NAK消息的发送,但目前也是成为了历史遗留问题,而不再起实际作用了。
NAK消息本身的发送大体如此。
接着再来看一下数据发送端在接到NAK消息的处理。NAK消息如同数据消息一样,也是在建立之后才能收发的。因而其dispatch可以参考数据的接收部分的分析,只是作为一种控制消息,它会被dispatch给CUDT::processCtrl(),这里就来看一下数据发送端对于NAK消息的处理:
void CUDT::processCtrl(CPacket& ctrlpkt) { // Just heard from the peer, reset the expiration count. m_iEXPCount = 1; uint64_t currtime; CTimer::rdtsc(currtime); m_ullLastRspTime = currtime; switch (ctrlpkt.getType()) { case 2: //010 - Acknowledgement { 。。。。。。 case 3: //011 - Loss Report { int32_t* losslist = (int32_t *) (ctrlpkt.m_pcData); m_pCC->onLoss(losslist, ctrlpkt.getLength() / 4); CCUpdate(); bool secure = true; // decode loss list message and insert loss into the sender loss list for (int i = 0, n = (int) (ctrlpkt.getLength() / 4); i < n; ++i) { if (0 != (losslist[i] & 0x80000000)) { if ((CSeqNo::seqcmp(losslist[i] & 0x7FFFFFFF, losslist[i + 1]) > 0) || (CSeqNo::seqcmp(losslist[i + 1], m_iSndCurrSeqNo) > 0)) { // seq_a must not be greater than seq_b; seq_b must not be greater than the most recent sent seq secure = false; break; } int num = 0; if (CSeqNo::seqcmp(losslist[i] & 0x7FFFFFFF, m_iSndLastAck) >= 0) num = m_pSndLossList->insert(losslist[i] & 0x7FFFFFFF, losslist[i + 1]); else if (CSeqNo::seqcmp(losslist[i + 1], m_iSndLastAck) >= 0) num = m_pSndLossList->insert(m_iSndLastAck, losslist[i + 1]); m_iTraceSndLoss += num; m_iSndLossTotal += num; ++i; } else if (CSeqNo::seqcmp(losslist[i], m_iSndLastAck) >= 0) { if (CSeqNo::seqcmp(losslist[i], m_iSndCurrSeqNo) > 0) { //seq_a must not be greater than the most recent sent seq secure = false; break; } int num = m_pSndLossList->insert(losslist[i], losslist[i]); m_iTraceSndLoss += num; m_iSndLossTotal += num; } } if (!secure) { //this should not happen: attack or bug m_bBroken = true; m_iBrokenCounter = 0; break; } // the lost packet (retransmission) should be sent out immediately m_pSndQueue->m_pSndUList->update(this); ++m_iRecvNAK; ++m_iRecvNAKTotal; break; }
可以看到处理过程为:
1. 在CUDT::processCtrl()开始处,会统一地复位超时计数器m_iEXPCount,并将当前时间读进m_ullLastRspTime。
2. 从packet中获取丢失packet或packet区间列表losslist。
3. 执行拥塞控制器的onLoss()回调。
4. 执行CCUpdate()。这个操作主要用于控制发送窗口的大小,及发包的频率。后面在研究拥塞控制时再来研究它。
5. 通过一个循环从loss list中解码出丢失的packet或packet区间列表。
(1). 循环体中首先是处理的packet区间的情况。
对于这种情况,会先对SeqNo的有效性进行检查。区间的起始SeqNo必须小于等于结束SeqNo值,且结束SeqNo必须小于等于m_iSndCurrSeqNo才是合法的。若发现出现了不合法的SeqNo区间,则认为这是一个攻击packet,会将secure置为false,并直接跳出对于loss list的解码。否则继续执行。
在丢失packet区间的起始SeqNo大于等于m_iSndLastAck时,将解码到的整个区间插入发送丢失列表中。
在在丢失packet区间的起始SeqNo小于m_iSndLastAck,而结束SeqNo大于m_iSndLastAck时,将区间[m_iSndLastAck, losslist[i + 1]]插入发送丢失列表中。
对于其它情况,也即丢失packet区间的结束SeqNo小于m_iSndLastAck的情况,则表明这个丢失packet区间中的packet都已经滑出了发送窗口了,因而不再向发送丢失列表中插入元素。
更新m_iTraceSndLoss,m_iSndLossTotal这两个用于做统计的字段。
递增循环计数i,以跳过丢失packet区间的结束SeqNo。
(2). 处理单个packet的情况。
只有SeqNo大于m_iSndLastAck,NAK消息才有处理的必要,这里也主要针对这种情况。
同样先对SeqNo的有效性进行检查,SeqNo小于等于m_iSndCurrSeqNo才是合法的。若发现出现了不合法的SeqNo,则认为这是一个攻击packet,会将secure置为false,并直接跳出对于loss list的解码。否则继续执行。
向发送丢失列表中插入SeqNo。
更新m_iTraceSndLoss,m_iSndLossTotal这两个用于做统计的字段。
6. 若发现收到了不secure的控制消息,则将m_bBroken置为true,复位m_iBrokenCounter,并退出对于NAK消息的处理。否则继续执行。
7. 执行m_pSndQueue->m_pSndUList->update(this),以便于能够尽可能快地重新发送丢失的包。
8. 更新m_iRecvNAK和m_iRecvNAKTotal这两个用于统计的字段。
由此可见发送数据的端在收到NAK消息之后,不会Ack这个NAK,因而NAK消息本身的传输不是可靠的。
NAK的发送处理过程,大体如此。
接着来看一下UDT中的ACK机制。
UDT中的ACK消息,其packet type为2,在UDT中,只有CUDT::checkTimers()在发送这一类型的控制消息。这里先来看一下在CUDT::checkTimers()中对于ACK消息的发送(src/core.cpp):
void CUDT::checkTimers() { // update CC parameters CCUpdate(); //uint64_t minint = (uint64_t)(m_ullCPUFrequency * m_pSndTimeWindow->getMinPktSndInt() * 0.9); //if (m_ullInterval < minint) // m_ullInterval = minint; uint64_t currtime; CTimer::rdtsc(currtime); if ((currtime > m_ullNextACKTime) || ((m_pCC->m_iACKInterval > 0) && (m_pCC->m_iACKInterval <= m_iPktCount))) { // ACK timer expired or ACK interval is reached sendCtrl(2); CTimer::rdtsc(currtime); if (m_pCC->m_iACKPeriod > 0) m_ullNextACKTime = currtime + m_pCC->m_iACKPeriod * m_ullCPUFrequency; else m_ullNextACKTime = currtime + m_ullACKInt; m_iPktCount = 0; m_iLightACKCount = 1; } else if (m_iSelfClockInterval * m_iLightACKCount <= m_iPktCount) { //send a "light" ACK sendCtrl(2, NULL, NULL, 4); ++m_iLightACKCount; }
可以看到,ACK消息分为两种,一种是常规ACK,在ACK定时器超时或ACK间距到来时发送;另一种是“light” ACK,在(m_iSelfClockInterval * m_iLightACKCount)小于等于m_iPktCount时发送。
m_ullNextACKTime描述ACK timer的超时时间,用于控制ACK发送在时间上的频率。在CUDT::open()中这个值初次被设置为一个有效值:
const int CUDT::m_iSYNInterval = 10000; m_ullCPUFrequency = CTimer::getCPUFrequency(); // set up the timers m_ullSYNInt = m_iSYNInterval * m_ullCPUFrequency; uint64_t currtime; CTimer::rdtsc(currtime); m_ullNextACKTime = currtime + m_ullSYNInt;
在CUDT::open()之后,m_ullSYNInt同样为一个常量值。
在接收数据packet的CUDT::processData()中,若发现接收到的是一个Msg的结束packet,会将当前时间读进m_ullNextACKTime,以便于CUDT::checkTimers()在下次执行时,就能立即发送ACK消息给发送端。
m_iPktCount描述自上次常规ACK消息发送之后,已经接收到的数据packet的个数。
m_pCC->m_iACKInterval用于控制每接收多少个数据packet要发送一个ACK消息,在默认的拥塞控制器CUDTCC中这个变量值为0,它主要用于自定义拥塞控制器的场景。
m_iSelfClockInterval为内部时钟的ACK发送间隔,是一个常量值(src/core.cpp):
const int CUDT::m_iSelfClockInterval = 64;
m_iLightACKCount为“light” ACK自上次常规ACK消息发送以来的发送次数。
回到CUDT::checkTimers()中ACK消息的发送过程:
1. 首先检查ACK定时器超时时间是否到达或ACK发送间距是否到来,若到来则进入常规ACK发送的过程,否则继续执行。常规ACK的发送过程为:
(1). 调用sendCtrl(2)发送常规ACK消息给调用者。
(2). 获取当前时间currtime。
(3). 检查m_pCC->m_iACKPeriod是否为大于0的有效值,若是则基于m_pCC->m_iACKPeriod更新m_ullNextACKTime。若不是则基于m_ullACKInt更新m_ullNextACKTime。
m_pCC->m_iACKPeriod为拥塞控制器中用来控制ACK消息发送时间周期的成员,可以看下这个变量在默认拥塞控制器中的初始化(src/ccc.cpp):
const int CUDT::m_iSYNInterval = 10000; 。。。。。。 CCC::CCC() : m_iSYNInterval(CUDT::m_iSYNInterval), m_dPktSndPeriod(1.0), 。。。。。。 void CUDTCC::init() { m_iRCInterval = m_iSYNInterval; m_LastRCTime = CTimer::getTime(); setACKTimer(m_iRCInterval); 。。。。。。 void CCC::setACKTimer(int msINT) { m_iACKPeriod = msINT > m_iSYNInterval ? m_iSYNInterval : msINT; }
m_pCC->m_iACKPeriod为一个常量值。
m_ullACKInt为CUDT内部控制ACK消息发送时间频率的成员。同样在CUDT::open()中设置:
const int CUDT::m_iSYNInterval = 10000; 。。。。。。 m_ullCPUFrequency = CTimer::getCPUFrequency(); 。。。。。。 // set up the timers m_ullSYNInt = m_iSYNInterval * m_ullCPUFrequency; 。。。。。。 m_ullACKInt = m_ullSYNInt;
这里可以看到,就ACK消息发送的时间频率控制而言,拥塞控制器的控制优先级要高于CUDT内部的控制。
(4). 复位m_iPktCount为0,m_iLightACKCount为1。
2. 检查(m_iSelfClockInterval * m_iLightACKCount)是否小于等于m_iPktCount,若条件成立,则表明发送“light” ACK消息的时间到了,于是发送一个“light” ACK消息,并递增“light” ACK消息发送计数m_iLightACKCount。在没有常规ACK消息发送的情况下,大概每接收到64个数据packet会发送一次ACK消息。由此可见“light” ACK大概应用于带宽比较高的情况,或者发送端发送数据packet速度过快的情况。
总结一下在CUDT::checkTimers()中对于ACK消息发送频率的控制。可以看到主要通过m_ullNextACKTime,拥塞控制器m_pCC的m_iACKInterval来控制常规ACK消息的发送。m_iSelfClockInterval用来控制“light” ACK的发送频率。
接着再来看一下在CUDT::sendCtrl()中ACK消息发送的具体过程:
void CUDT::sendCtrl(int pkttype, void* lparam, void* rparam, int size) { CPacket ctrlpkt; switch (pkttype) { case 2: //010 - Acknowledgement { int32_t ack; // If there is no loss, the ACK is the current largest sequence number plus 1; // Otherwise it is the smallest sequence number in the receiver loss list. if (0 == m_pRcvLossList->getLossLength()) ack = CSeqNo::incseq(m_iRcvCurrSeqNo); else ack = m_pRcvLossList->getFirstLostSeq(); if (ack == m_iRcvLastAckAck) break; // send out a lite ACK // to save time on buffer processing and bandwidth/AS measurement, a lite ACK only feeds back an ACK number if (4 == size) { ctrlpkt.pack(pkttype, NULL, &ack, size); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); break; } uint64_t currtime; CTimer::rdtsc(currtime); // There are new received packets to acknowledge, update related information. if (CSeqNo::seqcmp(ack, m_iRcvLastAck) > 0) { int acksize = CSeqNo::seqoff(m_iRcvLastAck, ack); m_iRcvLastAck = ack; m_pRcvBuffer->ackData(acksize); // signal a waiting "recv" call if there is any data available #ifndef WIN32 pthread_mutex_lock(&m_RecvDataLock); if (m_bSynRecving) pthread_cond_signal(&m_RecvDataCond); pthread_mutex_unlock(&m_RecvDataLock); #else if (m_bSynRecving) SetEvent(m_RecvDataCond); #endif // acknowledge any waiting epolls to read s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, true); } else if (ack == m_iRcvLastAck) { if ((currtime - m_ullLastAckTime) < ((m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency)) break; } else break; // Send out the ACK only if has not been received by the sender before if (CSeqNo::seqcmp(m_iRcvLastAck, m_iRcvLastAckAck) > 0) { int32_t data[6]; m_iAckSeqNo = CAckNo::incack(m_iAckSeqNo); data[0] = m_iRcvLastAck; data[1] = m_iRTT; data[2] = m_iRTTVar; data[3] = m_pRcvBuffer->getAvailBufSize(); // a minimum flow window of 2 is used, even if buffer is full, to break potential deadlock if (data[3] < 2) data[3] = 2; if (currtime - m_ullLastAckTime > m_ullSYNInt) { data[4] = m_pRcvTimeWindow->getPktRcvSpeed(); data[5] = m_pRcvTimeWindow->getBandwidth(); ctrlpkt.pack(pkttype, &m_iAckSeqNo, data, 24); CTimer::rdtsc(m_ullLastAckTime); } else { ctrlpkt.pack(pkttype, &m_iAckSeqNo, data, 16); } ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); m_pACKWindow->store(m_iAckSeqNo, m_iRcvLastAck); ++m_iSentACK; ++m_iSentACKTotal; } break; }
1. 首先要做的就是计算到底要ACK哪些数据packet,记为ack。
如果接收丢失数据packet列表m_pRcvLossList为空,则ACK CSeqNo::incseq(m_iRcvCurrSeqNo),若不为空则ACK首个丢失的数据packet。
2. 检查ACK的目标SeqNo是否已经被ACK过,且确定数据发送端已经收到了该ACK消息,若已经被ack过,且确定数据发送端已经收到了该ACK消息,则后面发送ACK消息的过程都没有必要了,而需要直接退出。否则继续执行。
检查方法就是比较前一步计算的ack与m_iRcvLastAckAck是否想等。若相等,则要ACK的SeqNo已经被ack过,且确定数据发送端已经收到了该ACK消息。
3. size == 4表示要发送的是“light” ACK,则发送“light” ACK消息,并退出,否则继续执行。
可以看到“light” ACK消息的数据部分只包含了要ACK的SeqNo。
4. 获取当前时间currtime。
5. 根据第一步计算的ACK目标SeqNo,分为3中情况来进行的发送ACK消息的前奏曲:
case 1:在上次常规ACK发送之后,有新的连续的数据包被接收,即CSeqNo::seqcmp(ack, m_iRcvLastAck) > 0。此时会计算要新ACK的数据packet的个数acksize,更新m_iRcvLastAck为ack,ACK接收缓冲区m_pRcvBuffer中的acksize个数据packet,然后唤醒在m_RecvDataCond上等待读取接收到的数据的线程程。
case 2:上次发送的ACK消息的超时重传。可见超时周期为((m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency),若超时时间未到,则退出执行。
case 3:CSeqNo::seqcmp(ack, m_iRcvLastAck) < 0的情况,一种本不应该出现的异常的情况。退出执行。
6. 发送常规ACK消息。
先来看一下经过了前面的那些步骤走到这一步时,m_iRcvLastAck,m_iRcvLastAckAck和ack之间的关系:
(ack > m_iRcvLastAckAck && ack == m_iRcvLastAck)
这里做的检查提供了更强的保证。
(1). 构造一个常规的ACK packet。一个常规的ACK packet会包含m_iRcvLastAck,也就是ACK的目标SeqNo,m_iRTT,m_iRTTVar,和接收缓冲区的可用大小。如果距离上次AckTime超过了m_ullSYNInt,还会携带接收时间窗口的packet接收速率和带宽等信息。
ACK消息有一套自己的SeqNo系统,m_iAckSeqNo用于记录最近发送的ACK消息的SeqNo。
(2). 发送常规ACK packet。
(3). 将m_iAckSeqNo和m_iRcvLastAck保存进ACK Window。
(4). 更新用于做数据统计的m_iSentACK和m_iSentACKTotal。
接着来看在数据发送端接收到ACK消息的处理:
void CUDT::processCtrl(CPacket& ctrlpkt) { // Just heard from the peer, reset the expiration count. m_iEXPCount = 1; uint64_t currtime; CTimer::rdtsc(currtime); m_ullLastRspTime = currtime; switch (ctrlpkt.getType()) { case 2: //010 - Acknowledgement { int32_t ack; // process a lite ACK if (4 == ctrlpkt.getLength()) { ack = *(int32_t *) ctrlpkt.m_pcData; if (CSeqNo::seqcmp(ack, m_iSndLastAck) >= 0) { m_iFlowWindowSize -= CSeqNo::seqoff(m_iSndLastAck, ack); m_iSndLastAck = ack; } break; } // read ACK seq. no. ack = ctrlpkt.getAckSeqNo(); // send ACK acknowledgement // number of ACK2 can be much less than number of ACK uint64_t now = CTimer::getTime(); if ((currtime - m_ullSndLastAck2Time > (uint64_t) m_iSYNInterval) || (ack == m_iSndLastAck2)) { sendCtrl(6, &ack); m_iSndLastAck2 = ack; m_ullSndLastAck2Time = now; } // Got data ACK ack = *(int32_t *) ctrlpkt.m_pcData; // check the validation of the ack if (CSeqNo::seqcmp(ack, CSeqNo::incseq(m_iSndCurrSeqNo)) > 0) { //this should not happen: attack or bug m_bBroken = true; m_iBrokenCounter = 0; break; } if (CSeqNo::seqcmp(ack, m_iSndLastAck) >= 0) { // Update Flow Window Size, must update before and together with m_iSndLastAck m_iFlowWindowSize = *((int32_t *) ctrlpkt.m_pcData + 3); m_iSndLastAck = ack; } // protect packet retransmission CGuard::enterCS(m_AckLock); cout << "Receive ack " << ack << endl; int offset = CSeqNo::seqoff(m_iSndLastDataAck, ack); if (offset <= 0) { // discard it if it is a repeated ACK CGuard::leaveCS(m_AckLock); break; } // acknowledge the sending buffer m_pSndBuffer->ackData(offset); // record total time used for sending m_llSndDuration += currtime - m_llSndDurationCounter; m_llSndDurationTotal += currtime - m_llSndDurationCounter; m_llSndDurationCounter = currtime; // update sending variables m_iSndLastDataAck = ack; m_pSndLossList->remove(CSeqNo::decseq(m_iSndLastDataAck)); CGuard::leaveCS(m_AckLock); #ifndef WIN32 pthread_mutex_lock(&m_SendBlockLock); if (m_bSynSending) pthread_cond_signal(&m_SendBlockCond); pthread_mutex_unlock(&m_SendBlockLock); #else if (m_bSynSending) SetEvent(m_SendBlockCond); #endif // acknowledde any waiting epolls to write s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true); // insert this socket to snd list if it is not on the list yet m_pSndQueue->m_pSndUList->update(this, false); // Update RTT //m_iRTT = *((int32_t *)ctrlpkt.m_pcData + 1); //m_iRTTVar = *((int32_t *)ctrlpkt.m_pcData + 2); int rtt = *((int32_t *) ctrlpkt.m_pcData + 1); m_iRTTVar = (m_iRTTVar * 3 + abs(rtt - m_iRTT)) >> 2; m_iRTT = (m_iRTT * 7 + rtt) >> 3; m_pCC->setRTT(m_iRTT); if (ctrlpkt.getLength() > 16) { // Update Estimated Bandwidth and packet delivery rate if (*((int32_t *) ctrlpkt.m_pcData + 4) > 0) m_iDeliveryRate = (m_iDeliveryRate * 7 + *((int32_t *) ctrlpkt.m_pcData + 4)) >> 3; if (*((int32_t *) ctrlpkt.m_pcData + 5) > 0) m_iBandwidth = (m_iBandwidth * 7 + *((int32_t *) ctrlpkt.m_pcData + 5)) >> 3; m_pCC->setRcvRate(m_iDeliveryRate); m_pCC->setBandwidth(m_iBandwidth); } m_pCC->onACK(ack); CCUpdate(); ++m_iRecvACK; ++m_iRecvACKTotal; break; }
在这里,可以看到处理过程为:
1. 处理“light” ACK packet,然后退出。
处理方式就是在ACK的目标SeqNo ack满足CSeqNo::seqcmp(ack, m_iSndLastAck) >= 0时,减小m_iFlowWindowSize,并更新m_iSndLastAck。
这里貌似CSeqNo::seqcmp(ack, m_iSndLastAck) == 0时也是什么都不需要做的。
2. 开始处理常规ACK。先是获取ACK消息的SeqNo,然后读取当前时间。
3. 发送ACK acknowledgement,也就是ACK2消息。可以看到,ACK2消息的发送个数可以少于ACK的消息个数。m_iSYNInterval为ACK2消息发送的最小时间间隔,即自上一次发送ACK2消息以来,经过的时间超过了m_iSYNInterval,在接到ACK消息时会发送ACK2消息。
另一种情况就是ack == m_iSndLastAck2,这可能表示上次发送的ACK2,数据接收端还没有收到。
这里在发送ACK2消息之后还会更新m_iSndLastAck2和m_ullSndLastAck2Time。
顺便来看一下CUDT::sendCtrl()中对于ACK2消息的发送:
case 6: //110 - Acknowledgement of Acknowledgement ctrlpkt.pack(pkttype, lparam); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); break;
4. 获取ACK消息中要ACK的目标数据packet的SeqNo存于ack。
5. 检查CSeqNo::seqcmp(ack, CSeqNo::incseq(m_iSndCurrSeqNo))是否大于0,若是则表明接收到的这个ACK消息在ACK从未发送过的数据packet,则说明这个消息可能是一个攻击或bug,于是m_bBroken置为true,复位m_iBrokenCounter并退出。
6. CSeqNo::seqcmp(ack, m_iSndLastAck) > 0时表明这个消息要ACK一些之前没有被ACK过的数据packet,从ACK packet读取信息更新Flow Window Size,及m_iSndLastAck。
7. 计算m_iSndLastDataAck到ack的偏移值offset,若offset小于等于0,表明接收到的这个ACK消息要ACK的目标SeqNo已经被其它ACK消息给ACK过了,因而无需再执行其它动作,退出执行。
8. 前面计算的m_iSndLastDataAck到ack的偏移值offset大于0,则ACK发送缓冲区m_pSndBuffer中的offset个packet。
9. 更新m_llSndDuration,m_llSndDurationTotal和m_llSndDurationCounter。m_llSndDuration,m_llSndDurationTotal用于统计SeqNo在ack之前的所有packet发送并得到ACK所用的时间。
10. 更新发送变量m_iSndLastDataAck为ack,并将SeqNo在m_iSndLastDataAck之前的所有SeqNo从发送丢失列表中移除出去。
11. 唤醒在等待发送数据的线程。ACK会使发送窗口向前移动。
12. 如果本socket不在snd 列表的话,将本socket插入snd列表。
13. 如我们前面在ACK的发送过程中所看到的,常规ACK消息还会携带许多用于同步的信息。这里正是将消息中的这些信息解出,并修改本地的一些用于控制数据收发的字段。
14. 执行拥塞控制器的回调m_pCC->onACK(ack),执行CCUpdate()以更新用于控制发送缓冲区及数据packet发送频率的一些字段。
15. 更新用于统计的字段m_iRecvACK和m_iRecvACKTotal。
接着来看下数据接收端在收到了ACK2消息之后会如何处理,在CUDT::processCtrl()中:
case 6: //110 - Acknowledgement of Acknowledgement { int32_t ack; int rtt = -1; // update RTT rtt = m_pACKWindow->acknowledge(ctrlpkt.getAckSeqNo(), ack); if (rtt <= 0) break; //if increasing delay detected... // sendCtrl(4); // RTT EWMA m_iRTTVar = (m_iRTTVar * 3 + abs(rtt - m_iRTT)) >> 2; m_iRTT = (m_iRTT * 7 + rtt) >> 3; m_pCC->setRTT(m_iRTT); // update last ACK that has been received by the sender if (CSeqNo::seqcmp(ack, m_iRcvLastAckAck) > 0) m_iRcvLastAckAck = ack; break; }
1. 在这里会向ACKWindow acknowledge前面发送的ACK消息,获得那个ACK消息对应的目标数据packet SeqNo ack,并计算rtt值。
2. 根据前一步计算的rtt值,计算m_iRTTVar值与m_iRTT值。
3. 将m_iRTT值设置给拥塞控制器。
4. 在CSeqNo::seqcmp(ack, m_iRcvLastAckAck) > 0时更新m_iRcvLastAckAck。
ACK2消息的处理比较简单,大体如此。
再来看一下UDT中的超时重传机制,在CUDT::checkTimers()中:
uint64_t next_exp_time; if (m_pCC->m_bUserDefinedRTO) next_exp_time = m_ullLastRspTime + m_pCC->m_iRTO * m_ullCPUFrequency; else { uint64_t exp_int = (m_iEXPCount * (m_iRTT + 4 * m_iRTTVar) + m_iSYNInterval) * m_ullCPUFrequency; if (exp_int < m_iEXPCount * m_ullMinExpInt) exp_int = m_iEXPCount * m_ullMinExpInt; next_exp_time = m_ullLastRspTime + exp_int; } if (currtime > next_exp_time) { // Haven‘t receive any information from the peer, is it dead?! // timeout: at least 16 expirations and must be greater than 10 seconds if ((m_iEXPCount > 16) && (currtime - m_ullLastRspTime > 5000000 * m_ullCPUFrequency)) { // // Connection is broken. // UDT does not signal any information about this instead of to stop quietly. // Application will detect this when it calls any UDT methods next time. // m_bClosing = true; m_bBroken = true; m_iBrokenCounter = 30; // update snd U list to remove this socket m_pSndQueue->m_pSndUList->update(this); releaseSynch(); // app can call any UDT API to learn the connection_broken error s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN | UDT_EPOLL_OUT | UDT_EPOLL_ERR, true); CTimer::triggerEvent(); return; } // sender: Insert all the packets sent after last received acknowledgement into the sender loss list. // recver: Send out a keep-alive packet if (m_pSndBuffer->getCurrBufSize() > 0) { if ((CSeqNo::incseq(m_iSndCurrSeqNo) != m_iSndLastAck) && (m_pSndLossList->getLossLength() == 0)) { // resend all unacknowledged packets on timeout, but only if there is no packet in the loss list int32_t csn = m_iSndCurrSeqNo; int num = m_pSndLossList->insert(m_iSndLastAck, csn); m_iTraceSndLoss += num; m_iSndLossTotal += num; } m_pCC->onTimeout(); CCUpdate(); // immediately restart transmission m_pSndQueue->m_pSndUList->update(this); } else { sendCtrl(1); } ++m_iEXPCount; // Reset last response time since we just sent a heart-beat. m_ullLastRspTime = currtime; }
1. 在这里首先是计算超时时间,拥塞控制器对于超时时间控制的优先级要高于CUDT内部的控制。
2. 与Peer端没有进行任何消息与数据的通信的时间过长时,会认为连接已经断开了。此时会将m_bClosing和m_bBroken置为true。释放所有的锁,然后退出。
3. 数据发送端和接收端有不同的处理。对于发送端,将最后一次接收到的确认消息之后发送的所有的packets都加进发送者的丢失packet列表中。对于接收者,则发送一个keep-alive消息。
4. 更新m_iEXPCount和m_ullLastRspTime。
发送端在将packet加如它的丢失packet列表之后,则在下次执行发送动作时,被认为丢失的packets会被重新发送。
这里来看一下发送窗口及发送速率的控制方法。
接收端的反馈
拥塞控制
标签:
原文地址:http://my.oschina.net/wolfcs/blog/510375