1:背景介绍
简单的实现一个客户端对http服务器的请求后,思考如果同时多个请求,如何有效的对代码进行设计,关注每个请求的回复。
1.1:如果多个请求同时进行,异步实现
http的特定是无连接的,一个tcp连接对应一个请求。
===》基于这个特点,可以使用多线程,实现异步的架构
===》线程1实现对http服务器的请求发送
===》线程2实现关注服务器对请求的响应,如果不考虑长连接,可以考虑接收请求后断开连接。
1.2:如何实现异步方案
发送请求后,我们可以使用epoll对**可读事件(accept连接,recv可读取)**进行监听。
我们这里是客户端,主动连接http服务器(connect函数),所以只需要关注对端的回复事件(可读事件)即可。
在使用epoll的过程中,主要是通过struct epoll_event 结构体对事件进行识别的,从下面结构可以看出,我们可以灵活运用struct epoll_event结构体中 union的指针定义,增加自己的一些机制。
//在对事件进行EPOLL_CTL_ADD时,设置对应的epoll_event设置参数时,可以用ptr指向自己的指针结构(回调函数等自己定义),实现异步关联 typedef union epoll_data { void *ptr; //使用这个 int fd; uint32_t u32; uint64_t u64; } epoll_data_t; struct epoll_event { uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ };
主要的异步逻辑:
//线程1 //构造自己的事件相关结构 struct my_epoll_event * my_event = (struct my_epoll_event *)calloc(1, sizeof(struct my_epoll_event)); my_event->sockfd = connfd; //可作为参数可能需要回复 memcpy(my_event->hostname, hostname, strlen(hostname)); my_event->cb = recv_exec_callback; //利用struct epoll_event结构体中的ptr指针特性 struct epoll_event ev; ev.data.ptr = my_event; ev.events = EPOLLIN; //监听可读事件 epoll_ctl(ctx->epollfd, EPOLL_CTL_ADD, connfd, &ev) //线程2 //epoll_wait获取到已经触发的事件进行处理 nready = epoll_wait(epfd, event_wait, 1024, -1); if (nready < 0) { if (errno == EINTR || errno == EAGAIN) { continue; } else { break; } } else if (nready == 0) { continue; } //从线程1设置的指针中获取一些必要的信息,进行业务处理, //如可以设置回调函数,设置一些参数,可以保存自己的连接fd,方便接收到数据后识别fd进行消息回复 for(int i=0; i<nready; i++) { //从事件中获取到指针对象 获取回调函数和参数进行业务处理 struct my_epoll_event *event_data = (struct my_epoll_event*)event_wait[i].data.ptr; int recvfd = event_data->sockfd; //接收数据 这里只是打印演示 应该做处理确保一个数据包的完整接收 需要设计半包粘包问题 char buffer[BUFFER_SIZE] = {0}; int recv_len = recv(recvfd, buffer, BUFFER_SIZE, 0); //可以自行设计 这里做业务处理 event_data->cb(recvfd, event_data->hostname, buffer, recv_len); ...... }
2:测试代码
利用struct epoll_event结构体的ptr指针特性,可以定制设计一些自己的业务。
使用多线程+epoll,实现发送与接收的解耦。。。
这里是使用c语言实现的测试代码。。。
/******************************************************** info: 作为http的客户端,拉取一下远端服务器数据,练习一下 data: 2022/02/10 author: hlp 所谓异步 只是把发送和接收的逻辑分开 发送后,接收交给另一个线程去做处理 可以利用epoll_event结构中指向的指针,定义不同的回调函数进行业务区分 ********************************************************/ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/time.h> #include <netinet/in.h> #include <arpa/inet.h> #include <unistd.h> /* close */ #include <netdb.h> #include <time.h> #include <fcntl.h> #include <errno.h> #include <sys/epoll.h> #include <pthread.h> //epoll是多个线程的关联 struct async_context { int epollfd; pthread_t thread_id; }; #define HOSTNAME_LENGTH 128 typedef void (*async_recv_cb)(int fd, const char *hostname, const char *result, int len); //每个客户端的连接 都有一个自己的poll_event设置自己的指针回调 struct my_epoll_event{ int sockfd; char hostname[HOSTNAME_LENGTH]; async_recv_cb cb; }; //创建资源 struct async_context * async_init_create_epoll(); //新线程 接收处理 int create_thread_and_recv_do_cycle(struct async_context *ctx); //主线程 发送 int send_http_request(struct async_context *ctx); //构造http报文 实际的发送 以及加入epoll int real_send_http_request(struct async_context *ctx, const char* hostname, const char * resource); int main(int argc, char* argv[]) { //使用异步多线程的方式 实现发送和接收的异步逻辑 //多线程 主线程进行发送 新建线程进行接收回调函数处理 //epfd是多个线程之间的关联,epoll_event是回调函数的关联 //构造epoll 申请资源 结构体只是存储了epfd+thread_id struct async_context *ctx; ctx = async_init_create_epoll(); if(ctx == NULL) { return -1; } //子线程负责接收,回调函数对epoll一直进行检测 int ret = create_thread_and_recv_do_cycle(ctx); //一请求一回复一终止关闭 if(ret != 0) { printf("create thread and do cycle error \n"); close(ctx->epollfd); free(ctx); ctx = NULL; return -1; } //主线程进行发送 if(send_http_request(ctx) != 0) { printf("\t\thttp request failed \n"); }else { printf("\t\thttp request success \n"); } //主线程应该等待 if(ctx->thread_id != 0) { pthread_join(ctx->thread_id, NULL); } close(ctx->epollfd); free(ctx); ctx = NULL; return 0; } //创建epoll 申请结构内存 struct async_context * async_init_create_epoll() { struct async_context * ctx = NULL; ctx = calloc(1,sizeof(struct async_context)); if(ctx == NULL) { return NULL; } int epfd = epoll_create(1); if(epfd < 0) { free(ctx); ctx = NULL; return ctx; } ctx->epollfd = epfd; ctx->thread_id = 0; return ctx; } #define BUFFER_SIZE 4096 //回调函数 监听epoll事件,对接收到的数据,执行对应的回调函数 static void *http_recv_callback(void * arg) { struct async_context *ctx = (struct async_context *)arg; printf("start thread epfd[%d][%lu] \n", ctx->epollfd, ctx->thread_id); int epfd = ctx->epollfd; struct epoll_event event_wait[1024] = {0}; int nready = -1; while(1) { //使用epoll_wait进行事件的查验 nready = epoll_wait(epfd, event_wait, 1024, -1); if (nready < 0) { if (errno == EINTR || errno == EAGAIN) { continue; } else { break; } } else if (nready == 0) { continue; } // struct my_epoll_event{ // int sockfd; // char hostname[HOSTNAME_LENGTH]; // async_recv_cb cb; // }; printf("get recv event [%d] start recv and exec msg...\n", nready); for(int i=0; i<nready; i++) { //从事件中获取到指针对象 获取回调函数和参数进行业务处理 struct my_epoll_event *event_data = (struct my_epoll_event*)event_wait[i].data.ptr; int recvfd = event_data->sockfd; //接收数据 这里只是打印演示 应该做处理确保一个数据包的完整接收 需要设计半包粘包问题 char buffer[BUFFER_SIZE] = {0}; int recv_len = recv(recvfd, buffer, BUFFER_SIZE, 0); //然后通过事件指针执行回调 这里可以根据业务自行处理 这里只是打印 //针对http的客户端请求业务场景 这里只是接收 就完成 并且不再使用fd event_data->cb(recvfd, event_data->hostname, buffer, recv_len); if(epoll_ctl(epfd, EPOLL_CTL_DEL, recvfd, NULL) == 0) { printf("epoll delete [%d] success \n", recvfd); }else { printf("epoll delete [%d] error \n", recvfd); } close(recvfd); //注意epoll_event自定义事件结构的释放 free(event_data); } } return NULL; } //创建线程 并启动 监听接收 int create_thread_and_recv_do_cycle(struct async_context *ctx) { if(pthread_create(&ctx->thread_id, NULL, http_recv_callback, ctx) != 0) { printf("create thread error \n"); return -1; } return 0; } struct http_request { char *hostname; char *resource; }; //根据全局依次获取hostnameh和resource进行构造http发送 并监听socket,设置接收时间的回调 int send_http_request(struct async_context *ctx) { struct http_request reqs[] = { {"api.seniverse.com", "/v3/weather/now.json?key=0pyd8z7jouficcil&location=beijing&language=zh-Hans&unit=c" }, {"api.seniverse.com", "/v3/weather/now.json?key=0pyd8z7jouficcil&location=changsha&language=zh-Hans&unit=c" }, {"api.seniverse.com", "/v3/weather/now.json?key=0pyd8z7jouficcil&location=shenzhen&language=zh-Hans&unit=c" }, {"api.seniverse.com", "/v3/weather/now.json?key=0pyd8z7jouficcil&location=shanghai&language=zh-Hans&unit=c" }, {"api.seniverse.com", "/v3/weather/now.json?key=0pyd8z7jouficcil&location=tianjin&language=zh-Hans&unit=c" }, {"api.seniverse.com", "/v3/weather/now.json?key=0pyd8z7jouficcil&location=wuhan&language=zh-Hans&unit=c" }, {"api.seniverse.com", "/v3/weather/now.json?key=0pyd8z7jouficcil&location=hefei&language=zh-Hans&unit=c" }, {"api.seniverse.com", "/v3/weather/now.json?key=0pyd8z7jouficcil&location=hangzhou&language=zh-Hans&unit=c" }, {"api.seniverse.com", "/v3/weather/now.json?key=0pyd8z7jouficcil&location=nanjing&language=zh-Hans&unit=c" }, {"api.seniverse.com", "/v3/weather/now.json?key=0pyd8z7jouficcil&location=jinan&language=zh-Hans&unit=c" }, {"api.seniverse.com", "/v3/weather/now.json?key=0pyd8z7jouficcil&location=taiyuan&language=zh-Hans&unit=c" }, {"api.seniverse.com", "/v3/weather/now.json?key=0pyd8z7jouficcil&location=wuxi&language=zh-Hans&unit=c" }, {"api.seniverse.com", "/v3/weather/now.json?key=0pyd8z7jouficcil&location=suzhou&language=zh-Hans&unit=c" }, }; int count = sizeof(reqs)/ sizeof(reqs[0]); int ret = 0; for(int i=0; i< count; i++) { //实际的发送函数 ret |= real_send_http_request(ctx, reqs[i].hostname, reqs[i].resource); } return ret; } char *get_ip_by_host(const char * hostname); int create_http_socket(const char * ip); int send_one_http_request(int fd, const char * hostname, const char* resource); void recv_exec_callback(int fd, const char *hostname, const char *result, int len); //创建socket,加入epoll,使用epoll_event中指针特性加入 int real_send_http_request(struct async_context *ctx, const char* hostname, const char * resource) { char * ip = get_ip_by_host(hostname); printf("get ip is [%s] \n", ip); //创建socket并连接,设置非阻塞 int connfd = create_http_socket(ip); if(connfd < 0) { printf("error create socket [%d]\n", connfd); return -1; } //构建http报文并发送,验证发送成功 send_one_http_request(connfd, hostname, resource); //构造自定义epoll_event,加入epoll // struct my_epoll_event{ // int sockfd; // char hostname[HOSTNAME_LENGTH]; // async_recv_cb cb; // }; struct my_epoll_event * my_event = (struct my_epoll_event *)calloc(1, sizeof(struct my_epoll_event)); if(my_event == NULL) { printf("add to epoll event error, malloc my_event error \n"); close(connfd); return -1; } //这里赋初值 供业务处理回调时使用 my_event->sockfd = connfd; //可作为参数可能需要回复 memcpy(my_event->hostname, hostname, strlen(hostname)); my_event->cb = recv_exec_callback; struct epoll_event ev; ev.data.ptr = my_event; ev.events = EPOLLIN; // struct async_context { // int epollfd; // pthread_t thread_id; // }; if(epoll_ctl(ctx->epollfd, EPOLL_CTL_ADD, connfd, &ev) != 0) { printf("add to epoll event error, epoll_ctl_add error \n"); close(connfd); return -1; } return 0; } char *get_ip_by_host(const char * hostname) { //通过域名获取ip struct hostent *host_entry = gethostbyname(hostname); if(host_entry == NULL) { return NULL; } //通过 struct hostent * 可以获取到域名 ip列表等信息 printf("host name is [%s] \n", host_entry->h_name); printf("the other name is :\n"); for(int i=0; host_entry->h_aliases[i]; ++i) { printf(" [i:%d][%s] \n", i, host_entry->h_aliases[i]); } printf("Address type: %s\n", (host_entry->h_addrtype==AF_INET) ? "AF_INET": "AF_INET6"); printf("address length: %d \n", host_entry->h_length); printf("ip list is :\n"); for(int i=0; host_entry->h_addr_list[i]; ++i) { // printf(" [i:%d][%s] \n", i, host_entry->h_addr_list[i]); printf(" [i:%d][%s] \n", i, inet_ntoa(*(struct in_addr*)host_entry->h_addr_list[i])); } return inet_ntoa(*(struct in_addr*)*host_entry->h_addr_list); } //创建socket 根据服务器ip和断开 进行连接 返回fd int create_http_socket(const char * ip) { //创建socket 根据Ip:port连接对端 int sockfd = socket(AF_INET, SOCK_STREAM, 0); if(sockfd < 0) { return -1; } struct sockaddr_in sin = {0}; sin.sin_addr.s_addr = inet_addr(ip); sin.sin_port = htons(80); sin.sin_family = AF_INET; //连接服务器 if (-1 == connect(sockfd, (struct sockaddr*)&sin, sizeof(struct sockaddr_in))) { printf("connect http server error \n"); return -1; } //设置fd非阻塞 fcntl(sockfd, F_SETFL, O_NONBLOCK); return sockfd; } #define HTTP_VERSION "HTTP/1.1" #define USER_AGENT "User-Agent: Mozilla/5.0 (Windows NT 5.1; rv:10.0.2) Gecko/20100101 Firefox/10.0.2\r\n" #define ENCODE_TYPE "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8\r\n" #define CONNECTION_TYPE "Connection: close\r\n" #define BUFFER_SIZE 4096 //发送http请求 返回发送成功还是失败 int send_one_http_request(int fd, const char * hostname, const char* resource) { char send_buff[BUFFER_SIZE] = {0}; //构造发送的请求报文 请求行 请求头部 空行 请求数据 // GET /teacher_6.jpg HTTP/1.1 // Host: www.0voice.com // User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.113 Safari/537.36 // Content-Type: application/x-www-form-urlencoded // Content-Length: 9 // lingsheng //只关注了必须的参数 测试一下 // GET %s %s\r\n resource HTTP_VERSION 请求行 // Host: %s\r\n hostname 请求头 // %s\r\n CONNECTION_TYPE 请求头 // \r\n int len = sprintf(send_buff, "GET %s %s\r\nHost: %s\r\n%s\r\n\r\n",\ resource, HTTP_VERSION, hostname, CONNECTION_TYPE); printf("send request buff is [%lu][%s] \n", strlen(send_buff), send_buff); int buff_len = strlen(send_buff); int sendlen = send(fd, send_buff, strlen(send_buff), 0); if(buff_len != sendlen) { printf("send buffer error \n"); return -1; } return 0; } void recv_exec_callback(int fd, const char *hostname, const char *result, int len) { printf("\t confd is [%d], \n\trecv buffer form [%s] [len:%d][data:%s] \n", fd, hostname, len, result); }
我在linux环境上,直接使用gcc进行编译,执行测试观察相关日志。。。
我开始试着积累一些常用代码:自己代码库中备用
我的知识储备更多来自这里,推荐你了解:Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等技术内容,立即学习