知识巩固源码落实之5:http get异步请求数据demo(多线程+struct epoll_event的ptr)

简介: 知识巩固源码落实之5:http get异步请求数据demo(多线程+struct epoll_event的ptr)

1:背景介绍

简单的实现一个客户端对http服务器的请求后,思考如果同时多个请求,如何有效的对代码进行设计,关注每个请求的回复。

1.1:如果多个请求同时进行,异步实现

http的特定是无连接的,一个tcp连接对应一个请求。

===》基于这个特点,可以使用多线程,实现异步的架构

===》线程1实现对http服务器的请求发送

===》线程2实现关注服务器对请求的响应,如果不考虑长连接,可以考虑接收请求后断开连接。

1.2:如何实现异步方案

发送请求后,我们可以使用epoll对**可读事件(accept连接,recv可读取)**进行监听。

我们这里是客户端,主动连接http服务器(connect函数),所以只需要关注对端的回复事件(可读事件)即可。

在使用epoll的过程中,主要是通过struct epoll_event 结构体对事件进行识别的,从下面结构可以看出,我们可以灵活运用struct epoll_event结构体中 union的指针定义,增加自己的一些机制。

//在对事件进行EPOLL_CTL_ADD时,设置对应的epoll_event设置参数时,可以用ptr指向自己的指针结构(回调函数等自己定义),实现异步关联 
typedef union epoll_data {
               void    *ptr;  //使用这个
               int      fd;
               uint32_t u32;
               uint64_t u64;
           } epoll_data_t;
struct epoll_event {
               uint32_t     events;    /* Epoll events */
               epoll_data_t data;      /* User data variable */
           };

主要的异步逻辑:

//线程1   
  //构造自己的事件相关结构
  struct my_epoll_event * my_event = (struct my_epoll_event *)calloc(1, sizeof(struct my_epoll_event));
  my_event->sockfd = connfd; //可作为参数可能需要回复
  memcpy(my_event->hostname, hostname, strlen(hostname));
  my_event->cb = recv_exec_callback; 
  //利用struct epoll_event结构体中的ptr指针特性
  struct epoll_event ev;
    ev.data.ptr = my_event;
    ev.events = EPOLLIN; //监听可读事件
    epoll_ctl(ctx->epollfd, EPOLL_CTL_ADD, connfd, &ev)
//线程2
    //epoll_wait获取到已经触发的事件进行处理
    nready = epoll_wait(epfd, event_wait, 1024, -1);
  if (nready < 0) {
        if (errno == EINTR || errno == EAGAIN) {
            continue;
        } else {
            break;
        }
    } else if (nready == 0) {
        continue;
    }
  //从线程1设置的指针中获取一些必要的信息,进行业务处理,
    //如可以设置回调函数,设置一些参数,可以保存自己的连接fd,方便接收到数据后识别fd进行消息回复
    for(int i=0; i<nready; i++)
    {
      //从事件中获取到指针对象  获取回调函数和参数进行业务处理
      struct my_epoll_event *event_data = (struct my_epoll_event*)event_wait[i].data.ptr;
      int recvfd = event_data->sockfd;
      //接收数据 这里只是打印演示  应该做处理确保一个数据包的完整接收 需要设计半包粘包问题
      char buffer[BUFFER_SIZE] = {0};
      int recv_len = recv(recvfd, buffer, BUFFER_SIZE, 0);
      //可以自行设计  这里做业务处理
      event_data->cb(recvfd, event_data->hostname, buffer, recv_len);
    ......
    }

2:测试代码

利用struct epoll_event结构体的ptr指针特性,可以定制设计一些自己的业务。

使用多线程+epoll,实现发送与接收的解耦。。。

这里是使用c语言实现的测试代码。。。

