网站首页> 文章专栏> callback 到 awaiter 的自动适配
callback 到 awaiter 的自动适配
编辑时间:2023-11-04 05:32:17 作者:loop 0条评论

callbackawaiter 的自动适配


本文中我们将尝试使用元编程技术对 callback 接口函数 进行自动适配,将其转化为 awaiter 对象。

  • 使用例:
namespace callback
{

    using sched = std::list<std::function<void()>>;

    void add(sched &_sched, int &&_a, const int &_b, double _c, double _d, std::function<void(int, double)> _f){
        _sched.emplace_back([_f, x = _a + _b, y = _c + _d]{_f(x, y);});
    }

    void mul(sched &_sched, std::function<void(std::size_t, std::size_t)> _f, int _a, int _b, std::size_t _c){
        _sched.emplace_back([_f, x = _a * _b, y = _c]{_f(x, y);});
    }

    void dup(sched &_sched, std::function<void(std::size_t)> _im, std::function<void(std::size_t)> _f, std::size_t _e){
        _im(_e * 2);
        _sched.emplace_back([_f, _e]{_f(_e * 3);});
    }

}

namespace coro
{

    struct promise;
    struct coroutine : std::coroutine_handle<promise>{using promise_type = struct promise;};
    struct promise{
        coroutine get_return_object(){return {coroutine::from_promise(*this)};}
        std::suspend_never initial_suspend() const noexcept{return {};}
        std::suspend_never final_suspend() const noexcept{return {};}
        void return_void() noexcept{}
        void unhandled_exception() const noexcept{}
    };

}

int main(int _argc, char* _argv[])
{
    // task queue. 
    callback::sched sched;
    static constexpr auto im = [](auto){std::cout << "[immediate]" << std::endl;};

    // tiny callback hell. 
    const int ref = 257;
    callback::add(
        sched, 257, ref, 57000.0f, 57000.0f,
        [&sched](auto _x, auto _y){
            std::cout << "callback<add>: " << _x + _y << std::endl;
            const int e = _x + _y;

            callback::mul(
                sched,
                [&sched, e](auto _x, auto _y){
                    std::cout << "callback<mul>: " << e + _x * _y << std::endl;

                    callback::dup(sched, im, [](auto _x){std::cout << "callback<dup>: " << _x << std::endl;}, 177);
                },
                2, 8, 112831
            );
        }
    );

    // coroutine. 
    [&sched, &ref]() -> coro::coroutine {
        const int e = std::apply(
            [](auto _x, auto _y){std::cout << "coroutine<add>: " << _x + _y << std::endl; return _x + _y;},
            co_await adaptor::make<callback::add>::awaitable(sched, 257, ref, 57000.0f, 57000.0f)
        );
        std::apply(
            [e](auto _x, auto _y){std::cout << "coroutine<mul>: " << e + _x * _y << std::endl;},
            co_await adaptor::make<callback::mul>::awaitable(sched, 2, 8, 112831)
        );
        std::apply(
            [](auto _x){std::cout << "coroutine<dup>: " << _x << std::endl;},
            co_await adaptor::make<callback::dup, adaptor::select<2>>::awaitable(sched, im, 177)
        );
    }();

    // invoke task queue. 
    while(sched.size())
        std::cout << "[queue.size() = " << sched.size() << "] ", sched.front()(), sched.pop_front();

    return 0;
}
  • output:
[queue.size() = 2] callback<add>: 114514
[queue.size() = 2] coroutine<add>: 114514
[queue.size() = 2] callback<mul>: 1919810
[immediate]
[queue.size() = 2] coroutine<mul>: 1919810
[immediate]
[queue.size() = 2] callback<dup>: 531
[queue.size() = 1] coroutine<dup>: 531

  • 术语约定
    • 接口 callback::add(...)callback::mul(...)callback::dup(...) 在本文中称为 回调接口
    • 其参数列表中类型为 std::function<...> 的参数 _f 在本文中称为 回调函数

需求分析

执行流分析

我们知道 callbackcoroutine 都涉及状态机停机与启动。

  • 回调接口调用co_await 运算符 对应,皆为状态机的停机和再启动点
    • 对于 回调接口调用 而言,其状态机前半为调用处之前的函数过程,后半为 回调函数
    • 对于 co_await 运算符 而言,其状态机前半为其之前的协程过程,后半为其后的协程过程
  • 对于状态机后半部分状态的执行流控制,二者的委托皆为 可调用实体(callable entity)
    • 对于 回调接口,其为 回调函数
    • 对于 coroutine,其为 await_suspend(_h) 通过参数接收到的 coroutine_handle

