码迷,mamicode.com
首页 > 其他好文 > 详细

UDT协议实现分析——UDT Socket的创建

时间:2015-09-07 21:19:35      阅读:919      评论:0      收藏:0      [点我收藏+]

标签:

UDT API的用法

在分析 连接的建立过程 之前,先来看一下UDT API的用法。在UDT网络中,通常要有一个UDT Server监听在某台机器的某个UDP端口上,等待客户端的连接;有一个或多个客户端连接UDT Server;UDT Server接收到来自客户端的连接请求后,创建另外一个单独的UDT Socket用于与该客户端进行通信。

先来看一下UDT Server的简单的实现,UDT的开发者已经提供了一些demo程序可供参考,位于app/目录下。

#include <unistd.h>
#include <cstdlib>
#include <cstring>
#include <netdb.h>

#include <iostream>
#include <udt.h>

using namespace std;

void* recvdata(void*);

struct UDTUpDown {
    UDTUpDown() {
        // use this function to initialize the UDT library
        UDT::startup();
    }
    ~UDTUpDown() {
        // use this function to release the UDT library
        UDT::cleanup();
    }
};

int main(int argc, char* argv[]) {
    if ((1 != argc) && ((2 != argc) || (0 == atoi(argv[1])))) {
        cout << "usage: appserver [server_port]" << endl;
        return 0;
    }

    // Automatically start up and clean up UDT module.
    UDTUpDown _udt_;

    addrinfo hints;
    addrinfo* res;

    memset(&hints, 0, sizeof(struct addrinfo));

    hints.ai_flags = AI_PASSIVE;
    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM;
    //hints.ai_socktype = SOCK_DGRAM;

    string service("9000");
    if (2 == argc)
        service = argv[1];

    if (0 != getaddrinfo(NULL, service.c_str(), &hints, &res)) {
        cout << "illegal port number or port is busy.\n" << endl;
        return 0;
    }

    UDTSOCKET serv = UDT::socket(res->ai_family, res->ai_socktype, res->ai_protocol);

    // UDT Options
    //UDT::setsockopt(serv, 0, UDT_CC, new CCCFactory<CUDPBlast>, sizeof(CCCFactory<CUDPBlast>));
    //UDT::setsockopt(serv, 0, UDT_MSS, new int(9000), sizeof(int));
    //UDT::setsockopt(serv, 0, UDT_RCVBUF, new int(10000000), sizeof(int));
    //UDT::setsockopt(serv, 0, UDP_RCVBUF, new int(10000000), sizeof(int));

    if (UDT::ERROR == UDT::bind(serv, res->ai_addr, res->ai_addrlen)) {
        cout << "bind: " << UDT::getlasterror().getErrorMessage() << endl;
        return 0;
    }

    freeaddrinfo(res);

    cout << "server is ready at port: " << service << endl;

    if (UDT::ERROR == UDT::listen(serv, 10)) {
        cout << "listen: " << UDT::getlasterror().getErrorMessage() << endl;
        return 0;
    }

    sockaddr_storage clientaddr;
    int addrlen = sizeof(clientaddr);

    UDTSOCKET recver;

    while (true) {
        if (UDT::INVALID_SOCK == (recver = UDT::accept(serv, (sockaddr*) &clientaddr, &addrlen))) {
            cout << "accept: " << UDT::getlasterror().getErrorMessage() << endl;
            return 0;
        }

        char clienthost[NI_MAXHOST];
        char clientservice[NI_MAXSERV];
        getnameinfo((sockaddr *) &clientaddr, addrlen, clienthost, sizeof(clienthost), clientservice,
                    sizeof(clientservice), NI_NUMERICHOST | NI_NUMERICSERV);
        cout << "new connection: " << clienthost << ":" << clientservice << endl;

        pthread_t rcvthread;
        pthread_create(&rcvthread, NULL, recvdata, new UDTSOCKET(recver));
        pthread_detach(rcvthread);
    }

    UDT::close(serv);

    return 0;
}

void* recvdata(void* usocket) {
    UDTSOCKET recver = *(UDTSOCKET*) usocket;
    delete (UDTSOCKET*) usocket;

    char* data;
    int size = 100000;
    data = new char[size];

    while (true) {
        int rsize = 0;
        int rs;
        while (rsize < size) {
            int rcv_size;
            int var_size = sizeof(int);
            UDT::getsockopt(recver, 0, UDT_RCVDATA, &rcv_size, &var_size);
            if (UDT::ERROR == (rs = UDT::recv(recver, data + rsize, size - rsize, 0))) {
                cout << "recv:" << UDT::getlasterror().getErrorMessage() << endl;
                break;
            }

            rsize += rs;
        }

        if (rsize < size)
            break;
    }

    delete[] data;

    UDT::close(recver);

    return NULL;
}
1. 如在  UDT协议实现分析——UDT初始化和销毁  一文中提到的, 在调用任何UDT API之前,需要首先调用UDT::startup()来对库做一个初始化,并在结束之后执行UDT::cleanup()做最后的清理。在这个示例中,是创建了一个helper类UDTUpDown来帮助做这些事情的。利用编程语言本身提供的构造-析够机制来对UDT进行初始化和销毁要比手动调用这些函数要可靠得多。

