write coroutine code using boost context

https://www.boost.org/doc/libs/1_84_0/libs/context/doc/html/context/overview.html

按照这个文档用boost context写了一个coroutine task简单示例,这个示例主要就是传入一堆tasks, 然后在各个task之间进行调度。task维护了一个 `continuation` fiber. 这样可以知道如果要切换出去的话,应该切换到什么地方:如果我理解没有错的话,每次应该是直接切换到 `schedule` 这个coroutine上的。

boost context coroutine是stackful的,相当于整个coroutine会保存自己的完整的堆栈。如果这个coroutine切换出去然后重新切换回来的话,那么之前函数调用栈还是可以保存的。与之对应的就是stackless, 目前C++20 coroutine就是stackless的,缺点非常明显就是如果yield/resume回来的话,之前的函数调用栈就完全丢失了,感觉stackless对于功能损害是非常严重的。

#include <boost/context/fiber.hpp>
#include <cstdio>
#include <sstream>
#include <utility>
#include <vector>
namespace ctx = boost::context;

struct Task;
using TaskFn = std::function<void(Task*)>;

struct Task {
    int id;
    // 这个task对应的function
    TaskFn fn;
    // 这个task对应的fiber
    ctx::fiber fiber;
    // 从什么fiber切换过来的
    // 调用resume后返回suspend对象
    ctx::fiber cont;

    std::string to_string() {
        std::stringstream ss;
        ss << "Task(id=" << id << ", fiber=" << fiber << ")";
        return ss.str();
    }
};

struct Scheduler {
    std::vector<Task*> tasks;
    int active = 0;
    int index = 0;
    ctx::fiber schedule;

    void add(TaskFn&& fn) {
        Task* t = new Task();
        t->fn = std::move(fn);
        t->fiber = ctx::fiber{[t, this](ctx::fiber&& cont) {
            t->cont = std::move(cont);
            t->fn(t);
            this->active--;
            return std::move(t->cont).resume();
        }};
        tasks.push_back(t);
        active++;
    }

    Scheduler() {
        schedule = ctx::fiber([&](ctx::fiber&& cont) {
            while (active) {
                Task* task = tasks[index];
                // 这个task运行结束
                if (!task->fiber) {
                    continue;
                }
                printf("---> S: select w%d: %s run\n", index, task->to_string().c_str());
                task->fiber = std::move(task->fiber).resume();
                printf("<--- S: select w%d: %s \n", index, task->to_string().c_str());
                index = (index + 1) % tasks.size();
            }
            printf("S: ready to exit\n");
            return std::move(cont).resume();
        });
    }

    void run() { std::move(schedule).resume(); }
};

static void foo(Task* t) {
    for (int i = 0; i < 2; i++) {
        printf("W: w%d run foo with %d\n", t->id, i);
        t->cont = std::move(t->cont).resume();
    }
}
int main() {
    Scheduler scheduler;
    const int n = 2;
    for (int i = 0; i < n; i++) {
        scheduler.add([i](Task* t) {
            t->id = i;
            for (int j = 0; j < 2; j++) {
                printf("W: w%d is running\n", t->id);
                foo(t);
                t->cont = std::move(t->cont).resume();
            }
        });
    }
    scheduler.run();
}

boost conext给了两种实现: fiber和continuation. 用continuation还有点不太习惯,因为一旦callcc就开始自动执行,而创建fiber的话可以在后续某个点触发执行。

#include <boost/context/continuation.hpp>
#include <boost/context/continuation_fcontext.hpp>
#include <boost/context/fiber.hpp>
#include <cstdio>
#include <sstream>
#include <utility>
#include <vector>
namespace ctx = boost::context;

struct Task;
using TaskFn = std::function<void(Task*)>;

struct Task {
    int id;
    // 这个task对应的function
    TaskFn fn;
    bool started = false;
    // 这个task对应的fiber
    ctx::continuation cc;
    // 从什么fiber切换过来的
    // 调用resume后返回suspend对象
    ctx::continuation cont;

    std::string to_string() {
        std::stringstream ss;
        ss << "Task(id=" << id << ", cc=" << cc << ")";
        return ss.str();
    }
};

struct Scheduler {
    std::vector<Task*> tasks;
    int active = 0;
    int index = 0;

    void add(TaskFn&& fn) {
        Task* t = new Task();
        t->fn = std::move(fn);
        tasks.push_back(t);
        active++;
    }

    void run() {
        ctx::continuation cc = ctx::callcc([&](ctx::continuation&& cont) {
            while (active) {
                Task* task = tasks[index];
                // 这个task运行结束
                if (task->started && !task->cc) {
                    continue;
                }
                printf("---> S: select w%d: %s run\n", index, task->to_string().c_str());
                if (!task->started) {
                    task->started = true;
                    task->cc = ctx::callcc([task, this](ctx::continuation&& cont) {
                        task->cont = std::move(cont);
                        task->fn(task);
                        this->active--;
                        return task->cont.resume();
                    });
                } else {
                    task->cc = task->cc.resume();
                }
                printf("<--- S: select w%d: %s \n", index, task->to_string().c_str());
                index = (index + 1) % tasks.size();
            }
            printf("S: ready to exit\n");
            return cont.resume();
        });
    }
};

static void foo(Task* t) {
    for (int i = 0; i < 2; i++) {
        printf("W: w%d run foo with %d\n", t->id, i);
        t->cont = t->cont.resume();
    }
}
int main() {
    Scheduler scheduler;
    const int n = 2;
    for (int i = 0; i < n; i++) {
        scheduler.add([i](Task* t) {
            t->id = i;
            for (int j = 0; j < 2; j++) {
                printf("W: w%d is running\n", t->id);
                foo(t);
                t->cont = t->cont.resume();
            }
        });
    }
    scheduler.run();
}