协程源码剖析(一)

简介: 协程源码剖析(一)

本文详细介绍了协程的核心组件,包括协程结构体(包含上下文、状态、调度器等)和调度器结构(管理协程、事件处理和文件描述符)。还讲解了如何创建协程和运行调度器的过程,以及它们在异步并发中的应用。

摘要由CSDN通过智能技术生成

概述

本节讲解内容如下

1.协程的核心组件:协程结构体、调度器结构体

2.对应函数:创建协程、运行调度器

适用对象:已经了解什么是协程、协程出现的原因、了解协程应用场景的朋友

如果是小白,请先学习协程入门篇

正文:

本文列举部分源码,协程全部源码https://github.com/wuj1nquan/bitco每一个函数都有详细注释

协程使用同步的编程方式实现异步的性能,实现了单线程中多个函数的并发

1.协程结构体

每一个协程可以抽象为一个函数的执行体,也就是说,每一个函数都可以变成一个协程,在同一个线程里,这些函数(协程)由调度器进行调度,实现并发,模拟了操作系统的并发

每一个协程都有对应的状态:执行、等待、睡眠,以及关联的文件描述符,所属调度器,以及其在队列、红黑树、链表中的节点,以及所关联的函数的上下文,用于恢复协程,协程的让出和恢复发生在其所属调度器和协程本身之间

协程结构体:

typedef struct _coroutine {
  ucontext_t ctx; // 协程的上下文信息
  proc_coroutine func; // 协程执行的函数
  void *arg; // 传递给协程执行函数的参数
  void *data; // 协程私有数据
  size_t stack_size; // 栈大小
  size_t last_stack_size; // 上次栈大小
  
  coroutine_status status; // 协程的状态
  schedule *sched; // 指向协程所属的调度器
  uint64_t birth; // 协程创建的事件戳
  uint64_t id; // 协程的唯一标识符
  int fd; // 与协程关联的文件描述符
  unsigned short events;  //POLL_EVENT
  char funcname[64]; //协程执行的函数名称
  struct _coroutine *co_join; 
  void **co_exit_ptr; 
  void *stack; // 栈空间
  void *ebp; //栈指针
  uint32_t ops; // 当前协程的操作码
  uint64_t sleep_usecs; //休眠时间
  RB_ENTRY(_coroutine) sleep_node; // 睡眠队列中的红黑树节点
  RB_ENTRY(_coroutine) wait_node; // 等待队列中的红黑树节点
  LIST_ENTRY(_coroutine) busy_next; // 忙碌协程链表中的下一个指针
  TAILQ_ENTRY(_coroutine) ready_next; // 就绪队列中的下一个指针
  TAILQ_ENTRY(_coroutine) defer_next; // 延迟队列中的下一个指针
  TAILQ_ENTRY(_coroutine) cond_next; // 条件变量队列中的下一个指针
  TAILQ_ENTRY(_coroutine) io_next; //  I/O 就绪队列中的下一个指针
  TAILQ_ENTRY(_coroutine) compute_next; // 计算就绪队列中的下一个指针
  struct { // 协程执行 I/O 操作所需的相关信息
    void *buf;
    size_t nbytes;
    int fd;
    int ret;
    int err;
  } io;
  struct coroutine_compute_sched *compute_sched; // 指向计算调度器的指针
  int ready_fds; // 就绪的文件描述符数量
  struct pollfd *pfds; // 指向 pollfd 结构体数组的指针
  nfds_t nfds; // pollfd 数组的大小
} coroutine;
2.调度器结构体

每一个线程拥有一个调度器,管理着这个线程所有的协程,每一个协程创建时都会加入到其所在线程的调度器中,调度器拥有红黑树、链表、就绪队列,用来管理不同状态的协程

此外还需要epoll来通知调度器,哪些协程中的文件描述符处于就绪状态,通过文件描述符查找红黑树对应的协程节点,调度器运行时将处于就绪状态的文件描述符所在的协程恢复运行

调度器在运行时会依次处理1.超时的协程 2. 就绪队列的协程 3.等待红黑树中的协程

如果一个调度器的等待队列、忙碌链表、睡眠红黑树和就绪队列都为空,则调度器完成了工作。