2. 获得本地UDP端口的网络地址。

3. 调用UDT::socket()函数创建一个UDT Socket。这个Socket是UDT抽象出来的一个逻辑的Socket,我们并不能利用这个Socket本身来收发数据。

4. 调用UDT::bind()将创建的UDT Socket绑定到本地端口的网络地址。这一步将会把UDT的逻辑Socket与能够进行数据收发的系统UDP socket进行关联,自此我们就可以利用UDT Socket进行收发数据了。

5. 调用UDT::listen()告诉UDT,把这个UDT Socket做为这个端口上的Listening Socket。一个UDP端口可以被多个UDT Socket复用,在调用UDT::listen()之后,UDT就知道,在有其它节点连接这个UDP端口时,需要把相关的连接请求消息发送到哪个UDT Socket的接收缓冲区了。

对绑定到相同UDP端口的多个不同UDT Socket调用UDT::listen()时,UDT是如何处理的?我们知道,一个UDP端口上最多只能有一个listening Socket。

6. 调用UDT::accept()函数等待其它节点的连接。其它节点连接时,这个函数返回另外一个单独的UDT Socket,以用于与发起连接的节点进行通信。

UDT::accept()函数返回的UDT Socket会被绑定到另外的一个不同的UDP端口,还是会被绑定到listening Socket所绑定的UDP端口?

7. 使用UDT::accept()返回的UDT Socket,利用UDT::recv()与UDT::send()等函数同发起连接的节点进行数据传输。

8. 在不再需要UDT Socket时,调用UDT::close()函数关掉它,以释放资源。

然后再来看下UDT Client的实现:

#include <unistd.h>
#include <cstdlib>
#include <cstring>
#include <netdb.h>

#include <iostream>
#include <udt.h>

using namespace std;

void* monitor(void*);

struct UDTUpDown {
    UDTUpDown() {
        // use this function to initialize the UDT library
        UDT::startup();
    }
    ~UDTUpDown() {
        // use this function to release the UDT library
        UDT::cleanup();
    }
};

int main(int argc, char* argv[]) {
    if ((3 != argc) || (0 == atoi(argv[2]))) {
        cout << "usage: appclient server_ip server_port" << endl;
        return 0;
    }

    // Automatically start up and clean up UDT module.
    UDTUpDown _udt_;

    struct addrinfo hints, *local, *peer;

    memset(&hints, 0, sizeof(struct addrinfo));

    hints.ai_flags = AI_PASSIVE;
    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM;
    //hints.ai_socktype = SOCK_DGRAM;

    if (0 != getaddrinfo(NULL, "9000", &hints, &local)) {
        cout << "incorrect network address.\n" << endl;
        return 0;
    }

    UDTSOCKET client = UDT::socket(local->ai_family, local->ai_socktype, local->ai_protocol);

    // UDT Options
    //UDT::setsockopt(client, 0, UDT_CC, new CCCFactory<CUDPBlast>, sizeof(CCCFactory<CUDPBlast>));
    //UDT::setsockopt(client, 0, UDT_MSS, new int(9000), sizeof(int));
    //UDT::setsockopt(client, 0, UDT_SNDBUF, new int(10000000), sizeof(int));
    //UDT::setsockopt(client, 0, UDP_SNDBUF, new int(10000000), sizeof(int));
    //UDT::setsockopt(client, 0, UDT_MAXBW, new int64_t(12500000), sizeof(int));

    // for rendezvous connection, enable the code below
    /*
     UDT::setsockopt(client, 0, UDT_RENDEZVOUS, new bool(true), sizeof(bool));
     if (UDT::ERROR == UDT::bind(client, local->ai_addr, local->ai_addrlen))
     {
     cout << "bind: " << UDT::getlasterror().getErrorMessage() << endl;
     return 0;
     }
     */

    freeaddrinfo(local);

    if (0 != getaddrinfo(argv[1], argv[2], &hints, &peer)) {
        cout << "incorrect server/peer address. " << argv[1] << ":" << argv[2] << endl;
        return 0;
    }

    // connect to the server, implict bind
    if (UDT::ERROR == UDT::connect(client, peer->ai_addr, peer->ai_addrlen)) {
        cout << "connect: " << UDT::getlasterror().getErrorMessage() << endl;
        return 0;
    }

    freeaddrinfo(peer);

    int size = 100000;
    char* data = new char[size];

    pthread_create(new pthread_t, NULL, monitor, &client);

    for (int i = 0; i < 1000000; i++) {
        int ssize = 0;
        int ss;
        while (ssize < size) {
            if (UDT::ERROR == (ss = UDT::send(client, data + ssize, size - ssize, 0))) {
                cout << "send:" << UDT::getlasterror().getErrorMessage() << endl;
                break;
            }

            ssize += ss;
        }

        if (ssize < size)
            break;
    }

    UDT::close(client);
    delete[] data;
    return 0;
}