/********************************************************
info: 作为http的客户端,拉取一下远端服务器数据,练习一下
data: 2022/02/10
author: hlp
所谓异步 只是把发送和接收的逻辑分开  发送后,接收交给另一个线程去做处理
可以利用epoll_event结构中指向的指针,定义不同的回调函数进行业务区分
********************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>       /* close */
#include <netdb.h> 
#include <time.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/epoll.h>
#include <pthread.h>
//epoll是多个线程的关联
struct async_context {
  int epollfd;
  pthread_t thread_id;
};
#define HOSTNAME_LENGTH 128
typedef void (*async_recv_cb)(int fd, const char *hostname, const char *result, int len);
//每个客户端的连接 都有一个自己的poll_event设置自己的指针回调
struct my_epoll_event{
  int sockfd;
  char hostname[HOSTNAME_LENGTH]; 
  async_recv_cb cb;
};
//创建资源
struct async_context * async_init_create_epoll();
//新线程 接收处理
int create_thread_and_recv_do_cycle(struct async_context *ctx);
//主线程 发送
int send_http_request(struct async_context *ctx);
//构造http报文 实际的发送 以及加入epoll
int real_send_http_request(struct async_context *ctx, const char* hostname, const char * resource);
int main(int argc, char* argv[])
{
  //使用异步多线程的方式  实现发送和接收的异步逻辑
  //多线程 主线程进行发送  新建线程进行接收回调函数处理
  //epfd是多个线程之间的关联,epoll_event是回调函数的关联
  //构造epoll  申请资源 结构体只是存储了epfd+thread_id
  struct async_context *ctx;
  ctx = async_init_create_epoll();
  if(ctx == NULL)
  {
    return -1;
  }
  //子线程负责接收,回调函数对epoll一直进行检测
  int ret = create_thread_and_recv_do_cycle(ctx); //一请求一回复一终止关闭
  if(ret != 0)
  {
    printf("create thread and do cycle error \n");
    close(ctx->epollfd);
    free(ctx);
    ctx = NULL;
    return -1;
  }
  //主线程进行发送 
  if(send_http_request(ctx) != 0)
  {
    printf("\t\thttp request failed \n");
  }else
  {
    printf("\t\thttp request success \n");
  }
  //主线程应该等待
  if(ctx->thread_id != 0)
  {
    pthread_join(ctx->thread_id, NULL);
  }
  close(ctx->epollfd);
  free(ctx);
  ctx = NULL;
  return 0;
}
//创建epoll 申请结构内存
struct async_context * async_init_create_epoll()
{
  struct async_context * ctx = NULL;
  ctx = calloc(1,sizeof(struct async_context));
  if(ctx == NULL)
  {
    return NULL;
  }
  int epfd = epoll_create(1);
  if(epfd  < 0)
  {
    free(ctx);
    ctx = NULL;
    return ctx;
  }
  ctx->epollfd = epfd;
  ctx->thread_id = 0;
  return ctx;
}
#define BUFFER_SIZE 4096
//回调函数  监听epoll事件,对接收到的数据,执行对应的回调函数
static void *http_recv_callback(void * arg)
{
  struct async_context *ctx = (struct async_context *)arg;
  printf("start thread epfd[%d][%lu] \n", ctx->epollfd, ctx->thread_id);
  int epfd = ctx->epollfd;
  struct epoll_event event_wait[1024] = {0};
  int nready = -1;
  while(1)
  {
    //使用epoll_wait进行事件的查验
    nready = epoll_wait(epfd, event_wait, 1024, -1);
    if (nready < 0) {
            if (errno == EINTR || errno == EAGAIN) {
                continue;
            } else {
                break;
            }
        } else if (nready == 0) {
            continue;
        }
// struct my_epoll_event{
//  int sockfd;
//  char hostname[HOSTNAME_LENGTH]; 
//  async_recv_cb cb;
// };
        printf("get recv event [%d] start recv and exec msg...\n", nready);
        for(int i=0; i<nready; i++)
        {
          //从事件中获取到指针对象  获取回调函数和参数进行业务处理
          struct my_epoll_event *event_data = (struct my_epoll_event*)event_wait[i].data.ptr;
          int recvfd = event_data->sockfd;
          //接收数据 这里只是打印演示  应该做处理确保一个数据包的完整接收 需要设计半包粘包问题
          char buffer[BUFFER_SIZE] = {0};
          int recv_len = recv(recvfd, buffer, BUFFER_SIZE, 0);
          //然后通过事件指针执行回调 这里可以根据业务自行处理 这里只是打印
          //针对http的客户端请求业务场景  这里只是接收 就完成 并且不再使用fd
          event_data->cb(recvfd, event_data->hostname, buffer, recv_len);
          if(epoll_ctl(epfd, EPOLL_CTL_DEL, recvfd, NULL) == 0)
          {
            printf("epoll delete [%d] success \n", recvfd);
          }else
          {
            printf("epoll delete [%d] error \n", recvfd);
          }
          close(recvfd);
          //注意epoll_event自定义事件结构的释放
          free(event_data);
        }
  }
  return NULL;
}
//创建线程  并启动 监听接收
int create_thread_and_recv_do_cycle(struct async_context *ctx)
{
  if(pthread_create(&ctx->thread_id, NULL, http_recv_callback, ctx) != 0)
  {
    printf("create thread error \n");
    return -1;
  }
  return 0;
}
struct http_request {
    char *hostname;
    char *resource;
};
//根据全局依次获取hostnameh和resource进行构造http发送   并监听socket,设置接收时间的回调
int send_http_request(struct async_context *ctx)
{
  struct http_request reqs[] = {
      {"api.seniverse.com", "/v3/weather/now.json?key=0pyd8z7jouficcil&location=beijing&language=zh-Hans&unit=c" },
      {"api.seniverse.com", "/v3/weather/now.json?key=0pyd8z7jouficcil&location=changsha&language=zh-Hans&unit=c" },
      {"api.seniverse.com", "/v3/weather/now.json?key=0pyd8z7jouficcil&location=shenzhen&language=zh-Hans&unit=c" },
      {"api.seniverse.com", "/v3/weather/now.json?key=0pyd8z7jouficcil&location=shanghai&language=zh-Hans&unit=c" },
      {"api.seniverse.com", "/v3/weather/now.json?key=0pyd8z7jouficcil&location=tianjin&language=zh-Hans&unit=c" },
      {"api.seniverse.com", "/v3/weather/now.json?key=0pyd8z7jouficcil&location=wuhan&language=zh-Hans&unit=c" },
      {"api.seniverse.com", "/v3/weather/now.json?key=0pyd8z7jouficcil&location=hefei&language=zh-Hans&unit=c" },
      {"api.seniverse.com", "/v3/weather/now.json?key=0pyd8z7jouficcil&location=hangzhou&language=zh-Hans&unit=c" },
      {"api.seniverse.com", "/v3/weather/now.json?key=0pyd8z7jouficcil&location=nanjing&language=zh-Hans&unit=c" },
      {"api.seniverse.com", "/v3/weather/now.json?key=0pyd8z7jouficcil&location=jinan&language=zh-Hans&unit=c" },
      {"api.seniverse.com", "/v3/weather/now.json?key=0pyd8z7jouficcil&location=taiyuan&language=zh-Hans&unit=c" },
      {"api.seniverse.com", "/v3/weather/now.json?key=0pyd8z7jouficcil&location=wuxi&language=zh-Hans&unit=c" },
      {"api.seniverse.com", "/v3/weather/now.json?key=0pyd8z7jouficcil&location=suzhou&language=zh-Hans&unit=c" },
  };
  int count = sizeof(reqs)/ sizeof(reqs[0]);
  int ret = 0;
  for(int i=0; i< count; i++)
  {
    //实际的发送函数
    ret |= real_send_http_request(ctx, reqs[i].hostname,  reqs[i].resource);
  }
  return ret;
}
char *get_ip_by_host(const char * hostname);
int create_http_socket(const char * ip);
int send_one_http_request(int fd, const char * hostname, const char* resource);
void recv_exec_callback(int fd, const char *hostname, const char *result, int len);
//创建socket,加入epoll,使用epoll_event中指针特性加入
int real_send_http_request(struct async_context *ctx, const char* hostname, const char * resource)
{
  char * ip = get_ip_by_host(hostname);
  printf("get ip is [%s] \n", ip);
  //创建socket并连接,设置非阻塞
  int connfd = create_http_socket(ip);
  if(connfd < 0)
  {
    printf("error create socket [%d]\n", connfd);
    return -1;
  }
  //构建http报文并发送,验证发送成功
  send_one_http_request(connfd, hostname, resource);
  //构造自定义epoll_event,加入epoll
// struct my_epoll_event{
//  int sockfd;
//  char hostname[HOSTNAME_LENGTH]; 
//  async_recv_cb cb;
// };
  struct my_epoll_event * my_event = (struct my_epoll_event *)calloc(1, sizeof(struct my_epoll_event));
  if(my_event == NULL)
  {
    printf("add to epoll event error, malloc my_event error \n");
    close(connfd);
    return -1;
  }
  //这里赋初值 供业务处理回调时使用
  my_event->sockfd = connfd; //可作为参数可能需要回复
  memcpy(my_event->hostname, hostname, strlen(hostname));
  my_event->cb = recv_exec_callback; 
  struct epoll_event ev;
    ev.data.ptr = my_event;
    ev.events = EPOLLIN;
// struct async_context {
//  int epollfd;
//  pthread_t thread_id;
// };
    if(epoll_ctl(ctx->epollfd, EPOLL_CTL_ADD, connfd, &ev) != 0)
    {
      printf("add to epoll event error, epoll_ctl_add error \n");
      close(connfd);
      return -1;
    }
    return 0;
}
char *get_ip_by_host(const char * hostname)
{
  //通过域名获取ip
  struct hostent *host_entry = gethostbyname(hostname);
  if(host_entry == NULL)
  {
    return NULL;
  }
  //通过 struct hostent * 可以获取到域名  ip列表等信息
  printf("host name is [%s] \n", host_entry->h_name);
  printf("the other name is :\n");
  for(int i=0; host_entry->h_aliases[i]; ++i)
  {
    printf("   [i:%d][%s] \n", i, host_entry->h_aliases[i]);
  }
  printf("Address type: %s\n", (host_entry->h_addrtype==AF_INET) ? "AF_INET": "AF_INET6");
  printf("address length: %d \n", host_entry->h_length);
  printf("ip list is :\n");
  for(int i=0; host_entry->h_addr_list[i]; ++i)
  {
    // printf("   [i:%d][%s] \n", i, host_entry->h_addr_list[i]);
    printf("   [i:%d][%s] \n", i, inet_ntoa(*(struct in_addr*)host_entry->h_addr_list[i]));
  }
  return inet_ntoa(*(struct in_addr*)*host_entry->h_addr_list);
}
//创建socket 根据服务器ip和断开 进行连接 返回fd
int create_http_socket(const char * ip)
{
  //创建socket 根据Ip:port连接对端
  int sockfd = socket(AF_INET, SOCK_STREAM, 0);
  if(sockfd < 0)
  {
    return -1;
  }
  struct sockaddr_in sin = {0};
    sin.sin_addr.s_addr = inet_addr(ip);
    sin.sin_port = htons(80);
    sin.sin_family = AF_INET;
    //连接服务器 
    if (-1 == connect(sockfd, (struct sockaddr*)&sin, sizeof(struct sockaddr_in))) {
      printf("connect http server error \n");
        return -1;
    }
    //设置fd非阻塞
  fcntl(sockfd, F_SETFL, O_NONBLOCK);    
  return sockfd;
}
#define HTTP_VERSION    "HTTP/1.1"
#define USER_AGENT      "User-Agent: Mozilla/5.0 (Windows NT 5.1; rv:10.0.2) Gecko/20100101 Firefox/10.0.2\r\n"
#define ENCODE_TYPE     "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8\r\n"
#define CONNECTION_TYPE "Connection: close\r\n"
#define BUFFER_SIZE 4096
//发送http请求 返回发送成功还是失败
int send_one_http_request(int fd, const char * hostname, const char* resource)
{
  char send_buff[BUFFER_SIZE] = {0};
  //构造发送的请求报文  请求行 请求头部 空行  请求数据
  // GET /teacher_6.jpg HTTP/1.1
  // Host: www.0voice.com
  // User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.113 Safari/537.36
  // Content-Type: application/x-www-form-urlencoded
  // Content-Length: 9
  // lingsheng
//只关注了必须的参数  测试一下
  // GET %s %s\r\n   resource  HTTP_VERSION   请求行
  // Host: %s\r\n    hostname         请求头
  // %s\r\n          CONNECTION_TYPE      请求头
  // \r\n
  int len = sprintf(send_buff, "GET %s %s\r\nHost: %s\r\n%s\r\n\r\n",\
    resource, HTTP_VERSION, hostname, CONNECTION_TYPE);
  printf("send request buff is [%lu][%s] \n", strlen(send_buff), send_buff);
  int buff_len = strlen(send_buff);
  int sendlen = send(fd, send_buff, strlen(send_buff), 0);
  if(buff_len != sendlen)
  {
    printf("send buffer error \n");
    return -1;
  }
  return 0;
}
void recv_exec_callback(int fd, const char *hostname, const char *result, int len)
{
  printf("\t confd is [%d], \n\trecv buffer form [%s] [len:%d][data:%s] \n", fd, hostname, len, result);
}

