write coroutine code using boost context
https://www.boost.org/doc/libs/1_84_0/libs/context/doc/html/context/overview.html
按照这个文档用boost context写了一个coroutine task简单示例,这个示例主要就是传入一堆tasks, 然后在各个task之间进行调度。task维护了一个 `continuation` fiber. 这样可以知道如果要切换出去的话,应该切换到什么地方:如果我理解没有错的话,每次应该是直接切换到 `schedule` 这个coroutine上的。
boost context coroutine是stackful的,相当于整个coroutine会保存自己的完整的堆栈。如果这个coroutine切换出去然后重新切换回来的话,那么之前函数调用栈还是可以保存的。与之对应的就是stackless, 目前C++20 coroutine就是stackless的,缺点非常明显就是如果yield/resume回来的话,之前的函数调用栈就完全丢失了,感觉stackless对于功能损害是非常严重的。
#include <boost/context/fiber.hpp> #include <cstdio> #include <sstream> #include <utility> #include <vector> namespace ctx = boost::context; struct Task; using TaskFn = std::function<void(Task*)>; struct Task { int id; // 这个task对应的function TaskFn fn; // 这个task对应的fiber ctx::fiber fiber; // 从什么fiber切换过来的 // 调用resume后返回suspend对象 ctx::fiber cont; std::string to_string() { std::stringstream ss; ss << "Task(id=" << id << ", fiber=" << fiber << ")"; return ss.str(); } }; struct Scheduler { std::vector<Task*> tasks; int active = 0; int index = 0; ctx::fiber schedule; void add(TaskFn&& fn) { Task* t = new Task(); t->fn = std::move(fn); t->fiber = ctx::fiber{[t, this](ctx::fiber&& cont) { t->cont = std::move(cont); t->fn(t); this->active--; return std::move(t->cont).resume(); }}; tasks.push_back(t); active++; } Scheduler() { schedule = ctx::fiber([&](ctx::fiber&& cont) { while (active) { Task* task = tasks[index]; // 这个task运行结束 if (!task->fiber) { continue; } printf("---> S: select w%d: %s run\n", index, task->to_string().c_str()); task->fiber = std::move(task->fiber).resume(); printf("<--- S: select w%d: %s \n", index, task->to_string().c_str()); index = (index + 1) % tasks.size(); } printf("S: ready to exit\n"); return std::move(cont).resume(); }); } void run() { std::move(schedule).resume(); } }; static void foo(Task* t) { for (int i = 0; i < 2; i++) { printf("W: w%d run foo with %d\n", t->id, i); t->cont = std::move(t->cont).resume(); } } int main() { Scheduler scheduler; const int n = 2; for (int i = 0; i < n; i++) { scheduler.add([i](Task* t) { t->id = i; for (int j = 0; j < 2; j++) { printf("W: w%d is running\n", t->id); foo(t); t->cont = std::move(t->cont).resume(); } }); } scheduler.run(); }
boost conext给了两种实现: fiber和continuation. 用continuation还有点不太习惯,因为一旦callcc就开始自动执行,而创建fiber的话可以在后续某个点触发执行。
#include <boost/context/continuation.hpp> #include <boost/context/continuation_fcontext.hpp> #include <boost/context/fiber.hpp> #include <cstdio> #include <sstream> #include <utility> #include <vector> namespace ctx = boost::context; struct Task; using TaskFn = std::function<void(Task*)>; struct Task { int id; // 这个task对应的function TaskFn fn; bool started = false; // 这个task对应的fiber ctx::continuation cc; // 从什么fiber切换过来的 // 调用resume后返回suspend对象 ctx::continuation cont; std::string to_string() { std::stringstream ss; ss << "Task(id=" << id << ", cc=" << cc << ")"; return ss.str(); } }; struct Scheduler { std::vector<Task*> tasks; int active = 0; int index = 0; void add(TaskFn&& fn) { Task* t = new Task(); t->fn = std::move(fn); tasks.push_back(t); active++; } void run() { ctx::continuation cc = ctx::callcc([&](ctx::continuation&& cont) { while (active) { Task* task = tasks[index]; // 这个task运行结束 if (task->started && !task->cc) { continue; } printf("---> S: select w%d: %s run\n", index, task->to_string().c_str()); if (!task->started) { task->started = true; task->cc = ctx::callcc([task, this](ctx::continuation&& cont) { task->cont = std::move(cont); task->fn(task); this->active--; return task->cont.resume(); }); } else { task->cc = task->cc.resume(); } printf("<--- S: select w%d: %s \n", index, task->to_string().c_str()); index = (index + 1) % tasks.size(); } printf("S: ready to exit\n"); return cont.resume(); }); } }; static void foo(Task* t) { for (int i = 0; i < 2; i++) { printf("W: w%d run foo with %d\n", t->id, i); t->cont = t->cont.resume(); } } int main() { Scheduler scheduler; const int n = 2; for (int i = 0; i < n; i++) { scheduler.add([i](Task* t) { t->id = i; for (int j = 0; j < 2; j++) { printf("W: w%d is running\n", t->id); foo(t); t->cont = t->cont.resume(); } }); } scheduler.run(); }