void* monitor(void* s) {
    UDTSOCKET u = *(UDTSOCKET*) s;
    UDT::TRACEINFO perf;
    cout << "SendRate(Mb/s)\tRTT(ms)\tCWnd\tPktSndPeriod(us)\tRecvACK\tRecvNAK" << endl;

    while (true) {
        sleep(1);
        if (UDT::ERROR == UDT::perfmon(u, &perf)) {
            cout << "perfmon: " << UDT::getlasterror().getErrorMessage() << endl;
            break;
        }

        cout << perf.mbpsSendRate << "\t\t" << perf.msRTT << "\t" << perf.pktCongestionWindow << "\t"
             << perf.usPktSndPeriod << "\t\t\t" << perf.pktRecvACK << "\t" << perf.pktRecvNAK << endl;
    }

    return NULL;
}

可以看到UDT Client连接UDT Server并发送数据的过程大体如下:

1. 同样需要在调用任何UDT API之前,先调用UDT::startup()来对库做初始化,并在结束之后执行UDT::cleanup()做最后的清理。这里同样使用helper类UDTUpDown来帮助做这些事情。

2. 获得UDT Server的网络地址。

3. 调用UDT::socket()函数创建一个UDT Socket。这个Socket可以与特定的本地地址绑定,也可以不绑定。如果绑定,则发送数据时的出口地址就是该端口,如果不绑定,出口地址则是一个不确定的值。

4. 调用UDT::connect()连接UDT Server。

5. 使用UDT Socket,利用UDT::recv()与UDT::send()等函数同UDT Server进行数据传输。

6. 在不再需要UDT Socket时,调用UDT::close()函数关掉它,以释放资源。

UDT基本的收发数据的API的用法大体如上面所示。接着我们来看,这些函数是如何实现的。

UDT Socket的创建

无论是UDT Server要listening,还是UDT client要连接UDT Server,在调用UDT::startup()初始化UDT之后,首先要做的事情都是调用UDT::socket()创建UDT Socket了。我们就来看一下创建UDT Socket的过程:

UDTSOCKET CUDTUnited::newSocket(int af, int type) {
    if ((type != SOCK_STREAM) && (type != SOCK_DGRAM))
        throw CUDTException(5, 3, 0);

    CUDTSocket* ns = NULL;

    try {
        ns = new CUDTSocket;
        ns->m_pUDT = new CUDT;
        if (AF_INET == af) {
            ns->m_pSelfAddr = (sockaddr*) (new sockaddr_in);
            ((sockaddr_in*) (ns->m_pSelfAddr))->sin_port = 0;
        } else {
            ns->m_pSelfAddr = (sockaddr*) (new sockaddr_in6);
            ((sockaddr_in6*) (ns->m_pSelfAddr))->sin6_port = 0;
        }
    } catch (...) {
        delete ns;
        throw CUDTException(3, 2, 0);
    }

    CGuard::enterCS(m_IDLock);
    ns->m_SocketID = --m_SocketID;
    CGuard::leaveCS(m_IDLock);

    ns->m_Status = INIT;
    ns->m_ListenSocket = 0;
    ns->m_pUDT->m_SocketID = ns->m_SocketID;
    ns->m_pUDT->m_iSockType = (SOCK_STREAM == type) ? UDT_STREAM : UDT_DGRAM;
    ns->m_pUDT->m_iIPversion = ns->m_iIPversion = af;
    ns->m_pUDT->m_pCache = m_pCache;

    // protect the m_Sockets structure.
    CGuard::enterCS(m_ControlLock);
    try {
        m_Sockets[ns->m_SocketID] = ns;
    } catch (...) {
        //failure and rollback
        CGuard::leaveCS(m_ControlLock);
        delete ns;
        ns = NULL;
    }
    CGuard::leaveCS(m_ControlLock);

    if (NULL == ns)
        throw CUDTException(3, 2, 0);

    return ns->m_SocketID;
}


UDTSOCKET CUDT::socket(int af, int type, int) {
    if (!s_UDTUnited.m_bGCStatus)
        s_UDTUnited.startup();

    try {
        return s_UDTUnited.newSocket(af, type);
    } catch (CUDTException& e) {
        s_UDTUnited.setError(new CUDTException(e));
        return INVALID_SOCK;
    } catch (bad_alloc&) {
        s_UDTUnited.setError(new CUDTException(3, 2, 0));
        return INVALID_SOCK;
    } catch (...) {
        s_UDTUnited.setError(new CUDTException(-1, 0, 0));
        return INVALID_SOCK;
    }
}



