2.7 协程设计原理

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: 2.7 协程设计原理

一、为什么要有协程?

以DNS请求为例子,客户端向服务器发送域名,服务器回复该域名对应得IP地址。

在Linux下,常使用IO多路复用器epoll来管理客户端连接,其主循环框架如下

while (1){
    int nready = epoll_wait(epfd, events, EVENT_SIZE, -1);
    int i=0;
    for (i=0; i<nready; i++){
        int sockfd = events[i].data.fd;
        if (sockfd == listenfd){
            int connfd = accept(listenfd, addr, &addr_len);
            setnonblock(connfd); //置为非阻塞
            ev.events = EPOLLIN | EPOLLET;
            ev.data.fd = connfd;
            epoll_ctl(epfd, EPOLL_CTL_ADD,connfd,&ev);
        }else{
            handel(sockfd); //进行读写操作
        }
    }
}

对于handel(sockfd)有两种处理方式:同步以及异步。

同步

handle(sockfd)函数内部对 sockfd 进行读写动作,并且handle 的 io 操作(send,recv)与 epoll_wait 是在同一个处理流程里面的,即IO 同步操作。

int handle(int sockfd) {
recv(sockfd, rbuffer, length, 0);
parser_proto(rbuffer, length);
send(sockfd, sbuffer, length, 0);
}

异步

handle(sockfd)函数内部将 sockfd 的操作,push 到线程池中。handle函数是将 sockfd 处理方式放到另一个已经其他的线程中运行,如此做法,将 io 操作(recv,send)与 epoll_wait 不在一个处理流程里面,使得 io操作(recv,send)与 epoll_wait 实现解耦,即 IO 异步操作。

int thread_cb(int sockfd) {
// 此函数是在线程池创建的线程中运行。
// 与 handle 不在一个线程上下文中运行
recv(sockfd, rbuffer, length, 0);
parser_proto(rbuffer, length);
send(sockfd, sbuffer, length, 0);
}
int handle(int sockfd) {
//此函数在主线程 main_thread 中运行
push_thread(sockfd, thread_cb); //将 sockfd 放到其他线程中运行。
}

对比


这样子,自然希望有一种方法,能有同步的代码逻辑,异步的性能,来方便编程人员对 IO 操作的。这就是一种轻量级的协程,在每次 send 或者 recv 之前进行切换,再由调度器来处理 epoll_wait 的流程。


二、协程的原语操作

协程的实现原理是在单个线程中创建多个协程,并通过“切换”的方式来实现协程之间的调度。协程之间的切换是由程序自己控制的,比线程切换更加快速和高效。协程通常不会阻塞整个线程,而只会阻塞当前的协程,因此可以大幅度降低线程的切换开销,提高程序的执行效率。


1、基本操作

协程包括三大部分:协程的创建、协程的调度器、协程的切换。


创建协程之后,将协程加入调度器,由调度器统一管理,决定执行顺序。

在遇到IO操作之前,协程让出(yield)执行权,交由调度器决定下一个恢复(resume)加载执行的协程。执行完成(或者未就绪)之后,协程再次让出(yield)执行权交给调度器。


061b47cdc27b53535022d8e7bdc4802e_26880412d15e45d1bf892d0047779e9c.png


整体感觉便是调度器将时间划分成不同的时间片,每个时间片交由不同协程完成具体操作。当操作完成或没有操作时,运行权移交给调度器控制。


因此,协程的核心原语操作有:创建(create)、让出(yield)、恢复(resume)

create:创建协程。

yield:由当前协程的上下文切换到调度器的上下文。

resume:由调度器获取下一个将要执行的协程的上下文,恢复协程的运行权。


2、让出(yield)和恢复(resume)

async_Recv(fd, buffer, length) {
  ret = poll(fd); //判断fd是否就绪
  //若未就绪,重新加入到epoll,并切换到下一个协程
  if (ret == notReady) {
    epoll_ctl(add);
    yield (next_fd);  //resume
  }
  else {
    recv;
  }
}
async_Send(fd, buffer, length) {
  ret = poll(fd); //判断fd是否就绪
  //若未就绪,重新加入到epoll,并切换到下一个协程
  if (ret == notReady) {
    epoll_ctl(add);
    yield(next_fd); //resume
  }
  else {
    send;
  }
}
while (1) {
  epool_wait();
  for () {
    async_Recv(fd, buffer, length);
    parse(buffer);  //解析
    async_Send(fd, buffer, length);
  }
}


三、协程的切换(switch)

协程的切换有三种方式:

1、汇编

2、ucontext

3、longjmp / setjmp


1、汇编

