协程设计原理

简介: 协程设计原理

1、协程的背景

1.1、同步与异步

对于响应式服务器,所有的客户端的操作驱动都是来源 epoll_wait 循环的反馈结果

while (1) {
      int nready = epoll_wait(epfd, events, EVENT_SIZE, -1);
      for (i = 0;i < nready; i++) {
          int sockfd = events[i].data.fd;
          if (sockfd == listenfd) {
          int connfd = accept(listenfd, xxx, xxxx);
          setnonblock(connfd);
          ev.events = EPOLLIN | EPOLLET;
          ev.data.fd = connfd;
          epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
      } else {
          handle(sockfd);
      }
  } }

I/O 有同步和异步两种处理方式。

同步 IO 与 异步 IO

方法一:io 同步操作

  • 发起请求:等待响应
  • 接受响应

对 io 的操作和 epoll_wait 放在同一流程里,需要等待 io 的响应。

int handle(int sockfd) {
     recv(sockfd, rbuffer, length, 0);
     parser_proto(rbuffer, length);
     send(sockfd, sbuffer, length, 0);
 }

优点:sockfd 管理方便,操作逻辑清晰缺点:依赖 io 响应速度,性能差

方法二:io 异步操作

handle 函数内部将 sockfd 的操作,push 到线程池中,在 io 数据拷贝阶段可以做其他事。

int thread_cb(int sockfd) {
     // 此函数是在线程池创建的线程中运行,与 handle 不在一个线程上下文中运行
     recv(sockfd, rbuffer, length, 0);
     parser_proto(rbuffer, length);
     send(sockfd, sbuffer, length, 0);
 }
 int handle(int sockfd) {
     //此函数在主线程 main_thread 中运行,在此处之前,确保线程池已经启动。
     push_thread(sockfd, thread_cb); //将 sockfd 放到其他线程中运行。
 }

优点:子模块性好规划,性能高。缺点:模块间管理 sockfd 复杂,代码逻辑复杂(回调函数多)

1.2、为什么使用协程

从性能方面来看,对于使用异步 io 的线程,存在三个问题:

  • 系统线程占用大量的内存空间
  • 线程切换占用大量的系统时间
  • 为了线程安全,线程间需要加锁保护资源,降低执行的效率

从编程角度来看,无论同步还是异步编程方式,都是基于事件驱动的。事件驱动流程包括注册事件,绑定回调,触发回调,提高了系统的并发。但是由于回调的多层嵌套,使得编程复杂,降低了代码的可维护性。

在资源有限的前提下,高性能服务需要解决的问题有:

  • 减少线程的重复高频创建:线程池
  • 尽量避免线程的阻塞
  • Reactor + 非阻塞回调:解决问题的能力有限
  • 响应式编程:容易陷入回调地狱,割裂业务逻辑
  • 协程:将同 io 转成异步 io


  • 提升代码的可维护与可理解性:减少回调函数,减少回调链深度

而协程的出现,可以很好地解决上述问题。

协程运行在线程之上。当一个协程调用阻塞 io,主动让出CPU( yield 原语) ,让另一个协程运行在当前线程之上( resume 原语)。协程没有增加线程数量,只是在线程的基础上通过分时复用的方式运行多个协程,降低了系统内存。而且协程的切换在用户态完成,减少了系统切换开销。

综上所述,协程的优势体现在:

  • 消耗系统资源和切换代价更小
  • 协程可以实现无锁编程
  • 简化了异步编程,可以达到以同步的编程方式实现异步的性能

1.3、协程的适用场景

协程适用于 I/O 密集型业务,线程切换频繁。其他情况,性能不会有太大的提升。

2、协程的原语操作

协程的原语操作

  • yield: 协程主动让出CPU给调度器。时机:业务提交 -> epoll_wait
  • resume: 调度器恢复协程的运行权。时机:epoll_wait -> 业务处理

resume 和 yield 是两个可逆的原子操作。