UDTSOCKET socket(int af, int type, int protocol) {
    return CUDT::socket(af, type, protocol);
}

调用流程为,UDT::socket() -> CUDT::socket() -> CUDTUnited::newSocket()。

在CUDT::socket()中,我们看到,它会首先检查s_UDTUnited.m_bGCStatus,若发现UDT还没有初始化完成的话,则会调用s_UDTUnited.startup()进行初始化。这个地方对UDT状态s_UDTUnited.m_bGCStatus的检查没有问题,但在发现UDT没有初始化完成时调用s_UDTUnited.startup()似乎并不恰当。这个地方调用了s_UDTUnited.startup(),那与这次调用相对应的cleanup()又在哪调用了呢?显然在UDT内部是没有。若是调用cleanup()的职责总是在UDT的使用者,那倒不如在这个地方返回错误给调用者,完全让调用者来管理UDT的生命周期,以尽可能地避免资源泄漏。

创建UDT Socket的工作实际都在CUDTUnited::newSocket()函数中完成。可以看到,在这个函数中主要做了如下的这样一些事情:

1. 创建一个CUDTSocket对象ns,并创建一个CUDT对象被ns->m_pUDT引用。初始化ns对象的Self网络地址,端口会被设置为0。在UDT中使用CUDTSocket和CUDT共同来描述一个Socket。每个UDT Socket都会有其相应的CUDTSocket对象和CUDT对象。

可以看一下CUDTSocket类的定义,来了解它都描述了UDT Socket的哪些属性(src/api.h):

class CUDTSocket {
 public:
    CUDTSocket();
    ~CUDTSocket();

    UDTSTATUS m_Status;                       // current socket state

    uint64_t m_TimeStamp;                     // time when the socket is closed

    int m_iIPversion;                         // IP version
    sockaddr* m_pSelfAddr;                    // pointer to the local address of the socket
    sockaddr* m_pPeerAddr;                    // pointer to the peer address of the socket

    UDTSOCKET m_SocketID;                     // socket ID
    UDTSOCKET m_ListenSocket;                 // ID of the listener socket; 0 means this is an independent socket

    UDTSOCKET m_PeerID;                       // peer socket ID
    int32_t m_iISN;                      // initial sequence number, used to tell different connection from same IP:port

    CUDT* m_pUDT;                             // pointer to the UDT entity

    std::set<UDTSOCKET>* m_pQueuedSockets;    // set of connections waiting for accept()
    std::set<UDTSOCKET>* m_pAcceptSockets;    // set of accept()ed connections

    pthread_cond_t m_AcceptCond;              // used to block "accept" call
    pthread_mutex_t m_AcceptLock;             // mutex associated to m_AcceptCond

    unsigned int m_uiBackLog;                 // maximum number of connections in queue

    int m_iMuxID;                             // multiplexer ID

    pthread_mutex_t m_ControlLock;            // lock this socket exclusively for control APIs: bind/listen/connect

 private:
    CUDTSocket(const CUDTSocket&);
    CUDTSocket& operator=(const CUDTSocket&);
};

这个类只提供了构造和析构两个成员函数。还声明了私有的copy构造函数和赋值操作符函数,但没有定义它们,以避免类对象的复制。

可以再来看一下CUDTSocket的构造函数实现(src/api.h):

CUDTSocket::CUDTSocket()
        : m_Status(INIT),
          m_TimeStamp(0),
          m_iIPversion(0),
          m_pSelfAddr(NULL),
          m_pPeerAddr(NULL),
          m_SocketID(0),
          m_ListenSocket(0),
          m_PeerID(0),
          m_iISN(0),
          m_pUDT(NULL),
          m_pQueuedSockets(NULL),
          m_pAcceptSockets(NULL),
          m_AcceptCond(),
          m_AcceptLock(),
          m_uiBackLog(0),
          m_iMuxID(-1) {
#ifndef WIN32
    pthread_mutex_init(&m_AcceptLock, NULL);
    pthread_cond_init(&m_AcceptCond, NULL);
    pthread_mutex_init(&m_ControlLock, NULL);
#else
    m_AcceptLock = CreateMutex(NULL, false, NULL);
    m_AcceptCond = CreateEvent(NULL, false, false, NULL);
    m_ControlLock = CreateMutex(NULL, false, NULL);
#endif
}

CUDT类有两个主要的职责,一是描述UDT Socket,包括所有的非静态成员变量和非静态成员函数,定义UDT Socket的大部分属性和所能提供操作;二是提供API,包括绝大部分的static成员函数,这些函数将调用者与UDT内部的实现连接起来。CUDT类这样的设计,明显违背了OO的SRP单一职责原则,这多少还是给代码的阅读带来了一定的障碍。再来看一下CUDT的构造函数实现(src/core.cpp):

