码迷,mamicode.com
首页 > 其他好文 > 详细

项目中的libevent

时间:2014-09-04 01:30:37      阅读:293      评论:0      收藏:0      [点我收藏+]

标签:style   blog   color   os   io   使用   ar   for   文件   

单线程libevent模式
项目里面是多线程版的,我先理解下单线程的。
//client
1.调用NGP::init()
bool NGP::init(NGPcontext context)
{
    _context = context;
    //_TcpLink = NEWSP(TcpLink);
    _TcpLink = NEWSP(TcpLinkEx);
    _TcpLink->Init(this);
    return true;
}

2.初始化Libevent
bool LibEvtServer::init(I_NetServerEvent* event, int start, int size)
{
    m_ids = new ChannelIDGenerator();
    m_ids->init(start, size);
    m_allChannels.resize(m_ids->getSize());

    m_event = event;

    //使用windows模式的线程和锁
    int hr = evthread_use_windows_threads();//当前是两个线程一个主线程一个派发线程
    
    //一个线程只有一个event_base,对应的是一个struct event_base结构体和相应的事件管理器,这个事件管理器管理跟这个event_base绑定的事件
    m_base = event_base_new();//创建event_base
    if (!m_base) {
        fprintf(stderr, "Could not initialize libevent!\n");
        return false;
    }

    return true;
}
3.监听,搞不懂客户端为什么要监听
bool LibEvtServer::listen(int* port)
{
    struct sockaddr_in sin;

    memset(&sin, 0, sizeof(sin));
    sin.sin_family = AF_INET;
    if(-1 == *port)
        sin.sin_port = htons(10000);
    else
        sin.sin_port = htons(*port);
    
    //如果libevent分配和绑定套接字就调用evconnlistener_new_bind,这个相当于原始的socket()和bind()和listen(),然后有连接会调用listener_cb
    //其中调用了evutil_make_socket_nonblocking,将socket设置成无阻塞的,bind绑定,evconnlistener_new创建监听器,然后返回监听器
    //连接监听器使用event_base来得知什么时候在监听套接字上有TCP连接,当有连接时会调用回调函数
    m_listener = evconnlistener_new_bind(m_base, ::listener_cb, (void*)this,//evconnlistener_new_bind提供了监听和接受TCP的一个方法,这个是要libevent分配和绑定套接字
        LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1,
        (struct sockaddr*)&sin,
        sizeof(sin));
    if (!m_listener) 
    {
        return false;
    }

    if( -1 == *port)
        *port = ntohs(sin.sin_port);

    if (!m_listener) {
        fprintf(stderr, "Could not create a listener!\n");
        return false;
    }
    m_spThread.reset(new std::thread([this]
    {
        //SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_HIGHEST);
        //event_base_loop(m_base, EVLOOP_ONCE);
        //循环在epoll/kqueue系统调用上,当事件发生时自动调用绑定的事件,但这些事件需要绑定到这个event_base上面
        event_base_dispatch(m_base);//启动event_base的循环,event_base_dispatch内部就是一个while循环,项目直接开了一个线程取做这个
        if(WSAENOTSOCK == WSAGetLastError())
        {
            Plug::PlugMessageBox(L"操作无效套接字啊!");
        }
        
        Plug::PlugMessageBox(L"Libevent派发线程退出!");
    }));
    return true;
}

4.客户端连接
bool LibEvtServer::connet_to(int ip, int port)
{
    //创建基于socket的bufferevent,并将其托管给event_base
    //buffevent从单词可以看出是在event上的buffer,即在read/write两个事件上有input和output缓冲区
    //是文件描述符,决定哪个文件被写入写出,可以将socket看成文件描述符,但不允许设置成pipe(2),为安全起见可以设置成-1,然后 set it with bufferevent_setfd or bufferevent_socket_connect().
    auto bev = bufferevent_socket_new(m_base, -1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE); //创建一个bufferevent,关联sockfd,托管给event_base
    if (!bev){  
        std::cout << "bufferevent_socket_new error!" << std::endl;  
        return false;  
    }  

    struct sockaddr_in sin;  
    sin.sin_family = AF_INET;  
    sin.sin_addr.s_addr = ntohl(ip);
    sin.sin_port = htons(port);  
    
    //连接
    //If the bufferevent does not already have a socket set, we allocate a new socket here and make it nonblocking before we begin.
    int hr = bufferevent_socket_connect(bev, (struct sockaddr*)&sin, sizeof(sin));
    if(0 != hr)
        return false;

    std::lock_guard<std::mutex> lock(m_offline_mtx);//跟申请释放channel id互锁
    auto c2 = CreateChannel(bev);
    
    //设置bufferevent上的回调
    bufferevent_setcb(bev, conn_readcb, conn_writecb, conn_eventcb, c2);//设置读写事件
    //启动bufferevent
    bufferevent_enable(bev, EV_READ | EV_WRITE);//启动读写事件,将读写事件放入监听队列poll中
    return true;
}
//-------------------------------------到此client方面就做好了libevent方面的准备-------------------------