typedef struct _coroutine {
  ucontext_t ctx; // 协程的上下文信息
  proc_coroutine func; // 协程执行的函数
  void *arg; // 传递给协程执行函数的参数
  void *data; // 协程私有数据
  size_t stack_size; // 栈大小
  size_t last_stack_size; // 上次栈大小
  
  coroutine_status status; // 协程的状态
  schedule *sched; // 指向协程所属的调度器
  uint64_t birth; // 协程创建的事件戳
  uint64_t id; // 协程的唯一标识符
  int fd; // 与协程关联的文件描述符
  unsigned short events;  //POLL_EVENT
  char funcname[64]; //协程执行的函数名称
  struct _coroutine *co_join; 
  void **co_exit_ptr; 
  void *stack; // 栈空间
  void *ebp; //栈指针
  uint32_t ops; // 当前协程的操作码
  uint64_t sleep_usecs; //休眠时间
  RB_ENTRY(_coroutine) sleep_node; // 睡眠队列中的红黑树节点
  RB_ENTRY(_coroutine) wait_node; // 等待队列中的红黑树节点
  LIST_ENTRY(_coroutine) busy_next; // 忙碌协程链表中的下一个指针
  TAILQ_ENTRY(_coroutine) ready_next; // 就绪队列中的下一个指针
  TAILQ_ENTRY(_coroutine) defer_next; // 延迟队列中的下一个指针
  TAILQ_ENTRY(_coroutine) cond_next; // 条件变量队列中的下一个指针
  TAILQ_ENTRY(_coroutine) io_next; //  I/O 就绪队列中的下一个指针
  TAILQ_ENTRY(_coroutine) compute_next; // 计算就绪队列中的下一个指针
  struct { // 协程执行 I/O 操作所需的相关信息
    void *buf;
    size_t nbytes;
    int fd;
    int ret;
    int err;
  } io;
  struct coroutine_compute_sched *compute_sched; // 指向计算调度器的指针
  int ready_fds; // 就绪的文件描述符数量
  struct pollfd *pfds; // 指向 pollfd 结构体数组的指针
  nfds_t nfds; // pollfd 数组的大小
} coroutine;
3.创建一个协程
int coroutine_create(coroutine **new_co, proc_coroutine func, void *arg) { // 创建一个新的协程,并将其添加到调度器的就绪队列
  assert(pthread_once(&sched_key_once, coroutine_sched_key_creator) == 0); // 保证调度器的键只会被创建一次: 确保 coroutine_sched_key_creator 函数只会被执行一次,用于创建线程局部存储的键保证调度器的键只会被创建一次
  schedule *sched = coroutine_get_sched(); // 获取当前线程的调度器
  if (sched == NULL) { // 当前线程尚未拥有调度器,需要先创建调度器:
    schedule_create(0); // 创建调度器 
    
    sched = coroutine_get_sched();
    if (sched == NULL) { // 创建调度器失败
      printf("Failed to create scheduler\n");
      return -1;
    }
  }
  coroutine *co = calloc(1, sizeof(coroutine)); // 为新的协程分配内存空间
  if (co == NULL) { // 分配失败
    printf("Failed to allocate memory for new coroutine\n");
    return -2;
  }
    // 初始化协程各个成员
  co->stack = NULL;
  co->stack_size = 0;
  co->sched = sched; // 所属调度器
  co->status = BIT(COROUTINE_STATUS_NEW); // 状态:新建
  co->id = sched->spawned_coroutines ++; // 协程的id,同时也是调度器中已创建的协程数量
  co->func = func; // 执行的函数
  co->fd_wait = -1;
  co->arg = arg; // 函数参数
  co->birth = coroutine_usec_now(); // 协程创建的时间戳
    
  *new_co = co; // 将新创建的协程指针赋给传入的参数
  TAILQ_INSERT_TAIL(&co->sched->ready, co, ready_next); // 将协程插入到调度器的就绪队列的尾部,以便后续调度器可以从就绪队列中选择协程执行
  return 0;
}
4.运行调度器
void schedule_run(void) {
  schedule *sched = coroutine_get_sched(); // 获取当前调度器
  if (sched == NULL) return ;
  while (!schedule_isdone(sched)) {
    // 1. expried coroutine in sleep rbtree
    // 获取超时的协程,并逐个执行这些协程的恢复操作
    coroutine *expired = NULL;
    while ((expired = schedule_expired(sched)) != NULL) {
      coroutine_resume(expired);
    }
    // 2. ready queue
        // 处理就绪队列中的协程:从就绪队列中依次取出协程,并执行它们的恢复操作
    coroutine *last_co_ready = TAILQ_LAST(&sched->ready, _coroutine_queue);
    while (!TAILQ_EMPTY(&sched->ready)) {
      coroutine *co = TAILQ_FIRST(&sched->ready);
      TAILQ_REMOVE(&co->sched->ready, co, ready_next);
            
      if (co->status & BIT(COROUTINE_STATUS_FDEOF)) { // 表示文件描述符已关闭,这时释放该协程的资源,然后退出循环。
        coroutine_free(co);
        break;
      }
    
      coroutine_resume(co);
      if (co == last_co_ready) break; // 全部处理完即退出
    }
    // 3. wait rbtree
    schedule_epoll(sched); // 调用epoll_wait轮询调度器中epoll管理的文件描述符,并将就绪事件保存到调度器的 eventlist 中
    while (sched->num_new_events) { // 遍历所有就绪事件
            // 从 num_new_events 中获取事件数量,并将事件数量减一
      int idx = --sched->num_new_events;
            // 获取对应索引处的事件
      struct epoll_event *ev = sched->eventlist+idx;
      
      int fd = ev->data.fd;
            // 检查事件是否为对端关闭连接
      int is_eof = ev->events & EPOLLHUP;
            // 如果事件为对端关闭连接,则设置 errno 为 ECONNRESET
      if (is_eof) errno = ECONNRESET;
            // 根据文件描述符在等待红黑树中查找对应的协程
      coroutine *co = schedule_search_wait(fd);
            // 如果找到了对应的协程,则恢复该协程的执行
      if (co != NULL) {
        if (is_eof) { // 如果事件为对端关闭连接,则设置协程状态为已关闭文件描述符
          co->status |= BIT(COROUTINE_STATUS_FDEOF);
        }
                // 恢复协程的执行
        coroutine_resume(co);
      }
            // 将 is_eof 重置为 0,以便下次循环使用
      is_eof = 0;
    }
  }
  schedule_free(sched); // 所有任务都完成后,释放调度器的资源,并返回。
  
  return ;
}

