协程(coroutine)的原理和使用

简介: 协程(coroutine)的原理和使用

coroutine

一个程序要真正运行起来,需要两个因素:可执行代码段、数据。体现在 CPU 中,主要包含以下几个方面:

  • EIP 寄存器:用来存储 CPU 要读取指令的地址
  • ESP 寄存器:指向当前线程栈的栈顶位置
  • 其他通用寄存器的内容:包括代表函数参数的 rdi、rsi 等等。
  • 线程栈中的内存内容。

这些数据内容,我们一般将其称为 “上下文” 或者 “现场”。有栈协程的原理,就是从线程的上下文下手,如果把线程的上下文完全改变,即:改变 EIP 寄存的内容,指向其他指令地址;改变线程栈的内存内容等等。

这样的话,当前线程运行的程序也就完全改变了,是一个全新的程序。

协程的实现分为有栈和无栈的方式,有栈的实现方式又是可以分为“独立栈” 和 "共享栈"的实现方式。

共享栈的实现

仅以云风的coroutine实现为例。它的底层是使用 Linux下的 ucontext 函数簇实现的。

typedef struct ucontext_t {
   struct ucontext_t *uc_link;    // 要去的环境
   sigset_t          uc_sigmask;  // 屏蔽的信号
   stack_t           uc_stack;    // 当前这个环境要使用的栈内存
   mcontext_t        uc_mcontext;   
   ...
} ucontext_t;

每次在进行协程切换时,都是由先将当前上下文数据保存共享栈中,当切换回来的时候再从共享栈中恢复数据。设置的共享栈的大小是 1M。调度器的结构如下:

#define STACK_SIZE (1024*1024)    // 1M 
/// @brief: 协程调度器
struct schedule {
    char stack[STACK_SIZE];    // 运行时栈
    ucontext_t main;       // 主协程的上下文
    int nco;          // 当前存活的协程个数,用作协程 id
    int cap;          // 协程管理器的当前最大容量,如果不够了,则进行扩容
    int running;        // 正在运行的协程ID
    struct coroutine** co;     // 一个一维数组,用于存放协程 
};
 /// @brief: 协程
struct coroutine {
  coroutine_func func;    // 协程运行的函数
  void* ud;          // func 参数
  ucontext_t ctx;      // 协程上下文
  struct schedule* sch;     // 该协程所属的调度器
  ptrdiff_t cap;       // 已经分配的内存大小
  ptrdiff_t size;      // 当前协程运行时栈,保存起来后的大小
  int status;       // 协程当前的状态
  char* stack;      // 当前协程的保存起来的运行时栈
};
创建协程调度器

函数 coroutine_open 负责创建一个协程调度器,初始化协程调度器的数值。

#define DEFAULT_COROUTINE 16 // 初始化建立协程时大小
/// @brief: 创建一个协程调度器 
struct schedule *  coroutine_open(void) 
{
  struct schedule *S = malloc(sizeof(struct schedule)); // 这里做的主要就是分配内存,同时赋初值
  S->nco = 0;
  S->cap = DEFAULT_COROUTINE;
  S->running = -1;
  S->co = malloc(sizeof(struct coroutine *) * S->cap);
  memset(S->co, 0, sizeof(struct coroutine *) * S->cap);
  return S;
}
创建一个协程

函数 _co_new 初始化 coroutine 对象:分配内存以及设置初始化状态是 COROUTINE_READY 。coroutine_new 将 创建的协程与对应的协程调度器联系起来。如果当前的协程数已经超过协程调度器中的协程数最大值cap就会两倍扩容。最终还是会更新协程调度器的相关参数,返回协程的 id。