//server
/* 成员函数 */
bool LibEvtServer::init(I_NetServerEvent* event, int start, int size)
{
    m_ids = new ChannelIDGenerator();
    m_ids->init(start, size);
    m_allChannels.resize(m_ids->getSize());

    m_event = event;

    //event支持多线程的初始化函数
    int hr = evthread_use_windows_threads();

    m_base = event_base_new();
    if (!m_base) {
        fprintf(stderr, "Could not initialize libevent!\n");
        return false;
    }

    return true;
}


//侦听端口,-1表示向系统申请一个任意可用端口
bool LibEvtServer::listen(int* port)
{
    struct sockaddr_in sin;

    memset(&sin, 0, sizeof(sin));
    sin.sin_family = AF_INET;
    if(-1 == *port)
        sin.sin_port = htons(10000);
    else
        sin.sin_port = htons(*port);

    m_listener = evconnlistener_new_bind(m_base, ::listener_cb, (void*)this,
        LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1,
        (struct sockaddr*)&sin,
        sizeof(sin));
    if (!m_listener) 
    {
        return false;
    }

    if( -1 == *port)
        *port = ntohs(sin.sin_port);

    if (!m_listener) {
        fprintf(stderr, "Could not create a listener!\n");
        return false;
    }
    m_spThread.reset(new std::thread([this]
    {
        //SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_HIGHEST);
        //event_base_loop(m_base, EVLOOP_ONCE);
        event_base_dispatch(m_base);
        if(WSAENOTSOCK == WSAGetLastError())
        {
            Plug::PlugMessageBox(L"操作无效套接字啊!");
        }
        
        Plug::PlugMessageBox(L"Libevent派发线程退出!");
    }));
    return true;
}

//然后在这个监听器里面的回调函数里面
/* 新连接事件处理 */
typedef void (*evconnlistener_cb)(struct evconnlistener *, evutil_socket_t, struct sockaddr *, int socklen, void *);//这个是回调的原型
static void listener_cb(struct evconnlistener *listener, evutil_socket_t fd,
    struct sockaddr *sa, int socklen, void *user_data)
{
    auto ser = (LibEvtServer*)user_data;
    ser->listener_cb(listener, fd, sa, socklen, user_data);
}


void LibEvtServer::listener_cb(struct evconnlistener *listener, evutil_socket_t fd,
    struct sockaddr *sa, int socklen, void *user_data)
{
    //获取listener相关的event_base
    auto base = evconnlistener_get_base(listener);

    //建立socket的bufferevent
    auto bev = bufferevent_socket_new(base, fd, BEV_OPT_THREADSAFE);//BEV_OPT_CLOSE_ON_FREE | 
    if (!bev) 
    {
        fprintf(stderr, "Error constructing bufferevent!");
        event_base_loopbreak(base);
        return ;
    }

    auto c2 = CreateChannel(bev);
    
    //设置回调
    bufferevent_setcb(bev, conn_readcb, NULL, conn_eventcb, c2);
    //启动bufferevent
    bufferevent_enable(bev, EV_READ | EV_WRITE );
}

//看来服务器过程和客户端差不多,
//libevent过程,基于event_base的socket bufferevent,在event_base_dispath里面不知道用的是select还是iocp,然后发现socket状态发生变化可读,可写,或者
//错误都会调用相应的注册时间,建立的socket都是不阻塞的。还有个这个读写回调到底是什么时候调用的,我还不太明白,和那个读写缓冲区到底什么区别.

 

项目中的libevent

标签:style   blog   color   os   io   使用   ar   for   文件   

原文地址:http://www.cnblogs.com/zzyoucan/p/3955109.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!