启动摄像头输出线程
server_thread
服务器线程函数,用于接受客户端的连接请求并创建子线程处理每个客户端连接。下面是代码的主要步骤:
初始化变量和数据结构。
获取服务器地址信息,包括IP地址和端口号。
创建套接字,并设置套接字选项。
绑定套接字到服务器地址。
监听套接字,等待客户端连接。
循环等待客户端连接请求,使用
select函数等待可读套接字。
当有客户端连接时,创建一个子线程来处理连接。
子线程通过调用accept函数接受客户端连接,并传递连接套接字和上下文信息给子线程。
在子线程中处理客户端请求和响应。
主线程继续等待下一个客户端连接。
当收到停止信号时,退出循环,关闭套接字,并执行清理函数。
// 服务器线程函数 void *server_thread(void *arg) { int on; pthread_t client; struct addrinfo *aip, *aip2; struct addrinfo hints; struct sockaddr_storage client_addr; socklen_t addr_len = sizeof(struct sockaddr_storage); fd_set selectfds; int max_fds = 0; char name[NI_MAXHOST]; int err; int i; context *pcontext = arg; pglobal = pcontext->pglobal; /* set cleanup handler to cleanup ressources */ pthread_cleanup_push(server_cleanup, pcontext); // 初始化hints结构体 bzero(&hints, sizeof(hints)); hints.ai_family = PF_UNSPEC; // 支持IPv4和IPv6 hints.ai_flags = AI_PASSIVE; // 用于bind函数,表示返回的套接字地址结构体中的IP地址是通配地址 hints.ai_socktype = SOCK_STREAM; // TCP协议 // 获取地址信息 snprintf(name, sizeof(name), "%d", ntohs(pcontext->conf.port)); // 将端口号转换为字符串 if((err = getaddrinfo(NULL, name, &hints, &aip)) != 0) { // 获取地址信息 perror(gai_strerror(err)); // 输出错误信息 exit(EXIT_FAILURE); // 退出程序 } // 初始化所有套接字为-1 for(i = 0; i < MAX_SD_LEN; i++) pcontext->sd[i] = -1; /* open sockets for server (1 socket / address family) */ i = 0; for(aip2 = aip; aip2 != NULL; aip2 = aip2->ai_next) { if((pcontext->sd[i] = socket(aip2->ai_family, aip2->ai_socktype, 0)) < 0) { // 创建套接字 continue; } /* ignore "socket already in use" errors */ on = 1; if(setsockopt(pcontext->sd[i], SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) { // 设置套接字选项 perror("setsockopt(SO_REUSEADDR) failed"); } /* IPv6 socket should listen to IPv6 only, otherwise we will get "socket already in use" */ on = 1; if(aip2->ai_family == AF_INET6 && setsockopt(pcontext->sd[i], IPPROTO_IPV6, IPV6_V6ONLY, (const void *)&on , sizeof(on)) < 0) { // 设置套接字选项 perror("setsockopt(IPV6_V6ONLY) failed"); } /* perhaps we will use this keep-alive feature oneday */ /* setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, &on, sizeof(on)); */ if(bind(pcontext->sd[i], aip2->ai_addr, aip2->ai_addrlen) < 0) { // 绑定套接字 perror("bind"); pcontext->sd[i] = -1; continue; } if(listen(pcontext->sd[i], 10) < 0) { // 监听套接字 perror("listen"); pcontext->sd[i] = -1; } else { i++; if(i >= MAX_SD_LEN) { OPRINT("%s(): maximum number of server sockets exceeded", __FUNCTION__); i--; break; } } } pcontext->sd_len = i; if(pcontext->sd_len < 1) { // 如果没有套接字绑定成功,程序退出 OPRINT("%s(): bind(%d) failed", __FUNCTION__, htons(pcontext->conf.port)); closelog(); exit(EXIT_FAILURE); } /* create a child for every client that connects */ while(!pglobal->stop) { // 循环等待客户端连接 //int *pfd = (int *)malloc(sizeof(int)); cfd *pcfd = malloc(sizeof(cfd)); // 分配内存 if(pcfd == NULL) { // 如果分配内存失败,程序退出 fprintf(stderr, "failed to allocate (a very small amount of) memory\n"); exit(EXIT_FAILURE); } DBG("waiting for clients to connect\n"); // 输出调试信息 do { // 循环等待客户端连接 FD_ZERO(&selectfds); // 清空文件描述符集合 for(i = 0; i < MAX_SD_LEN; i++) { // 将所有套接字加入文件描述符集合 if(pcontext->sd[i] != -1) { FD_SET(pcontext->sd[i], &selectfds); if(pcontext->sd[i] > max_fds) max_fds = pcontext->sd[i]; } } err = select(max_fds + 1, &selectfds, NULL, NULL, NULL); // 等待客户端连接 if(err < 0 && errno != EINTR) { // 如果出错,程序退出 perror("select"); exit(EXIT_FAILURE); } } while(err <= 0); // 如果没有客户端连接,继续等待 for(i = 0; i < max_fds + 1; i++) { if(pcontext->sd[i] != -1 && FD_ISSET(pcontext->sd[i], &selectfds)) { pcfd->fd = accept(pcontext->sd[i], (struct sockaddr *)&client_addr, &addr_len); pcfd->pc = pcontext; /* start new thread that will handle this TCP connected client */ DBG("create thread to handle client that just established a connection\n"); #if 0 /* commented out as it fills up syslog with many redundant entries */ if(getnameinfo((struct sockaddr *)&client_addr, addr_len, name, sizeof(name), NULL, 0, NI_NUMERICHOST) == 0) { syslog(LOG_INFO, "serving client: %s\n", name); } #endif if(pthread_create(&client, NULL, &client_thread, pcfd) != 0) { // 创建线程处理客户端连接 DBG("could not launch another client thread\n"); close(pcfd->fd); free(pcfd); continue; } pthread_detach(client); // 分离线程 } } } DBG("leaving server thread, calling cleanup function now\n"); pthread_cleanup_pop(1); // 弹出清理函数 return NULL; }
设置 SO_REUSEADDR 选项。
该选项允许在套接字关闭后立即重新使用相同的地址。
通过设置 SO_REUSEADDR 选项,可以在套接字关闭后,立即重新使用相同的地址,而不需要等待操作系统释放该地址的等待时间。这在处理服务器应用程序时很常见,因为服务器通常会频繁地启动和关闭,并在相同的地址上监听连接。
/* ignore "socket already in use" errors */ on = 1; if(setsockopt(pcontext->sd[i], SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) { // 设置套接字选项 perror("setsockopt(SO_REUSEADDR) failed"); }
设置 IPV6_V6ONLY 选项。
该选项用于在 IPv6 套接字上限制仅接受 IPv6 连接,以避免 IPv4 连接进入 IPv6 套接字。
通过设置 IPV6_V6ONLY 选项,可以确保 IPv6 套接字仅接受 IPv6 连接,防止 IPv4 连接进入该套接字。这对于处理同时支持 IPv4 和 IPv6 的服务器应用程序非常重要,以确保连接按照正确的协议进行处理。
/* IPv6 socket should listen to IPv6 only, otherwise we will get "socket already in use" */ on = 1; if(aip2->ai_family == AF_INET6 && setsockopt(pcontext->sd[i], IPPROTO_IPV6, IPV6_V6ONLY, (const void *)&on , sizeof(on)) < 0) { // 设置套接字选项 perror("setsockopt(IPV6_V6ONLY) failed"); }
绑定地址/开始监听
通过 bind 函数将套接字与指定地址绑定,并使用 listen 函数开始监听连接请求。
/* perhaps we will use this keep-alive feature oneday */ /* setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, &on, sizeof(on)); */ if(bind(pcontext->sd[i], aip2->ai_addr, aip2->ai_addrlen) < 0) { // 绑定套接字 perror("bind"); pcontext->sd[i] = -1; continue; } if(listen(pcontext->sd[i], 10) < 0) { // 监听套接字 perror("listen"); pcontext->sd[i] = -1; } else { i++; if(i >= MAX_SD_LEN) { OPRINT("%s(): maximum number of server sockets exceeded", __FUNCTION__); i--; break; } } /* create a child for every client that connects */ while(!pglobal->stop) { // 循环等待客户端连接 //int *pfd = (int *)malloc(sizeof(int)); cfd *pcfd = malloc(sizeof(cfd)); // 分配内存 if(pcfd == NULL) { // 如果分配内存失败,程序退出 fprintf(stderr, "failed to allocate (a very small amount of) memory\n"); exit(EXIT_FAILURE); } DBG("waiting for clients to connect\n"); // 输出调试信息 do { // 循环等待客户端连接 FD_ZERO(&selectfds); // 清空文件描述符集合 for(i = 0; i < MAX_SD_LEN; i++) { // 将所有套接字加入文件描述符集合 if(pcontext->sd[i] != -1) { FD_SET(pcontext->sd[i], &selectfds); if(pcontext->sd[i] > max_fds) max_fds = pcontext->sd[i]; } } err = select(max_fds + 1, &selectfds, NULL, NULL, NULL); // 等待客户端连接 if(err < 0 && errno != EINTR) { // 如果出错,程序退出 perror("select"); exit(EXIT_FAILURE); } } while(err <= 0); // 如果没有客户端连接,继续等待
等待客户端连接
当有客户端连接请求到达时,将会创建一个子进程来处理该连接。这段代码使用 select 函数来实现非阻塞的等待,并通过文件描述符集合 selectfds 来管理待监听的套接字。
do { // 循环等待客户端连接 FD_ZERO(&selectfds); // 清空文件描述符集合 for(i = 0; i < MAX_SD_LEN; i++) { // 将所有套接字加入文件描述符集合 if(pcontext->sd[i] != -1) { FD_SET(pcontext->sd[i], &selectfds); if(pcontext->sd[i] > max_fds) max_fds = pcontext->sd[i]; } } err = select(max_fds + 1, &selectfds, NULL, NULL, NULL); // 等待客户端连接 if(err < 0 && errno != EINTR) { // 如果出错,程序退出 perror("select"); exit(EXIT_FAILURE); } } while(err <= 0); // 如果没有客户端连接,继续等待
处理与客户端建立的连接。
对于每个就绪的套接字,它会创建一个新线程来处理客户端连接,并将相关的套接字和上下文信息传递给线程函数。线程函数 client_thread 负责实际处理客户端连接的逻辑。主线程继续循环等待并处理更多的连接请求。
for(i = 0; i < max_fds + 1; i++) { if(pcontext->sd[i] != -1 && FD_ISSET(pcontext->sd[i], &selectfds)) { pcfd->fd = accept(pcontext->sd[i], (struct sockaddr *)&client_addr, &addr_len); pcfd->pc = pcontext; /* start new thread that will handle this TCP connected client */ DBG("create thread to handle client that just established a connection\n"); #if 0 /* commented out as it fills up syslog with many redundant entries */ if(getnameinfo((struct sockaddr *)&client_addr, addr_len, name, sizeof(name), NULL, 0, NI_NUMERICHOST) == 0) { syslog(LOG_INFO, "serving client: %s\n", name); } #endif if(pthread_create(&client, NULL, &client_thread, pcfd) != 0) { // 创建线程处理客户端连接 DBG("could not launch another client thread\n"); close(pcfd->fd); free(pcfd); continue; } pthread_detach(client); // 分离线程 } } }
client_thread
这段代码是一个HTTP客户端线程的函数实现。它接收一个指向cfd结构的指针作为参数,然后进行一系列的操作来处理客户端的请求。
以下是代码的主要流程:
初始化变量和数据结构。
读取客户端的请求行。
根据请求行确定请求的类型,并设置相应的标记。
解析HTTP请求的其余部分,包括请求头和可选的用户名密码验证信息。
检查用户名和密码是否匹配配置文件中的设置。
根据请求类型处理请求,可能涉及发送快照、发送流、执行命令、发送插件描述符JSON文件或发送文件等操作。
关闭文件描述符,释放请求相关的内存。
需要注意的是,代码中的部分逻辑可能与具体的应用程序有关,例如根据配置文件限制命令的执行或检查输入编号的范围等。
/* thread for clients that connected to this server */ void *client_thread(void *arg) { int cnt; char input_suffixed = 0; int input_number = 0; char buffer[BUFFER_SIZE] = {0}, *pb = buffer; iobuffer iobuf; request req; cfd lcfd; /* local-connected-file-descriptor */ /* we really need the fildescriptor and it must be freeable by us */ // 我们确实需要文件描述符,并且它必须由我们释放 if(arg != NULL) { memcpy(&lcfd, arg, sizeof(cfd)); free(arg); } else return NULL; /* initializes the structures */ // 初始化结构体 init_iobuffer(&iobuf); init_request(&req); /* What does the client want to receive? Read the request. */ // 客户端想要接收什么?读取请求 memset(buffer, 0, sizeof(buffer)); if((cnt = _readline(lcfd.fd, &iobuf, buffer, sizeof(buffer) - 1, 5)) == -1) { // 读取请求行 close(lcfd.fd); return NULL; } /* 确定要提供什么 */ if(strstr(buffer, "GET /?action=snapshot") != NULL) { // 如果请求是获取快照 req.type = A_SNAPSHOT; // 设置请求类型为获取快照 #ifdef WXP_COMPAT } else if((strstr(buffer, "GET /cam") != NULL) && (strstr(buffer, ".jpg") != NULL)) { // 如果请求是获取jpg格式的快照 req.type = A_SNAPSHOT; // 设置请求类型为获取快照 #endif input_suffixed = 255; // 标记请求中是否包含插件编号 } else if(strstr(buffer, "GET /?action=stream") != NULL) { // 如果请求是获取流 input_suffixed = 255; // 标记请求中是否包含插件编号 req.type = A_STREAM; // 设置请求类型为获取流 #ifdef WXP_COMPAT } else if((strstr(buffer, "GET /cam") != NULL) && (strstr(buffer, ".mjpg") != NULL)) { // 如果请求是获取mjpg格式的流 req.type = A_STREAM; // 设置请求类型为获取流 #endif input_suffixed = 255; // 标记请求中是否包含插件编号 } else if((strstr(buffer, "GET /input") != NULL) && (strstr(buffer, ".json") != NULL)) { // 如果请求是获取输入插件的json格式数据 req.type = A_INPUT_JSON; // 设置请求类型为获取输入插件的json格式数据 input_suffixed = 255; // 标记请求中是否包含插件编号 } else if((strstr(buffer, "GET /output") != NULL) && (strstr(buffer, ".json") != NULL)) { // 如果请求是获取输出插件的json格式数据 req.type = A_OUTPUT_JSON; // 设置请求类型为获取输出插件的json格式数据 input_suffixed = 255; // 标记请求中是否包含插件编号 } else if(strstr(buffer, "GET /program.json") != NULL) { // 如果请求是获取程序的json格式数据 req.type = A_PROGRAM_JSON; // 设置请求类型为获取程序的json格式数据 input_suffixed = 255; // 标记请求中是否包含插件编号 } else if(strstr(buffer, "GET /?action=command") != NULL) { int len; req.type = A_COMMAND; /* advance by the length of known string */ if((pb = strstr(buffer, "GET /?action=command")) == NULL) { // 如果请求不是获取命令 DBG("HTTP request seems to be malformed\n"); // 输出调试信息 send_error(lcfd.fd, 400, "Malformed HTTP request"); // 发送错误信息 close(lcfd.fd); // 关闭文件描述符 return NULL; // 返回空指针 } pb += strlen("GET /?action=command"); // a pb points to thestring after the first & after command /* only accept certain characters */ // 只接受特定字符 len = MIN(MAX(strspn(pb, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_-=&1234567890%./"), 0), 100); // 计算参数长度 req.parameter = malloc(len + 1); // 分配参数内存 if(req.parameter == NULL) { // 如果分配失败 exit(EXIT_FAILURE); // 退出程序 } memset(req.parameter, 0, len + 1); // 清空参数内存 strncpy(req.parameter, pb, len); // 复制参数 if(unescape(req.parameter) == -1) { // 如果解码失败 free(req.parameter); // 释放参数内存 send_error(lcfd.fd, 500, "could not properly unescape command parameter string"); // 发送错误信息 LOG("could not properly unescape command parameter string\n"); // 输出调试信息 close(lcfd.fd); // 关闭文件描述符 return NULL; // 返回空指针 } DBG("command parameter (len: %d): \"%s\"\n", len, req.parameter); // 输出调试信息 } else { int len; DBG("try to serve a file\n"); // 输出调试信息 req.type = A_FILE; // 设置请求类型为获取文件 if((pb = strstr(buffer, "GET /")) == NULL) { // 如果请求不是获取文件 DBG("HTTP request seems to be malformed\n"); // 输出调试信息 send_error(lcfd.fd, 400, "Malformed HTTP request"); // 发送错误信息 close(lcfd.fd); // 关闭文件描述符 return NULL; // 返回空指针 } pb += strlen("GET /"); // 跳过"GET /" len = MIN(MAX(strspn(pb, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ._-1234567890"), 0), 100); // 计算参数长度 req.parameter = malloc(len + 1); // 分配参数内存 if(req.parameter == NULL) { // 如果分配失败 exit(EXIT_FAILURE); // 退出程序 } memset(req.parameter, 0, len + 1); // 清空参数内存 strncpy(req.parameter, pb, len); // 复制参数 DBG("parameter (len: %d): \"%s\"\n", len, req.parameter); // 输出调试信息 } /* * 当我们使用多个输入插件时,有些url可能会有一个_[插件编号后缀] * 为了兼容性,可以在这种情况下保留,输出将从第0个输入插件生成 */ if(input_suffixed) { char *sch = strchr(buffer, '_'); if(sch != NULL) { // 如果url中有_,则输入编号应该存在 DBG("sch %s\n", sch + 1); // FIXME 如果添加了超过10个输入插件 char numStr[3]; memset(numStr, 0, 3); strncpy(numStr, sch + 1, 1); input_number = atoi(numStr); } DBG("input plugin_no: %d\n", input_number); } /* * 解析HTTP请求的其余部分 * 请求头的结尾由一个单独的空行"\r\n"标记 */ do { memset(buffer, 0, sizeof(buffer)); if((cnt = _readline(lcfd.fd, &iobuf, buffer, sizeof(buffer) - 1, 5)) == -1) { free_request(&req); close(lcfd.fd); return NULL; } if(strstr(buffer, "User-Agent: ") != NULL) { req.client = strdup(buffer + strlen("User-Agent: ")); } else if(strstr(buffer, "Authorization: Basic ") != NULL) { req.credentials = strdup(buffer + strlen("Authorization: Basic ")); decodeBase64(req.credentials); DBG("username:password: %s\n", req.credentials); } } while(cnt > 2 && !(buffer[0] == '\r' && buffer[1] == '\n')); /* 如果给出了参数-c,则检查用户名和密码 */ if(lcfd.pc->conf.credentials != NULL) { if(req.credentials == NULL || strcmp(lcfd.pc->conf.credentials, req.credentials) != 0) { DBG("access denied\n"); send_error(lcfd.fd, 401, "username and password do not match to configuration"); close(lcfd.fd); if(req.parameter != NULL) free(req.parameter); if(req.client != NULL) free(req.client); if(req.credentials != NULL) free(req.credentials); return NULL; } DBG("access granted\n"); } /* 现在是回应请求的时候 */ if(!(input_number < pglobal->incnt)) { // 如果输入编号超出范围 DBG("Input number: %d out of range (valid: 0..%d)\n", input_number, pglobal->incnt-1); // 输出调试信息 send_error(lcfd.fd, 404, "Invalid input plugin number"); // 发送错误信息 req.type = A_UNKNOWN; // 设置请求类型为未知 } switch(req.type) { // 根据请求类型进行处理 case A_SNAPSHOT: // 请求快照 DBG("Request for snapshot from input: %d\n", input_number); // 输出调试信息 send_snapshot(lcfd.fd, input_number); // 发送快照 break; case A_STREAM: // 请求流 DBG("Request for stream from input: %d\n", input_number); // 输出调试信息 send_stream(lcfd.fd, input_number); // 发送流 break; case A_COMMAND: // 请求命令 if(lcfd.pc->conf.nocommands) { // 如果不允许命令 send_error(lcfd.fd, 501, "this server is configured to not accept commands"); // 发送错误信息 break; } command(lcfd.pc->id, lcfd.fd, req.parameter); // 执行命令 break; case A_INPUT_JSON: // 请求输入插件描述符JSON文件 DBG("Request for the Input plugin descriptor JSON file\n"); // 输出调试信息 send_Input_JSON(lcfd.fd, input_number); // 发送输入插件描述符JSON文件 break; case A_OUTPUT_JSON: // 请求输出插件描述符JSON文件 DBG("Request for the Output plugin descriptor JSON file\n"); // 输出调试信息 send_Output_JSON(lcfd.fd, input_number); // 发送输出插件描述符JSON文件 break; case A_PROGRAM_JSON: // 请求程序描述符JSON文件 DBG("Request for the program descriptor JSON file\n"); // 输出调试信息 send_Program_JSON(lcfd.fd); // 发送程序描述符JSON文件 break; case A_FILE: // 请求文件 if(lcfd.pc->conf.www_folder == NULL) // 如果没有配置www文件夹 send_error(lcfd.fd, 501, "no www-folder configured"); // 发送错误信息 else send_file(lcfd.pc->id, lcfd.fd, req.parameter); // 发送文件 break; default: // 未知请求 DBG("unknown request\n"); // 输出调试信息 } close(lcfd.fd); // 关闭文件描述符 free_request(&req); // 释放请求内存 DBG("leaving HTTP client thread\n"); // 输出调试信息 return NULL; }
_readline
函数_read用于从文件描述符中读取数据,并将读取的数据存储到缓冲区中。下面是函数的主要步骤:
初始化变量和数据结构。
循环读取数据,直到满足读取长度的要求。
使用select函数等待数据到达或超时。
调用read函数从文件描述符中读取数据。
将读取的数据存储到缓冲区中,并更新相关计数器。
如果读取的字节数小于缓冲区大小,将数据移动到缓冲区末尾。
返回已读取的字节数。
函数_readline是基于_read函数的封装,用于从文件描述符中读取一行数据,直到遇到换行符或达到最大长度。它调用_read函数逐个读取字符,并将字符存储到指定的缓冲区中,直到满足结束条件。返回读取的字符数或-1(表示超时或出错)。
这两个函数可能是某个网络或文件处理程序中的一部分,用于读取和处理输入数据。它们在循环读取和处理数据时提供了超时机制,以防止程序永久阻塞。
int _read(int fd, iobuffer *iobuf, void *buffer, size_t len, int timeout) { int copied = 0, rc, i; fd_set fds; struct timeval tv; memset(buffer, 0, len); // 将buffer清零 while((copied < len)) { // 循环读取数据 i = MIN(iobuf->level, len - copied); // 计算需要读取的字节数 memcpy(buffer + copied, iobuf->buffer + IO_BUFFER - iobuf->level, i); // 将读取到的数据存入buffer中 iobuf->level -= i; // 更新iobuf中的level copied += i; // 更新已读取的字节数 if(copied >= len) // 如果已读取的字节数等于需要读取的字节数,返回已读取的字节数 return copied; /* select将在超时或有新数据到达时返回 */ tv.tv_sec = timeout; tv.tv_usec = 0; FD_ZERO(&fds); FD_SET(fd, &fds); if((rc = select(fd + 1, &fds, NULL, NULL, &tv)) <= 0) { // 调用select函数等待数据到达或超时 if(rc < 0) // 如果返回值小于0,说明出错 exit(EXIT_FAILURE); /* 这里一定是超时 */ return copied; // 返回已读取的字节数 } init_iobuffer(iobuf); // 初始化iobuf /* * 由于select函数已经返回,所以这里应该至少有一个字节可读 * 但是,由于在select和read之间,远程socket可能会关闭,所以不能保证一定有数据可读 */ if((iobuf->level = read(fd, &iobuf->buffer, IO_BUFFER)) <= 0) { // 调用read函数读取数据 /* 出错了 */ return -1; // 返回-1 } /* 如果读取的字节数小于IO_BUFFER,将数据移动到缓冲区末尾 */ memmove(iobuf->buffer + (IO_BUFFER - iobuf->level), iobuf->buffer, iobuf->level); } return 0; } /****************************************************************************** Description.: Read a single line from the provided fildescriptor. This funtion will return under two conditions: * line end was reached * timeout occured Input Value.: * fd.....: fildescriptor to read from * iobuf..: iobuffer that allows to use this functions from multiple threads because the complete context is the iobuffer. * buffer.: The buffer to store values at, will be set to zero before storing values. * len....: the length of buffer * timeout: seconds to wait for an answer Return Value: * buffer.: will become filled with bytes read * iobuf..: May get altered to save the context for future calls. * func().: bytes copied to buffer or -1 in case of error ******************************************************************************/ /* read just a single line or timeout */ int _readline(int fd, iobuffer *iobuf, void *buffer, size_t len, int timeout) { char c = '\0', *out = buffer; // 定义字符变量c和指向buffer的指针out int i; memset(buffer, 0, len); // 将buffer清零 for(i = 0; i < len && c != '\n'; i++) { // 循环读取每个字符,直到读取到换行符或达到最大长度 if(_read(fd, iobuf, &c, 1, timeout) <= 0) { // 调用_read函数读取一个字符,如果返回值小于等于0,说明超时或出错 /* timeout or error occured */ // 超时或出错 return -1; // 返回-1 } *out++ = c; // 将读取到的字符存入buffer中 } return i; // 返回读取到的字符数 }
send_snapshot
函数send_snapshot用于将快照数据发送给客户端。下面是对函数的概括:
等待获取新的一帧数据。
锁定输入缓冲区的互斥锁,并等待输入缓冲区的更新条件变量。
读取输入缓冲区的帧大小。
为当前帧分配内存空间。
将输入缓冲区的时间戳复制到用户空间。
将输入缓冲区的帧数据复制到分配的内存空间中。
解锁输入缓冲区的互斥锁。
构建响应头部信息,包括HTTP状态行、标准头部和图片类型等。
将响应头部发送到客户端。
将帧数据发送到客户端。
释放帧数据的内存空间。
/* 发送快照给客户端 */ void send_snapshot(int fd, int input_number) { unsigned char *frame = NULL; int frame_size = 0; char buffer[BUFFER_SIZE] = {0}; struct timeval timestamp; /* 等待获取新的一帧 */ pthread_mutex_lock(&pglobal->in[input_number].db); pthread_cond_wait(&pglobal->in[input_number].db_update, &pglobal->in[input_number].db); /* 读取缓冲区 */ frame_size = pglobal->in[input_number].size; /* 为这一帧分配一个缓冲区 */ if((frame = malloc(frame_size + 1)) == NULL) { free(frame); pthread_mutex_unlock(&pglobal->in[input_number].db); send_error(fd, 500, "not enough memory"); return; } /* 将 v4l2_buffer 的时间戳复制到用户空间 */ timestamp = pglobal->in[input_number].timestamp; memcpy(frame, pglobal->in[input_number].buf, frame_size); DBG("got frame (size: %d kB)\n", frame_size / 1024); pthread_mutex_unlock(&pglobal->in[input_number].db); /* 写入响应 */ sprintf(buffer, "HTTP/1.0 200 OK\r\n" \ STD_HEADER \ "Content-type: image/jpeg\r\n" \ "X-Timestamp: %d.%06d\r\n" \ "\r\n", (int) timestamp.tv_sec, (int) timestamp.tv_usec); /* 现在发送头和图像 */ if(write(fd, buffer, strlen(buffer)) < 0 || \ write(fd, frame, frame_size) < 0) { free(frame); return; } free(frame); }
send_stream
函数send_stream用于发送视频流给客户端。下面是对函数的概括:
准备HTTP响应头部信息,包括状态行、标准头部和多部分数据流的Content-Type等。
将响应头部发送给客户端。
在循环中,等待获取新的一帧数据。
锁定输入缓冲区的互斥锁,并等待输入缓冲区的更新条件变量。
读取输入缓冲区的帧大小。
检查帧缓冲区的大小是否足够,如果不够则增加缓冲区的大小。
将输入缓冲区的时间戳复制到用户空间。
将输入缓冲区的帧数据复制到帧缓冲区中。
解锁输入缓冲区的互斥锁。
构建帧的响应头部信息,包括Content-Type、Content-Length和时间戳等。
将帧的响应头部发送给客户端。
将帧数据发送给客户端。
发送分隔符boundary。
重复步骤3-13直到停止条件满足。
释放帧缓冲区的内存空间。
void send_stream(int fd, int input_number) { unsigned char *frame = NULL, *tmp = NULL; int frame_size = 0, max_frame_size = 0; char buffer[BUFFER_SIZE] = {0}; struct timeval timestamp; DBG("preparing header\n"); sprintf(buffer, "HTTP/1.0 200 OK\r\n" \ STD_HEADER \ "Content-Type: multipart/x-mixed-replace;boundary=" BOUNDARY "\r\n" \ "\r\n" \ "--" BOUNDARY "\r\n"); if(write(fd, buffer, strlen(buffer)) < 0) { free(frame); return; } DBG("Headers send, sending stream now\n"); while(!pglobal->stop) { /* 等待获取新的一帧 */ pthread_mutex_lock(&pglobal->in[input_number].db); pthread_cond_wait(&pglobal->in[input_number].db_update, &pglobal->in[input_number].db); /* 读取缓冲区 */ frame_size = pglobal->in[input_number].size; /* 检查帧缓冲区是否足够大,如果不够大则增加缓冲区大小 */ if(frame_size > max_frame_size) { DBG("增加缓冲区大小到 %d\n", frame_size); max_frame_size = frame_size + TEN_K; if((tmp = realloc(frame, max_frame_size)) == NULL) { free(frame); pthread_mutex_unlock(&pglobal->in[input_number].db); send_error(fd, 500, "内存不足"); return; } frame = tmp; } /* 将 v4l2_buffer 的时间戳复制到用户空间 */ timestamp = pglobal->in[input_number].timestamp; memcpy(frame, pglobal->in[input_number].buf, frame_size); DBG("got frame (size: %d kB)\n", frame_size / 1024); pthread_mutex_unlock(&pglobal->in[input_number].db); /* * 打印单个 mimetype 和长度 * 发送内容长度可以修复在 firefox 中观察到的随机流中断 */ sprintf(buffer, "Content-Type: image/jpeg\r\n" \ "Content-Length: %d\r\n" \ "X-Timestamp: %d.%06d\r\n" \ "\r\n", frame_size, (int)timestamp.tv_sec, (int)timestamp.tv_usec); DBG("sending intemdiate header\n"); if(write(fd, buffer, strlen(buffer)) < 0) break; DBG("sending frame\n"); if(write(fd, frame, frame_size) < 0) break; DBG("sending boundary\n"); sprintf(buffer, "\r\n--" BOUNDARY "\r\n"); if(write(fd, buffer, strlen(buffer)) < 0) break; } free(frame); }