x86_64 的寄存器有 16 个 64 位寄存器,分别是:%rax, %rbx, %rcx, %esi, %edi, %rbp, %rsp, %r8, %r9, %r10, %r11, %r12,%r13, %r14, %r15。

%rax:存储函数的返回值;

%rbp:栈指针寄存器,指向栈底;

%rsp:栈指针寄存器,指向栈顶。

%rdi,%rsi,%rdx,%rcx,%r8,%r9:函数的六个参数,依次对应第 1 参数,第 2 参数。如果函数的参数超过六个,那么六个以后的参数会入栈。

%rbx, %rbp, %r12, %r13, %r14, %r15 用作数据存储,遵循调用者使用规则,换句话说,就是随便用。调用子函数之前要备份它,以防它被修改

%r10, %r11 用作数据存储,就是使用前要先保存原值。


上下文切换,就是将CPU的寄存器暂存,再将即将运行的协程的上下文寄存器分别mov到相应的寄存器上。


切换_switch 函数定义:

int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);

参数 1:即将运行协程的上下文

参数 2:正在运行协程的上下文

按照 x86_64 的寄存器定义,%rdi 保存第一个参数的值,即 new_ctx 的值,%rsi 保存第二

个参数的值,即保存 cur_ctx 的值。

__asm__ (
"    .text                                  \n"
"       .p2align 4,,15                                   \n"
".globl _switch                                          \n"
".globl __switch                                         \n"
"_switch:                                                \n"
"__switch:                                               \n"
"       movq %rsp, 0(%rsi)      # save stack_pointer     \n"
"       movq %rbp, 8(%rsi)      # save frame_pointer     \n"
"       movq (%rsp), %rax       # save insn_pointer      \n"
"       movq %rax, 16(%rsi)                              \n"
"       movq %rbx, 24(%rsi)     # save rbx,r12-r15       \n"
"       movq %r12, 32(%rsi)                              \n"
"       movq %r13, 40(%rsi)                              \n"
"       movq %r14, 48(%rsi)                              \n"
"       movq %r15, 56(%rsi)                              \n"
"       movq 56(%rdi), %r15                              \n"
"       movq 48(%rdi), %r14                              \n"
"       movq 40(%rdi), %r13     # restore rbx,r12-r15    \n"
"       movq 32(%rdi), %r12                              \n"
"       movq 24(%rdi), %rbx                              \n"
"       movq 8(%rdi), %rbp      # restore frame_pointer  \n"
"       movq 0(%rdi), %rsp      # restore stack_pointer  \n"
"       movq 16(%rdi), %rax     # restore insn_pointer   \n"
"       movq %rax, (%rsp)                                \n"
"       ret                                              \n"
);

按照 x86_64 的寄存器定义,%rdi 保存第一个参数的值,即 new_ctx 的值,%rsi 保存第二

个参数的值,即保存 cur_ctx 的值。X86_64 每个寄存器是 64bit,8byte。

Movq %rsp, 0(%rsi) 保存在栈指针到 cur_ctx 实例的 rsp 项

Movq %rbp, 8(%rsi)

Movq (%rsp), %rax #将栈顶地址里面的值存储到 rax 寄存器中。Ret 后出栈,执行栈顶

Movq %rbp, 8(%rsi) #后续的指令都是用来保存 CPU 的寄存器到 new_ctx 的每一项中

Movq 8(%rdi), %rbp #将 new_ctx 的值

Movq 16(%rdi), %rax #将指令指针 rip 的值存储到 rax 中

Movq %rax, (%rsp) # 将存储的 rip 值的 rax 寄存器赋值给栈指针的地址的值。

Ret # 出栈,回到栈指针,执行 rip 指向的指令。

上下文环境的切换完成。


2、ucontext

getcontext、makecontext、swapcontext是一组操作上下文(context)的函数,一般用于在用户层级(user space)线程中实现协程(coroutine)。


(1)getcontext(ucontext_t *ucp):该函数会获取当前线程的上下文,并将其保存到ucontext_t类型的结构体指针ucp中,以便后续使用。

(2)makecontext(ucontext_t *ucp, void (*func)(), int argc, ...):创建一个新的执行上下文,并将其与func函数关联,argc表示func函数的参数个数,后面的省略号表示具体的函数参数。通常情况下,我们需要先调用getcontext获取当前线程的上下文,然后再通过makecontext创建一个新的上下文,该上下文可以通过swapcontext函数来激活,并开始执行func函数。

(3)swapcontext(ucontext_t *oucp, const ucontext_t *ucp):用于切换两个不同上下文之间的控制流。oucp和ucp分别表示当前和目标上下文。当调用该函数时,程序会保存当前上下文并开始执行ucp所指定的上下文。