io 异步操作函数执行流程如下:

  • 将 sockfd 添加到 epoll 管理
  • 由协程上下文 yield 到调度器的上下文
  • 调度器获取下一个协程上下文,resume 新的协程

协程_原语操作


// yield 原语操作:协程主动让出CPU给调度器
 void nty_coroutine_yield(nty_coroutine *co) {
     co->ops = 0;
     // 保存cpu寄存器的值到该协程,加载协程调度器保存的上下文到cpu寄存器上运行
     _switch(&co->sched->ctx, &co->ctx);
 }
 // resume 原语操作:调度器恢复协程的运行权
 int nty_coroutine_resume(nty_coroutine *co) {
     if (co->status & BIT(NTY_COROUTINE_STATUS_NEW)) {
         nty_coroutine_init(co);
     }
     // 获取协程调度器
     nty_schedule *sched = nty_coroutine_get_sched();
     // 设置当前调度器指向的协程
     sched->curr_thread = co;
     // 保存cpu寄存器的值到调度器,加载该协程保存的上下文到cpu寄存器上运行
     _switch(&co->ctx, &co->sched->ctx);
     // 设置当前调度器指向的协程
     sched->curr_thread = NULL;
     nty_coroutine_madvise(co);
     // 释放协程占用的空间
     if (co->status & BIT(NTY_COROUTINE_STATUS_EXITED)) {
         if (co->status & BIT(NTY_COROUTINE_STATUS_DETACH)) {
             printf("nty_coroutine_resume --> \n");
             nty_coroutine_free(co);
         }
         return -1;
     } 
     return 0;
 }

3、协程的切换

协程的上下文如何进行切换?现有的 C++ 协程库均基于两种方案

  • 汇编实现:libco,Boost.context
  • OS 提供的 API :
  • phxrpc:基于 ucontext / Boost.context 的上下文切换
  • libmill:基于 setjump/longjump 的协程切换


一般来说,基于汇编的上下文切换要比采用系统调用的切换更加高效

3.1、汇编实现

x86-64有16个64位寄存器,分别是:%rax,%rbx,%rcx,%rdx,%rdi,%rsi,%rbp,%rsp,%r8,%r9,%r10,%r11,%r12,%r13,%r14,%r15。

  • %rax: Return value, 作为函数返回值使用。
  • %rsp: Stack pointer,栈指针,栈顶指针,指向栈顶
  • %rbp: Base pointer,基址指针,栈桢(栈底)指针 Frame pointer,指向栈的底部
  • %rdi,%rsi,%rdx,%rcx,%r8,%r9: 用作函数参数,依次对应第1, ..., 6参数
  • %rbx,%r12,%r13,%r14,%r15: Caller saved,被调用者保护寄存器(易失性寄存器),程序调用过程中寄存器的值不需要保存。如果要保存,则调用者负责压栈。
  • %r10,%r11: Callee-owned,调用者保护寄存器(非易失性寄存器)。程序调用过程中,需要保存,不能覆盖。被调用寄存器先保存值然后再调用,调用结束后恢复调用前的值。
  • %rip: Instruction pointer, 相当于PC指针指向当前的指令地址,指向下一条要执行的指令

栈帧


协程上下文切换,就是先将 cpu 寄存器的值暂时保存到 cur_ctx,再将即将运行的协程的上下文 new_ctx 的值 mov 到相对应的 cpu 寄存器上,完成切换。

协程的切换


切换函数 _switch 的定义