我在linux环境上,直接使用gcc进行编译,执行测试观察相关日志。。。

我开始试着积累一些常用代码:自己代码库中备用

我的知识储备更多来自这里,推荐你了解:Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等技术内容,立即学习

目录
相关文章
|
3月前
|
消息中间件 监控 安全
服务Down机了,线程池中的数据如何保证不丢失?
在分布式系统与高并发应用开发中,服务的稳定性和数据的持久性是两个至关重要的考量点。当服务遭遇Down机时,如何确保线程池中处理的数据不丢失,是每一位开发者都需要深入思考的问题。以下,我将从几个关键方面分享如何在这种情况下保障数据的安全与完整性。
78 2
|
1月前
|
消息中间件 监控 Java
线程池关闭时未完成的任务如何保证数据的一致性?
保证线程池关闭时未完成任务的数据一致性需要综合运用多种方法和机制。通过备份与恢复、事务管理、任务状态记录与恢复、数据同步与协调、错误处理与补偿、监控与预警等手段的结合,以及结合具体业务场景进行分析和制定策略,能够最大程度地确保数据的一致性,保障系统的稳定运行和业务的顺利开展。同时,不断地优化和改进这些方法和机制,也是提高系统性能和可靠性的重要途径。
119 62
|
2月前
|
缓存 安全 Java
使用 Java 内存模型解决多线程中的数据竞争问题
【10月更文挑战第11天】在 Java 多线程编程中,数据竞争是一个常见问题。通过使用 `synchronized` 关键字、`volatile` 关键字、原子类、显式锁、避免共享可变数据、合理设计数据结构、遵循线程安全原则和使用线程池等方法,可以有效解决数据竞争问题,确保程序的正确性和稳定性。
63 2
|
3月前
|
消息中间件 存储 Java
服务重启了,如何保证线程池中的数据不丢失?
【8月更文挑战第30天】为确保服务重启时线程池数据不丢失,可采用数据持久化(如数据库或文件存储)、使用可靠的任务队列(如消息队列或分布式任务队列系统)、状态监测与恢复机制,以及分布式锁等方式。这些方法能有效提高系统稳定性和可靠性,需根据具体需求选择合适方案并进行测试优化。
255 5
|
4月前
处理串口线程数据的函数
【8月更文挑战第4天】处理串口线程数据的函数。
32 4
|
4月前
|
数据处理 Python
解锁Python多线程编程魔法,告别漫长等待!让数据下载如飞,感受科技带来的速度与激情!
【8月更文挑战第22天】Python以简洁的语法和强大的库支持在多个领域大放异彩。尽管存在全局解释器锁(GIL),Python仍提供多线程支持,尤其适用于I/O密集型任务。通过一个多线程下载数据的例子,展示了如何使用`threading`模块创建多线程程序,并与单线程版本进行了性能对比。实验表明,多线程能显著减少总等待时间,但在CPU密集型任务上GIL可能会限制其性能提升。此案例帮助理解Python多线程的优势及其适用场景。
48 0
|
4月前
|
NoSQL Redis
Lettuce的特性和内部实现问题之在同步调用模式下,业务线程是如何拿到结果数据的
Lettuce的特性和内部实现问题之在同步调用模式下,业务线程是如何拿到结果数据的
|
5月前
|
存储 缓存 NoSQL
架构设计篇问题之在数据割接过程中,多线程处理会导致数据错乱和重复问题如何解决
架构设计篇问题之在数据割接过程中,多线程处理会导致数据错乱和重复问题如何解决
|
5月前
|
网络协议 安全 Python
我们将使用Python的内置库`http.server`来创建一个简单的Web服务器。虽然这个示例相对简单,但我们可以围绕它展开许多讨论,包括HTTP协议、网络编程、异常处理、多线程等。
我们将使用Python的内置库`http.server`来创建一个简单的Web服务器。虽然这个示例相对简单,但我们可以围绕它展开许多讨论,包括HTTP协议、网络编程、异常处理、多线程等。
|
5月前
|
存储 安全 Java
Java面试题:如何在Java应用中实现有效的内存优化?在多线程环境下,如何确保数据的线程安全?如何设计并实现一个基于ExecutorService的任务处理流程?
Java面试题:如何在Java应用中实现有效的内存优化?在多线程环境下,如何确保数据的线程安全?如何设计并实现一个基于ExecutorService的任务处理流程?
50 0

热门文章

最新文章