// getcontext(&context);
// makecontext(&context, func, arg);
// swapcontext(&curent_context, &next_context);
#include <stdio.h>
#include <ucontext.h>
ucontext_t ctx[2];
ucontext_t main_ctx;
int count = 0;
void fun1(){
    while (count++ < 50){
        printf("1");
        swapcontext(&ctx[0],&ctx[1]);
        printf("3");
    }
}
void fun2(){
    while (count++ < 50){
        printf("2");
        swapcontext(&ctx[1],&ctx[0]);
        printf("4");
    }
}
int main(){
    char stack1[2048] = {0};
    char stack2[2048] = {0};
    getcontext(&ctx[0]);
    //将用户上下文ctx[1]的栈空间(stack)指针指向了一个已预先分配的内存块stack1
    ctx[0].uc_stack.ss_sp = stack1;
    //将栈空间的大小设置为stack1的大小,也就是在此处分配的内存块大小。
    ctx[0].uc_stack.ss_size = sizeof(stack1);
    //将链接指针(link pointer)设置为main_ctx,也就是当执行完当前上下文时,
    //程序将会切换回main_ctx所指定的上下文。链接指针用于实现协程(coroutine)之间的切换。
    ctx[0].uc_link = &main_ctx;
    makecontext(&ctx[0], fun1 ,0);
    getcontext(&ctx[1]);
    ctx[1].uc_stack.ss_sp = stack1;
    ctx[1].uc_stack.ss_size = sizeof(stack1);
    ctx[1].uc_link = &main_ctx;
    makecontext(&ctx[1], fun2 ,0);
    printf("swapcontext\n");
    swapcontext(&main_ctx, &ctx[0]);
    printf("\n");
}

结果是

123142314231423142314231423142314231423142314231423142314231423142314231423142314231423142314231423

执行顺序是:


8f5bf547e0f1a8b134771195222f4333_c6cb86afbab94d5f957910ef7a6650ad.png


p19 -> p20 -> p21 -> p28 -> p29 -> p30 -> p21 -> p22 -> p19 -> p20 -> p21 -> p30 -> p31 -> p28 -> p29 -> p30 -> p21 -> p22 -> p19 -> p20 -> p21 -> p30 -> p31 -> ……


3、longjmp / setjmp

setjmp 和 longjmp 是两个用于非局部跳转(non-local jump)的 C 语言库函数。


(1)int setjmp(jmp_buf env);:保存当前程序上下文信息到env,并返回0;

(2)void longjmp(jmp_buf env, int val);:直接跳转到该跳转点env的位置,并且恢复该跳转点env的状态。还可以传递一个整数值val,用于作为 setjmp 函数的返回值。

#include <stdio.h>
#include <setjmp.h>
jmp_buf env;
void func(int arg){
    printf("func\n");
    longjmp(env, ++arg);
    printf("longjmp complete\n"); //由于longjmp是非局部跳转操作,所以之前在func函数中的printf语句不会被执行。
}
int main(){
    int ret = setjmp(env);
    if (ret == 0){
        printf("ret == 0\n");
        func(ret);
    }else if (ret == 1){
        printf("ret == 1\n");
        func(ret);
    }
    printf("ret:%d\n",ret);
}

执行结果是

ret == 0
func
ret == 1
func
ret:2

执行顺序

1、在main函数中使用setjmp函数保存当前程序状态。

2、在main函数中判断setjmp返回值ret,如果为0,则说明是第一次调用,执行func函数并将ret作为参数传递;否则,说明是通过longjmp跳转回来的,继续执行func函数。

3、在func函数中,先输出"func",然后通过longjmp跳转回到setjmp处,并将arg+1作为返回值。由于longjmp是非局部跳转操作,所以之前在func函数中的printf语句不会被执行。

4、最终,在main函数结束时输出ret的值


四、协程结构的定义

//协程的状态
typedef enum _co_status {
  CO_NEW,    //新建
  CO_READY, // 就绪
  CO_WAIT,  //等待
  CO_SLEEP, //睡眠
  CO_EXIT,  //退出
} co_status_t;
struct coroutine {
  int birth; //协程创建的时间
  int coid; //协程的id
  struct context ctx; //上下文信息
  struct scheduler* sched; //调度器
  void* (*entry)(void*); //回调函数入口
  void *arg; //回调函数的参数
  void* stack; //独立栈(每个协程有自己的虚拟内存空间) 或 共享栈  
  size_t size; //栈的大小
  co_status_t status; //协程的状态
  queue_node(coroutine) readyq;
  rbtree_node(coroutine) sleept;
  rbtree_node(coroutine) waitt;
  queue__node(coroutine) exitq;
};