CUDT::CUDT() {
    m_pSndBuffer = NULL;
    m_pRcvBuffer = NULL;
    m_pSndLossList = NULL;
    m_pRcvLossList = NULL;
    m_pACKWindow = NULL;
    m_pSndTimeWindow = NULL;
    m_pRcvTimeWindow = NULL;

    m_pSndQueue = NULL;
    m_pRcvQueue = NULL;
    m_pPeerAddr = NULL;
    m_pSNode = NULL;
    m_pRNode = NULL;

    // Initilize mutex and condition variables
    initSynch();

    // Default UDT configurations
    m_iMSS = 1500;
    m_bSynSending = true;
    m_bSynRecving = true;
    m_iFlightFlagSize = 25600;
    m_iSndBufSize = 8192;
    m_iRcvBufSize = 8192;  //Rcv buffer MUST NOT be bigger than Flight Flag size
    m_Linger.l_onoff = 1;
    m_Linger.l_linger = 180;
    m_iUDPSndBufSize = 65536;
    m_iUDPRcvBufSize = m_iRcvBufSize * m_iMSS;
    m_iSockType = UDT_STREAM;
    m_iIPversion = AF_INET;
    m_bRendezvous = false;
    m_iSndTimeOut = -1;
    m_iRcvTimeOut = -1;
    m_bReuseAddr = true;
    m_llMaxBW = -1;

    m_pCCFactory = new CCCFactory<CUDTCC>;
    m_pCC = NULL;
    m_pCache = NULL;

    // Initial status
    m_bOpened = false;
    m_bListening = false;
    m_bConnecting = false;
    m_bConnected = false;
    m_bClosing = false;
    m_bShutdown = false;
    m_bBroken = false;
    m_bPeerHealth = true;
    m_ullLingerExpiration = 0;
}


void CUDT::initSynch() {
#ifndef WIN32
    pthread_mutex_init(&m_SendBlockLock, NULL);
    pthread_cond_init(&m_SendBlockCond, NULL);
    pthread_mutex_init(&m_RecvDataLock, NULL);
    pthread_cond_init(&m_RecvDataCond, NULL);
    pthread_mutex_init(&m_SendLock, NULL);
    pthread_mutex_init(&m_RecvLock, NULL);
    pthread_mutex_init(&m_AckLock, NULL);
    pthread_mutex_init(&m_ConnectionLock, NULL);
#else
    m_SendBlockLock = CreateMutex(NULL, false, NULL);
    m_SendBlockCond = CreateEvent(NULL, false, false, NULL);
    m_RecvDataLock = CreateMutex(NULL, false, NULL);
    m_RecvDataCond = CreateEvent(NULL, false, false, NULL);
    m_SendLock = CreateMutex(NULL, false, NULL);
    m_RecvLock = CreateMutex(NULL, false, NULL);
    m_AckLock = CreateMutex(NULL, false, NULL);
    m_ConnectionLock = CreateMutex(NULL, false, NULL);
#endif
}

都是成员变量的初始化,后续再来详细了解这些成员变量的作用。

2. 为Socket分配SocketID,其值为CUDTUnited的m_SocketID递减的结果。m_SocketIDCUDTUnited的构造函数中初始化:

CUDTUnited::CUDTUnited()
        : m_Sockets(),
          m_ControlLock(),
          m_IDLock(),
          m_SocketID(0),
          m_TLSError(),
          m_mMultiplexer(),
          m_MultiplexerLock(),
          m_pCache(NULL),
          m_bClosing(false),
          m_GCStopLock(),
          m_GCStopCond(),
          m_InitLock(),
          m_iInstanceCount(0),
          m_bGCStatus(false),
          m_GCThread(),
          m_ClosedSockets() {
    // Socket ID MUST start from a random value
    srand((unsigned int) CTimer::getTime());
    m_SocketID = 1 + (int) ((1 << 30) * (double(rand()) / RAND_MAX));

#ifndef WIN32
    pthread_mutex_init(&m_ControlLock, NULL);
    pthread_mutex_init(&m_IDLock, NULL);
    pthread_mutex_init(&m_InitLock, NULL);
#else
    m_ControlLock = CreateMutex(NULL, false, NULL);
    m_IDLock = CreateMutex(NULL, false, NULL);
    m_InitLock = CreateMutex(NULL, false, NULL);
#endif

#ifndef WIN32
    pthread_key_create(&m_TLSError, TLSDestroy);
#else
    m_TLSError = TlsAlloc();
    m_TLSLock = CreateMutex(NULL, false, NULL);
#endif

    m_pCache = new CCache<CInfoBlock>;
}

m_SocketID的初始值是一个随机数。

3. 初始化ns及它的CUDT对象的一些成员变量。

4. 将ns放在std::map<UDTSOCKET, CUDTSocket*> m_Sockets中。

5. 将UDT socket的SocketID返回给调用者。