/**
  * @brief switch实现协程切换,保存cpu寄存器的值到cur_ctx,加载new_ctx上下文到cpu寄存器
  * @param new_ctx: 对应寄存器rdi,即将运行协程的上下文,加载它的上下文到cpu寄存器
  * @param cur_ctx: 对应寄存器rsi,正在运行协程的上下文,保存cpu寄存器到它的上下文 
  * @return int 
  */
 int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);
 // 
 __asm__ (
 "    .text                                  \n"
 "       .p2align 4,,15                                   \n"
 ".globl _switch                                          \n"
 ".globl __switch                                         \n"
 "_switch:                                                \n"
 "__switch:                                               \n"
 "       movq %rsp, 0(%rsi)      # save stack_pointer     \n"
 "       movq %rbp, 8(%rsi)      # save frame_pointer     \n"
 "       movq (%rsp), %rax       # save insn_pointer      \n"
 "       movq %rax, 16(%rsi)                              \n"
 "       movq %rbx, 24(%rsi)     # save rbx,r12-r15       \n"
 "       movq %r12, 32(%rsi)                              \n"
 "       movq %r13, 40(%rsi)                              \n"
 "       movq %r14, 48(%rsi)                              \n"
 "       movq %r15, 56(%rsi)                              \n"
 "       movq 56(%rdi), %r15                              \n"
 "       movq 48(%rdi), %r14                              \n"
 "       movq 40(%rdi), %r13     # restore rbx,r12-r15    \n"
 "       movq 32(%rdi), %r12                              \n"
 "       movq 24(%rdi), %rbx                              \n"
 "       movq 8(%rdi), %rbp      # restore frame_pointer  \n"
 "       movq 0(%rdi), %rsp      # restore stack_pointer  \n"
 "       movq 16(%rdi), %rax     # restore insn_pointer   \n"
 "       movq %rax, (%rsp)                                \n"
 "       ret                                              \n"
 );

nty_cpu_ctx 结构体,存储寄存器的值

// x86 寄存器列表,每个寄存器8字节
 typedef struct _nty_cpu_ctx {
     void *esp; 
     void *ebp;
     void *eip;
     void *edi;
     void *esi;
     void *ebx;
     void *r1;
     void *r2;
     void *r3;
     void *r4;
     void *r5;
 } nty_cpu_ctx;

4、协程的定义

协程数据结构设计分为两部分

  • 运行体 R:包含运行状态(就绪,睡眠,等待),运行体回调函数,回调参数,栈指针,栈大小,当前运行体
  • 调度器 S:包含执行集合(就绪,睡眠,等待)

4.1、多状态集合设计

新创建的协程,创建完成后,加入到就绪集合,等待调度器的调度;协程在运行完成后,进行 IO 操作,此时 IO 并未准备好,进入等待状态集合;IO 准备就绪,协程开始运行,后续进行 sleep 操作,此时进入到睡眠状态集合。

那么,运行体如何在多状态集合高效切换?三种集合如何设置合理的数据结构?

  • 就绪 (ready) 集合:不设置优先级,所有协程优先级一致,使用队列存储就绪的协程,简称就绪队列 (ready_queue)
  • 睡眠 (sleep) 集合:需要对睡眠时长排序,采用红黑树来存储,简称睡眠树 (sleep_tree)。key 为睡眠时长,value 为对应的协程结点。
  • 等待 (wait) 集合:需要对 IO 等待时间排序,采用红黑树来存储,简称等待树 (wait_tree)。

数据结构


运行体设计如下:

struct coroutine {
     nty_cpu_ctx ctx;                    //上下文环境,保存CPU寄存器
     proc_coroutine func;                // 子过程的回调函数
     void *arg;                         // 子过程回调函数的参数
     void *ret;                         // 子过程回调函数的返回值
     nty_coroutine_status status;         // 运行状态:ready, wait, sleep
     nty_schedule *sched;                // 调度器
     uint64_t birth;                     // 创建时间
     uint64_t id;                        // 协程 id
     void *stack;                        // 栈空间
     size_t stack_size;                   // 栈空间大小
     RB_ENTRY(_nty_coroutine) sleep_node;    // 睡眠 sleep 树
     RB_ENTRY(_nty_coroutine) wait_node;     // 等待 wait 树
     TAILQ_ENTRY(_nty_coroutine) ready_next; // 就绪 ready 队列
 };

4.2、调度器的定义

每一协程都需要使用的而且可能会不同属性的,就是协程属性(私有)。每一协程都需要的而且数据一致的,就是调度器的属性(公共)。调度器是管理所有协程运行的组件。