五、协程调度器结构的定义

struct scheduler {
  struct coroutine* cur_co; //当前运行的协程
  queue_head(coroutine) readyh;
  rbtree_root(coroutine) sleepr;
  rbtree_root(coroutine) waitr;
  queue_head(coroutine) exith;
};


六、调度策略

调度策略:

1、IO密集型(更多强调IO等待)-- > 把wait放在最前面,把sleep放在最后面

2、计算密集型(更多强调计算结果) --> 把ready放在最前面

schedule(struct schedule* sched) {
  coroutine* co = NULL;
  while ((co = get_expired_node(sched->sleepr)) != NULL) {
    resume(co); 
  }
  while ((co = get_wait_node(sched->waitq)) != NULL) {
    push_ready_queue(co); 
  }
  whike((co = get_next_node(sched->readyq)) != NULL) {
    resume(co);
  }
  whike((co = get_next_node(sched->exitq)) != NULL) {
    destroy(co);
  }
}


七、如何与posix api 兼容

利用hook技术,可以在实现协程的时候与本系统的API实现兼容。

主要函数是dlsym函数

dlsym函数是一个动态链接库函数,它的作用是在动态链接库中查找指定的符号,并返回符号对应的地址。

void *dlsym(void *handle, const char *symbol);


举个例子,连接mysql的时候,我们通过使用hook技术,使得实际执行的connect、recv、send是经过我们重定义后的。

zxm@ubuntu:~/share/opp$ gcc -o mysql mysql.c -lmysqlclient -ldl
zxm@ubuntu:~/share/opp$ ./mysql 
connect
recv
send
mysql_real_connect success
#define _GNU_SOURCE
#include <dlfcn.h>
#include <stdio.h>
#include <sys/types.h>   
#include <sys/socket.h>
#include <mysql/mysql.h>
#define __INIT_HOOK__ init_hook();
#define MING_DB_IP           "192.168.42.128"
#define MING_DB_PORT         3306
#define MING_DB_USENAME      "admin"
#define MING_DB_PASSWORD     "123456"
#define MING_DB_DEFAULTDB    "MING_DB"
typedef int (*connect_t)(int sockfd, const struct sockaddr *addr,socklen_t addrlen);
connect_t connect_f = NULL;
typedef ssize_t (*recv_t)(int sockfd, void *buf, size_t len, int flags);
recv_t recv_f = NULL;
typedef ssize_t (*send_t)(int sockfd, const void *buf, size_t len, int flags);
send_t send_f = NULL;
int connect(int sockfd, const struct sockaddr *addr,socklen_t addrlen){
    printf("connect\n");
    return connect_f(sockfd, addr, addrlen);
}
ssize_t recv(int sockfd, void *buf, size_t len, int flags){
    printf("recv\n");
    return recv_f(sockfd,buf, len, flags );
}
ssize_t send(int sockfd,const void *buf, size_t len, int flags){
    printf("send\n");
    return send_f(sockfd ,buf, len, flags );
}
void init_hook(void){
    if (!connect_f){
        connect_f = dlsym(RTLD_NEXT,"connect");
    }
    if (!recv_f){
        recv_f = dlsym(RTLD_NEXT,"recv");
    }
    if (!send_f){
        send_f = dlsym(RTLD_NEXT,"send");
    }
}
int main () {
    __INIT_HOOK__;
    MYSQL *mysql = mysql_init(NULL); // 初始化MYSQL结构体,返回一个指向MYSQL结构体的指针
    if (!mysql){
        printf("musql_init failed\n");
        return 0;
    }
    if (!mysql_real_connect(mysql,MING_DB_IP,MING_DB_USENAME,MING_DB_PASSWORD,MING_DB_DEFAULTDB,
    MING_DB_PORT,NULL,CLIENT_FOUND_ROWS)){
        printf("mysql_real_connect failed\n");
        return 0;
    }
    printf("mysql_real_connect success\n");
}


八、协程多核模式

可将某个计算与某个cpu绑定黏合在一起,有利于计算密集型。

比如对于多进程/多线程与多核的黏合,可以为每个进程或者线程,分配一个调度器。

