标签:
本节介绍接受连接Acceptor,并给出实现;
(1)Acceptor用于接受连接,是TcpServer的成员,生命周期由TcpServer控制;
(2)当Acceptor使用accept接受连接时,回调TcpServer的函数,来通知TcpServer创建TcpConnection;
(3)Acceptor的socket fd是一个listening,也就是是一个服务器的socket,当Epoller会回调Acceptor的_handleRead,在_handleRead中会使用accept接收新连接;
class Event;
class EventLoop;
class Acceptor final
{
public:
Acceptor(const Acceptor&) = delete;
Acceptor& operator=(const Acceptor&) = delete;
Acceptor(const InetAddress& listenAddr, EventLoop* loop);
~Acceptor();
void setNewConnectionCallback(const NewConnectionCallback& cb)
{
_newConnectionCallback = cb;
}
private:
void _handleRead();
void _bind();
void _listen();
static const int backlog = 10;
int _acceptfd;
InetAddress _listenAddr;
EventLoop* _loop;
std::unique_ptr<Event> _acceptEvent;
NewConnectionCallback _newConnectionCallback;
};
说明几点:
(1)_handleRead()为accept时的回调函数用来执行接受新连接;_acceptfd为监听的描述符;
(2)_newConnectionCallback为TcpServer注册给Acceptor的回调函数,当Acceptor的_handleRead函数执行时,将回调_newConnectionCallback来创建TcpConnection;
Acceptor::Acceptor(const InetAddress& listenAddr, EventLoop* loop) :
_acceptfd(sockets::createNonBlockingSocket()),
_listenAddr(listenAddr),
_loop(loop),
_acceptEvent(new Event(_acceptfd, _loop))
{
LOG_TRACE << "accept fd: " << _acceptfd;
sockets::setReuseAddr(_acceptfd);
_bind();
_listen();
_acceptEvent->setReadCallback(std::bind(&Acceptor::_handleRead, this));
_acceptEvent->enableReading();
}
Acceptor::~Acceptor()
{
_acceptEvent->disableAll();
_acceptEvent->remove();
::close(_acceptfd);
}
void Acceptor::_handleRead()
{
InetAddress clientAddr;
int connfd = sockets::accept(_acceptfd, &clientAddr.address());
if (connfd < 0)
return;
if (_newConnectionCallback)
{
_newConnectionCallback(connfd, clientAddr);
}
else
{
::close(connfd);
}
}
void Acceptor::_bind()
{
sockets::bind(_acceptfd, _listenAddr.address());
}
void Acceptor::_listen()
{
sockets::listen(_acceptfd, backlog);
}说明几点:
(1)在构造函数中,创建非阻塞的socket的_acceptfd描述符,绑定_acceptfd到服务器地址,以及监听_acceptfd;对于sockets::createNonBlockingSocket()、sockets::bind、sockets::listen下文介绍;
(2)_handleRead()用来处理接受连接, sockets::accept接受一个新连接,然后执行TcpServer的回调_newConnectionCallback;
void TcpServer::_newConnection(int connfd, const InetAddress& peerAddr)
{
EventLoop* loop;
if ( _loopPool)
{
loop = _loopPool->getNextLoop();
}
else
{
loop = _loop;
}
LOG_INFO << "TcpServer::_newConnection [" << connfd << "] from " << peerAddr.hostNameString();
TcpConnectionPtr conn(new TcpConnection(connfd, loop, peerAddr, _serverAddr));
assert(_connectionMaps.find(connfd) == _connectionMaps.end());
_connectionMaps[connfd] = conn;
conn->setMessageCallback(_messageCallback);
conn->setConnectionCallback(_connectionCallback);
conn->setCloseConnectionCallback(std::bind(&TcpServer::_removeConnection, this, std::placeholders::_1));
conn->connectEstablished();
}
说明几点:
(1)TcpServer的详细介绍,请见下一个博客;
(2)在_newConnection中,主要是创建 TcpConnection,并在最后 conn->connectEstablished();让管理TcpConnection的IO线程来处理TcpConnection上的回调函数;
void sockets::setNonBlockingFd(int fd)
{
int flags;
if ((flags = fcntl(fd, F_GETFL, 0)) < 0)
{
LOG_SYSERR << "fcntl system error: " << strError();
}
flags |= O_NONBLOCK;
if ((flags = fcntl(fd, F_SETFL, flags)) < 0)
{
LOG_SYSERR << "fcntl system error: " << strError();
}
}
int sockets::createNonBlockingSocket(void)
{
int sockfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_TCP);
if (sockfd < 0)
{
LOG_SYSERR << "socket system error: " << strError();
abort();
}
return sockfd;
}
void sockets::bind(int sockfd, const struct sockaddr_in& addr)
{
socklen_t len = sizeof(addr);
if (::bind(sockfd, reinterpret_cast<const sockaddr*>(&addr), len) < 0) //can not use C++ static_cast
{
LOG_SYSERR << "bind system error: " << strError();
// abort();
}
}
void sockets::listen(int listenfd, int backlog)
{
if (::listen(listenfd, backlog) < 0)
{
LOG_SYSERR << "bind system error: " << strError();
abort();
}
}
int sockets::accept(int listenfd, struct sockaddr_in* clientAddr)
{
int connfd;
socklen_t len;
#if _GNU_SOURCE
connfd = accept4(listenfd, reinterpret_cast<sockaddr*>(clientAddr), &len, SOCK_NONBLOCK | SOCK_CLOEXEC);
#else
confd = accept(listenfd, reinterpret_cast<sockaddr*>(clientAddr), &len);
setNonBlockingFd(connfd);
#endif
if (connfd < 0)
{
int savederrno = errno;
switch (savederrno)
{
case EAGAIN: //EWOULDBLOCK:
case EINTR:
case EBADF:
case ECONNABORTED:
errno = savederrno;
LOG_SYSERR << "accept system expected error: " << strError();
break;
case ENOTSOCK:
case EOPNOTSUPP:
case ENOBUFS:
case EINVAL:
case EFAULT:
LOG_SYSERR << "accept system unexpected error: " << strError();
abort();
break;
default:
LOG_SYSERR << "accept system unknown error: " << strError();
abort();
break;
}
}
return connfd;
}说明几点:
(1)accept函数中可以使用accept4一次性得到非阻塞性的连接描述符;
使用InetAddress封装socket的IP地址和端口;方便取出IP的字符串名称等;
class InetAddress final
{
public:
InetAddress() = default;
InetAddress(const char* ip, uint16_t port);
uint16_t port() const;
std::string ipString() const;
std::string hostNameString() const;
struct sockaddr_in& address()
{
return _address;
}
private:
struct sockaddr_in _address;
};
InetAddress::InetAddress(const char* ip, uint16_t p)
{
::memset(&_address, 0, sizeof(struct sockaddr_in));
sockets::stringToAddr(ip, &_address.sin_addr);
_address.sin_port = endian::hostToNet16(p);
_address.sin_family = AF_INET; //impotant
}
uint16_t InetAddress::port() const
{
return _address.sin_port;
}
std::string InetAddress::ipString() const
{
return sockets::addrToString(_address.sin_addr);
}
std::string InetAddress::hostNameString() const
{
char buf[32];
snprintf(buf, sizeof buf, ":%d", _address.sin_port);
return sockets::addrToString(_address.sin_addr) + buf;
}
void sockets::stringToAddr(const char* ip, struct in_addr* addr)
{
if (inet_aton(ip, addr) == 0)
{
LOG_ERROR << "invalid Ip address: " << strError();
}
}
std::string sockets::addrToString(const struct in_addr& addr)
{
return ::inet_ntoa(addr);
}
标签:
原文地址:http://blog.csdn.net/skyuppour/article/details/44856759