coroutine
一个程序要真正运行起来,需要两个因素:可执行代码段、数据。体现在 CPU 中,主要包含以下几个方面:
- EIP 寄存器:用来存储 CPU 要读取指令的地址
- ESP 寄存器:指向当前线程栈的栈顶位置
- 其他通用寄存器的内容:包括代表函数参数的 rdi、rsi 等等。
- 线程栈中的内存内容。
这些数据内容,我们一般将其称为 “上下文” 或者 “现场”。有栈协程的原理,就是从线程的上下文下手,如果把线程的上下文完全改变,即:改变 EIP 寄存的内容,指向其他指令地址;改变线程栈的内存内容等等。
这样的话,当前线程运行的程序也就完全改变了,是一个全新的程序。
协程的实现分为有栈和无栈的方式,有栈的实现方式又是可以分为“独立栈” 和 "共享栈"的实现方式。
共享栈的实现
仅以云风的coroutine实现为例。它的底层是使用 Linux下的 ucontext 函数簇实现的。
typedef struct ucontext_t { struct ucontext_t *uc_link; // 要去的环境 sigset_t uc_sigmask; // 屏蔽的信号 stack_t uc_stack; // 当前这个环境要使用的栈内存 mcontext_t uc_mcontext; ... } ucontext_t;
每次在进行协程切换时,都是由先将当前上下文数据保存共享栈中,当切换回来的时候再从共享栈中恢复数据。设置的共享栈的大小是 1M。调度器的结构如下:
#define STACK_SIZE (1024*1024) // 1M /// @brief: 协程调度器 struct schedule { char stack[STACK_SIZE]; // 运行时栈 ucontext_t main; // 主协程的上下文 int nco; // 当前存活的协程个数,用作协程 id int cap; // 协程管理器的当前最大容量,如果不够了,则进行扩容 int running; // 正在运行的协程ID struct coroutine** co; // 一个一维数组,用于存放协程 }; /// @brief: 协程 struct coroutine { coroutine_func func; // 协程运行的函数 void* ud; // func 参数 ucontext_t ctx; // 协程上下文 struct schedule* sch; // 该协程所属的调度器 ptrdiff_t cap; // 已经分配的内存大小 ptrdiff_t size; // 当前协程运行时栈,保存起来后的大小 int status; // 协程当前的状态 char* stack; // 当前协程的保存起来的运行时栈 };
创建协程调度器
函数 coroutine_open 负责创建一个协程调度器,初始化协程调度器的数值。
#define DEFAULT_COROUTINE 16 // 初始化建立协程时大小 /// @brief: 创建一个协程调度器 struct schedule * coroutine_open(void) { struct schedule *S = malloc(sizeof(struct schedule)); // 这里做的主要就是分配内存,同时赋初值 S->nco = 0; S->cap = DEFAULT_COROUTINE; S->running = -1; S->co = malloc(sizeof(struct coroutine *) * S->cap); memset(S->co, 0, sizeof(struct coroutine *) * S->cap); return S; }
创建一个协程
函数 _co_new 初始化 coroutine 对象:分配内存以及设置初始化状态是 COROUTINE_READY 。coroutine_new 将 创建的协程与对应的协程调度器联系起来。如果当前的协程数已经超过协程调度器中的协程数最大值cap就会两倍扩容。最终还是会更新协程调度器的相关参数,返回协程的 id。
/// @brief: 创建协程 /// @param: @c S 是协程调度器 /// @param: @c func 协程运行函数 /// @param: @c ud 是函数 func 的参数 /// @param: co 新创建的对象 struct coroutine* _co_new(struct schedule* S , coroutine_func func, void *ud) { struct coroutine* co = malloc(sizeof(struct coroutine)); co->func = func; co->ud = ud; co->sch = S; co->cap = 0; co->size = 0; co->status = COROUTINE_READY; // 默认的最初状态都是 COROUTINE_READY co->stack = NULL; return co; }
/// @brief: 创建一个协程对象 /// @param: S 该协程所属的调度器 /// @param: func 该协程函数执行体 /// @param: ud func 的参数 /// @return: 新建的协程的 ID int coroutine_new(struct schedule* S, coroutine_func func, void *ud) { struct coroutine *co = _co_new(S, func , ud); // 如果目前协程的数量已经大于调度器的容量,那么进行扩容 if (S->nco >= S->cap) { int id = S->cap; // 新的协程的id直接为当前容量的大小 S->co = realloc(S->co, S->cap * 2 * sizeof(struct coroutine *)); // 2倍扩容 memset(S->co + S->cap , 0 , sizeof(struct coroutine *) * S->cap); // 初始化新的内存 S->co[S->cap] = co; // 将新创建的协程放入调度器中 S->cap *= 2; // 将容量参数更新为两倍 ++S->nco; // 协程数更新 return id; } else { // 如果目前协程的数量小于调度器的容量,则取一个为NULL的位置,放入新的协程 int i; for (i=0; i<S->cap; i++) { // 为什么不 i % S->cap, 而是要从 nco+i 开始呢,因为前nco有很大概率都非NULL的,直接跳过去更好 int id = (i + S->nco) % S->cap; if (S->co[id] == NULL) { S->co[id] = co; ++S->nco; return id; } } } assert(NULL); return -1; }
切出
coroutine_resume 函数会切入到指定协程中执行。当前正在执行的协程的上下文会被保存起来,同时上下文替换成新的协程,该协程的状态将被置为 RUNNING。
/// @biref: 切换到对应协程中执行 /// @param: S 协程调度器 /// @param: id 协程ID void coroutine_resume(struct schedule * S, int id) { assert(S->running == -1); assert(0 <= id && id < S->cap); struct coroutine* C = S->co[id]; // 取出协程 if (C == NULL) return; int status = C->status; switch(status) { case COROUTINE_READY: getcontext(&C->ctx); // 初始化 ucontext_t 结构体,将当前的上下文放到 C->ctx 里面 // 将当前协程的运行时栈的栈顶设置为 S->stack // 每个协程都这么设置,这就是所谓的共享栈。(注意,这里是栈顶) C->ctx.uc_stack.ss_sp = S->stack; C->ctx.uc_stack.ss_size = STACK_SIZE; C->ctx.uc_link = &S->main; // 如果协程执行完,将切换到主协程中执行 S->running = id; C->status = COROUTINE_RUNNING; // 设置执行 C->ctx 函数, 并将 S 作为参数传进去 uintptr_t ptr = (uintptr_t)S; makecontext(&C->ctx, (void (*)(void)) mainfunc, 2, (uint32_t)ptr, (uint32_t)(ptr>>32)); // 将当前的上下文放入S->main中,并将C->ctx的上下文替换到当前上下文 swapcontext(&S->main, &C->ctx); break; case COROUTINE_SUSPEND: // 将协程所保存的栈的内容,拷贝到当前运行时栈中 // 其中C->size在yield时有保存 memcpy(S->stack + STACK_SIZE - C->size, C->stack, C->size); S->running = id; C->status = COROUTINE_RUNNING; swapcontext(&S->main, &C->ctx); break; default: assert(0); } }
yield
/// @brief: 将当前正在运行的协程让出,切换到主协程上 /// @param S 协程调度器 void coroutine_yield(struct schedule * S) { // 取出当前正在运行的协程 int id = S->running; assert(id >= 0); struct coroutine * C = S->co[id]; assert((char *)&C > S->stack); // 将当前运行的协程的栈内容保存起来 _save_stack(C,S->stack + STACK_SIZE); // 将当前栈的状态改为 挂起 C->status = COROUTINE_SUSPEND; S->running = -1; // 所以这里可以看到,只能从协程切换到主协程中 swapcontext(&C->ctx , &S->main); } static void _save_stack(struct coroutine *C, char *top) { char dummy = 0; assert(top - &dummy <= STACK_SIZE); // 如果已分配内存小于当前栈的大小,则释放内存重新分配 if (C->cap < top - &dummy) { free(C->stack); C->cap = top-&dummy; C->stack = malloc(C->cap); } C->size = top - &dummy; // 从 dummy 拷贝 size 内存到 C->stack memcpy(C->stack, &dummy, C->size); }