回调执行流由 回调接口 提供,因此我们需要将 coroutine_handle 包装为 coroutine_handle 封装器,对原先由 回调接口 接收的 回调函数 进行替换,将协程的后半部分状态机间接委托给 回调接口coroutine_handle 封装器 一般是一个简单的 lambda 表达式。同时我们需要将被替换掉的原先的 回调函数 置于协程内的 co_await 运算符 之后,以 co_await 表达式 的返回值作为参数对其进行调用,回调嵌套即可通过这一过程展开。

需要注意的是,对 callbackawaiter 的适配无法覆盖所有情况。

  • 回调接口 可以产生 fork 语义,而 coroutine 则不行
    • 因此我们只可能用 coroutine 表示链式调用的 callback
    • 如果使用串行的 co_await 运算符 模拟并行的 回调接口 ,其将无法并行执行
    • 对于并行 回调接口,需要构造额外的并发 awaiter 包装器,本文内容不包括这一技术
    • 即使对具有 fork 语义的 回调接口 调用做并发适配,协程也不得不对其进行额外的 join

数据流分析

协程 需要使得其自身先停机,此后其控制权才能被转移给 awaiter

由于 coroutine_handle 封装器 内需要封装 coroutine_handle,而后者必须在协程状态机停机后才能够被 awaiter 获取,所以 awaiter 必须在此前暂存由协程传入的其他 回调接口 参数。此后,当 协程 完成停机,coroutine_handle 将被传入 await_suspend,并被绑定为 coroutine_handle 封装器。在 await_suspend 中,暂存的参数将与 coroutine_handle 封装器 进行拼接,拼接后的参数列表将用于调用 回调接口。此种对参数的暂存操作等价于 柯里化,如果协程对象本身需要对 awaiter 进行依赖反向注入 (如为其提供 调度器),则也可以通过 柯里化 实现 (本文中没有演示,需要借助 await_transform 接口协议)。

timer无参数事件通知 外的几乎所有 回调函数 都需要接收异步信息。

比如 receive 异步回调往往通过其参数接收操作系统已经获取到的网络数据流或错误代码。

本文中我们将这些被传递给回调函数的异步信息称为 回调结果

回调结果 将通过绑定后的 回调函数 (既 coroutine_handle 封装器) 的参数通知 awaiter 对象。回调结果 将被暂存在 awaiter 对象内部,此后由 coroutine_handle 封装器协程上下文 进行恢复,协程恢复后 awaiter 对象将立即把刚刚获取的 回调结果 通过 await_resume 传递给协程。

  • 执行流挂起前
    • 协程参数暂存参数co_await协程停机await_suspendcoroutine_handlecoroutine_handle 封装器拼接回调接口
  • 执行流恢复后
    • 回调函数回调结果暂存结果协程恢复await_resume结果协程

协程内部只能观测到: 参数暂存参数co_await结果


适配器实现

#include <iostream>
#include <tuple>
#include <list>
#include <functional>
#include <coroutine>

#include "functor_info.h"
#include "tuple.h"
#include "flat.h"
#include "view.h"

namespace adaptor
{

    namespace detail
    {

        template<fake::functor_c auto &_functor, fake::mezz_c _Index, fake::tuple_c _Retn, typename... _Args>
        struct awaiter{
            using args_type = std::tuple<_Args...>;
            using retn_type = std::remove_cvref_t<_Retn>;
            awaiter(_Args ..._args): args{std::forward<_Args>(_args)...}{}
            static constexpr bool await_ready() noexcept{return false;}
            void await_suspend(auto _h){
                [this, _h]<std::size_t... _index>(std::index_sequence<_index...>){
                    const auto each = [this, _h](fake::mezz_c auto _i) -> decltype(auto) {
                        if constexpr(_i.value < _Index::value)
                            return std::get<_i.value>(std::move(args));
                        if constexpr(_i.value == _Index::value)
                            return [this, _h](auto &&..._retn){retn = {std::forward<decltype(_retn)>(_retn)...}, _h();};
                        if constexpr(_i.value > _Index::value)
                            return std::get<_i.value - 1>(std::move(args));
                    };

                    std::apply(_functor, std::forward_as_tuple(each(fake::mezz_v<_index>)...));
                }(std::make_index_sequence<sizeof...(_Args) + 1>());
            }
            retn_type await_resume(){return std::move(retn);}

        private:
            args_type args;
            retn_type retn;
        };

        inline constexpr auto detector = [](fake::pack_c auto _pack){
            return fake::functor_c<fake::take_t<decltype(_pack){}>>;
        };