/// @brief: 创建协程
/// @param: @c S 是协程调度器
/// @param: @c func 协程运行函数
/// @param: @c ud 是函数 func 的参数
/// @param: co 新创建的对象
struct coroutine* 
_co_new(struct schedule* S , coroutine_func func, void *ud) {
  struct coroutine* co = malloc(sizeof(struct coroutine));
  co->func = func;  
  co->ud = ud;
  co->sch = S;
  co->cap = 0;
  co->size = 0;
  co->status = COROUTINE_READY; // 默认的最初状态都是 COROUTINE_READY
  co->stack = NULL;
  return co;
}
/// @brief: 创建一个协程对象
/// @param: S 该协程所属的调度器
/// @param: func 该协程函数执行体
/// @param: ud func 的参数
/// @return: 新建的协程的 ID
int 
coroutine_new(struct schedule* S, coroutine_func func, void *ud) {
  struct coroutine *co = _co_new(S, func , ud);
  // 如果目前协程的数量已经大于调度器的容量,那么进行扩容
    if (S->nco >= S->cap) 
    {
    int id = S->cap;     // 新的协程的id直接为当前容量的大小
    S->co = realloc(S->co, S->cap * 2 * sizeof(struct coroutine *));  // 2倍扩容
    memset(S->co + S->cap , 0 , sizeof(struct coroutine *) * S->cap); // 初始化新的内存
    S->co[S->cap] = co;    // 将新创建的协程放入调度器中
    S->cap *= 2;      // 将容量参数更新为两倍
    ++S->nco;           // 协程数更新
    return id;
  } 
    else 
    {
    // 如果目前协程的数量小于调度器的容量,则取一个为NULL的位置,放入新的协程
    int i;
    for (i=0; i<S->cap; i++) { 
       // 为什么不 i % S->cap, 而是要从 nco+i 开始呢,因为前nco有很大概率都非NULL的,直接跳过去更好
      int id = (i + S->nco) % S->cap;
      if (S->co[id] == NULL) {
        S->co[id] = co;
        ++S->nco;
        return id;
      }
    }
  }
  assert(NULL);
  return -1;
}

切出

coroutine_resume 函数会切入到指定协程中执行。当前正在执行的协程的上下文会被保存起来,同时上下文替换成新的协程,该协程的状态将被置为 RUNNING。