由于篇幅,只能列举部分源码进行说明,协程全部源码
协程源码
每一个函数都有详细注释,学习首选

下一节继续讲解hook系统调用

推荐学习https://xxetb.xetslk.com/s/p5Ibb

目录
相关文章
|
4月前
|
API 调度
协程源码剖析(二)
协程源码剖析(二)
24 1
|
4月前
|
监控 程序员 调度
协程实现单线程并发(入门)
协程实现单线程并发(入门)
48 1
|
5月前
|
网络协议 NoSQL API
协程知识点总结
协程知识点总结
|
5月前
|
调度 Python
Python多线程、多进程与协程面试题解析
【4月更文挑战第14天】Python并发编程涉及多线程、多进程和协程。面试中,对这些概念的理解和应用是评估候选人的重要标准。本文介绍了它们的基础知识、常见问题和应对策略。多线程在同一进程中并发执行,多进程通过进程间通信实现并发,协程则使用`asyncio`进行轻量级线程控制。面试常遇到的问题包括并发并行混淆、GIL影响多线程性能、进程间通信不当和协程异步IO理解不清。要掌握并发模型,需明确其适用场景,理解GIL、进程间通信和协程调度机制。
130 0
|
5月前
|
调度
解释一下为什么协程比线程更轻量级。
解释一下为什么协程比线程更轻量级。
244 1
|
5月前
|
Java 测试技术 程序员
多线程(初阶九:线程池)
多线程(初阶九:线程池)
65 0
|
11月前
|
存储 Ubuntu 调度
协程框架NtyCo的实现
协程框架NtyCo的实现
116 1
|
11月前
|
存储 安全 Java
并发编程系列教程(07) - 线程池原理分析(一)
并发编程系列教程(07) - 线程池原理分析(一)
29 0
|
11月前
|
存储 缓存 监控
并发编程系列教程(08) - 线程池原理分析(二)
并发编程系列教程(08) - 线程池原理分析(二)
38 0
|
11月前
|
存储 关系型数据库 MySQL
协程框架nty_co
协程框架nty_co