        template<fake::functor_c auto &_functor, auto _detector>
        struct impl{
            static consteval auto analyse() noexcept{
                using state = fake::query<
                    fake::mate<fake::view_t<"index">, std::tuple<>>,
                    fake::mate<fake::view_t<"retn">, std::tuple<>>,
                    fake::mate<fake::view_t<"args">, std::tuple<>>
                >;
                constexpr auto each = [](fake::pack_c auto _state, fake::mezz_c auto _index, fake::pack_c auto _type){
                    using state = fake::take_t<decltype(_state){}>;
                    using type = fake::take_t<decltype(_type){}>;

                    constexpr bool is_callback = [](fake::mezz_c auto _index, fake::pack_c auto _type){
                        if constexpr(requires{_detector(_type, _index);})
                            return _detector(_type, _index);
                        else
                            return _detector(_type);
                    }(_index, _type);

                    if constexpr(is_callback){
                        using index = std::remove_cvref_t<decltype(_index)>;
                        using results = typename fake::functor_info<std::remove_cvref_t<type>>::tuple;
                        using retn_t = fake::query_rebind_t<state, fake::view_v<"retn">, results>;
                        using next_t = fake::query_rebind_t<retn_t, fake::view_v<"index">, index>;

                        return fake::pack_v<next_t>;
                    }
                    else{
                        using args = std::remove_cvref_t<decltype(std::declval<state>()[fake::view_v<"args">])>;
                        using store = std::conditional_t<
                            std::is_rvalue_reference_v<type>,
                            std::remove_reference_t<type>,
                            type
                        >;
                        using arguments = fake::tuple::emplace_back_t<args, store>;
                        using args_t = fake::query_rebind_t<state, fake::view_v<"args">, arguments>;

                        return fake::pack_v<args_t>;
                    }
                };
                using callback_args = typename fake::functor_info<decltype(_functor)>::tuple;
                using result = fake::tuple::for_each_t<callback_args, state, each>;

                return fake::pack_v<result>;
            }

            static consteval auto make(fake::pack_c auto _pack) noexcept{
                using state = fake::take_t<decltype(_pack){}>;
                using index = std::remove_cvref_t<decltype(std::declval<state>()[fake::view_v<"index">])>;
                using retn = std::remove_cvref_t<decltype(std::declval<state>()[fake::view_v<"retn">])>;
                using args = std::remove_cvref_t<decltype(std::declval<state>()[fake::view_v<"args">])>;

                using callback_args = typename fake::functor_info<decltype(_functor)>::tuple;
                static_assert(std::tuple_size_v<callback_args> == std::tuple_size_v<args> + 1, "detector failed");

                return []<typename... _Args>(fake::type_package<std::tuple<_Args...>>){
                    return fake::pack_v<awaiter<_functor, index, retn, _Args...>>;
                }(fake::pack_v<args>);
            }

            using meta_info = fake::take_t<analyse()>;
            using type = fake::take_t<make(fake::pack_v<meta_info>)>;
        };

    }

    template<fake::functor_c auto &_functor, auto _detector = detail::detector>
    struct make{using awaitable = typename detail::impl<_functor, _detector>::type;};

    template<std::size_t _index>
    inline constexpr auto select = [](auto, fake::mezz_c auto _i){return _i.value == _index;};

}

通过元编程对回调接口进行解析

由于 回调接口 的参数包括 回调函数其他参数,而 awaiter 的参数只包括 其他参数,所以我们需要使用元编程手段分析 回调接口 的所有参数类型,从中找出 回调函数 的位置,并通过 std::forward_as_tuple(...) 函数对所有参数进行重组,将 coroutine_handle 封装器 作为新的回调函数插入到 回调接口 的参数列表中,再通过 std::apply(_f, _t) 接口传递给 回调接口

示例中需要使用 fake 库中的 模板元编程 工具集。

  • 实例化 awaiter 模板类(adaptor::detail::awaiter<_functor, _Index, _Retn, _Args...>)所需的编译期信息如下
    • 回调接口 的引用 _functor (其为编译期或连接期常量)
    • 回调接口 的参数列表中 回调函数 所在的位置 _Index
    • 回调函数 的参数既 co_await 表达式 的返回值,其类型为 _Retn
    • 回调接口 所需的除 回调函数 外的所有其他参数,其类型为 _Args...

对于 callable 的参数类型,我们可以通过 fake::functor_info<...> 进行解析

  • _functor

    • 此参数由用户直接传递,既 回调接口,适配器的功能就是将其自动转换为 awaiter
  • _Index

    • 通过遍历 回调接口 的参数列表,找出其中类型为 回调函数 的参数
    • 回调函数 类型的判别条件可由模板参数 _detector 来指定
    • _detector 是一个封装了 constraintlambda 表达式。其接收类型,返回 bool
    • 默认的 _detector 选择器将任何 callable 类型视为 回调函数
    • fake::mezz_t 用于保存编译期数值常量,其功能与 std::integral_constant 类似
  • _Retn

    • 回调接口 参数列表的遍历过程中,一旦判别出 回调函数,则记录后者的参数类型列表
    • 回调函数 通过参数接收 回调结果,将其转为 std::tuple<...>awaiter 的返回类型
  • _Args...

    • 通过 fake::tuple::emplace_back_t<...> 工具对 回调接口 中的除 回调函数 以外的其他参数类型进行记录