/// @biref: 切换到对应协程中执行
/// @param: S 协程调度器
/// @param: id 协程ID
void 
coroutine_resume(struct schedule * S, int id) {
  assert(S->running == -1);
  assert(0 <= id && id < S->cap);
  struct coroutine* C = S->co[id];  // 取出协程
  if (C == NULL) return;
  int status = C->status;
  switch(status) {
  case COROUTINE_READY:
    getcontext(&C->ctx);  // 初始化 ucontext_t 结构体,将当前的上下文放到 C->ctx 里面
    // 将当前协程的运行时栈的栈顶设置为 S->stack
    // 每个协程都这么设置,这就是所谓的共享栈。(注意,这里是栈顶)
    C->ctx.uc_stack.ss_sp = S->stack; 
    C->ctx.uc_stack.ss_size = STACK_SIZE;
    C->ctx.uc_link = &S->main; // 如果协程执行完,将切换到主协程中执行
    S->running = id;
    C->status = COROUTINE_RUNNING;
    // 设置执行 C->ctx 函数, 并将 S 作为参数传进去
    uintptr_t ptr = (uintptr_t)S;
    makecontext(&C->ctx, (void (*)(void)) mainfunc, 2, (uint32_t)ptr, (uint32_t)(ptr>>32));
    // 将当前的上下文放入S->main中,并将C->ctx的上下文替换到当前上下文
    swapcontext(&S->main, &C->ctx);
    break;
  case COROUTINE_SUSPEND:
      // 将协程所保存的栈的内容,拷贝到当前运行时栈中
    // 其中C->size在yield时有保存
    memcpy(S->stack + STACK_SIZE - C->size, C->stack, C->size);
    S->running = id;
    C->status = COROUTINE_RUNNING;
    swapcontext(&S->main, &C->ctx);
    break;
  default:
    assert(0);
  }
}
yield
/// @brief: 将当前正在运行的协程让出,切换到主协程上
/// @param S 协程调度器
void
coroutine_yield(struct schedule * S) {
  // 取出当前正在运行的协程
  int id = S->running;
  assert(id >= 0);
  struct coroutine * C = S->co[id];
  assert((char *)&C > S->stack);
  // 将当前运行的协程的栈内容保存起来
  _save_stack(C,S->stack + STACK_SIZE);
  
  // 将当前栈的状态改为 挂起
  C->status = COROUTINE_SUSPEND;
  S->running = -1;
  // 所以这里可以看到,只能从协程切换到主协程中
  swapcontext(&C->ctx , &S->main);
}
static void _save_stack(struct coroutine *C, char *top) {
  char dummy = 0;
  assert(top - &dummy <= STACK_SIZE);
    // 如果已分配内存小于当前栈的大小,则释放内存重新分配
  if (C->cap < top - &dummy) {
    free(C->stack);
    C->cap = top-&dummy;
    C->stack = malloc(C->cap);
  }
  C->size = top - &dummy;
    // 从 dummy 拷贝 size 内存到 C->stack
  memcpy(C->stack, &dummy, C->size);
}
相关文章
|
7月前
|
API 调度
2.3.1 协程设计原理与汇编实现
2.3.1 协程设计原理与汇编实现
|
1月前
|
存储 安全 测试技术
GoLang协程Goroutiney原理与GMP模型详解
本文详细介绍了Go语言中的Goroutine及其背后的GMP模型。Goroutine是Go语言中的一种轻量级线程,由Go运行时管理,支持高效的并发编程。文章讲解了Goroutine的创建、调度、上下文切换和栈管理等核心机制,并通过示例代码展示了如何使用Goroutine。GMP模型(Goroutine、Processor、Machine)是Go运行时调度Goroutine的基础,通过合理的调度策略,实现了高并发和高性能的程序执行。
121 29
|
1月前
|
负载均衡 算法 Go
GoLang协程Goroutiney原理与GMP模型详解
【11月更文挑战第4天】Goroutine 是 Go 语言中的轻量级线程,由 Go 运行时管理,创建和销毁开销小,适合高并发场景。其调度采用非抢占式和协作式多任务处理结合的方式。GMP 模型包括 G(Goroutine)、M(系统线程)和 P(逻辑处理器),通过工作窃取算法实现负载均衡,确保高效利用系统资源。
|
3月前
|
调度 Python
揭秘Python并发编程核心:深入理解协程与异步函数的工作原理
在Python异步编程领域,协程与异步函数成为处理并发任务的关键工具。协程(微线程)比操作系统线程更轻量级,通过`async def`定义并在遇到`await`表达式时暂停执行。异步函数利用`await`实现任务间的切换。事件循环作为异步编程的核心,负责调度任务;`asyncio`库提供了事件循环的管理。Future对象则优雅地处理异步结果。掌握这些概念,可使代码更高效、简洁且易于维护。
37 1
|
2月前
|
数据采集 调度 Python
Python编程异步爬虫——协程的基本原理(一)
Python编程异步爬虫——协程的基本原理(一)
23 0
|
2月前
|
数据采集 Python
Python编程异步爬虫——协程的基本原理(二)
Python编程异步爬虫——协程的基本原理(二)
26 0
|
2月前
|
存储 前端开发 rax
协程设计与原理(二)
协程设计与原理(二)
20 0
|
2月前
|
Java Linux Go
协程的设计原理(一)
协程的设计原理(一)
34 0
|
6月前
|
PHP 调度 容器
Swoole 源码分析之 Coroutine 协程模块
协程又称轻量级线程,但与线程不同的是;协程是用户级线程,不需要操作系统参与。由用户显式控制,可以在需要的时候挂起、或恢复执行。
85 1
Swoole 源码分析之 Coroutine 协程模块
|
5月前
|
调度 Python
揭秘Python并发编程核心:深入理解协程与异步函数的工作原理
【7月更文挑战第15天】Python异步编程借助协程和async/await提升并发性能,减少资源消耗。协程(async def)轻量级、用户态,便于控制。事件循环,如`asyncio.get_event_loop()`,调度任务执行。异步函数内的await关键词用于协程间切换。回调和Future对象简化异步结果处理。理解这些概念能写出高效、易维护的异步代码。
65 2