// 调度策略
 struct scheduler_op {
     remove_wait();
     remove_sleep();
 };
 // 调度器,用来管理所有的协程
 struct scheduler {
     int epfd;                   
     struct epoll_event events[];  
     struct coroutine *cur;   // 当前运行的协程
     queue_tail(, struct coroutine) ready;  // 指向就绪队列
     rbtree_root(, struct coroutine) wait;  // 指向等待树
     rbtree_root(, struct coroutine) sleep; // 指向睡眠树
     struct scheduler_op *sch_op; //调度策略
 };

调度器与协程(每个协程对应一个客户端)的运行关系

调度器与协程的关系

if (io 是否可写) {
     connect();
 }
 else {
     epoll_ctl(epfd, fd);
     yield();
 }

4.2、调度策略

协程如何被调度,有两种方案,生产者消费者模式和多状态运行。

生产者消费者模式


while (1) {
      //遍历睡眠集合,将满足条件的加入到 ready
      nty_coroutine *expired = NULL;
      while ((expired = sleep_tree_expired(sched)) != NULL) {
         TAILQ_ADD(&sched->ready, expired);
      }
      //遍历等待集合,将满足添加的加入到 ready
      nty_coroutine *wait = NULL;
      int nready = epoll_wait(sched->epfd, events, EVENT_MAX, 1);
      for (i = 0;i < nready;i ++) {
         wait = wait_tree_search(events[i].data.fd);
         TAILQ_ADD(&sched->ready, wait);
      }
      // 使用 resume 恢复 ready 的协程运行权
      while (!TAILQ_EMPTY(&sched->ready)) {
         nty_coroutine *ready = TAILQ_POP(sched->ready);
         resume(ready);
      }
 }

多状态运行


while (1) {
      //遍历睡眠集合,使用 resume 恢复 expired 的协程运行权
      nty_coroutine *expired = NULL;
      while ((expired = sleep_tree_expired(sched)) != NULL) {
          resume(expired);
      }
      //遍历等待集合,使用 resume 恢复 wait 的协程运行权
      nty_coroutine *wait = NULL;
      int nready = epoll_wait(sched->epfd, events, EVENT_MAX, 1);
      for (i = 0;i < nready;i ++) {
          wait = wait_tree_search(events[i].data.fd);
          resume(wait);
      }
      // 使用 resume 恢复 ready 的协程运行权
      while (!TAILQ_EMPTY(sched->ready)) {
          nty_coroutine *ready = TAILQ_POP(sched->ready);
          resume(ready);
     }
 }

5、api 封装

5.1、hook 机制

通过 hook 系统的 socket 函数族来实现无需修改代码的异步化改造。简单来说,就是利用动态链接的原理来修改符号指向。OS 提供了运行时动态链接共享库的方式。通过共享库的方式让引用程序在下次运行时执行不同的代码,这也为应用程序 Hook 系统函数提供了基础。

Unix-like 系统提供了 dlopendlsym 系列函数来供程序在运行时操作外部的动态链接库,从而获取动态链接库中的函数或者功能调用。

例如:微信后台的 c/c++ 协程库 libco。 libco 使用动态链接 Hook 系统函数,最大的特点是将系统中的关于网络操作的阻塞函数全部进行相应的非侵入式改造,对于 readwrite 等阻塞函数,libco 均定义了自己的版本,然后通过 LD_PRELOAD 进行运行时地解析,从而来达到阻塞时自动让出协程,并在 IO 事件发生时唤醒协程的目的。

5.2、hook 函数原型

#define _GNU_SOURCE
 #include <dlfcn.h>
 void *dlsym(void *handle, const char *symbol);
 void *dlvsym(void *handle, char *symbol, char *version);
 link with -ldl

5.3、hook 的使用

  • typedef 原有的系统函数指针
  • 定义函数指针指
  • 改造原有的系统函数
  • init_hook