//线程绑定
Thread 3 is running on cpu 
Thread 2 is running on cpu 
Thread 1 is running on cpu 
Thread 0 is running on cpu 
//进程绑定
Process 31690 is running on cpu
Process 31691 is running on cpu
Process 31692 is running on cpu
Process 31693 is running on cpu
#define _GNU_SOURCE
#include <stdio.h>
#include <pthread.h>
#include <sched.h>
#include <unistd.h>
#include <sys/syscall.h>
#define     THREAD_COUNT  2
void *thread_func(void *arg){
    int threadid = *(int *)arg;
    printf("Thread %d is running on cpu \n",threadid);
    while (1);
}
int process_bind(void) {
  int num = sysconf(_SC_NPROCESSORS_CONF); // //返回当前系统中可用的CPU核心数量
  pid_t self_id = syscall(__NR_gettid);
  cpu_set_t mask;
  CPU_ZERO(&mask);
  CPU_SET(self_id % num, &mask);
  sched_setaffinity(0, sizeof(mask), &mask);
  printf("Process %d is running on cpu\n", self_id);
  while (1) ;
}
int main(){
#if 0  // 线程绑定
    pthread_t threads[THREAD_COUNT];
    int threadid[THREAD_COUNT];
    //定义了一个 cpu_set_t 类型的变量 cpus,它是一个位图,每一位代表一个 CPU 核心
    cpu_set_t cpus;
    //通过调用 CPU_ZERO(&cpus) 函数将其清零。
    CPU_ZERO(&cpus);
    int i=0;
    for (i=0; i< THREAD_COUNT; i++){
        //将下标 i 对应的 CPU 核心编号添加到 cpus 变量中
        CPU_SET(i,&cpus);
    }
    for (i=0;i< THREAD_COUNT; i++){
        threadid[i] = i;
        pthread_create(&threads[i], NULL , thread_func, &threadid[i]);
        // 将第 i 个线程绑定到第 i 个 CPU 上
        pthread_setaffinity_np (threads[i], sizeof(cpu_set_t), &cpus);
    }
    for (i=0; i< THREAD_COUNT; i++){
        pthread_join (threads[i],NULL);
    }
#else  //进程绑定
    fork();
    fork();
    process_bind();
#endif
    return 0;
}
相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
打赏
0
0
0
0
6
分享
相关文章
|
8月前
|
2.3.1 协程设计原理与汇编实现
2.3.1 协程设计原理与汇编实现
协程(coroutine)的原理和使用
协程(coroutine)的原理和使用
GoLang协程Goroutiney原理与GMP模型详解
本文详细介绍了Go语言中的Goroutine及其背后的GMP模型。Goroutine是Go语言中的一种轻量级线程,由Go运行时管理,支持高效的并发编程。文章讲解了Goroutine的创建、调度、上下文切换和栈管理等核心机制,并通过示例代码展示了如何使用Goroutine。GMP模型(Goroutine、Processor、Machine)是Go运行时调度Goroutine的基础,通过合理的调度策略,实现了高并发和高性能的程序执行。
192 29
GoLang协程Goroutiney原理与GMP模型详解
【11月更文挑战第4天】Goroutine 是 Go 语言中的轻量级线程,由 Go 运行时管理,创建和销毁开销小,适合高并发场景。其调度采用非抢占式和协作式多任务处理结合的方式。GMP 模型包括 G(Goroutine)、M(系统线程)和 P(逻辑处理器),通过工作窃取算法实现负载均衡,确保高效利用系统资源。
|
4月前
|
揭秘Python并发编程核心:深入理解协程与异步函数的工作原理
在Python异步编程领域,协程与异步函数成为处理并发任务的关键工具。协程(微线程)比操作系统线程更轻量级,通过`async def`定义并在遇到`await`表达式时暂停执行。异步函数利用`await`实现任务间的切换。事件循环作为异步编程的核心,负责调度任务;`asyncio`库提供了事件循环的管理。Future对象则优雅地处理异步结果。掌握这些概念,可使代码更高效、简洁且易于维护。
45 1
Python编程异步爬虫——协程的基本原理(一)
Python编程异步爬虫——协程的基本原理(一)
28 0
Python编程异步爬虫——协程的基本原理(二)
Python编程异步爬虫——协程的基本原理(二)
30 0
协程设计与原理(二)
协程设计与原理(二)
25 0
|
3月前
|
协程的设计原理(一)
协程的设计原理(一)
49 0
|
6月前
|
揭秘Python并发编程核心:深入理解协程与异步函数的工作原理
【7月更文挑战第15天】Python异步编程借助协程和async/await提升并发性能,减少资源消耗。协程(async def)轻量级、用户态,便于控制。事件循环,如`asyncio.get_event_loop()`,调度任务执行。异步函数内的await关键词用于协程间切换。回调和Future对象简化异步结果处理。理解这些概念能写出高效、易维护的异步代码。
74 2
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等