码迷,mamicode.com
首页 > 其他好文 > 详细

使用同步socket创建非阻塞socket server

时间:2014-12-26 10:58:59      阅读:192      评论:0      收藏:0      [点我收藏+]

标签:

 

 

这个socket server可以:

  1. 非阻塞的处理多个socket连接。
  2. 可以接收来自客户端的ping消息,并把5秒内无活动的客户端移除。
  3. 可以接收客户端的login请求,使用者可以按自己需求加入认证逻辑。

 

/*‘‘‘

Non-Blocking socket server using blocking API

Created on Dec 25, 2014 (merry christmas)

@author: ScottGu<150316990@qq.com, gu.kai.66@gmail.com>

performance tested:

environment: 64bit win7, i7-4800MQ, 8GB
‘‘‘*/

#pragma once
boost::recursive_mutex cs; // thread-safe access to clients array

struct talk_to_client
    : boost::enable_shared_from_this<talk_to_client>
{
    talk_to_client(boost::asio::io_service& io_service) 
        :sock_(io_service)
    {
    }

    std::string username() const
    {
        return username_;
    }

    void answer_to_client() {
        try {
            read_request();
            process_request();
        }
        catch (boost::system::system_error&) {
            stop();
        }
        if (timed_out())
            stop();
    }


    void read_request() {
        if (sock_.available())
        {
            already_read_ += sock_.read_some(
                boost::asio::buffer(buff_ + already_read_, max_msg - already_read_));
        }
    }

    void process_request() {
        bool found_enter = std::find(buff_, buff_ + already_read_, \n)< buff_ + already_read_;
        if (!found_enter)
            return; // message is not full
        // process the msg
        last_ping_ = boost::posix_time::microsec_clock::local_time();
        size_t pos = std::find(buff_, buff_ + already_read_, \n) -
            buff_;
        std::string msg(buff_, pos);
        
        std::copy(buff_ + pos, buff_ + already_read_, buff_);
        //std::copy(buff_ + already_read_, buff_ + max_msg, buff_);
        already_read_ -= pos + 1;

        if (msg.find("login ") == 0) on_login(msg);
        else if (msg.find("ping") == 0) on_ping();
        else if (msg.find("ask_clients") == 0) on_clients();
        else std::cerr << "invalid msg " << msg << std::endl;
    }

    void set_clients_changed() { clients_changed_ = true; }
    boost::asio::ip::tcp::socket & sock() { return sock_; }

    bool timed_out() const {
        boost::posix_time::ptime now = boost::posix_time::microsec_clock::local_time();
        long long ms = (now - last_ping_).total_milliseconds();
        return ms > 5000;
    }

    void stop() {
        boost::system::error_code err; sock_.close(err);
    }
    void on_login(const std::string & msg) {
        std::istringstream in(msg);
        in >> username_ >> username_;
        write("login ok\n");
        //update_clients_changed();
    }

    void on_ping() {
        write(clients_changed_ ? "ping client_list_changed\n" : "ping ok\n");
        clients_changed_ = false;
    }

    void on_clients();

    void write(const std::string & msg) { sock_.write_some(boost::asio::buffer(msg)); }


private:
    //   in Synchronous Client field are same
    bool clients_changed_;
    boost::posix_time::ptime last_ping_;
    boost::asio::ip::tcp::socket sock_;

    enum { max_msg = 6*1024 };
    int already_read_;
    char buff_[max_msg];
    bool started_;
    std::string username_;

};




typedef boost::shared_ptr<talk_to_client> client_ptr;
typedef std::vector<client_ptr> array;
array clients;

void talk_to_client::on_clients() {
    std::string msg;
    boost::recursive_mutex::scoped_lock lk(cs);
    for (auto b = clients.begin(), e = clients.end(); b != e; ++b){
        msg += (*b)->username() + " ";
    }
    write("clients " + msg + "\n");
}


void accept_thread() {
    boost::asio::io_service io_service;
    boost::asio::ip::tcp::acceptor acceptr(io_service,
        boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 8001));

    while (true) {
        client_ptr new_(new talk_to_client(io_service));

        acceptr.accept(new_->sock());
        
        boost::recursive_mutex::scoped_lock lk(cs);
        clients.push_back(new_);
    }
}


void handle_clients_thread() {
    while (true) {
        boost::this_thread::sleep(boost::posix_time::millisec(1));
        boost::recursive_mutex::scoped_lock lk(cs);
        for (array::iterator b = clients.begin(), e = clients.end(); b != e; ++b){
            (*b)->answer_to_client();
        }
        
        // erase clients that timed out
        clients.erase(std::remove_if(clients.begin(), clients.end(),
            boost::bind(&talk_to_client::timed_out, _1)), clients.end());
    }
}


int run_sync_talk_server() {

    boost::thread_group threads;
    threads.create_thread(boost::bind(accept_thread));
    threads.create_thread(handle_clients_thread);
    threads.join_all();
}

 

使用同步socket创建非阻塞socket server

标签:

原文地址:http://www.cnblogs.com/scottgu/p/4186186.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!