协程源码剖析(一)

简介: 协程源码剖析(一)

本文详细介绍了协程的核心组件,包括协程结构体(包含上下文、状态、调度器等)和调度器结构(管理协程、事件处理和文件描述符)。还讲解了如何创建协程和运行调度器的过程,以及它们在异步并发中的应用。

摘要由CSDN通过智能技术生成

概述

本节讲解内容如下

1.协程的核心组件:协程结构体、调度器结构体

2.对应函数:创建协程、运行调度器

适用对象:已经了解什么是协程、协程出现的原因、了解协程应用场景的朋友

如果是小白,请先学习协程入门篇

正文:

本文列举部分源码,协程全部源码https://github.com/wuj1nquan/bitco每一个函数都有详细注释

协程使用同步的编程方式实现异步的性能,实现了单线程中多个函数的并发

1.协程结构体

每一个协程可以抽象为一个函数的执行体,也就是说,每一个函数都可以变成一个协程,在同一个线程里,这些函数(协程)由调度器进行调度,实现并发,模拟了操作系统的并发

每一个协程都有对应的状态:执行、等待、睡眠,以及关联的文件描述符,所属调度器,以及其在队列、红黑树、链表中的节点,以及所关联的函数的上下文,用于恢复协程,协程的让出和恢复发生在其所属调度器和协程本身之间

协程结构体:

typedef struct _coroutine {
  ucontext_t ctx; // 协程的上下文信息
  proc_coroutine func; // 协程执行的函数
  void *arg; // 传递给协程执行函数的参数
  void *data; // 协程私有数据
  size_t stack_size; // 栈大小
  size_t last_stack_size; // 上次栈大小
  
  coroutine_status status; // 协程的状态
  schedule *sched; // 指向协程所属的调度器
  uint64_t birth; // 协程创建的事件戳
  uint64_t id; // 协程的唯一标识符
  int fd; // 与协程关联的文件描述符
  unsigned short events;  //POLL_EVENT
  char funcname[64]; //协程执行的函数名称
  struct _coroutine *co_join; 
  void **co_exit_ptr; 
  void *stack; // 栈空间
  void *ebp; //栈指针
  uint32_t ops; // 当前协程的操作码
  uint64_t sleep_usecs; //休眠时间
  RB_ENTRY(_coroutine) sleep_node; // 睡眠队列中的红黑树节点
  RB_ENTRY(_coroutine) wait_node; // 等待队列中的红黑树节点
  LIST_ENTRY(_coroutine) busy_next; // 忙碌协程链表中的下一个指针
  TAILQ_ENTRY(_coroutine) ready_next; // 就绪队列中的下一个指针
  TAILQ_ENTRY(_coroutine) defer_next; // 延迟队列中的下一个指针
  TAILQ_ENTRY(_coroutine) cond_next; // 条件变量队列中的下一个指针
  TAILQ_ENTRY(_coroutine) io_next; //  I/O 就绪队列中的下一个指针
  TAILQ_ENTRY(_coroutine) compute_next; // 计算就绪队列中的下一个指针
  struct { // 协程执行 I/O 操作所需的相关信息
    void *buf;
    size_t nbytes;
    int fd;
    int ret;
    int err;
  } io;
  struct coroutine_compute_sched *compute_sched; // 指向计算调度器的指针
  int ready_fds; // 就绪的文件描述符数量
  struct pollfd *pfds; // 指向 pollfd 结构体数组的指针
  nfds_t nfds; // pollfd 数组的大小
} coroutine;
2.调度器结构体

每一个线程拥有一个调度器,管理着这个线程所有的协程,每一个协程创建时都会加入到其所在线程的调度器中,调度器拥有红黑树、链表、就绪队列,用来管理不同状态的协程

此外还需要epoll来通知调度器,哪些协程中的文件描述符处于就绪状态,通过文件描述符查找红黑树对应的协程节点,调度器运行时将处于就绪状态的文件描述符所在的协程恢复运行

调度器在运行时会依次处理1.超时的协程 2. 就绪队列的协程 3.等待红黑树中的协程

如果一个调度器的等待队列、忙碌链表、睡眠红黑树和就绪队列都为空,则调度器完成了工作。