这个接口直接返回一个表示UDT Socket的类对象,而不是一个handle,以便于调用者在调用UDT API时无需每次都输入UDT Socket handle参数,这样的API设计相对于UDT的这种API设计,有什么样的优缺点?

UDT的错误处理机制

在前面UDT Server和UDT Client的demo程序中,我们有看到,所有的UDT API都会在出错时返回一个错误码,比如UDT::bind()、UDT::bind2()、UDT::listen()和UDT::connect()等返回值类型为int的函数,在出错时返回UDT::ERROR,UDT::socket()和UDT::accept()等返回值类型为UDTSOCKET的函数在出错时返回UDT::INVALID_SOCK。

UDT API的调用者在检测到它们返回了错误值时,通过调用UDT::getlasterror()函数来获取关于异常更加详细的信息。这里我们就来看一下这套机制的实现。

注意看CUDT::socket()函数的实现。在这个函数中,会将实际创建UDT Socket的任务委托给s_UDTUnited.newSocket(),但这个调用会被包在一个try-catch块中。在s_UDTUnited.newSocket()函数执行的过程中发生任何的异常,都会先被CUDT::socket()捕获。CUDT::socket()函数在捕获这些异常之后,会根据捕获的异常的类型创建不同的CUDTException对象,并通过调用CUDTUnited::setError()函数将该CUDTException对象设置给s_UDTUnited。

我们来看一下CUDTUnited::setError()函数的实现(src/api.cpp):

void CUDTUnited::setError(CUDTException* e) {
#ifndef WIN32
    delete (CUDTException*) pthread_getspecific(m_TLSError);
    pthread_setspecific(m_TLSError, e);
#else
    CGuard tg(m_TLSLock);
    delete (CUDTException*)TlsGetValue(m_TLSError);
    TlsSetValue(m_TLSError, e);
    m_mTLSRecord[GetCurrentThreadId()] = e;
#endif
}

在这个函数中,会首先取出线程局部存储变量m_TLSError中保存的本线程上一次创建的 CUDTException对象,将其delete掉,并将本次创建的CUDTException对象设置进去。CUDTUnited类定义中m_TLSError的声明(src/api.h):

private:
    pthread_key_t m_TLSError;                         // thread local error record (last error)
#ifndef WIN32
    static void TLSDestroy(void* e) {
        if (NULL != e)
            delete (CUDTException*) e;
    }
#else
    std::map<DWORD, CUDTException*> m_mTLSRecord;
    void checkTLSValue();
    pthread_mutex_t m_TLSLock;
#endif

在CUDTUnited构造函数中也可以看到对这个对象的初始化。 CUDTUnited::TLSDestroy()函数是m_TLSError的析构函数,在m_TLSError最后被销毁时,这个函数被调用,以便于释放搜有还未释放的资源。

再来看UDT API的调用者获取上一次发生的异常的函数UDT::getlasterror():

CUDTException* CUDTUnited::getError() {
#ifndef WIN32
    if (NULL == pthread_getspecific(m_TLSError))
        pthread_setspecific(m_TLSError, new CUDTException);
    return (CUDTException*) pthread_getspecific(m_TLSError);
#else
    CGuard tg(m_TLSLock);
    if(NULL == TlsGetValue(m_TLSError))
    {
        CUDTException* e = new CUDTException;
        TlsSetValue(m_TLSError, e);
        m_mTLSRecord[GetCurrentThreadId()] = e;
    }
    return (CUDTException*)TlsGetValue(m_TLSError);
#endif
}



CUDTException& CUDT::getlasterror() {
    return *s_UDTUnited.getError();
}


ERRORINFO& getlasterror() {
    return CUDT::getlasterror();
}

调用过程为UDT::getlasterror() -> CUDT::getlasterror() -> CUDTUnited::getError()。

主要就是从s_UDTUnited的线程局部存储变量m_TLSError中取出前面设置的本线程上次创建的CUDTException对象返回给调用者。

总结一下,UDT的使用者在调用UDT API时,UDT API会直接调用CUDT类对应的static API函数,在CUDT类的这些static API函数中会将做实际事情的工作委托给s_UDTUnited的相应函数,但这个委托调用会被包在一个try-catch block中。s_UDTUnited的函数在遇到异常情况时抛出异常,CUDT类的static API函数捕获异常,根据捕获到的异常的具体类型,创建不同的CUDTException对象设置给s_UDTUnited的线程局部存储变量m_TLSError中并向UDT API调用者返回错误码,UDT API的调用者检测到错误码后,通过UDT::getlasterror()获取存储在m_TLSError中的异常。

此处可以看到,CUDT提供的这一层API,一个比较重要的作用大概就是做异常处理了。

在UDT中,使用CUDTException来描述所有出现的异常。可以看一下这个类的定义(src/udt.h):

