标签:
代码并未在作者github上提供
将书中代码敲至vc 并调试运行
// Client.cpp : 定义控制台应用程序的入口点。 // #include "stdafx.h" #include <boost/thread.hpp> #include <thread> #include <string> #include "../Common/RWHandler.h" class Connector { public: Connector(io_service& ios, const string& strIp, short port) :m_ios(ios), m_socket(ios), m_serverAddr(tcp::endpoint(address::from_string(strIp), port)), m_isConnected(false), m_chkThread(nullptr) { CreateEventHandler(ios); } ~Connector(){} bool Start() { m_eventHandler->GetSocket().async_connect(m_serverAddr, [this](const boost::system::error_code& error) { if (error) { HandleConnectError(error); return; } cout << "connect ok" << endl; m_isConnected = true; m_eventHandler->HandleRead(); }); boost::this_thread::sleep(boost::posix_time::seconds(1)); return m_isConnected; } bool IsConnected()const { return m_isConnected; } void Send(char* data, int len) { if (!m_isConnected) return; m_eventHandler->HandleWrite(data,len); } void AsyncSend(char* data, int len) { if (!m_isConnected) return; //m_eventHandler->HandleAsyncWrite(data, len); m_eventHandler->HandleWrite(data, len); } private: void CreateEventHandler(io_service& ios) { m_eventHandler = std::make_shared<RWHandler>(ios); m_eventHandler->SetCallBackError([this](int connid) { HandleRWError(connid); }); } void CheckConnect() { if (m_chkThread != nullptr) return; m_chkThread = std::make_shared<std::thread>([this] { while (true) { if (!IsConnected()) Start(); boost::this_thread::sleep(boost::posix_time::seconds(1)); } }); } void HandleConnectError(const boost::system::error_code& error) { m_isConnected = false; cout << error.message() << endl; m_eventHandler->CloseSocket(); CheckConnect(); } void HandleRWError(int connid) { m_isConnected = false; CheckConnect(); } private: io_service& m_ios; tcp::socket m_socket; tcp::endpoint m_serverAddr; std::shared_ptr<RWHandler> m_eventHandler; bool m_isConnected; std::shared_ptr<std::thread> m_chkThread; }; int main() { io_service ios; boost::asio::io_service::work work(ios); boost::thread thd([&ios] {ios.run(); }); Connector conn(ios, "127.0.0.1", 9900); conn.Start(); std::string str; if (!conn.IsConnected()) { cin >> str; return -1; } const int len = 512; char line[len] = ""; while (cin >> str) { char header[HEAD_LEN] = {}; int totalLen = str.length() + 1 + HEAD_LEN; std::sprintf(header, "%d", totalLen); memcpy(line, header, HEAD_LEN); memcpy(line + HEAD_LEN, str.c_str(), str.length() + 1); conn.Send(line, totalLen); } return 0; }
// Server.cpp : 定义控制台应用程序的入口点。 // #include "stdafx.h" #include "../Common/RWHandler.h" #include "../Common/Message.h" #include <boost/asio/buffer.hpp> #include <unordered_map> #include <numeric> const int MaxConnectionNum = 65536; const int MaxRecvSize = 65536; class Server { public: Server(io_service& ios, short port) :m_ios(ios), m_acceptor(ios, tcp::endpoint(tcp::v4(), port)), m_connIdPool(MaxConnectionNum) { m_connIdPool.resize(MaxConnectionNum); std::iota(m_connIdPool.begin(), m_connIdPool.end(), 1); } ~Server(){} void Accept() { cout << "Start listening " << endl; std::shared_ptr<RWHandler> handler = CreateHandler(); m_acceptor.async_accept(handler->GetSocket(), [this, handler](const boost::system::error_code& error) { if (error) { cout << error.value() << " " << error.message() << endl; HandleAcpError(handler, error); return; } m_handlers.insert(std::make_pair(handler->GetConnId(),handler)); cout << "current connect count: " << m_handlers.size() << endl; handler->HandleRead(); Accept(); }); } private: void HandleAcpError(std::shared_ptr<RWHandler> eventHandler, const boost::system::error_code& error) { cout << "Error,error reason: " << error.value() << error.message() << endl; eventHandler->CloseSocket(); StopAccept(); } void StopAccept() { boost::system::error_code ec; m_acceptor.cancel(ec); m_acceptor.close(ec); m_ios.stop(); } std::shared_ptr<RWHandler> CreateHandler() { int connId = m_connIdPool.front(); m_connIdPool.pop_front(); std::shared_ptr<RWHandler> handler = std::make_shared<RWHandler>(m_ios); handler->SetConnId(connId); handler->SetCallBackError([this](int connId) { RecyclConnid(connId); }); return handler; } void RecyclConnid(int connId) { auto it = m_handlers.find(connId); if (it != m_handlers.end()) m_handlers.erase(it); //== cout << "current connect count: " << m_handlers.size() << endl; m_connIdPool.push_back(connId); } private: io_service& m_ios; tcp::acceptor m_acceptor; std::unordered_map<int, std::shared_ptr<RWHandler>> m_handlers; list<int> m_connIdPool; }; int main() { io_service ios; Server server(ios, 9900); server.Accept(); ios.run(); return 0; }
#pragma once class Message { public: enum { header_length = 4 }; enum { max_body_length = 512 }; Message() :body_length_(0){ } const char* data() const { return data_; } char* data() { return data_; } size_t length()const { return header_length + body_length_; } const char* body()const { return data_ + header_length; } char* body() { return data_ + header_length; } size_t body_length()const { return body_length_; } void body_length(size_t new_length) { body_length_ = new_length; if (body_length_ > max_body_length) body_length_ = max_body_length; } bool decode_header() { char header[header_length + 1] = ""; std::strncat(header, data_, header_length); body_length_ = std::atoi(header) - header_length; if (body_length_ > max_body_length) { body_length_ = 0; return false; } return true; } void encode_header() { char header[header_length + 1] = ""; std::sprintf(header,"%4d",body_length_); std::memcpy(data_,header,header_length); } private: char data_[header_length + max_body_length]; std::size_t body_length_; };
#pragma once #include <array> #include <functional> #include <iostream> using namespace std; #include <boost/asio.hpp> using namespace boost::asio; using namespace boost::asio::ip; using namespace boost; const int MAX_IP_PACK_SIZE = 65536; const int HEAD_LEN = 4; class RWHandler { public: RWHandler(io_service& ios) :m_sock(ios) {} ~RWHandler(){} void HandleRead() { async_read(m_sock, buffer(m_buff), transfer_at_least(HEAD_LEN), [this](const boost::system::error_code& ec, size_t size) { if (ec != nullptr) { HandleError(ec); return; } cout << m_buff.data() + HEAD_LEN << endl; HandleRead(); }); } void HandleWrite(char* data, int len) { boost::system::error_code ec; write(m_sock, buffer(data, len), ec); if (ec != nullptr) HandleError(ec); } tcp::socket& GetSocket() { return m_sock; } void CloseSocket() { boost::system::error_code ec; m_sock.shutdown(tcp::socket::shutdown_send, ec); m_sock.close(ec); } void SetConnId(int connId) { m_connId = connId; } int GetConnId()const { return m_connId; } template<typename F>void SetCallBackError(F f) { m_callbackError = f; } private: void HandleError(const boost::system::error_code& ec) { CloseSocket(); cout << ec.message() << endl; if (m_callbackError) m_callbackError(m_connId); } private: tcp::socket m_sock; std::array<char, MAX_IP_PACK_SIZE> m_buff; int m_connId; std::function<void(int)> m_callbackError; };
标签:
原文地址:http://www.cnblogs.com/itdef/p/5248823.html