标签:注意 代码分析 c++ 数据接口 ace osi socket tst 处理
跟据以下几种类型来判断为什么类型的包,来进行相应的处理:
enum QuicFrameType {
PADDING_FRAME = 0,
RST_STREAM_FRAME = 1,
CONNECTION_CLOSE_FRAME = 2,
GOAWAY_FRAME = 3,
WINDOW_UPDATE_FRAME = 4,
BLOCKED_FRAME = 5,
STOP_WAITING_FRAME = 6,
PING_FRAME = 7,
STREAM_FRAME,
ACK_FRAME,
MTU_DISCOVERY_FRAME,
NUM_FRAME_TYPES
};
PADDING_FRAME:为填充字节帧,接收到这个包时会将包剩余部分填充字节。
RST_STREAM_FRAME:当由流的创建者发送时,表示创建者希望关闭流,当由接收者发送时,表示发生错误或者不想接收流,因此流应该被关闭。
CONNECTION_CLOSE_FRAME:连接关闭。
GOAWAY_FRAME:表示流应该被停止使用,因为之后将会被关闭,在使用的流将被继续处理,但是发送者不会在接收流。
WINDOW_UPDATE_FRAME:用于通知对端流量控制端口接收窗口大小的更新。
BLOCKED_FRAME:表示已经准备好发送数据且有数据要发送,但是被阻塞了。
STOP_WAITING_FRAME:通知对端不需要等待包号小于特定值的包。
PING_FRAME:用来验证对端是否保持活跃,且连接是否正常。
STREAM_FRAME:用于发送数据。
ACK_FRAME:通知对端哪些包被接收到了。
quic实现的C/S端代码位于proto-quic\src\net\tools\quic
根据quic C/S端代码,其服务端和客户端代码的main()文件为server_bin.cc和client_bin.cc等文件。
client.main()函数基本实现步骤:
创建QuicClient类client,调用client.Initialize()进行初始化。
Initialize()实现:
{
定义流窗口大小
定义session窗口大小
设置epoll_server超时时间
调用CreateUDPSocket()创建UDP套接字
注册epoll时间回调函数
}
之后调用client.Connect()进行对话的连接
Connect()实现
{
写数据类PacketWrite类创建
创建session类
初始化session,InitializeSession()
WaitForEvent
}
调用client.CreateClientStream()与session中创建一个stream类流,用于发送数据。
调用stream->WriteStringPiece()来进行数据的发送。
调用client.WaitForEvents()等待事件。
调用stream->CloseConnection(net::QUIC_NO_ERROR);来关闭连接。
调用client.Disconnect()关闭client。
服务端与client端基本类似。
最外层的发送数据接口为调用stream流的WriteOrBufferData(body, fin, nullptr)方法其中body是要发的数据,fin是标识是否是改流的最后一个数据。之后会在流中进行相应的判断和处理,如流上是否有足够的空间来发送这个数据,发送窗口大小是否合适,是否阻塞等。如果判断可以进行发送之后便会调用session类的方法WritevData()。
在session类会调用connection类的SendStreamData方法发送数据,并根据实际发送的数据更新相应stream流的数据消费的数值。
在connection类会调用PacketGenerator类的ConsumeData方法来发送数据。其中会根据包来进行ack的绑定。
之后会返回connection类,根据消息队列情况调用WritePacket()进行socket上包的写入,该方法实现于PacketWriter类。
当Server端创建好之后循环调用StartReading(),进行接收包,根据synchronous_read_count_ 来判断是否是CHLO包。
void QuicSimpleServer::StartReading() {
if (synchronous_read_count_ == 0) {
// Only process buffered packets once per message loop.
dispatcher_->ProcessBufferedChlos(kNumSessionsToCreatePerSocketEvent);
}
...
int result = socket_->RecvFrom(
read_buffer_.get(), read_buffer_->size(), &client_address_,
base::Bind(&QuicSimpleServer::OnReadComplete, base::Unretained(this)));
...
OnReadComplete(result);
}
OnReadComplete()中会调用dispatcher的处理包方法
void QuicSimpleServer::OnReadComplete(int result) {
...
dispatcher_->ProcessPacket(
QuicSocketAddress(QuicSocketAddressImpl(server_address_)),
QuicSocketAddress(QuicSocketAddressImpl(client_address_)), packet);
StartReading();
}
void QuicDispatcher::ProcessPacket(const QuicSocketAddress& server_address,
const QuicSocketAddress& client_address,
const QuicReceivedPacket& packet) {
...
framer_.ProcessPacket(packet);
...
}
跳转到Framer类的处理方法
bool QuicFramer::ProcessPacket(const QuicEncryptedPacket& packet) {
...
if (!visitor_->OnUnauthenticatedPublicHeader(public_header)) {
// The visitor suppresses further processing of the packet.
return true;
}
...
}
visitor_指向dispatch类,跳转到
QuicDispatcher::OnUnauthenticatedPublicHeader(){
...
QuicConnectionId connection_id = header.connection_id;
SessionMap::iterator it = session_map_.find(connection_id);
if (it != session_map_.end()) {
DCHECK(!buffered_packets_.HasBufferedPackets(connection_id));
it->second->ProcessUdpPacket(current_server_address_,
current_client_address_, *current_packet_);
return false;
}
...
}
当包头的connection_id 能在session_map里找到时,直接调用connection的ProcessUdpPacket处理,server端的session_map维护在dispatch类里,创建session类都会记录下来。
之后经过处理跳转到Framer类的ProcessFrameData()方法里,其中对stream Framer和ACK Framer分别进行了处理,
如果是stream包,则对其进行解析后会调用OnStreamFrame()抛到上层。
if (!ProcessStreamFrame(reader, frame_type, &frame)) {
return RaiseError(QUIC_INVALID_STREAM_DATA);
}
if (!visitor_->OnStreamFrame(frame)) {
QUIC_DVLOG(1) << ENDPOINT
<< "Visitor asked to stop further processing.";
// Returning true since there was no parsing error.
return true;
}
}
visitor_在Framer类里,由创建connection类时初始化,指向connection类,在到connection类里调用visitor_->OnStreamFrame(),visitor_指向session类,在由session类抛到stream类的OnDataAvailable()将数据进行处理,注意基础stream类里没有实现OnDataAvailable()的方法,需要编写,下面是官方tools文件里的处理。
OnDataAvailable() {
while (HasBytesToRead()) {
struct iovec iov;
if (GetReadableRegions(&iov, 1) == 0) {
// No more data to read.
break;
}
QUIC_DVLOG(1) << "Stream " << id() << " processed " << iov.iov_len
<< " bytes.";
body_.append(static_cast<char*>(iov.iov_base), iov.iov_len);
if (content_length_ >= 0 &&
body_.size() > static_cast<uint64_t>(content_length_)) {
QUIC_DVLOG(1) << "Body size (" << body_.size() << ") > content length ("
<< content_length_ << ").";
SendErrorResponse();
return;
}
MarkConsumed(iov.iov_len);
}
if (!sequencer()->IsClosed()) {
sequencer()->SetUnblocked();
return;
}
// If the sequencer is closed, then all the body, including the fin, has been
// consumed.
OnFinRead();
if (write_side_closed() || fin_buffered()) {
return;
}
}
如果是ACK包,则会对ack进行处理,并进行拥塞算法的运算。
if (!ProcessAckFrame(reader, frame_type, &frame)) {
return RaiseError(QUIC_INVALID_ACK_DATA);
}
if (!visitor_->OnAckFrame(frame)) {
QUIC_DVLOG(1) << ENDPOINT
<< "Visitor asked to stop further processing.";
// Returning true since there was no parsing error.
return true;
}
OnAckFrame()跳转到connection类的OnAckFrame,其中调用QuicSentPacketManager类OnIncomingAck()方法,其中进行了rtt和带宽的更新,并对丢包进行判断。
(gdb) bt #0 posix_quic::QuicConnectionVisitor::OnAckFrame (this=0x83b238, frame=...) at /root/posix_quic/src/connection_visitor.cpp:109 #1 0x000000000043331c in net::QuicConnection::OnAckFrame(net::QuicAckFrame const&) () #2 0x000000000043fd08 in net::QuicFramer::ProcessFrameData(net::QuicDataReader*, net::QuicPacketHeader const&) [clone .part.162] [clone .constprop.172] () #3 0x0000000000440258 in net::QuicFramer::ProcessDataPacket(net::QuicDataReader*, net::QuicPacketHeader*, net::QuicEncryptedPacket const&, char*, unsigned long) [clone .part.163] [clone .constprop.166] () #4 0x0000000000440684 in net::QuicFramer::ProcessPacket(net::QuicEncryptedPacket const&) () #5 0x0000000000434958 in net::QuicConnection::ProcessUdpPacket(net::QuicSocketAddress const&, net::QuicSocketAddress const&, net::QuicReceivedPacket const&) () #6 0x00000000004171f8 in posix_quic::QuicSocketEntry::ProcessUdpPacket (this=this@entry=0x83b080, self_address=..., peer_address=..., packet=...) at /root/posix_quic/src/socket_entry.cpp:415 #7 0x0000000000406ff0 in posix_quic::QuicEpollerEntry::Wait (this=this@entry=0x837950, events=0x461790 <net::QuicSocketAddressImpl::QuicSocketAddressImpl(sockaddr_storage const&)+48>, events@entry=0xffffffff9128, maxevents=65535, maxevents@entry=1024, timeout=timeout@entry=6000) at /root/posix_quic/src/epoller_entry.cpp:376 #8 0x00000000004157cc in posix_quic::QuicEpollWait (epfd=3, epfd@entry=0, events=events@entry=0xffffffff9128, maxevents=maxevents@entry=1024, timeout=timeout@entry=6000) at /root/posix_quic/src/quic_socket.cpp:494 #9 0x0000000000401dfc in doLoop (ep=0, ep@entry=3) at /root/posix_quic/test/client/src/client.cpp:37 #10 0x00000000004009c8 in main () at /root/posix_quic/test/client/src/client.cpp:143 (gdb)
标签:注意 代码分析 c++ 数据接口 ace osi socket tst 处理
原文地址:https://www.cnblogs.com/dream397/p/14605040.html