class UDT_API CUDTException {
 public:
    CUDTException(int major = 0, int minor = 0, int err = -1);
    CUDTException(const CUDTException& e);
    virtual ~CUDTException();

    // Functionality:
    //    Get the description of the exception.
    // Parameters:
    //    None.
    // Returned value:
    //    Text message for the exception description.

    virtual const char* getErrorMessage();

    // Functionality:
    //    Get the system errno for the exception.
    // Parameters:
    //    None.
    // Returned value:
    //    errno.

    virtual int getErrorCode() const;

    // Functionality:
    //    Clear the error code.
    // Parameters:
    //    None.
    // Returned value:
    //    None.

    virtual void clear();

 private:
    int m_iMajor;        // major exception categories

// 0: correct condition
// 1: network setup exception
// 2: network connection broken
// 3: memory exception
// 4: file exception
// 5: method not supported
// 6+: undefined error

    int m_iMinor;		// for specific error reasons
    int m_iErrno;		// errno returned by the system if there is any
    std::string m_strMsg;	// text error message

    std::string m_strAPI;	// the name of UDT function that returns the error
    std::string m_strDebug;  // debug information, set to the original place that causes the error

 public:
    // Error Code
    static const int SUCCESS;
    static const int ECONNSETUP;
    static const int ENOSERVER;
    static const int ECONNREJ;
    static const int ESOCKFAIL;
    static const int ESECFAIL;
    static const int ECONNFAIL;
    static const int ECONNLOST;
    static const int ENOCONN;
    static const int ERESOURCE;
    static const int ETHREAD;
    static const int ENOBUF;
    static const int EFILE;
    static const int EINVRDOFF;
    static const int ERDPERM;
    static const int EINVWROFF;
    static const int EWRPERM;
    static const int EINVOP;
    static const int EBOUNDSOCK;
    static const int ECONNSOCK;
    static const int EINVPARAM;
    static const int EINVSOCK;
    static const int EUNBOUNDSOCK;
    static const int ENOLISTEN;
    static const int ERDVNOSERV;
    static const int ERDVUNBOUND;
    static const int ESTREAMILL;
    static const int EDGRAMILL;
    static const int EDUPLISTEN;
    static const int ELARGEMSG;
    static const int EINVPOLLID;
    static const int EASYNCFAIL;
    static const int EASYNCSND;
    static const int EASYNCRCV;
    static const int ETIMEOUT;
    static const int EPEERERR;
    static const int EUNKNOWN;
};

这个class主要通过Major错误码和Minor错误码来描述异常情况,如果是调用系统调用出错了,还会用Errno值。

具体看一下这个class的实现,特别是CUDTException::getErrorMessage()函数,来了解每一个错误码所代表的含义:

CUDTException::CUDTException(int major, int minor, int err)
        : m_iMajor(major),
          m_iMinor(minor) {
    if (-1 == err)
#ifndef WIN32
        m_iErrno = errno;
#else
        m_iErrno = GetLastError();
#endif
        else
        m_iErrno = err;
}

CUDTException::CUDTException(const CUDTException& e)
        : m_iMajor(e.m_iMajor),
          m_iMinor(e.m_iMinor),
          m_iErrno(e.m_iErrno),
          m_strMsg() {
}

CUDTException::~CUDTException() {
}