实例化 awaiter 模板类所需的编译期信息中,后三项需要在对 回调接口 的参数列表的遍历中进行解析和记录。由于模板元编程是无状态的,因此我们需要手动构造状态机,并通过递归来模拟迭代过程。递归实现被封装在 fake::tuple::for_each_t<...> 工具中,在示例中我们以类似于迭代的语法对 回调接口 的参数列表进行分析。回调接口 的各个参数通过 fake::type_package<_T> 类型包 传递给 lambda 表达式 each

在迭代过程中,我们使用 _state 参数来记录编译期信息,它是一个 类型包,其中包装的类型是一个 fake::query<...> 模板实例。fake::query<...> 类型是一个编译期字典,在迭代过程中,我们通过修改其 value 的类型来进行编译期类型记录。对 fake::queryvalue 类型的替换是通过 fake::query_rebind_t<...> 工具实现的,其参数分别为原 fake::query 类型,key 类型和要设置的 value 类型。本示例中的 key 类型为 fake::view<_char...> 类型,它是一种编译期字符串类型,如果 fake::view 储存的字符串不同,其模板实例的类型也不同。其变长模板参数列表既为字符串本身。

_detector 参数通过对 constraint 进行函数对象化,使得我们可以从外部为适配器设置 回调函数选择器,这一设计可以使适配器自动适应任何 回调接口 参数顺序。但在最糟糕的情况下,如果 回调接口 的参数列表中有其他参数类型与 回调函数 过于相似,以至于二者无法被 回调函数选择器 区分,则我们可以传入 adaptor::select<_index> 来强制指定 回调函数回调接口 参数列表中的位置。

注意参数的引用类型,我们应当去除 右值引用 并在 awaiter 暂存参数时对其进行移动

awaiter 中的参数暂存与结果暂存

  • 本示例中我们假设 awaiter 是一次性的,因此将全程对回调数据进行 std::move

我们已经在 回调接口 解析过程中通过元编程收集了用于暂存 回调参数回调结果std::tuple<...> 类型。

回调参数 暂存发生在 awaiter 对象的构造函数中,我们直接将其 std::forwardawaiterargs 成员。此后我们需要在 await_suspend 中对其与 coroutine_handle 封装器 进行拼接,这是由于 coroutine_handle 在此时才会被协程控制器传入。 回调接口 的全部参数包括 args 中暂存的参数和 回调函数 两部分。我们需要把原先的 回调函数 替换为 coroutine_handle 封装器,用于在回调事件发生时将 回调结果 通知给 awaiter,以及在此后恢复 coroutine 上下文。

  • coroutine_handle 封装器:
if constexpr(_i.value == _Index::value)
    return [this, _h](auto &&..._retn){retn = {std::forward<decltype(_retn)>(_retn)...}, _h();};

回调结果 暂存则发生在 coroutine_handle 封装器 内部。实现中我们将 coroutine_handle 封装器 用作 回调函数,其参数由回调事件传入,这些参数既 回调结果。我们直接将 回调结果 std::forwardretn 成员。

此后 coroutine_handle 封装器 通过调用 coroutine_handle 恢复协程上下文,这会导致协程上下文自动调用 await_resume 接口。我们通过访问 awaiterretn 成员,将之前暂存的 回调结果 传递给协程。

由于我们使用 std::tuple 传递 回调结果,所以我们可以直接使用 std::apply(_f, _t) 调用适配前的原 回调函数。这样我们就顺利地将嵌套的 回调接口调用 适配成了 co_await 表达式


技术缺陷

  • 无法优雅地适配 泛型回调接口,如接口为泛型则需要对其进行手动模板特化
  • 无法适配具有 fork 语义的回调方式,这是由于协程本身只包含一个执行流


    出自:purecpp.cn

    地址: www.purecpp.cn

    转载请注明出处!


来说两句吧
登录才能发表评论。
最新评论
Absolutely

purecpp

一个很酷的modern c++开源社区


[社区开源项目列表,点击前往]


purecpp社区自2015年创办以来,以“Newer is Better”为理念,相信新技术可以改变世界,一直致力于现代C++研究、应用和技术创新,期望通过现代C++的技术创新来提高企业生产力和效率。


社区坚持只发表原创技术文章,已经累计发表了一千多篇原创C++技术文章;


组织了十几场的C++沙龙和C++大会,有力地促进了国内外C++开发者之间的技术交流;


开源了十几个现代C++项目,被近百家公司所使用,有力地推动了现代C++在企业中的应用。


期待更多的C++爱好者能参与到社区C++社区的建设中来,一起为现代C++开源项目添砖加瓦,一起完善C++基础设施和生态圈。


微信公众号:purecpp, 社区邮箱: purecpp@163.com


友情链接