本文详细介绍了协程的核心组件,包括协程结构体(包含上下文、状态、调度器等)和调度器结构(管理协程、事件处理和文件描述符)。还讲解了如何创建协程和运行调度器的过程,以及它们在异步并发中的应用。
摘要由CSDN通过智能技术生成
概述
本节讲解内容如下:
1.协程的核心组件:协程结构体、调度器结构体
2.对应函数:创建协程、运行调度器
适用对象:已经了解什么是协程、协程出现的原因、了解协程应用场景的朋友
如果是小白,请先学习协程入门篇
正文:
本文列举部分源码,协程全部源码https://github.com/wuj1nquan/bitco 对每一个函数都有详细注释
协程使用同步的编程方式实现异步的性能,实现了单线程中多个函数的并发
1.协程结构体
每一个协程可以抽象为一个函数的执行体,也就是说,每一个函数都可以变成一个协程,在同一个线程里,这些函数(协程)由调度器进行调度,实现并发,模拟了操作系统的并发
每一个协程都有对应的状态:执行、等待、睡眠,以及关联的文件描述符,所属调度器,以及其在队列、红黑树、链表中的节点,以及所关联的函数的上下文,用于恢复协程,协程的让出和恢复发生在其所属调度器和协程本身之间
协程结构体:
typedef struct _coroutine { ucontext_t ctx; // 协程的上下文信息 proc_coroutine func; // 协程执行的函数 void *arg; // 传递给协程执行函数的参数 void *data; // 协程私有数据 size_t stack_size; // 栈大小 size_t last_stack_size; // 上次栈大小 coroutine_status status; // 协程的状态 schedule *sched; // 指向协程所属的调度器 uint64_t birth; // 协程创建的事件戳 uint64_t id; // 协程的唯一标识符 int fd; // 与协程关联的文件描述符 unsigned short events; //POLL_EVENT char funcname[64]; //协程执行的函数名称 struct _coroutine *co_join; void **co_exit_ptr; void *stack; // 栈空间 void *ebp; //栈指针 uint32_t ops; // 当前协程的操作码 uint64_t sleep_usecs; //休眠时间 RB_ENTRY(_coroutine) sleep_node; // 睡眠队列中的红黑树节点 RB_ENTRY(_coroutine) wait_node; // 等待队列中的红黑树节点 LIST_ENTRY(_coroutine) busy_next; // 忙碌协程链表中的下一个指针 TAILQ_ENTRY(_coroutine) ready_next; // 就绪队列中的下一个指针 TAILQ_ENTRY(_coroutine) defer_next; // 延迟队列中的下一个指针 TAILQ_ENTRY(_coroutine) cond_next; // 条件变量队列中的下一个指针 TAILQ_ENTRY(_coroutine) io_next; // I/O 就绪队列中的下一个指针 TAILQ_ENTRY(_coroutine) compute_next; // 计算就绪队列中的下一个指针 struct { // 协程执行 I/O 操作所需的相关信息 void *buf; size_t nbytes; int fd; int ret; int err; } io; struct coroutine_compute_sched *compute_sched; // 指向计算调度器的指针 int ready_fds; // 就绪的文件描述符数量 struct pollfd *pfds; // 指向 pollfd 结构体数组的指针 nfds_t nfds; // pollfd 数组的大小 } coroutine;
2.调度器结构体
每一个线程拥有一个调度器,管理着这个线程所有的协程,每一个协程创建时都会加入到其所在线程的调度器中,调度器拥有红黑树、链表、就绪队列,用来管理不同状态的协程
此外还需要epoll来通知调度器,哪些协程中的文件描述符处于就绪状态,通过文件描述符查找红黑树对应的协程节点,调度器运行时将处于就绪状态的文件描述符所在的协程恢复运行
调度器在运行时会依次处理1.超时的协程 2. 就绪队列的协程 3.等待红黑树中的协程
如果一个调度器的等待队列、忙碌链表、睡眠红黑树和就绪队列都为空,则调度器完成了工作。
typedef struct _coroutine { ucontext_t ctx; // 协程的上下文信息 proc_coroutine func; // 协程执行的函数 void *arg; // 传递给协程执行函数的参数 void *data; // 协程私有数据 size_t stack_size; // 栈大小 size_t last_stack_size; // 上次栈大小 coroutine_status status; // 协程的状态 schedule *sched; // 指向协程所属的调度器 uint64_t birth; // 协程创建的事件戳 uint64_t id; // 协程的唯一标识符 int fd; // 与协程关联的文件描述符 unsigned short events; //POLL_EVENT char funcname[64]; //协程执行的函数名称 struct _coroutine *co_join; void **co_exit_ptr; void *stack; // 栈空间 void *ebp; //栈指针 uint32_t ops; // 当前协程的操作码 uint64_t sleep_usecs; //休眠时间 RB_ENTRY(_coroutine) sleep_node; // 睡眠队列中的红黑树节点 RB_ENTRY(_coroutine) wait_node; // 等待队列中的红黑树节点 LIST_ENTRY(_coroutine) busy_next; // 忙碌协程链表中的下一个指针 TAILQ_ENTRY(_coroutine) ready_next; // 就绪队列中的下一个指针 TAILQ_ENTRY(_coroutine) defer_next; // 延迟队列中的下一个指针 TAILQ_ENTRY(_coroutine) cond_next; // 条件变量队列中的下一个指针 TAILQ_ENTRY(_coroutine) io_next; // I/O 就绪队列中的下一个指针 TAILQ_ENTRY(_coroutine) compute_next; // 计算就绪队列中的下一个指针 struct { // 协程执行 I/O 操作所需的相关信息 void *buf; size_t nbytes; int fd; int ret; int err; } io; struct coroutine_compute_sched *compute_sched; // 指向计算调度器的指针 int ready_fds; // 就绪的文件描述符数量 struct pollfd *pfds; // 指向 pollfd 结构体数组的指针 nfds_t nfds; // pollfd 数组的大小 } coroutine;
3.创建一个协程
int coroutine_create(coroutine **new_co, proc_coroutine func, void *arg) { // 创建一个新的协程,并将其添加到调度器的就绪队列 assert(pthread_once(&sched_key_once, coroutine_sched_key_creator) == 0); // 保证调度器的键只会被创建一次: 确保 coroutine_sched_key_creator 函数只会被执行一次,用于创建线程局部存储的键保证调度器的键只会被创建一次 schedule *sched = coroutine_get_sched(); // 获取当前线程的调度器 if (sched == NULL) { // 当前线程尚未拥有调度器,需要先创建调度器: schedule_create(0); // 创建调度器 sched = coroutine_get_sched(); if (sched == NULL) { // 创建调度器失败 printf("Failed to create scheduler\n"); return -1; } } coroutine *co = calloc(1, sizeof(coroutine)); // 为新的协程分配内存空间 if (co == NULL) { // 分配失败 printf("Failed to allocate memory for new coroutine\n"); return -2; } // 初始化协程各个成员 co->stack = NULL; co->stack_size = 0; co->sched = sched; // 所属调度器 co->status = BIT(COROUTINE_STATUS_NEW); // 状态:新建 co->id = sched->spawned_coroutines ++; // 协程的id,同时也是调度器中已创建的协程数量 co->func = func; // 执行的函数 co->fd_wait = -1; co->arg = arg; // 函数参数 co->birth = coroutine_usec_now(); // 协程创建的时间戳 *new_co = co; // 将新创建的协程指针赋给传入的参数 TAILQ_INSERT_TAIL(&co->sched->ready, co, ready_next); // 将协程插入到调度器的就绪队列的尾部,以便后续调度器可以从就绪队列中选择协程执行 return 0; }
4.运行调度器
void schedule_run(void) { schedule *sched = coroutine_get_sched(); // 获取当前调度器 if (sched == NULL) return ; while (!schedule_isdone(sched)) { // 1. expried coroutine in sleep rbtree // 获取超时的协程,并逐个执行这些协程的恢复操作 coroutine *expired = NULL; while ((expired = schedule_expired(sched)) != NULL) { coroutine_resume(expired); } // 2. ready queue // 处理就绪队列中的协程:从就绪队列中依次取出协程,并执行它们的恢复操作 coroutine *last_co_ready = TAILQ_LAST(&sched->ready, _coroutine_queue); while (!TAILQ_EMPTY(&sched->ready)) { coroutine *co = TAILQ_FIRST(&sched->ready); TAILQ_REMOVE(&co->sched->ready, co, ready_next); if (co->status & BIT(COROUTINE_STATUS_FDEOF)) { // 表示文件描述符已关闭,这时释放该协程的资源,然后退出循环。 coroutine_free(co); break; } coroutine_resume(co); if (co == last_co_ready) break; // 全部处理完即退出 } // 3. wait rbtree schedule_epoll(sched); // 调用epoll_wait轮询调度器中epoll管理的文件描述符,并将就绪事件保存到调度器的 eventlist 中 while (sched->num_new_events) { // 遍历所有就绪事件 // 从 num_new_events 中获取事件数量,并将事件数量减一 int idx = --sched->num_new_events; // 获取对应索引处的事件 struct epoll_event *ev = sched->eventlist+idx; int fd = ev->data.fd; // 检查事件是否为对端关闭连接 int is_eof = ev->events & EPOLLHUP; // 如果事件为对端关闭连接,则设置 errno 为 ECONNRESET if (is_eof) errno = ECONNRESET; // 根据文件描述符在等待红黑树中查找对应的协程 coroutine *co = schedule_search_wait(fd); // 如果找到了对应的协程,则恢复该协程的执行 if (co != NULL) { if (is_eof) { // 如果事件为对端关闭连接,则设置协程状态为已关闭文件描述符 co->status |= BIT(COROUTINE_STATUS_FDEOF); } // 恢复协程的执行 coroutine_resume(co); } // 将 is_eof 重置为 0,以便下次循环使用 is_eof = 0; } } schedule_free(sched); // 所有任务都完成后,释放调度器的资源,并返回。 return ; }
由于篇幅,只能列举部分源码进行说明,协程全部源码
协程源码 对每一个函数都有详细注释,学习首选
下一节继续讲解hook系统调用