标签:value 函数参数 一个 参数 tuple 异步任务 tor throw inter
在开始之前,我们先修复上一篇文章中的一个bug,SharedState::add_finish_callback中post_all_callbacks应当提前判断settled,否则会在未设置结果的情况下添加callback,callback也会被立即post
template<class T>
class SharedState : public SharedStateBase {
// ...
// private
void add_finish_callback(std::function<void(T&)> callback) {
finish_callbacks.push_back(std::move(callback));
if (settled) {
post_all_callbacks();
}
}
};
今天我们要实现的东西包括
如果你看到这行文字,说明这篇文章被无耻的盗用了(或者你正在选中文字),请前往 cnblogs.com/pointer-smq 支持原作者,谢谢
首先是Schedular的timer支持,我们这里使用一个简单的优先队列用来管理所有timer,并在poll函数中处理完当帧pending_state后,视情况sleep到最近的timer到期,并处理所有到期的timer
class Schedular {
// ...
// public
using timer_callback = std::function<void()>;
using timer_item = std::tuple<bool, float, chrono::time_point<chrono::steady_clock, chrono::duration<float>>, timer_callback>;
using timer = std::chrono::steady_clock;
struct timer_item_cmp {
bool operator()(const timer_item& a, const timer_item& b) const {
return std::get<2>(a) > std::get<2>(b);
}
};
// ...
// public
void poll() {
size_t sz = pending_states.size();
for (size_t i = 0; i != sz; i++) {
auto state = std::move(pending_states[i]);
state->invoke_all_callback();
}
pending_states.erase(pending_states.begin(), pending_states.begin() + sz);
if (timer_queue.empty()) {
return;
}
if (pending_states.empty()) { //如果pending_states为空,则可以sleep较长的时间,等待第一个将要完成的timer
std::this_thread::sleep_until(std::get<2>(timer_queue.front()));
auto now = timer::now();
do {
deal_one_timer();
} while (!timer_queue.empty() && std::get<2>(timer_queue.front()) <= now);
} else { //否则只能处理当帧到期的timer,不能sleep,要及时返回给caller,让caller及时下一次poll处理剩下的pending_states
auto now = timer::now();
while (!timer_queue.empty() && std::get<2>(timer_queue.front()) <= now) {
deal_one_timer();
}
}
}
void add_timer(bool repeat, float delay, timer_callback callback) {
auto cur_time = chrono::time_point_cast<chrono::duration<float>>(timer::now());
auto timeout = cur_time + chrono::duration<float>(delay);
timer_queue.emplace_back(repeat, delay, timeout, callback);
std::push_heap(timer_queue.begin(), timer_queue.end(), timer_item_cmp{});
}
// ...
// private
void deal_one_timer() {
std::pop_heap(timer_queue.begin(), timer_queue.end(), timer_item_cmp{});
auto item = std::move(timer_queue.back());
timer_queue.pop_back();
std::get<3>(item)();
if (std::get<0>(item)) {
add_timer(true, std::get<1>(item), std::move(std::get<3>(item)));
}
}
std::deque<timer_item> timer_queue;
};
这样之后,基于当前调度器的delay函数就可以写出来了
class Schedular {
// ...
// public
Future<float> delay(float second) {
auto promise = Promise<float>(*this);
add_timer(false, second, [=]() mutable {
promise.set_result(second);
});
return promise.get_future();
}
// ...
};
需要注意的是,这个delay函数虽然返回Future,但并不是协程,协程的判断标准是当且仅当函数中使用了co_await/co_yield/co_return,和返回类型无关。
这个函数同样展示了将回调式API封装为Future的做法,就是把Promise.set_result作为回调传入给API,并返回Promise.get_future,使用者在Future这边等待就好了。
有了这些东西之后,我们可以先把本次的测试代码写出来了
如果你看到这行文字,说明这篇文章被无耻的盗用了(或者你正在选中文字),请前往 cnblogs.com/pointer-smq 支持原作者,谢谢
Future<float> func(Schedular& schedular) {
std::cout << "start sleep\n";
auto r = co_await schedular.delay(1.2);
co_return r;
}
Future<int> func2(Schedular& schedular) {
auto r = co_await func(schedular);
std::cout << "slept for " << r << "s\n";
co_return 42;
}
如果你看到这行文字,说明这篇文章被无耻的盗用了(或者你正在选中文字),请前往 cnblogs.com/pointer-smq 支持原作者,谢谢
为了让Future支持协程,代码中还需要补充一系列的内容,列举在下面
template<class T>
class Future {
// ...
// public
// 协程接口
using promise_type = Promise<T>;
bool await_ready() { return _state->settled; }
void await_suspend(exp::coroutine_handle<> handle) {
add_finish_callback([=](T&) mutable { handle.resume(); });
}
T await_resume() { return _state->value; }
// 协程接口
// ...
};
为了让Promise支持协程,需要补充的内容在下面
// ...
// 在最开头
// 如果你的编译器已经不需要std::experimental了,那就去掉这行,后面使用std而不是exp
namespace exp = std::experimental;
template<class T>
class Promise {
// ...
// public
// 协程接口
Future<T> get_return_object();
exp::suspend_never initial_suspend() { return {}; }
exp::suspend_never final_suspend() noexcept { return {}; }
void return_value(T v) { set_result(v); }
void unhandled_exception() { std::terminate(); }
// 协程接口
// ...
};
// ...
// 在Future定义后面
template<class T>
Future<T> Promise<T>::get_return_object() {
return get_future();
}
有了这些东西之后,编译就不应该再出现错误了,我的编译选项是 clang++-9 test.cpp -stdlib=libc++ -std=c++2a
为了方便,我们效法python,给Schedular补一个run_until_compete的方法
如果你看到这行文字,说明这篇文章被无耻的盗用了(或者你正在选中文字),请前往 cnblogs.com/pointer-smq 支持原作者,谢谢
class Schedular {
// ...
// public
template<class F, class... Args>
auto run_until_complete(F&& fn, Args&&... args) -> typename std::invoke_result_t<F&&, Args&&...>::result_type {
auto future = std::forward<F>(fn)(std::forward<Args>(args)...);
while (!future.await_ready()) {
poll();
}
return future.await_resume();
}
};
int main() {
Schedular schedular;
auto r = schedular.run_until_complete(func2, schedular);
std::cout << "run complete with " << r << "\n";
}
start sleep
slept for 1.2s
run complete with 42
如果你看到这行文字,说明这篇文章被无耻的盗用了(或者你正在选中文字),请前往 cnblogs.com/pointer-smq 支持原作者,谢谢
#include <vector>
#include <deque>
#include <memory>
#include <iostream>
#include <functional>
#include <chrono>
#include <thread>
#include <algorithm>
#include <experimental/coroutine>
namespace exp = std::experimental;
template<class T>
class Future;
template<class T>
class Promise;
class Schedular;
class SharedStateBase : public std::enable_shared_from_this<SharedStateBase> {
friend class Schedular;
public:
virtual ~SharedStateBase() = default;
private:
virtual void invoke_all_callback() = 0;
};
template<class T>
class SharedState : public SharedStateBase {
friend class Future<T>;
friend class Promise<T>;
public:
SharedState(Schedular& schedular)
: schedular(&schedular)
{}
SharedState(const SharedState&) = delete;
SharedState(SharedState&&) = delete;
SharedState& operator=(const SharedState&) = delete;
SharedState& operator=(SharedState&&) = delete;
private:
template<class U>
void set(U&& v) {
if (settled) {
return;
}
settled = true;
value = std::forward<U>(v);
post_all_callbacks();
}
T& get() { return value; }
void add_finish_callback(std::function<void(T&)> callback) {
finish_callbacks.push_back(std::move(callback));
if (settled) {
post_all_callbacks();
}
}
void post_all_callbacks();
virtual void invoke_all_callback() override {
callback_posted = false;
size_t sz = finish_callbacks.size();
for (size_t i = 0; i != sz; i++) {
auto v = std::move(finish_callbacks[i]);
v(value);
}
finish_callbacks.erase(finish_callbacks.begin(), finish_callbacks.begin()+sz);
}
bool settled = false;
bool callback_posted = false;
Schedular* schedular = nullptr;
T value;
std::vector<std::function<void(T&)>> finish_callbacks;
};
template<class T>
class Promise {
public:
Promise(Schedular& schedular)
: _schedular(&schedular)
, _state(std::make_shared<SharedState<T>>(*_schedular))
{}
Future<T> get_future();
// 协程接口
Future<T> get_return_object();
exp::suspend_never initial_suspend() { return {}; }
exp::suspend_never final_suspend() noexcept { return {}; }
void return_value(T v) { set_result(v); }
void unhandled_exception() { std::terminate(); }
// 协程接口
template<class U>
void set_result(U&& value) {
if (_state->settled) {
throw std::invalid_argument("already set result");
}
_state->set(std::forward<U>(value));
}
private:
Schedular* _schedular;
std::shared_ptr<SharedState<T>> _state;
};
template<class T>
class Future {
public:
using result_type = T;
using promise_type = Promise<T>;
friend class Promise<T>;
private:
Future(std::shared_ptr<SharedState<T>> state)
: _state(std::move(state))
{
}
public:
// 协程接口
bool await_ready() { return _state->settled; }
void await_suspend(exp::coroutine_handle<> handle) {
add_finish_callback([=](T&) mutable { handle.resume(); });
}
T await_resume() { return _state->value; }
// 协程接口
void add_finish_callback(std::function<void(T&)> callback) {
_state->add_finish_callback(std::move(callback));
}
private:
std::shared_ptr<SharedState<T>> _state;
};
template<class T>
Future<T> Promise<T>::get_future() {
return Future<T>(_state);
}
template<class T>
Future<T> Promise<T>::get_return_object() {
return get_future();
}
namespace chrono = std::chrono;
class Schedular {
template<class T>
friend class SharedState;
public:
using timer_callback = std::function<void()>;
using timer_item = std::tuple<bool, float, chrono::time_point<chrono::steady_clock, chrono::duration<float>>, timer_callback>;
using timer = std::chrono::steady_clock;
struct timer_item_cmp {
bool operator()(const timer_item& a, const timer_item& b) const {
return std::get<2>(a) > std::get<2>(b);
}
};
Schedular() = default;
Schedular(Schedular&&) = delete;
Schedular(const Schedular&) = delete;
Schedular& operator=(Schedular&&) = delete;
Schedular& operator=(const Schedular&) = delete;
void poll() {
size_t sz = pending_states.size();
for (size_t i = 0; i != sz; i++) {
auto state = std::move(pending_states[i]);
state->invoke_all_callback();
}
pending_states.erase(pending_states.begin(), pending_states.begin() + sz);
if (timer_queue.empty()) {
return;
}
if (pending_states.empty()) { //如果pending_states为空,则可以sleep较长的时间,等待第一个将要完成的timer
std::this_thread::sleep_until(std::get<2>(timer_queue.front()));
auto now = timer::now();
do {
deal_one_timer();
} while (!timer_queue.empty() && std::get<2>(timer_queue.front()) <= now);
} else { //否则只能处理当帧到期的timer,不能sleep,要及时返回给caller,让caller及时下一次poll处理剩下的pending_states
auto now = timer::now();
while (!timer_queue.empty() && std::get<2>(timer_queue.front()) <= now) {
deal_one_timer();
}
}
}
template<class F, class... Args>
auto run_until_complete(F&& fn, Args&&... args) -> typename std::invoke_result_t<F&&, Args&&...>::result_type {
auto future = std::forward<F>(fn)(std::forward<Args>(args)...);
while (!future.await_ready()) {
poll();
}
return future.await_resume();
}
void add_timer(bool repeat, float delay, timer_callback callback) {
auto cur_time = chrono::time_point_cast<chrono::duration<float>>(timer::now());
auto timeout = cur_time + chrono::duration<float>(delay);
timer_queue.emplace_back(repeat, delay, timeout, callback);
std::push_heap(timer_queue.begin(), timer_queue.end(), timer_item_cmp{});
}
Future<float> delay(float second) {
auto promise = Promise<float>(*this);
add_timer(false, second, [=]() mutable {
promise.set_result(second);
});
return promise.get_future();
}
private:
void deal_one_timer() {
std::pop_heap(timer_queue.begin(), timer_queue.end(), timer_item_cmp{});
auto item = std::move(timer_queue.back());
timer_queue.pop_back();
std::get<3>(item)();
if (std::get<0>(item)) {
add_timer(true, std::get<1>(item), std::move(std::get<3>(item)));
}
}
void post_call_state(std::shared_ptr<SharedStateBase> state) {
pending_states.push_back(std::move(state));
}
std::vector<std::shared_ptr<SharedStateBase>> pending_states;
std::deque<timer_item> timer_queue;
};
template<class T>
void SharedState<T>::post_all_callbacks() {
if (callback_posted) {
return;
}
callback_posted = true;
schedular->post_call_state(shared_from_this());
}
Future<float> func(Schedular& schedular) {
std::cout << "start sleep\n";
auto r = co_await schedular.delay(1.2);
co_return r;
}
Future<int> func2(Schedular& schedular) {
auto r = co_await func(schedular);
std::cout << "slept for " << r << "s\n";
co_return 42;
}
int main() {
Schedular schedular;
auto r = schedular.run_until_complete(func2, schedular);
std::cout << "run complete with " << r << "\n";
}
C++20协程解糖 - 动手实现协程2 - 实现co_await和co_return
标签:value 函数参数 一个 参数 tuple 异步任务 tor throw inter
原文地址:https://www.cnblogs.com/pointer-smq/p/12940360.html