标签:des style blog http ar io color os 使用
1.主线程定义回调对象
2.调用io object的操作
3.io object会另开线程,定义opertion op来执行操作,同时将回调对象加到op的do_complete上。进行操作
4.完成操作加入完成队列
5.io_service线程循环从完成队列取事件,调用其事件对应的回调函数
还记得前面我们在分析resolver的实现的时候,挖了一个关于operation的坑?为了不让自己陷进去,现在来填吧;接下来我们就来看看asio中的各种operation。
和前面提到过的service的类似,这里的operation也分为两大系:IOCP Enable和Disable系列。这里我们重点关注下图中红色部分表示的IOCP Enable系列operation。
从上图可以看到,所有IOCP Enable的operation,其基类都是struct OVERLAPPED结构,该结构是Win32进行交叠IO一个非常重要的结构,用以异步执行过程中的参数传递。所有的operation直接从该结构继承的结果,就是所有operation对象,可以直接作为OVERLAPPED结构在异步函数中进行传递。
例如在win_iocp_socket_service_base中,为了启动一个receive的异步操作, start_receive_op函数就直接把传递进来的operation指针作为OVERLAPPED结构传递给::WSARecv函数,从而发起一个异步服务请求。
void win_iocp_socket_service_base::start_receive_op( win_iocp_socket_service_base::base_implementation_type& impl, WSABUF* buffers, std::size_t buffer_count, socket_base::message_flags flags, bool noop,operation* op) { update_cancellation_thread_id(impl); iocp_service_.work_started();
if (noop) iocp_service_.on_completion(op); else if (!is_open(impl)) iocp_service_.on_completion(op, boost::asio::error::bad_descriptor); else { DWORD bytes_transferred = 0; DWORD recv_flags = flags; int result = ::WSARecv(impl.socket_, buffers, static_cast<DWORD>(buffer_count), &bytes_transferred, &recv_flags, op, 0); DWORD last_error = ::WSAGetLastError(); if (last_error == ERROR_NETNAME_DELETED) last_error = WSAECONNRESET; else if (last_error == ERROR_PORT_UNREACHABLE) last_error = WSAECONNREFUSED; if (result != 0 && last_error != WSA_IO_PENDING) iocp_service_.on_completion(op, last_error, bytes_transferred); else iocp_service_.on_pending(op); } } |
关于operation对象的创建、传递,以及完成handler的执行序列等,使用下图可以清晰的描述。
下表反映了Windows环境下,部分的异步请求所对应的服务、win函数、operation等信息:
异步请求 |
服务 |
start op |
Win32函数 |
对应operation |
ip::tcp::socket::async_connect |
win_iocp_socket_service |
start_connect_op() |
::connect |
reactive_socket_connect_op |
ip::tcp::socket::async_read_some |
start_receive_op() |
::WSARecv |
win_iocp_socket_recv_op |
|
ip::tcp::socket::async_receive |
start_receive_op() |
::WSARecv |
win_iocp_socket_recv_op |
|
ip::tcp::socket::async_write_some |
start_send_op() |
::WSASend |
win_iocp_socket_send_op |
|
ip::tcp::socket::async_send |
start_send_op() |
::WSASend |
win_iocp_socket_send_op |
|
ip::tcp::acceptor::async_accept |
start_accept_op() |
::AcceptEx |
win_iocp_socket_accept_op |
|
|
|
|
|
|
ip::tcp::resolver::async_resolve |
resolver_service |
start_resolve_op() |
::getaddrinfo |
resolve_op |
不知你是否注意到,在operation的类图中,所有从operation继承的子类,都定义了一个do_complete()函数,然而该函数声明为static,这又是为何呢?
我们以win_iocp_socket_recv_op为例来进行说明。该类中的do_complete是这样声明的:
staticvoid do_complete(io_service_impl* owner,
operation* base,
const boost::system::error_code& result_ec,
std::size_t bytes_transferred)
该类的构造函数,又把此函数地址传递给父类win_iocp_operation去初始化父类成员,这两个类的构造函数分别如下,请注意加粗代码:
win_iocp_socket_recv_op ::
win_iocp_socket_recv_op(socket_ops::state_type state,
socket_ops::weak_cancel_token_type cancel_token,
const MutableBufferSequence& buffers, Handler& handler)
:operation(&win_iocp_socket_recv_op::do_complete),
state_(state),
cancel_token_(cancel_token),
buffers_(buffers),
handler_(BOOST_ASIO_MOVE_CAST(Handler)(handler))
{
}
win_iocp_operation ::win_iocp_operation(func_type func)
: next_(0),
func_(func)
{
reset();
}
至此,我们明白,将do_complete声明为static,可以方便获取函数指针,并在父类中进行回调。那么,不仅要问,既然两个类存在继承关系,那么为何不将do_complete声明为虚函数呢?
再回头看看这些类的最顶层基类,就会明白的。最顶层的OVERLAPPED基类,使得将operation对象作为OVERLAPPED对象在异步函数中进行传递成为可能;如果将do_complete声明为虚函数,则多数编译器会在对象起始位置放置vptr,这样就改变了内存布局,从而不能再把operation对象直接作为OVERLAPPED对象进行传递了。
当然,一定要用虚函数的话,也不是不可能,只是在传递对象的时候,就需要考虑到vptr的存在,这会有两个方面的问题:一是进行多态类型转换时,效率上的损失;二是各家编译器对vtpr的实现各不相同,跨平台的asio就需要进行多种适配,这无疑又过于烦躁了。于是作者就采取了最为简单有效的方式——用static函数来进行回调——简单,就美。
在Windows NT环境下(IOCP Enabled),win_iocp_io_service代表着io_service,是整个asio的运转核心。本节开始来分析该类的实现。
从类的命名也可以看出,IOCP是该实现的核心。IOCP(IO Completion Port, IOCP)在windows上,可以说是效率最高的异步IO模型了,他使用有限的线程,处理尽可能多的并发IO请求。该模型虽说可以应用于各种IO处理,但目前应用较多的还是网络IO方面。
我们都知道,在Window是环境下使用IOCP,基本上需要这样几个步骤:
所有这些线程调用GetQueuedCompletionStatus()函数等待一个完成端口事件的到来;
那么,这些步骤,是如何分散到asio中的呢?来吧,先从完成端口创建开始。
如上所述,完成端口的创建,需要调用CreateIoCompletionPort()函数,在win_iocp_io_service的构造函数中,就有这样的操作:
win_iocp_io_service::win_iocp_io_service( boost::asio::io_service& io_service, size_tconcurrency_hint) : boost::asio::detail::service_base<win_iocp_io_service>(io_service), iocp_(), outstanding_work_(0), stopped_(0), stop_event_posted_(0), shutdown_(0), dispatch_required_(0) { BOOST_ASIO_HANDLER_TRACKING_INIT;
iocp_.handle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, static_cast<DWORD>((std::min<size_t>)(concurrency_hint, DWORD(~0)))); if (!iocp_.handle) { DWORD last_error = ::GetLastError(); boost::system::error_code ec(last_error, boost::asio::error::get_system_category()); boost::asio::detail::throw_error(ec, "iocp"); } } |
win_iocp_io_service的构造函数,负责创建一个完成端口,并把此完成端口对象的句柄交给一个auto_handle进行管理——auto_handle的唯一用途,就是在对象析构时,调用::CloseHandle()把windows句柄资源关闭,从而保证不会资源泄露。
我们在windows环境下,声明一个boost::asio::io_service对象,其内部就创建了一个win_iocp_io_service的实例;因此,一个io_service对象就对应着一个完成端口对象——这也就可以解释,为什么所有的IO Object都需要一个io_service参数了——这样,大家就好公用外面定义好的完成端口对象。
除了io_service对象会创建一个完成端口对象,事实上,在asio中,另外一个service也会创建一个,这就是boost::asio::ip::resolver_service。该类对应的detail实现boost::asio::detail::resolver_service中,有一个数据成员是: io_service,这样就同样创建了一个完成端口对象:
namespace boost {
namespace asio {
namespace detail {
class resolver_service_base
{
...
protected:
// Private io_service used for performing asynchronous host resolution.
scoped_ptr<boost::asio::io_service> work_io_service_;
...
至于该完成端口的用途如何,我们在后续部分再来说明——搽,又开始挖坑了。
在创建了io对象后,例如socket,就需要将此对象和完成端口对象绑定起来,以指示操作系统将该io对象上后续所有的完成事件发送到某个完成端口上,该操作同样是由CreateIoCompletionPort()函数完成,只是所使用的参数不同。
在win_iocp_io_service中,这个操作由下面的代码完成——请注意参数的差别:
boost::system::error_code win_iocp_io_service::register_handle( HANDLE handle, boost::system::error_code& ec) { if (::CreateIoCompletionPort(handle, iocp_.handle, 0, 0)== 0) { DWORD last_error = ::GetLastError(); ec = boost::system::error_code(last_error, boost::asio::error::get_system_category()); } else { ec = boost::system::error_code(); } return ec; } |
通过代码搜索,我们发现函数win_iocp_socket_service_base::do_open()内部调用了register_handle();该函数的作用是打开一个socket(其中调用了socket函数socket()去创建一个socket),也就是说,在打开一个socket后,就把该socket绑定到指定的完成端口上,这样,后续的事件就会发送到完成端口了。
此外还有另外的和assign相关的两个函数也调用了register_handle(),不再贴出其代码了。
IOCP要求至少有一个线程进行服务,也可以有一堆线程;io_service早就为这些线程准备好了服务例程,即io_service::run()函数。
void server::run() { // Create a pool of threads to run all of the io_services. std::vector<boost::shared_ptr<boost::thread> > threads; for (std::size_t i = 0; i < thread_pool_size_; ++i) { boost::shared_ptr<boost::thread> thread( new boost::thread( boost::bind(&boost::asio::io_service::run, &io_service_) ) ); threads.push_back(thread); }
// Wait for all threads in the pool to exit. for (std::size_t i = 0; i < threads.size(); ++i) threads[i]->join(); } |
由于io_service::run()又是委托win_iocp_io_service::run()来实现的,我们来看看后者的实现:
size_t win_iocp_io_service::run(boost::system::error_code& ec) { if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0) { stop(); ec = boost::system::error_code(); return 0; }
win_iocp_thread_info this_thread; thread_call_stack::context ctx(this, this_thread);
size_t n = 0; while (do_one(true, ec)) if (n != (std::numeric_limits<size_t>::max)()) ++n; return n; } |
run()首先检查是否有需要处理的操作,如果没有,函数退出;win_iocp_io_service使用outstanding_work_来记录当前需要处理的任务数。如果该数值不为0,则委托do_one函数继续处理——asio中,所有的脏活累活都在这里处理了。
win_iocp_io_service::do_one函数较长,我们只贴出核心代码
size_t win_iocp_io_service::do_one(bool block, boost::system::error_code& ec) { for (;;) { // Try to acquire responsibility for dispatching timers and completed ops. if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1)?#1 { mutex::scoped_lock lock(dispatch_mutex_);
// Dispatch pending timers and operations. op_queue<win_iocp_operation> ops; ops.push(completed_ops_); timer_queues_.get_ready_timers(ops); post_deferred_completions(ops);?#2 update_timeout(); }
// Get the next operation from the queue. DWORD bytes_transferred = 0; dword_ptr_t completion_key = 0; LPOVERLAPPED overlapped = 0; ::SetLastError(0); BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred, &completion_key, &overlapped, block ? gqcs_timeout : 0);?#3 DWORD last_error = ::GetLastError();
if (overlapped) { win_iocp_operation* op =static_cast<win_iocp_operation*>(overlapped);?#4 boost::system::error_code result_ec(last_error, boost::asio::error::get_system_category());
// We may have been passed the last_error and bytes_transferred in the // OVERLAPPED structure itself. if (completion_key == overlapped_contains_result) { result_ec = boost::system::error_code(static_cast<int>(op->Offset), *reinterpret_cast<boost::system::error_category*>(op->Internal)); bytes_transferred = op->OffsetHigh; }
// Otherwise ensure any result has been saved into the OVERLAPPED // structure. else { op->Internal = reinterpret_cast<ulong_ptr_t>(&result_ec.category()); op->Offset = result_ec.value(); op->OffsetHigh = bytes_transferred; }
// Dispatch the operation only if ready. The operation may not be ready // if the initiating function (e.g. a call to WSARecv) has not yet // returned. This is because the initiating function still wants access // to the operation‘s OVERLAPPED structure. if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1) { // Ensure the count of outstanding work is decremented on block exit. work_finished_on_block_exit on_exit = { this };?#5 (void)on_exit;?#6
op->complete(*this, result_ec, bytes_transferred);?#7 ec = boost::system::error_code(); return 1; } } else if (!ok) { ... |
做一下简要说明:
- #1:变量dispatch_required_记录了由于资源忙,而没有成功投递的操作数;所有这些操作都记录在队列completed_ops_中;
- #2:将所有需要投递的操作,投递出去;至于什么样的操作需要投递,何时投递,以及为先前会投递失败,失败后如何处理等,我们后续说明——再次挖坑了。
- #3:IOCP的核心操作函数GetQueuedCompletionStatus()出现了。该函数导致线程在完成端口上进行等待,直到超时或者某个完成端口数据包到来。
- #4:注意这里将 OVERLAPPED结构直接转换为 operation对象。相关内容在前面的operation:OVERLAPPED基类部分已经有说明。
- #5:该变量保证在操作完成,return之后,win_iocp_io_service对象所记录的任务数outstanding_work_会自动减1——是啊,辛辛苦苦做的事儿,能不记录下来嘛!
- #6:这一行从功能上讲没有什么特别的用途;不过有了这一行,可以抑制有些编译器针对 #5所声明的变量没有被使用的编译器警告;
- #7:调用operation对象的complete()函数,从而调用到异步操作所设定的回调函数。具体流程参考operation:执行流程。
上述的线程函数,会在GetQueuedCompletionStatus()函数上进行等待,直到超时或者有完成端口数据包到来;
完成端口数据包,有两个来源:一个是用户所请求的异步操作完成,异步服务执行者(这里是操作系统)向该完成端口投递完成端口数据包;另外一种情况是,用户自己使用IOCP的另外一个核心函数PostQueuedCompletionStatus()向完成端口投递数据包;
一般的异步操作请求,是不需要用户自己主动向完成端口投递数据的,例如async_read, asyn_write等操作;
有另外一些操作,由于没有对应或者作者并没有采用支持OVERLAPPED IO操作的Win32函数,就需要实现者自己管理完成事件,并进行完成端口数据包的投递,比如:
另外还有一些io_service提供的操作,例如请求io_service执行代为执行指定handler的操作:
所有这些需要自己投递完成端口数据包的操作,基本上都是这样一个投递流程:
OK,至此,基本分析完了operation的投递,总数填了一个前面挖下的坑。
前面说过,Resolver自己会创建一个IOCP,为什么会这样呢?由于Win32下面没有提供对应于地址解析的overlapped版本的函数,为了实现async_resolve操作,作者自己实现了这样一个异步服务。在resolver_service内部,有一个io_service数据成员,该数据成员创建了一个IOCP;除此之外,该service内部还启动一个工作线程来执行io_service::run(),使用此线程来模拟异步服务。
使用resolver进行async_resolve的详细过程如下:
Main Thread (IOCP#1) |
Resolver Thread (IOCP #2) |
|
|
|
|
1.构建主io_service对象, IOCP#1被创建 |
|
|
|
|
|
2.构建 resolver对象, IOCP#2被创建,同时该resolver持有主io_service的引用 |
|
|
|
|
|
3.发起异步调用:resolver.async_resolve() |
|
|
|
|
|
4. resolve_op被创建 |
|
|
|
|
|
5. Resolver线程启动,主线程开始等待 |
|
|
|
|
|
|
|
6.开始运行,激活等待事件,并在 IOCP#2上开始等待 |
|
|
|
7.线程恢复执行;将op投递到 IOCP#2 |
|
|
|
|
|
|
|
8.执行op->do_complete()操作,地址解析完成后,将op再回投给IOCP#1 |
|
|
|
9. do_one()得到从Resolver线程投递回来的op,开始执行op->do_complete()操作,此时回调async_resolve所设置的handler |
|
|
|
|
|
10.结束 |
|
|
请注意step8和 step9,执行同样一个op->do_complete()函数,为什么操作不一样呢?看其实现就知道,该函数内部,会判断执行此函数时的owner,如果owner是主io_service对象,则说明是在主线程中执行,此时进行handler的回调;否则就说明在工作线程中,就去执行真正的地址解析工作;
针对socket上提交的异步请求,可以使用cancel()函数来取消掉该socket上所有没执行的异步请求。
使用该函数,在Windows Vista(不含)以前的版本,有几点需要注意:
针对这些问题,另外的替代方案是:
在windows vista及后续版本中,cancel()函数在内部调用Win32函数 CancelIoEx(),该函数可以取消来自任何线程的异步操作请求,不存在上述问题。
需要注意的是,即使异步请求被取消了,所指定的handler也会被执行,只是传递的error code 为:boost::asio::error::operation_aborted。
该service提供了windows下所有socket相关的功能,是asio在windows环境中一个非常重要的角色,他所提供的函数主要分下面两类:
不过关于该类的实现前面已经做了较多的涉及,不再单独详述了。
现在我们已经把Windows环境下所涉及到的关键部件都涉及到了,此刻我们再回过头来,从高层俯瞰一下asio的架构,看看是否会有不一样的感受呢?事实上,asio的文档用下面的图来说明asio的高层架构——前摄器模式,我们也从这个图开始:
呵呵,其实这张图,从一开始就是为了表达Proactor(前摄器)模式的,基本上它和asio没半毛钱关系,只不过asio既支持同步IO,又支持异步IO,在异步IO部分,是参照Proactor模式来实现的。下面我们来分别说说asio的前摄器模式中的各个组件:
仅仅从asio使用者的角度看,高层的stream_socket_service类就是一个这样的处理器,因为从tcp::socket发送的异步操作都是由其完成处理的。但是从真正实现的角度看,这样的异步操作在Windwos上,大部分由操作系统负责完成的,另外一部分由asio自己负责处理,如resolver_service,因此Windows操作系统和asio一起组成了异步操作处理器。
在Windows平台上,io_service类通过win_iocp_io_service类的do_one()函数把每个异步操作所设定的completion handler调用起来。
在Windows上,asio的完成事件队列由操作系统负责管理;只不过该队列中的数据有两个来源,一个是Windows内部,另外一个是asio中自己PostQueuedCompletionStatus()所提交的事件。
在Windows上,这一功能也是由操作系统完成的,具体来说,我认为是由GetQueuedCompletionStatus完成的,而该函数时由do_one()调用的,因此,从高层的角度来看,这个分离器,也是由io_service负责的。
基于上述信息,我们重绘practor模式架构图如下:
标签:des style blog http ar io color os 使用
原文地址:http://www.cnblogs.com/3chimenea/p/4170238.html