// gcc -ldl
 #include <dlfcn.h>
 #define _GNU_SOURCE
 // 1、typedef 原有的系统函数指针
 typedef ssize_t(*send_t)(int sockfd, const void *buf, size_t len, int flags);
 // 2、定义函数指针
 recv_t recv_f = NULL;
 // 3、改造原有的系统函数
 ssize_t recv(int fd, void *buf, size_t len, int flags) {
     printf("recv\n");
     int ret = recv_f(fd, buf, len, flags);
     return ret;
 }
 int init_hook(void) {
     recv_f = (recv_t)dlsym(RTLD_NEXT, "recv"); 
 }

6、多核模式

  • 一个线程一个调度器,简单,不需要加锁
  • 一个进程一个调度器,简单,不需要加锁
  • 多个线程共用一个调度器,复杂,入队需要加锁

7、性能测试

测试标准

  • 并发量:fd数量、协程的数量
  • 每秒接入量:fd -> coroutine_create
  • 断开连接:coroutine_destory

8、参考

相关文章
|
8月前
|
API 调度
2.3.1 协程设计原理与汇编实现
2.3.1 协程设计原理与汇编实现
|
5月前
|
存储 Linux 调度
协程(coroutine)的原理和使用
协程(coroutine)的原理和使用
|
2月前
|
存储 安全 测试技术
GoLang协程Goroutiney原理与GMP模型详解
本文详细介绍了Go语言中的Goroutine及其背后的GMP模型。Goroutine是Go语言中的一种轻量级线程,由Go运行时管理,支持高效的并发编程。文章讲解了Goroutine的创建、调度、上下文切换和栈管理等核心机制,并通过示例代码展示了如何使用Goroutine。GMP模型(Goroutine、Processor、Machine)是Go运行时调度Goroutine的基础,通过合理的调度策略,实现了高并发和高性能的程序执行。
155 29
|
2月前
|
负载均衡 算法 Go
GoLang协程Goroutiney原理与GMP模型详解
【11月更文挑战第4天】Goroutine 是 Go 语言中的轻量级线程,由 Go 运行时管理,创建和销毁开销小,适合高并发场景。其调度采用非抢占式和协作式多任务处理结合的方式。GMP 模型包括 G(Goroutine)、M(系统线程)和 P(逻辑处理器),通过工作窃取算法实现负载均衡,确保高效利用系统资源。
|
4月前
|
调度 Python
揭秘Python并发编程核心:深入理解协程与异步函数的工作原理
在Python异步编程领域,协程与异步函数成为处理并发任务的关键工具。协程(微线程)比操作系统线程更轻量级,通过`async def`定义并在遇到`await`表达式时暂停执行。异步函数利用`await`实现任务间的切换。事件循环作为异步编程的核心,负责调度任务;`asyncio`库提供了事件循环的管理。Future对象则优雅地处理异步结果。掌握这些概念,可使代码更高效、简洁且易于维护。
41 1
|
3月前
|
数据采集 调度 Python
Python编程异步爬虫——协程的基本原理(一)
Python编程异步爬虫——协程的基本原理(一)
26 0
|
3月前
|
数据采集 Python
Python编程异步爬虫——协程的基本原理(二)
Python编程异步爬虫——协程的基本原理(二)
29 0
|
3月前
|
存储 前端开发 rax
协程设计与原理(二)
协程设计与原理(二)
24 0
|
3月前
|
Java Linux Go
协程的设计原理(一)
协程的设计原理(一)
44 0
|
6月前
|
调度 Python
揭秘Python并发编程核心:深入理解协程与异步函数的工作原理
【7月更文挑战第15天】Python异步编程借助协程和async/await提升并发性能,减少资源消耗。协程(async def)轻量级、用户态,便于控制。事件循环,如`asyncio.get_event_loop()`,调度任务执行。异步函数内的await关键词用于协程间切换。回调和Future对象简化异步结果处理。理解这些概念能写出高效、易维护的异步代码。
70 2
下一篇
开通oss服务