typedef struct _coroutine {
  ucontext_t ctx; // 协程的上下文信息
  proc_coroutine func; // 协程执行的函数
  void *arg; // 传递给协程执行函数的参数
  void *data; // 协程私有数据
  size_t stack_size; // 栈大小
  size_t last_stack_size; // 上次栈大小
  
  coroutine_status status; // 协程的状态
  schedule *sched; // 指向协程所属的调度器
  uint64_t birth; // 协程创建的事件戳
  uint64_t id; // 协程的唯一标识符
  int fd; // 与协程关联的文件描述符
  unsigned short events;  //POLL_EVENT
  char funcname[64]; //协程执行的函数名称
  struct _coroutine *co_join; 
  void **co_exit_ptr; 
  void *stack; // 栈空间
  void *ebp; //栈指针
  uint32_t ops; // 当前协程的操作码
  uint64_t sleep_usecs; //休眠时间
  RB_ENTRY(_coroutine) sleep_node; // 睡眠队列中的红黑树节点
  RB_ENTRY(_coroutine) wait_node; // 等待队列中的红黑树节点
  LIST_ENTRY(_coroutine) busy_next; // 忙碌协程链表中的下一个指针
  TAILQ_ENTRY(_coroutine) ready_next; // 就绪队列中的下一个指针
  TAILQ_ENTRY(_coroutine) defer_next; // 延迟队列中的下一个指针
  TAILQ_ENTRY(_coroutine) cond_next; // 条件变量队列中的下一个指针
  TAILQ_ENTRY(_coroutine) io_next; //  I/O 就绪队列中的下一个指针
  TAILQ_ENTRY(_coroutine) compute_next; // 计算就绪队列中的下一个指针
  struct { // 协程执行 I/O 操作所需的相关信息
    void *buf;
    size_t nbytes;
    int fd;
    int ret;
    int err;
  } io;
  struct coroutine_compute_sched *compute_sched; // 指向计算调度器的指针
  int ready_fds; // 就绪的文件描述符数量
  struct pollfd *pfds; // 指向 pollfd 结构体数组的指针
  nfds_t nfds; // pollfd 数组的大小
} coroutine;
3.创建一个协程
int coroutine_create(coroutine **new_co, proc_coroutine func, void *arg) { // 创建一个新的协程,并将其添加到调度器的就绪队列
  assert(pthread_once(&sched_key_once, coroutine_sched_key_creator) == 0); // 保证调度器的键只会被创建一次: 确保 coroutine_sched_key_creator 函数只会被执行一次,用于创建线程局部存储的键保证调度器的键只会被创建一次
  schedule *sched = coroutine_get_sched(); // 获取当前线程的调度器
  if (sched == NULL) { // 当前线程尚未拥有调度器,需要先创建调度器:
    schedule_create(0); // 创建调度器 
    
    sched = coroutine_get_sched();
    if (sched == NULL) { // 创建调度器失败
      printf("Failed to create scheduler\n");
      return -1;
    }
  }
  coroutine *co = calloc(1, sizeof(coroutine)); // 为新的协程分配内存空间
  if (co == NULL) { // 分配失败
    printf("Failed to allocate memory for new coroutine\n");
    return -2;
  }
    // 初始化协程各个成员
  co->stack = NULL;
  co->stack_size = 0;
  co->sched = sched; // 所属调度器
  co->status = BIT(COROUTINE_STATUS_NEW); // 状态:新建
  co->id = sched->spawned_coroutines ++; // 协程的id,同时也是调度器中已创建的协程数量
  co->func = func; // 执行的函数
  co->fd_wait = -1;
  co->arg = arg; // 函数参数
  co->birth = coroutine_usec_now(); // 协程创建的时间戳
    
  *new_co = co; // 将新创建的协程指针赋给传入的参数
  TAILQ_INSERT_TAIL(&co->sched->ready, co, ready_next); // 将协程插入到调度器的就绪队列的尾部,以便后续调度器可以从就绪队列中选择协程执行
  return 0;
}
4.运行调度器
void schedule_run(void) {
  schedule *sched = coroutine_get_sched(); // 获取当前调度器
  if (sched == NULL) return ;
  while (!schedule_isdone(sched)) {
    // 1. expried coroutine in sleep rbtree
    // 获取超时的协程,并逐个执行这些协程的恢复操作
    coroutine *expired = NULL;
    while ((expired = schedule_expired(sched)) != NULL) {
      coroutine_resume(expired);
    }
    // 2. ready queue
        // 处理就绪队列中的协程:从就绪队列中依次取出协程,并执行它们的恢复操作
    coroutine *last_co_ready = TAILQ_LAST(&sched->ready, _coroutine_queue);
    while (!TAILQ_EMPTY(&sched->ready)) {
      coroutine *co = TAILQ_FIRST(&sched->ready);
      TAILQ_REMOVE(&co->sched->ready, co, ready_next);
            
      if (co->status & BIT(COROUTINE_STATUS_FDEOF)) { // 表示文件描述符已关闭,这时释放该协程的资源,然后退出循环。
        coroutine_free(co);
        break;
      }
    
      coroutine_resume(co);
      if (co == last_co_ready) break; // 全部处理完即退出
    }
    // 3. wait rbtree
    schedule_epoll(sched); // 调用epoll_wait轮询调度器中epoll管理的文件描述符,并将就绪事件保存到调度器的 eventlist 中
    while (sched->num_new_events) { // 遍历所有就绪事件
            // 从 num_new_events 中获取事件数量,并将事件数量减一
      int idx = --sched->num_new_events;
            // 获取对应索引处的事件
      struct epoll_event *ev = sched->eventlist+idx;
      
      int fd = ev->data.fd;
            // 检查事件是否为对端关闭连接
      int is_eof = ev->events & EPOLLHUP;
            // 如果事件为对端关闭连接,则设置 errno 为 ECONNRESET
      if (is_eof) errno = ECONNRESET;
            // 根据文件描述符在等待红黑树中查找对应的协程
      coroutine *co = schedule_search_wait(fd);
            // 如果找到了对应的协程,则恢复该协程的执行
      if (co != NULL) {
        if (is_eof) { // 如果事件为对端关闭连接,则设置协程状态为已关闭文件描述符
          co->status |= BIT(COROUTINE_STATUS_FDEOF);
        }
                // 恢复协程的执行
        coroutine_resume(co);
      }
            // 将 is_eof 重置为 0,以便下次循环使用
      is_eof = 0;
    }
  }
  schedule_free(sched); // 所有任务都完成后,释放调度器的资源,并返回。
  
  return ;
}