const char* CUDTException::getErrorMessage() {
    // translate "Major:Minor" code into text message.

    switch (m_iMajor) {
        case 0:
            m_strMsg = "Success";
            break;

        case 1:
            m_strMsg = "Connection setup failure";

            switch (m_iMinor) {
                case 1:
                    m_strMsg += ": connection time out";
                    break;

                case 2:
                    m_strMsg += ": connection rejected";
                    break;

                case 3:
                    m_strMsg += ": unable to create/configure UDP socket";
                    break;

                case 4:
                    m_strMsg += ": abort for security reasons";
                    break;

                default:
                    break;
            }

            break;

        case 2:
            switch (m_iMinor) {
                case 1:
                    m_strMsg = "Connection was broken";
                    break;

                case 2:
                    m_strMsg = "Connection does not exist";
                    break;

                default:
                    break;
            }

            break;

        case 3:
            m_strMsg = "System resource failure";

            switch (m_iMinor) {
                case 1:
                    m_strMsg += ": unable to create new threads";
                    break;

                case 2:
                    m_strMsg += ": unable to allocate buffers";
                    break;

                default:
                    break;
            }

            break;

        case 4:
            m_strMsg = "File system failure";

            switch (m_iMinor) {
                case 1:
                    m_strMsg += ": cannot seek read position";
                    break;

                case 2:
                    m_strMsg += ": failure in read";
                    break;

                case 3:
                    m_strMsg += ": cannot seek write position";
                    break;

                case 4:
                    m_strMsg += ": failure in write";
                    break;

                default:
                    break;
            }

            break;

        case 5:
            m_strMsg = "Operation not supported";

            switch (m_iMinor) {
                case 1:
                    m_strMsg += ": Cannot do this operation on a BOUND socket";
                    break;

                case 2:
                    m_strMsg += ": Cannot do this operation on a CONNECTED socket";
                    break;

                case 3:
                    m_strMsg += ": Bad parameters";
                    break;

                case 4:
                    m_strMsg += ": Invalid socket ID";
                    break;

                case 5:
                    m_strMsg += ": Cannot do this operation on an UNBOUND socket";
                    break;

                case 6:
                    m_strMsg += ": Socket is not in listening state";
                    break;

                case 7:
                    m_strMsg += ": Listen/accept is not supported in rendezous connection setup";
                    break;

                case 8:
                    m_strMsg += ": Cannot call connect on UNBOUND socket in rendezvous connection setup";
                    break;

                case 9:
                    m_strMsg += ": This operation is not supported in SOCK_STREAM mode";
                    break;

                case 10:
                    m_strMsg += ": This operation is not supported in SOCK_DGRAM mode";
                    break;

                case 11:
                    m_strMsg += ": Another socket is already listening on the same port";
                    break;

                case 12:
                    m_strMsg += ": Message is too large to send (it must be less than the UDT send buffer size)";
                    break;

                case 13:
                    m_strMsg += ": Invalid epoll ID";
                    break;

                default:
                    break;
            }

            break;

        case 6:
            m_strMsg = "Non-blocking call failure";

            switch (m_iMinor) {
                case 1:
                    m_strMsg += ": no buffer available for sending";
                    break;

                case 2:
                    m_strMsg += ": no data available for reading";
                    break;

                default:
                    break;
            }

            break;

        case 7:
            m_strMsg = "The peer side has signalled an error";

            break;

        default:
            m_strMsg = "Unknown error";
    }

    // Adding "errno" information
    if ((0 != m_iMajor) && (0 < m_iErrno)) {
        m_strMsg += ": ";
#ifndef WIN32
        char errmsg[1024];
        if (strerror_r(m_iErrno, errmsg, 1024) == 0)
            m_strMsg += errmsg;
#else
        LPVOID lpMsgBuf;
        FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, m_iErrno, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), (LPTSTR)&lpMsgBuf, 0, NULL);
        m_strMsg += (char*)lpMsgBuf;
        LocalFree(lpMsgBuf);
#endif
    }

    // period
#ifndef WIN32
    m_strMsg += ".";
#endif

    return m_strMsg.c_str();
}

int CUDTException::getErrorCode() const {
    return m_iMajor * 1000 + m_iMinor;
}

void CUDTException::clear() {
    m_iMajor = 0;
    m_iMinor = 0;
    m_iErrno = 0;
}

const int CUDTException::SUCCESS = 0;
const int CUDTException::ECONNSETUP = 1000;
const int CUDTException::ENOSERVER = 1001;
const int CUDTException::ECONNREJ = 1002;
const int CUDTException::ESOCKFAIL = 1003;
const int CUDTException::ESECFAIL = 1004;
const int CUDTException::ECONNFAIL = 2000;
const int CUDTException::ECONNLOST = 2001;
const int CUDTException::ENOCONN = 2002;
const int CUDTException::ERESOURCE = 3000;
const int CUDTException::ETHREAD = 3001;
const int CUDTException::ENOBUF = 3002;
const int CUDTException::EFILE = 4000;
const int CUDTException::EINVRDOFF = 4001;
const int CUDTException::ERDPERM = 4002;
const int CUDTException::EINVWROFF = 4003;
const int CUDTException::EWRPERM = 4004;
const int CUDTException::EINVOP = 5000;
const int CUDTException::EBOUNDSOCK = 5001;
const int CUDTException::ECONNSOCK = 5002;
const int CUDTException::EINVPARAM = 5003;
const int CUDTException::EINVSOCK = 5004;
const int CUDTException::EUNBOUNDSOCK = 5005;
const int CUDTException::ENOLISTEN = 5006;
const int CUDTException::ERDVNOSERV = 5007;
const int CUDTException::ERDVUNBOUND = 5008;
const int CUDTException::ESTREAMILL = 5009;
const int CUDTException::EDGRAMILL = 5010;
const int CUDTException::EDUPLISTEN = 5011;
const int CUDTException::ELARGEMSG = 5012;
const int CUDTException::EINVPOLLID = 5013;
const int CUDTException::EASYNCFAIL = 6000;
const int CUDTException::EASYNCSND = 6001;
const int CUDTException::EASYNCRCV = 6002;
const int CUDTException::ETIMEOUT = 6003;
const int CUDTException::EPEERERR = 7000;
const int CUDTException::EUNKNOWN = -1;

Done。

UDT协议实现分析——UDT Socket的创建

标签:

原文地址:http://my.oschina.net/wolfcs/blog/502509

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!