网站首页> 文章专栏> callback 到 awaiter 的自动适配
callback
到 awaiter
的自动适配测试环境:
本文中我们将尝试使用元编程技术对 callback 接口函数
进行自动适配,将其转化为 awaiter
对象。
namespace callback
{
using sched = std::list<std::function<void()>>;
void add(sched &_sched, int &&_a, const int &_b, double _c, double _d, std::function<void(int, double)> _f){
_sched.emplace_back([_f, x = _a + _b, y = _c + _d]{_f(x, y);});
}
void mul(sched &_sched, std::function<void(std::size_t, std::size_t)> _f, int _a, int _b, std::size_t _c){
_sched.emplace_back([_f, x = _a * _b, y = _c]{_f(x, y);});
}
void dup(sched &_sched, std::function<void(std::size_t)> _im, std::function<void(std::size_t)> _f, std::size_t _e){
_im(_e * 2);
_sched.emplace_back([_f, _e]{_f(_e * 3);});
}
}
namespace coro
{
struct promise;
struct coroutine : std::coroutine_handle<promise>{using promise_type = struct promise;};
struct promise{
coroutine get_return_object(){return {coroutine::from_promise(*this)};}
std::suspend_never initial_suspend() const noexcept{return {};}
std::suspend_never final_suspend() const noexcept{return {};}
void return_void() noexcept{}
void unhandled_exception() const noexcept{}
};
}
int main(int _argc, char* _argv[])
{
// task queue.
callback::sched sched;
static constexpr auto im = [](auto){std::cout << "[immediate]" << std::endl;};
// tiny callback hell.
const int ref = 257;
callback::add(
sched, 257, ref, 57000.0f, 57000.0f,
[&sched](auto _x, auto _y){
std::cout << "callback<add>: " << _x + _y << std::endl;
const int e = _x + _y;
callback::mul(
sched,
[&sched, e](auto _x, auto _y){
std::cout << "callback<mul>: " << e + _x * _y << std::endl;
callback::dup(sched, im, [](auto _x){std::cout << "callback<dup>: " << _x << std::endl;}, 177);
},
2, 8, 112831
);
}
);
// coroutine.
[&sched, &ref]() -> coro::coroutine {
const int e = std::apply(
[](auto _x, auto _y){std::cout << "coroutine<add>: " << _x + _y << std::endl; return _x + _y;},
co_await adaptor::make<callback::add>::awaitable(sched, 257, ref, 57000.0f, 57000.0f)
);
std::apply(
[e](auto _x, auto _y){std::cout << "coroutine<mul>: " << e + _x * _y << std::endl;},
co_await adaptor::make<callback::mul>::awaitable(sched, 2, 8, 112831)
);
std::apply(
[](auto _x){std::cout << "coroutine<dup>: " << _x << std::endl;},
co_await adaptor::make<callback::dup, adaptor::select<2>>::awaitable(sched, im, 177)
);
}();
// invoke task queue.
while(sched.size())
std::cout << "[queue.size() = " << sched.size() << "] ", sched.front()(), sched.pop_front();
return 0;
}
[queue.size() = 2] callback<add>: 114514
[queue.size() = 2] coroutine<add>: 114514
[queue.size() = 2] callback<mul>: 1919810
[immediate]
[queue.size() = 2] coroutine<mul>: 1919810
[immediate]
[queue.size() = 2] callback<dup>: 531
[queue.size() = 1] coroutine<dup>: 531
callback::add(...)
,callback::mul(...)
与 callback::dup(...)
在本文中称为 回调接口
std::function<...>
的参数 _f
在本文中称为 回调函数
我们知道 callback
和 coroutine
都涉及状态机停机与启动。
回调接口调用
与 co_await 运算符
对应,皆为状态机的停机和再启动点回调接口调用
而言,其状态机前半为调用处之前的函数过程,后半为 回调函数
co_await 运算符
而言,其状态机前半为其之前的协程过程,后半为其后的协程过程可调用实体(callable entity)
回调接口
,其为 回调函数
coroutine
,其为 await_suspend(_h)
通过参数接收到的 coroutine_handle
回调执行流由 回调接口
提供,因此我们需要将 coroutine_handle
包装为 coroutine_handle 封装器
,对原先由 回调接口
接收的 回调函数
进行替换,将协程的后半部分状态机间接委托给 回调接口
。coroutine_handle 封装器
一般是一个简单的 lambda
表达式。同时我们需要将被替换掉的原先的 回调函数
置于协程内的 co_await 运算符
之后,以 co_await 表达式
的返回值作为参数对其进行调用,回调嵌套即可通过这一过程展开。
需要注意的是,对 callback
到 awaiter
的适配无法覆盖所有情况。
回调接口
可以产生 fork
语义,而 coroutine
则不行coroutine
表示链式调用的 callback
co_await 运算符
模拟并行的 回调接口
,其将无法并行执行回调接口
,需要构造额外的并发 awaiter
包装器,本文内容不包括这一技术fork
语义的 回调接口
调用做并发适配,协程也不得不对其进行额外的 join
协程
需要使得其自身先停机,此后其控制权才能被转移给 awaiter
。
由于 coroutine_handle 封装器
内需要封装 coroutine_handle
,而后者必须在协程状态机停机后才能够被 awaiter
获取,所以 awaiter
必须在此前暂存由协程传入的其他 回调接口
参数。此后,当 协程
完成停机,coroutine_handle
将被传入 await_suspend
,并被绑定为 coroutine_handle 封装器
。在 await_suspend
中,暂存的参数将与 coroutine_handle 封装器
进行拼接,拼接后的参数列表将用于调用 回调接口
。此种对参数的暂存操作等价于 柯里化
,如果协程对象本身需要对 awaiter
进行依赖反向注入 (如为其提供 调度器
),则也可以通过 柯里化
实现 (本文中没有演示,需要借助 await_transform
接口协议)。
除 timer
和 无参数事件通知
外的几乎所有 回调函数
都需要接收异步信息。
比如 receive
异步回调往往通过其参数接收操作系统已经获取到的网络数据流或错误代码。
本文中我们将这些被传递给回调函数的异步信息称为 回调结果
。
回调结果
将通过绑定后的 回调函数
(既 coroutine_handle 封装器
) 的参数通知 awaiter
对象。回调结果
将被暂存在 awaiter
对象内部,此后由 coroutine_handle 封装器
对 协程上下文
进行恢复,协程恢复后 awaiter
对象将立即把刚刚获取的 回调结果
通过 await_resume
传递给协程。
协程
→ 参数
→ 暂存参数
→ co_await
→ 协程停机
→ await_suspend
→ coroutine_handle
→ coroutine_handle 封装器
→ 拼接
→ 回调接口
回调函数
→ 回调结果
→ 暂存结果
→ 协程恢复
→ await_resume
→ 结果
→ 协程
协程内部只能观测到: 参数
→ 暂存参数
→ co_await
→ 结果
#include <iostream>
#include <tuple>
#include <list>
#include <functional>
#include <coroutine>
#include "functor_info.h"
#include "tuple.h"
#include "flat.h"
#include "view.h"
namespace adaptor
{
namespace detail
{
template<fake::functor_c auto &_functor, fake::mezz_c _Index, fake::tuple_c _Retn, typename... _Args>
struct awaiter{
using args_type = std::tuple<_Args...>;
using retn_type = std::remove_cvref_t<_Retn>;
awaiter(_Args ..._args): args{std::forward<_Args>(_args)...}{}
static constexpr bool await_ready() noexcept{return false;}
void await_suspend(auto _h){
[this, _h]<std::size_t... _index>(std::index_sequence<_index...>){
const auto each = [this, _h](fake::mezz_c auto _i) -> decltype(auto) {
if constexpr(_i.value < _Index::value)
return std::get<_i.value>(std::move(args));
if constexpr(_i.value == _Index::value)
return [this, _h](auto &&..._retn){retn = {std::forward<decltype(_retn)>(_retn)...}, _h();};
if constexpr(_i.value > _Index::value)
return std::get<_i.value - 1>(std::move(args));
};
std::apply(_functor, std::forward_as_tuple(each(fake::mezz_v<_index>)...));
}(std::make_index_sequence<sizeof...(_Args) + 1>());
}
retn_type await_resume(){return std::move(retn);}
private:
args_type args;
retn_type retn;
};
inline constexpr auto detector = [](fake::pack_c auto _pack){
return fake::functor_c<fake::take_t<decltype(_pack){}>>;
};
template<fake::functor_c auto &_functor, auto _detector>
struct impl{
static consteval auto analyse() noexcept{
using state = fake::query<
fake::mate<fake::view_t<"index">, std::tuple<>>,
fake::mate<fake::view_t<"retn">, std::tuple<>>,
fake::mate<fake::view_t<"args">, std::tuple<>>
>;
constexpr auto each = [](fake::pack_c auto _state, fake::mezz_c auto _index, fake::pack_c auto _type){
using state = fake::take_t<decltype(_state){}>;
using type = fake::take_t<decltype(_type){}>;
constexpr bool is_callback = [](fake::mezz_c auto _index, fake::pack_c auto _type){
if constexpr(requires{_detector(_type, _index);})
return _detector(_type, _index);
else
return _detector(_type);
}(_index, _type);
if constexpr(is_callback){
using index = std::remove_cvref_t<decltype(_index)>;
using results = typename fake::functor_info<std::remove_cvref_t<type>>::tuple;
using retn_t = fake::query_rebind_t<state, fake::view_v<"retn">, results>;
using next_t = fake::query_rebind_t<retn_t, fake::view_v<"index">, index>;
return fake::pack_v<next_t>;
}
else{
using args = std::remove_cvref_t<decltype(std::declval<state>()[fake::view_v<"args">])>;
using store = std::conditional_t<
std::is_rvalue_reference_v<type>,
std::remove_reference_t<type>,
type
>;
using arguments = fake::tuple::emplace_back_t<args, store>;
using args_t = fake::query_rebind_t<state, fake::view_v<"args">, arguments>;
return fake::pack_v<args_t>;
}
};
using callback_args = typename fake::functor_info<decltype(_functor)>::tuple;
using result = fake::tuple::for_each_t<callback_args, state, each>;
return fake::pack_v<result>;
}
static consteval auto make(fake::pack_c auto _pack) noexcept{
using state = fake::take_t<decltype(_pack){}>;
using index = std::remove_cvref_t<decltype(std::declval<state>()[fake::view_v<"index">])>;
using retn = std::remove_cvref_t<decltype(std::declval<state>()[fake::view_v<"retn">])>;
using args = std::remove_cvref_t<decltype(std::declval<state>()[fake::view_v<"args">])>;
using callback_args = typename fake::functor_info<decltype(_functor)>::tuple;
static_assert(std::tuple_size_v<callback_args> == std::tuple_size_v<args> + 1, "detector failed");
return []<typename... _Args>(fake::type_package<std::tuple<_Args...>>){
return fake::pack_v<awaiter<_functor, index, retn, _Args...>>;
}(fake::pack_v<args>);
}
using meta_info = fake::take_t<analyse()>;
using type = fake::take_t<make(fake::pack_v<meta_info>)>;
};
}
template<fake::functor_c auto &_functor, auto _detector = detail::detector>
struct make{using awaitable = typename detail::impl<_functor, _detector>::type;};
template<std::size_t _index>
inline constexpr auto select = [](auto, fake::mezz_c auto _i){return _i.value == _index;};
}
由于 回调接口
的参数包括 回调函数
与 其他参数
,而 awaiter
的参数只包括 其他参数
,所以我们需要使用元编程手段分析 回调接口
的所有参数类型,从中找出 回调函数
的位置,并通过 std::forward_as_tuple(...)
函数对所有参数进行重组,将 coroutine_handle 封装器
作为新的回调函数插入到 回调接口
的参数列表中,再通过 std::apply(_f, _t)
接口传递给 回调接口
。
示例中需要使用 fake
库中的 模板元编程
工具集。
awaiter
模板类(adaptor::detail::awaiter<_functor, _Index, _Retn, _Args...>
)所需的编译期信息如下回调接口
的引用 _functor
(其为编译期或连接期常量)回调接口
的参数列表中 回调函数
所在的位置 _Index
回调函数
的参数既 co_await 表达式
的返回值,其类型为 _Retn
回调接口
所需的除 回调函数
外的所有其他参数,其类型为 _Args...
对于 callable
的参数类型,我们可以通过 fake::functor_info<...>
进行解析
_functor
回调接口
,适配器的功能就是将其自动转换为 awaiter
_Index
回调接口
的参数列表,找出其中类型为 回调函数
的参数回调函数
类型的判别条件可由模板参数 _detector
来指定_detector
是一个封装了 constraint
的 lambda
表达式。其接收类型,返回 bool
_detector
选择器将任何 callable
类型视为 回调函数
fake::mezz_t
用于保存编译期数值常量,其功能与 std::integral_constant
类似_Retn
回调接口
参数列表的遍历过程中,一旦判别出 回调函数
,则记录后者的参数类型列表回调函数
通过参数接收 回调结果
,将其转为 std::tuple<...>
既 awaiter
的返回类型_Args...
fake::tuple::emplace_back_t<...>
工具对 回调接口
中的除 回调函数
以外的其他参数类型进行记录实例化 awaiter
模板类所需的编译期信息中,后三项需要在对 回调接口
的参数列表的遍历中进行解析和记录。由于模板元编程是无状态的,因此我们需要手动构造状态机,并通过递归来模拟迭代过程。递归实现被封装在 fake::tuple::for_each_t<...>
工具中,在示例中我们以类似于迭代的语法对 回调接口
的参数列表进行分析。回调接口
的各个参数通过 fake::type_package<_T> 类型包
传递给 lambda
表达式 each
。
在迭代过程中,我们使用 _state
参数来记录编译期信息,它是一个 类型包
,其中包装的类型是一个 fake::query<...>
模板实例。fake::query<...>
类型是一个编译期字典,在迭代过程中,我们通过修改其 value
的类型来进行编译期类型记录。对 fake::query
中 value
类型的替换是通过 fake::query_rebind_t<...>
工具实现的,其参数分别为原 fake::query
类型,key
类型和要设置的 value
类型。本示例中的 key
类型为 fake::view<_char...>
类型,它是一种编译期字符串类型,如果 fake::view
储存的字符串不同,其模板实例的类型也不同。其变长模板参数列表既为字符串本身。
_detector
参数通过对 constraint
进行函数对象化,使得我们可以从外部为适配器设置 回调函数选择器
,这一设计可以使适配器自动适应任何 回调接口
参数顺序。但在最糟糕的情况下,如果 回调接口
的参数列表中有其他参数类型与 回调函数
过于相似,以至于二者无法被 回调函数选择器
区分,则我们可以传入 adaptor::select<_index>
来强制指定 回调函数
在 回调接口
参数列表中的位置。
注意参数的引用类型,我们应当去除 右值引用
并在 awaiter
暂存参数时对其进行移动
awaiter
中的参数暂存与结果暂存awaiter
是一次性的,因此将全程对回调数据进行 std::move
我们已经在 回调接口
解析过程中通过元编程收集了用于暂存 回调参数
和 回调结果
的 std::tuple<...>
类型。
回调参数
暂存发生在 awaiter
对象的构造函数中,我们直接将其 std::forward
给 awaiter
的 args
成员。此后我们需要在 await_suspend
中对其与 coroutine_handle 封装器
进行拼接,这是由于 coroutine_handle
在此时才会被协程控制器传入。 回调接口
的全部参数包括 args
中暂存的参数和 回调函数
两部分。我们需要把原先的 回调函数
替换为 coroutine_handle 封装器
,用于在回调事件发生时将 回调结果
通知给 awaiter
,以及在此后恢复 coroutine
上下文。
coroutine_handle 封装器
:if constexpr(_i.value == _Index::value)
return [this, _h](auto &&..._retn){retn = {std::forward<decltype(_retn)>(_retn)...}, _h();};
回调结果
暂存则发生在 coroutine_handle 封装器
内部。实现中我们将 coroutine_handle 封装器
用作 回调函数
,其参数由回调事件传入,这些参数既 回调结果
。我们直接将 回调结果
std::forward
给 retn
成员。
此后 coroutine_handle 封装器
通过调用 coroutine_handle
恢复协程上下文,这会导致协程上下文自动调用 await_resume
接口。我们通过访问 awaiter
的 retn
成员,将之前暂存的 回调结果
传递给协程。
由于我们使用 std::tuple
传递 回调结果
,所以我们可以直接使用 std::apply(_f, _t)
调用适配前的原 回调函数
。这样我们就顺利地将嵌套的 回调接口调用
适配成了 co_await 表达式
。
泛型回调接口
,如接口为泛型则需要对其进行手动模板特化fork
语义的回调方式,这是由于协程本身只包含一个执行流地址: www.purecpp.cn
转载请注明出处!
purecpp
一个很酷的modern c++开源社区
purecpp社区自2015年创办以来,以“Newer is Better”为理念,相信新技术可以改变世界,一直致力于现代C++研究、应用和技术创新,期望通过现代C++的技术创新来提高企业生产力和效率。
社区坚持只发表原创技术文章,已经累计发表了一千多篇原创C++技术文章;
组织了十几场的C++沙龙和C++大会,有力地促进了国内外C++开发者之间的技术交流;
开源了十几个现代C++项目,被近百家公司所使用,有力地推动了现代C++在企业中的应用。
期待更多的C++爱好者能参与到社区C++社区的建设中来,一起为现代C++开源项目添砖加瓦,一起完善C++基础设施和生态圈。
微信公众号:purecpp, 社区邮箱: purecpp@163.com