由于篇幅,只能列举部分源码进行说明,协程全部源码
协程源码
每一个函数都有详细注释,学习首选

下一节继续讲解hook系统调用

推荐学习https://xxetb.xetslk.com/s/p5Ibb

目录
相关文章
|
存储 缓存 安全
10款轻量型的嵌入式GUI库分享
10款轻量型的嵌入式GUI库分享
1186 1
|
网络协议 Unix Linux
有了协程库,开发DPDK应用程序第一次可以这么简单
使用PhotonLibOS协程库,以多执行单元并发的代码模型代替原先的异步回调模型,简化DPDK应用程序的开发。同时使用echo server验证了 用户态TCP/IP协议栈+轮询模式驱动 对比 内核原生协议栈+中断模式驱动 的性能优势
10163 0
有了协程库,开发DPDK应用程序第一次可以这么简单
|
Shell Linux Android开发
【Linux】【编译相关】execvp: /bin/sh: Argument list too long问题处理小结
【Linux】【编译相关】execvp: /bin/sh: Argument list too long问题处理小结
2034 0
|
7月前
|
JavaScript 数据安全/隐私保护
Vue Amazing UI 组件库(Vue3+TypeScript+Vite 等最新技术栈开发)
Vue Amazing UI 是一个基于 Vue 3、TypeScript、Vite 等最新技术栈开发构建的现代化组件库,包含丰富的 UI 组件和常用工具函数,并且持续不断维护更新中。另外,组件库全量使用 TypeScript,支持自动按需引入和 Tree Shaking 等,能够显著提升开发效率,降低开发成本。
459 5
Vue Amazing UI 组件库(Vue3+TypeScript+Vite 等最新技术栈开发)
|
6月前
|
消息中间件 缓存 负载均衡
php怎么解决高并发的问题
在PHP中处理高并发问题需要多方面的优化,包括使用缓存技术、异步处理、数据库优化、负载均衡、选择合适的架构以及优化服务器配置。通过结合这些技术,可以显著提高PHP应用的并发处理能力,确保在高并发场景下依然能够提供稳定和高效的服务。
155 12
|
6月前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁主要依靠一个SETNX指令实现的 , 这条命令的含义就是“SET if Not Exists”,即不存在的时候才会设置值。 只有在key不存在的情况下,将键key的值设置为value。如果key已经存在,则SETNX命令不做任何操作。 这个命令的返回值如下。 ● 命令在设置成功时返回1。 ● 命令在设置失败时返回0。 假设此时有线程A和线程B同时访问临界区代码,假设线程A首先执行了SETNX命令,并返回结果1,继续向下执行。而此时线程B再次执行SETNX命令时,返回的结果为0,则线程B不能继续向下执行。只有当线程A执行DELETE命令将设置的锁状态删除时,线程B才会成功执行S
|
6月前
|
消息中间件 缓存 负载均衡
php怎么解决高并发的问题
在实际应用中,应根据具体需求和应用场景,选择合适的优化方案,并进行持续监控和优化,确保系统的高效稳定运行。
333 6
|
存储 弹性计算 监控
几百T的视频、图片数据进行更有效地存储和管理
采用传统硬盘搭建存储方案,看起来成本低廉,但是再加上各种附加因素后却大幅攀升,而云存储厂商通常提供基于订阅的定价模型、一些免费服务和一定的折扣。现在,我们就来了解一下如何更省钱地使用云存储。
20382 45
几百T的视频、图片数据进行更有效地存储和管理
|
并行计算 Java 调度
C/C++协程编程:解锁并发编程新纪元
C/C++协程编程:解锁并发编程新纪元
289 0
|
存储 弹性计算 数据库
云计算概念和与云服务的区别
“云”在计算机科学和信息技术领域通常指“云计算”,即通过互联网提供计算资源(如服务器、存储、数据库、网络、软件、分析等)的模式。用户可以按需访问和使用这些资源,而无需管理和维护实际的硬件和软件。
1591 3

热门文章

最新文章