标签:
本节介绍接受连接Acceptor,并给出实现;
(1)Acceptor用于接受连接,是TcpServer的成员,生命周期由TcpServer控制;
(2)当Acceptor使用accept接受连接时,回调TcpServer的函数,来通知TcpServer创建TcpConnection;
(3)Acceptor的socket fd是一个listening,也就是是一个服务器的socket,当Epoller会回调Acceptor的_handleRead,在_handleRead中会使用accept接收新连接;
class Event; class EventLoop; class Acceptor final { public: Acceptor(const Acceptor&) = delete; Acceptor& operator=(const Acceptor&) = delete; Acceptor(const InetAddress& listenAddr, EventLoop* loop); ~Acceptor(); void setNewConnectionCallback(const NewConnectionCallback& cb) { _newConnectionCallback = cb; } private: void _handleRead(); void _bind(); void _listen(); static const int backlog = 10; int _acceptfd; InetAddress _listenAddr; EventLoop* _loop; std::unique_ptr<Event> _acceptEvent; NewConnectionCallback _newConnectionCallback; };
说明几点:
(1)_handleRead()为accept时的回调函数用来执行接受新连接;_acceptfd为监听的描述符;
(2)_newConnectionCallback为TcpServer注册给Acceptor的回调函数,当Acceptor的_handleRead函数执行时,将回调_newConnectionCallback来创建TcpConnection;
Acceptor::Acceptor(const InetAddress& listenAddr, EventLoop* loop) : _acceptfd(sockets::createNonBlockingSocket()), _listenAddr(listenAddr), _loop(loop), _acceptEvent(new Event(_acceptfd, _loop)) { LOG_TRACE << "accept fd: " << _acceptfd; sockets::setReuseAddr(_acceptfd); _bind(); _listen(); _acceptEvent->setReadCallback(std::bind(&Acceptor::_handleRead, this)); _acceptEvent->enableReading(); } Acceptor::~Acceptor() { _acceptEvent->disableAll(); _acceptEvent->remove(); ::close(_acceptfd); } void Acceptor::_handleRead() { InetAddress clientAddr; int connfd = sockets::accept(_acceptfd, &clientAddr.address()); if (connfd < 0) return; if (_newConnectionCallback) { _newConnectionCallback(connfd, clientAddr); } else { ::close(connfd); } } void Acceptor::_bind() { sockets::bind(_acceptfd, _listenAddr.address()); } void Acceptor::_listen() { sockets::listen(_acceptfd, backlog); }说明几点:
(1)在构造函数中,创建非阻塞的socket的_acceptfd描述符,绑定_acceptfd到服务器地址,以及监听_acceptfd;对于sockets::createNonBlockingSocket()、sockets::bind、sockets::listen下文介绍;
(2)_handleRead()用来处理接受连接, sockets::accept接受一个新连接,然后执行TcpServer的回调_newConnectionCallback;
void TcpServer::_newConnection(int connfd, const InetAddress& peerAddr) { EventLoop* loop; if ( _loopPool) { loop = _loopPool->getNextLoop(); } else { loop = _loop; } LOG_INFO << "TcpServer::_newConnection [" << connfd << "] from " << peerAddr.hostNameString(); TcpConnectionPtr conn(new TcpConnection(connfd, loop, peerAddr, _serverAddr)); assert(_connectionMaps.find(connfd) == _connectionMaps.end()); _connectionMaps[connfd] = conn; conn->setMessageCallback(_messageCallback); conn->setConnectionCallback(_connectionCallback); conn->setCloseConnectionCallback(std::bind(&TcpServer::_removeConnection, this, std::placeholders::_1)); conn->connectEstablished(); }
说明几点:
(1)TcpServer的详细介绍,请见下一个博客;
(2)在_newConnection中,主要是创建 TcpConnection,并在最后 conn->connectEstablished();让管理TcpConnection的IO线程来处理TcpConnection上的回调函数;
void sockets::setNonBlockingFd(int fd) { int flags; if ((flags = fcntl(fd, F_GETFL, 0)) < 0) { LOG_SYSERR << "fcntl system error: " << strError(); } flags |= O_NONBLOCK; if ((flags = fcntl(fd, F_SETFL, flags)) < 0) { LOG_SYSERR << "fcntl system error: " << strError(); } } int sockets::createNonBlockingSocket(void) { int sockfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_TCP); if (sockfd < 0) { LOG_SYSERR << "socket system error: " << strError(); abort(); } return sockfd; } void sockets::bind(int sockfd, const struct sockaddr_in& addr) { socklen_t len = sizeof(addr); if (::bind(sockfd, reinterpret_cast<const sockaddr*>(&addr), len) < 0) //can not use C++ static_cast { LOG_SYSERR << "bind system error: " << strError(); // abort(); } } void sockets::listen(int listenfd, int backlog) { if (::listen(listenfd, backlog) < 0) { LOG_SYSERR << "bind system error: " << strError(); abort(); } } int sockets::accept(int listenfd, struct sockaddr_in* clientAddr) { int connfd; socklen_t len; #if _GNU_SOURCE connfd = accept4(listenfd, reinterpret_cast<sockaddr*>(clientAddr), &len, SOCK_NONBLOCK | SOCK_CLOEXEC); #else confd = accept(listenfd, reinterpret_cast<sockaddr*>(clientAddr), &len); setNonBlockingFd(connfd); #endif if (connfd < 0) { int savederrno = errno; switch (savederrno) { case EAGAIN: //EWOULDBLOCK: case EINTR: case EBADF: case ECONNABORTED: errno = savederrno; LOG_SYSERR << "accept system expected error: " << strError(); break; case ENOTSOCK: case EOPNOTSUPP: case ENOBUFS: case EINVAL: case EFAULT: LOG_SYSERR << "accept system unexpected error: " << strError(); abort(); break; default: LOG_SYSERR << "accept system unknown error: " << strError(); abort(); break; } } return connfd; }说明几点:
(1)accept函数中可以使用accept4一次性得到非阻塞性的连接描述符;
使用InetAddress封装socket的IP地址和端口;方便取出IP的字符串名称等;
class InetAddress final { public: InetAddress() = default; InetAddress(const char* ip, uint16_t port); uint16_t port() const; std::string ipString() const; std::string hostNameString() const; struct sockaddr_in& address() { return _address; } private: struct sockaddr_in _address; };
InetAddress::InetAddress(const char* ip, uint16_t p) { ::memset(&_address, 0, sizeof(struct sockaddr_in)); sockets::stringToAddr(ip, &_address.sin_addr); _address.sin_port = endian::hostToNet16(p); _address.sin_family = AF_INET; //impotant } uint16_t InetAddress::port() const { return _address.sin_port; } std::string InetAddress::ipString() const { return sockets::addrToString(_address.sin_addr); } std::string InetAddress::hostNameString() const { char buf[32]; snprintf(buf, sizeof buf, ":%d", _address.sin_port); return sockets::addrToString(_address.sin_addr) + buf; }
void sockets::stringToAddr(const char* ip, struct in_addr* addr) { if (inet_aton(ip, addr) == 0) { LOG_ERROR << "invalid Ip address: " << strError(); } } std::string sockets::addrToString(const struct in_addr& addr) { return ::inet_ntoa(addr); }
标签:
原文地址:http://blog.csdn.net/skyuppour/article/details/44856759