一、为什么要有协程?
以DNS请求为例子,客户端向服务器发送域名,服务器回复该域名对应得IP地址。
在Linux下,常使用IO多路复用器epoll来管理客户端连接,其主循环框架如下
while (1){ int nready = epoll_wait(epfd, events, EVENT_SIZE, -1); int i=0; for (i=0; i<nready; i++){ int sockfd = events[i].data.fd; if (sockfd == listenfd){ int connfd = accept(listenfd, addr, &addr_len); setnonblock(connfd); //置为非阻塞 ev.events = EPOLLIN | EPOLLET; ev.data.fd = connfd; epoll_ctl(epfd, EPOLL_CTL_ADD,connfd,&ev); }else{ handel(sockfd); //进行读写操作 } } }
对于handel(sockfd)有两种处理方式:同步以及异步。
同步
handle(sockfd)函数内部对 sockfd 进行读写动作,并且handle 的 io 操作(send,recv)与 epoll_wait 是在同一个处理流程里面的,即IO 同步操作。
int handle(int sockfd) { recv(sockfd, rbuffer, length, 0); parser_proto(rbuffer, length); send(sockfd, sbuffer, length, 0); }
异步
handle(sockfd)函数内部将 sockfd 的操作,push 到线程池中。handle函数是将 sockfd 处理方式放到另一个已经其他的线程中运行,如此做法,将 io 操作(recv,send)与 epoll_wait 不在一个处理流程里面,使得 io操作(recv,send)与 epoll_wait 实现解耦,即 IO 异步操作。
int thread_cb(int sockfd) { // 此函数是在线程池创建的线程中运行。 // 与 handle 不在一个线程上下文中运行 recv(sockfd, rbuffer, length, 0); parser_proto(rbuffer, length); send(sockfd, sbuffer, length, 0); } int handle(int sockfd) { //此函数在主线程 main_thread 中运行 push_thread(sockfd, thread_cb); //将 sockfd 放到其他线程中运行。 }
对比
这样子,自然希望有一种方法,能有同步的代码逻辑,异步的性能,来方便编程人员对 IO 操作的。这就是一种轻量级的协程,在每次 send 或者 recv 之前进行切换,再由调度器来处理 epoll_wait 的流程。
二、协程的原语操作
协程的实现原理是在单个线程中创建多个协程,并通过“切换”的方式来实现协程之间的调度。协程之间的切换是由程序自己控制的,比线程切换更加快速和高效。协程通常不会阻塞整个线程,而只会阻塞当前的协程,因此可以大幅度降低线程的切换开销,提高程序的执行效率。
1、基本操作
协程包括三大部分:协程的创建、协程的调度器、协程的切换。
创建协程之后,将协程加入调度器,由调度器统一管理,决定执行顺序。
在遇到IO操作之前,协程让出(yield)执行权,交由调度器决定下一个恢复(resume)加载执行的协程。执行完成(或者未就绪)之后,协程再次让出(yield)执行权交给调度器。
整体感觉便是调度器将时间划分成不同的时间片,每个时间片交由不同协程完成具体操作。当操作完成或没有操作时,运行权移交给调度器控制。
因此,协程的核心原语操作有:创建(create)、让出(yield)、恢复(resume)
create:创建协程。
yield:由当前协程的上下文切换到调度器的上下文。
resume:由调度器获取下一个将要执行的协程的上下文,恢复协程的运行权。
2、让出(yield)和恢复(resume)
async_Recv(fd, buffer, length) { ret = poll(fd); //判断fd是否就绪 //若未就绪,重新加入到epoll,并切换到下一个协程 if (ret == notReady) { epoll_ctl(add); yield (next_fd); //resume } else { recv; } } async_Send(fd, buffer, length) { ret = poll(fd); //判断fd是否就绪 //若未就绪,重新加入到epoll,并切换到下一个协程 if (ret == notReady) { epoll_ctl(add); yield(next_fd); //resume } else { send; } } while (1) { epool_wait(); for () { async_Recv(fd, buffer, length); parse(buffer); //解析 async_Send(fd, buffer, length); } }
三、协程的切换(switch)
协程的切换有三种方式:
1、汇编
2、ucontext
3、longjmp / setjmp
1、汇编
x86_64 的寄存器有 16 个 64 位寄存器,分别是:%rax, %rbx, %rcx, %esi, %edi, %rbp, %rsp, %r8, %r9, %r10, %r11, %r12,%r13, %r14, %r15。
%rax:存储函数的返回值;
%rbp:栈指针寄存器,指向栈底;
%rsp:栈指针寄存器,指向栈顶。
%rdi,%rsi,%rdx,%rcx,%r8,%r9:函数的六个参数,依次对应第 1 参数,第 2 参数。如果函数的参数超过六个,那么六个以后的参数会入栈。
%rbx, %rbp, %r12, %r13, %r14, %r15 用作数据存储,遵循调用者使用规则,换句话说,就是随便用。调用子函数之前要备份它,以防它被修改
%r10, %r11 用作数据存储,就是使用前要先保存原值。
上下文切换,就是将CPU的寄存器暂存,再将即将运行的协程的上下文寄存器分别mov到相应的寄存器上。
切换_switch 函数定义:
int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);
参数 1:即将运行协程的上下文
参数 2:正在运行协程的上下文
按照 x86_64 的寄存器定义,%rdi 保存第一个参数的值,即 new_ctx 的值,%rsi 保存第二
个参数的值,即保存 cur_ctx 的值。
__asm__ ( " .text \n" " .p2align 4,,15 \n" ".globl _switch \n" ".globl __switch \n" "_switch: \n" "__switch: \n" " movq %rsp, 0(%rsi) # save stack_pointer \n" " movq %rbp, 8(%rsi) # save frame_pointer \n" " movq (%rsp), %rax # save insn_pointer \n" " movq %rax, 16(%rsi) \n" " movq %rbx, 24(%rsi) # save rbx,r12-r15 \n" " movq %r12, 32(%rsi) \n" " movq %r13, 40(%rsi) \n" " movq %r14, 48(%rsi) \n" " movq %r15, 56(%rsi) \n" " movq 56(%rdi), %r15 \n" " movq 48(%rdi), %r14 \n" " movq 40(%rdi), %r13 # restore rbx,r12-r15 \n" " movq 32(%rdi), %r12 \n" " movq 24(%rdi), %rbx \n" " movq 8(%rdi), %rbp # restore frame_pointer \n" " movq 0(%rdi), %rsp # restore stack_pointer \n" " movq 16(%rdi), %rax # restore insn_pointer \n" " movq %rax, (%rsp) \n" " ret \n" );
按照 x86_64 的寄存器定义,%rdi 保存第一个参数的值,即 new_ctx 的值,%rsi 保存第二
个参数的值,即保存 cur_ctx 的值。X86_64 每个寄存器是 64bit,8byte。
Movq %rsp, 0(%rsi) 保存在栈指针到 cur_ctx 实例的 rsp 项
Movq %rbp, 8(%rsi)
Movq (%rsp), %rax #将栈顶地址里面的值存储到 rax 寄存器中。Ret 后出栈,执行栈顶
Movq %rbp, 8(%rsi) #后续的指令都是用来保存 CPU 的寄存器到 new_ctx 的每一项中
Movq 8(%rdi), %rbp #将 new_ctx 的值
Movq 16(%rdi), %rax #将指令指针 rip 的值存储到 rax 中
Movq %rax, (%rsp) # 将存储的 rip 值的 rax 寄存器赋值给栈指针的地址的值。
Ret # 出栈,回到栈指针,执行 rip 指向的指令。
上下文环境的切换完成。
2、ucontext
getcontext、makecontext、swapcontext是一组操作上下文(context)的函数,一般用于在用户层级(user space)线程中实现协程(coroutine)。
(1)getcontext(ucontext_t *ucp):该函数会获取当前线程的上下文,并将其保存到ucontext_t类型的结构体指针ucp中,以便后续使用。
(2)makecontext(ucontext_t *ucp, void (*func)(), int argc, ...):创建一个新的执行上下文,并将其与func函数关联,argc表示func函数的参数个数,后面的省略号表示具体的函数参数。通常情况下,我们需要先调用getcontext获取当前线程的上下文,然后再通过makecontext创建一个新的上下文,该上下文可以通过swapcontext函数来激活,并开始执行func函数。
(3)swapcontext(ucontext_t *oucp, const ucontext_t *ucp):用于切换两个不同上下文之间的控制流。oucp和ucp分别表示当前和目标上下文。当调用该函数时,程序会保存当前上下文并开始执行ucp所指定的上下文。
// getcontext(&context); // makecontext(&context, func, arg); // swapcontext(&curent_context, &next_context); #include <stdio.h> #include <ucontext.h> ucontext_t ctx[2]; ucontext_t main_ctx; int count = 0; void fun1(){ while (count++ < 50){ printf("1"); swapcontext(&ctx[0],&ctx[1]); printf("3"); } } void fun2(){ while (count++ < 50){ printf("2"); swapcontext(&ctx[1],&ctx[0]); printf("4"); } } int main(){ char stack1[2048] = {0}; char stack2[2048] = {0}; getcontext(&ctx[0]); //将用户上下文ctx[1]的栈空间(stack)指针指向了一个已预先分配的内存块stack1 ctx[0].uc_stack.ss_sp = stack1; //将栈空间的大小设置为stack1的大小,也就是在此处分配的内存块大小。 ctx[0].uc_stack.ss_size = sizeof(stack1); //将链接指针(link pointer)设置为main_ctx,也就是当执行完当前上下文时, //程序将会切换回main_ctx所指定的上下文。链接指针用于实现协程(coroutine)之间的切换。 ctx[0].uc_link = &main_ctx; makecontext(&ctx[0], fun1 ,0); getcontext(&ctx[1]); ctx[1].uc_stack.ss_sp = stack1; ctx[1].uc_stack.ss_size = sizeof(stack1); ctx[1].uc_link = &main_ctx; makecontext(&ctx[1], fun2 ,0); printf("swapcontext\n"); swapcontext(&main_ctx, &ctx[0]); printf("\n"); }
结果是
123142314231423142314231423142314231423142314231423142314231423142314231423142314231423142314231423
执行顺序是:
p19 -> p20 -> p21 -> p28 -> p29 -> p30 -> p21 -> p22 -> p19 -> p20 -> p21 -> p30 -> p31 -> p28 -> p29 -> p30 -> p21 -> p22 -> p19 -> p20 -> p21 -> p30 -> p31 -> ……
3、longjmp / setjmp
setjmp 和 longjmp 是两个用于非局部跳转(non-local jump)的 C 语言库函数。
(1)int setjmp(jmp_buf env);:保存当前程序上下文信息到env,并返回0;
(2)void longjmp(jmp_buf env, int val);:直接跳转到该跳转点env的位置,并且恢复该跳转点env的状态。还可以传递一个整数值val,用于作为 setjmp 函数的返回值。
#include <stdio.h> #include <setjmp.h> jmp_buf env; void func(int arg){ printf("func\n"); longjmp(env, ++arg); printf("longjmp complete\n"); //由于longjmp是非局部跳转操作,所以之前在func函数中的printf语句不会被执行。 } int main(){ int ret = setjmp(env); if (ret == 0){ printf("ret == 0\n"); func(ret); }else if (ret == 1){ printf("ret == 1\n"); func(ret); } printf("ret:%d\n",ret); }
执行结果是
ret == 0 func ret == 1 func ret:2
执行顺序
1、在main函数中使用setjmp函数保存当前程序状态。
2、在main函数中判断setjmp返回值ret,如果为0,则说明是第一次调用,执行func函数并将ret作为参数传递;否则,说明是通过longjmp跳转回来的,继续执行func函数。
3、在func函数中,先输出"func",然后通过longjmp跳转回到setjmp处,并将arg+1作为返回值。由于longjmp是非局部跳转操作,所以之前在func函数中的printf语句不会被执行。
4、最终,在main函数结束时输出ret的值
四、协程结构的定义
//协程的状态 typedef enum _co_status { CO_NEW, //新建 CO_READY, // 就绪 CO_WAIT, //等待 CO_SLEEP, //睡眠 CO_EXIT, //退出 } co_status_t; struct coroutine { int birth; //协程创建的时间 int coid; //协程的id struct context ctx; //上下文信息 struct scheduler* sched; //调度器 void* (*entry)(void*); //回调函数入口 void *arg; //回调函数的参数 void* stack; //独立栈(每个协程有自己的虚拟内存空间) 或 共享栈 size_t size; //栈的大小 co_status_t status; //协程的状态 queue_node(coroutine) readyq; rbtree_node(coroutine) sleept; rbtree_node(coroutine) waitt; queue__node(coroutine) exitq; };
五、协程调度器结构的定义
struct scheduler { struct coroutine* cur_co; //当前运行的协程 queue_head(coroutine) readyh; rbtree_root(coroutine) sleepr; rbtree_root(coroutine) waitr; queue_head(coroutine) exith; };
六、调度策略
调度策略:
1、IO密集型(更多强调IO等待)-- > 把wait放在最前面,把sleep放在最后面
2、计算密集型(更多强调计算结果) --> 把ready放在最前面
schedule(struct schedule* sched) { coroutine* co = NULL; while ((co = get_expired_node(sched->sleepr)) != NULL) { resume(co); } while ((co = get_wait_node(sched->waitq)) != NULL) { push_ready_queue(co); } whike((co = get_next_node(sched->readyq)) != NULL) { resume(co); } whike((co = get_next_node(sched->exitq)) != NULL) { destroy(co); } }
七、如何与posix api 兼容
利用hook技术,可以在实现协程的时候与本系统的API实现兼容。
主要函数是dlsym函数
dlsym函数是一个动态链接库函数,它的作用是在动态链接库中查找指定的符号,并返回符号对应的地址。
void *dlsym(void *handle, const char *symbol);
举个例子,连接mysql的时候,我们通过使用hook技术,使得实际执行的connect、recv、send是经过我们重定义后的。
zxm@ubuntu:~/share/opp$ gcc -o mysql mysql.c -lmysqlclient -ldl zxm@ubuntu:~/share/opp$ ./mysql connect recv send mysql_real_connect success
#define _GNU_SOURCE #include <dlfcn.h> #include <stdio.h> #include <sys/types.h> #include <sys/socket.h> #include <mysql/mysql.h> #define __INIT_HOOK__ init_hook(); #define MING_DB_IP "192.168.42.128" #define MING_DB_PORT 3306 #define MING_DB_USENAME "admin" #define MING_DB_PASSWORD "123456" #define MING_DB_DEFAULTDB "MING_DB" typedef int (*connect_t)(int sockfd, const struct sockaddr *addr,socklen_t addrlen); connect_t connect_f = NULL; typedef ssize_t (*recv_t)(int sockfd, void *buf, size_t len, int flags); recv_t recv_f = NULL; typedef ssize_t (*send_t)(int sockfd, const void *buf, size_t len, int flags); send_t send_f = NULL; int connect(int sockfd, const struct sockaddr *addr,socklen_t addrlen){ printf("connect\n"); return connect_f(sockfd, addr, addrlen); } ssize_t recv(int sockfd, void *buf, size_t len, int flags){ printf("recv\n"); return recv_f(sockfd,buf, len, flags ); } ssize_t send(int sockfd,const void *buf, size_t len, int flags){ printf("send\n"); return send_f(sockfd ,buf, len, flags ); } void init_hook(void){ if (!connect_f){ connect_f = dlsym(RTLD_NEXT,"connect"); } if (!recv_f){ recv_f = dlsym(RTLD_NEXT,"recv"); } if (!send_f){ send_f = dlsym(RTLD_NEXT,"send"); } } int main () { __INIT_HOOK__; MYSQL *mysql = mysql_init(NULL); // 初始化MYSQL结构体,返回一个指向MYSQL结构体的指针 if (!mysql){ printf("musql_init failed\n"); return 0; } if (!mysql_real_connect(mysql,MING_DB_IP,MING_DB_USENAME,MING_DB_PASSWORD,MING_DB_DEFAULTDB, MING_DB_PORT,NULL,CLIENT_FOUND_ROWS)){ printf("mysql_real_connect failed\n"); return 0; } printf("mysql_real_connect success\n"); }
八、协程多核模式
可将某个计算与某个cpu绑定黏合在一起,有利于计算密集型。
比如对于多进程/多线程与多核的黏合,可以为每个进程或者线程,分配一个调度器。
//线程绑定 Thread 3 is running on cpu Thread 2 is running on cpu Thread 1 is running on cpu Thread 0 is running on cpu //进程绑定 Process 31690 is running on cpu Process 31691 is running on cpu Process 31692 is running on cpu Process 31693 is running on cpu
#define _GNU_SOURCE #include <stdio.h> #include <pthread.h> #include <sched.h> #include <unistd.h> #include <sys/syscall.h> #define THREAD_COUNT 2 void *thread_func(void *arg){ int threadid = *(int *)arg; printf("Thread %d is running on cpu \n",threadid); while (1); } int process_bind(void) { int num = sysconf(_SC_NPROCESSORS_CONF); // //返回当前系统中可用的CPU核心数量 pid_t self_id = syscall(__NR_gettid); cpu_set_t mask; CPU_ZERO(&mask); CPU_SET(self_id % num, &mask); sched_setaffinity(0, sizeof(mask), &mask); printf("Process %d is running on cpu\n", self_id); while (1) ; } int main(){ #if 0 // 线程绑定 pthread_t threads[THREAD_COUNT]; int threadid[THREAD_COUNT]; //定义了一个 cpu_set_t 类型的变量 cpus,它是一个位图,每一位代表一个 CPU 核心 cpu_set_t cpus; //通过调用 CPU_ZERO(&cpus) 函数将其清零。 CPU_ZERO(&cpus); int i=0; for (i=0; i< THREAD_COUNT; i++){ //将下标 i 对应的 CPU 核心编号添加到 cpus 变量中 CPU_SET(i,&cpus); } for (i=0;i< THREAD_COUNT; i++){ threadid[i] = i; pthread_create(&threads[i], NULL , thread_func, &threadid[i]); // 将第 i 个线程绑定到第 i 个 CPU 上 pthread_setaffinity_np (threads[i], sizeof(cpu_set_t), &cpus); } for (i=0; i< THREAD_COUNT; i++){ pthread_join (threads[i],NULL); } #else //进程绑定 fork(); fork(); process_bind(); #endif return 0; }