mjpg-streamer实现细节分析(下)

简介: mjpg-streamer实现细节分析(下)

启动摄像头输出线程

server_thread

服务器线程函数,用于接受客户端的连接请求并创建子线程处理每个客户端连接。下面是代码的主要步骤:

初始化变量和数据结构。

获取服务器地址信息,包括IP地址和端口号。

创建套接字,并设置套接字选项。

绑定套接字到服务器地址。

监听套接字,等待客户端连接。

循环等待客户端连接请求,使用

select函数等待可读套接字。

当有客户端连接时,创建一个子线程来处理连接。

子线程通过调用accept函数接受客户端连接,并传递连接套接字和上下文信息给子线程。

在子线程中处理客户端请求和响应。

主线程继续等待下一个客户端连接。

当收到停止信号时,退出循环,关闭套接字,并执行清理函数。

// 服务器线程函数
void *server_thread(void *arg)
{
    int on;
    pthread_t client;
    struct addrinfo *aip, *aip2;
    struct addrinfo hints;
    struct sockaddr_storage client_addr;
    socklen_t addr_len = sizeof(struct sockaddr_storage);
    fd_set selectfds;
    int max_fds = 0;
    char name[NI_MAXHOST];
    int err;
    int i;
    context *pcontext = arg;
    pglobal = pcontext->pglobal;
    /* set cleanup handler to cleanup ressources */
    pthread_cleanup_push(server_cleanup, pcontext);
    // 初始化hints结构体
    bzero(&hints, sizeof(hints));
    hints.ai_family = PF_UNSPEC; // 支持IPv4和IPv6
    hints.ai_flags = AI_PASSIVE; // 用于bind函数,表示返回的套接字地址结构体中的IP地址是通配地址
    hints.ai_socktype = SOCK_STREAM; // TCP协议
    // 获取地址信息
    snprintf(name, sizeof(name), "%d", ntohs(pcontext->conf.port)); // 将端口号转换为字符串
    if((err = getaddrinfo(NULL, name, &hints, &aip)) != 0) { // 获取地址信息
        perror(gai_strerror(err)); // 输出错误信息
        exit(EXIT_FAILURE); // 退出程序
    }
    // 初始化所有套接字为-1
    for(i = 0; i < MAX_SD_LEN; i++)
        pcontext->sd[i] = -1;
    /* open sockets for server (1 socket / address family) */
    i = 0;
    for(aip2 = aip; aip2 != NULL; aip2 = aip2->ai_next) {
        if((pcontext->sd[i] = socket(aip2->ai_family, aip2->ai_socktype, 0)) < 0) { // 创建套接字
            continue;
        }
        /* ignore "socket already in use" errors */
        on = 1;
        if(setsockopt(pcontext->sd[i], SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) { // 设置套接字选项
            perror("setsockopt(SO_REUSEADDR) failed");
        }
        /* IPv6 socket should listen to IPv6 only, otherwise we will get "socket already in use" */
        on = 1;
        if(aip2->ai_family == AF_INET6 && setsockopt(pcontext->sd[i], IPPROTO_IPV6, IPV6_V6ONLY,
                (const void *)&on , sizeof(on)) < 0) { // 设置套接字选项
            perror("setsockopt(IPV6_V6ONLY) failed");
        }
        /* perhaps we will use this keep-alive feature oneday */
        /* setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, &on, sizeof(on)); */
        if(bind(pcontext->sd[i], aip2->ai_addr, aip2->ai_addrlen) < 0) { // 绑定套接字
            perror("bind");
            pcontext->sd[i] = -1;
            continue;
        }
        if(listen(pcontext->sd[i], 10) < 0) { // 监听套接字
            perror("listen");
            pcontext->sd[i] = -1;
        } else {
            i++;
            if(i >= MAX_SD_LEN) {
                OPRINT("%s(): maximum number of server sockets exceeded", __FUNCTION__);
                i--;
                break;
            }
        }
    }
    pcontext->sd_len = i;
    if(pcontext->sd_len < 1) { // 如果没有套接字绑定成功,程序退出
        OPRINT("%s(): bind(%d) failed", __FUNCTION__, htons(pcontext->conf.port));
        closelog();
        exit(EXIT_FAILURE);
    }
    /* create a child for every client that connects */
    while(!pglobal->stop) { // 循环等待客户端连接
        //int *pfd = (int *)malloc(sizeof(int));
        cfd *pcfd = malloc(sizeof(cfd)); // 分配内存
        if(pcfd == NULL) { // 如果分配内存失败,程序退出
            fprintf(stderr, "failed to allocate (a very small amount of) memory\n");
            exit(EXIT_FAILURE);
        }
        DBG("waiting for clients to connect\n"); // 输出调试信息
        do { // 循环等待客户端连接
            FD_ZERO(&selectfds); // 清空文件描述符集合
            for(i = 0; i < MAX_SD_LEN; i++) { // 将所有套接字加入文件描述符集合
                if(pcontext->sd[i] != -1) {
                    FD_SET(pcontext->sd[i], &selectfds);
                    if(pcontext->sd[i] > max_fds)
                        max_fds = pcontext->sd[i];
                }
            }
            err = select(max_fds + 1, &selectfds, NULL, NULL, NULL); // 等待客户端连接
            if(err < 0 && errno != EINTR) { // 如果出错,程序退出
                perror("select");
                exit(EXIT_FAILURE);
            }
        } while(err <= 0); // 如果没有客户端连接,继续等待
        for(i = 0; i < max_fds + 1; i++) {
            if(pcontext->sd[i] != -1 && FD_ISSET(pcontext->sd[i], &selectfds)) {
                pcfd->fd = accept(pcontext->sd[i], (struct sockaddr *)&client_addr, &addr_len);
                pcfd->pc = pcontext;
                /* start new thread that will handle this TCP connected client */
                DBG("create thread to handle client that just established a connection\n");
#if 0
/* commented out as it fills up syslog with many redundant entries */
                if(getnameinfo((struct sockaddr *)&client_addr, addr_len, name, sizeof(name), NULL, 0, NI_NUMERICHOST) == 0) {
                    syslog(LOG_INFO, "serving client: %s\n", name);
                }
#endif
                if(pthread_create(&client, NULL, &client_thread, pcfd) != 0) { // 创建线程处理客户端连接
                    DBG("could not launch another client thread\n");
                    close(pcfd->fd);
                    free(pcfd);
                    continue;
                }
                pthread_detach(client); // 分离线程
            }
        }
    }
    DBG("leaving server thread, calling cleanup function now\n");
    pthread_cleanup_pop(1); // 弹出清理函数
    return NULL;
}

设置 SO_REUSEADDR 选项。

该选项允许在套接字关闭后立即重新使用相同的地址。

通过设置 SO_REUSEADDR 选项,可以在套接字关闭后,立即重新使用相同的地址,而不需要等待操作系统释放该地址的等待时间。这在处理服务器应用程序时很常见,因为服务器通常会频繁地启动和关闭,并在相同的地址上监听连接。

/* ignore "socket already in use" errors */
on = 1;
if(setsockopt(pcontext->sd[i], SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) { // 设置套接字选项
    perror("setsockopt(SO_REUSEADDR) failed");
}

设置 IPV6_V6ONLY 选项。

该选项用于在 IPv6 套接字上限制仅接受 IPv6 连接,以避免 IPv4 连接进入 IPv6 套接字。

通过设置 IPV6_V6ONLY 选项,可以确保 IPv6 套接字仅接受 IPv6 连接,防止 IPv4 连接进入该套接字。这对于处理同时支持 IPv4 和 IPv6 的服务器应用程序非常重要,以确保连接按照正确的协议进行处理。

/* IPv6 socket should listen to IPv6 only, otherwise we will get "socket already in use" */
on = 1;
if(aip2->ai_family == AF_INET6 && setsockopt(pcontext->sd[i], IPPROTO_IPV6, IPV6_V6ONLY,
        (const void *)&on , sizeof(on)) < 0) { // 设置套接字选项
    perror("setsockopt(IPV6_V6ONLY) failed");
}

绑定地址/开始监听

通过 bind 函数将套接字与指定地址绑定,并使用 listen 函数开始监听连接请求。

/* perhaps we will use this keep-alive feature oneday */
    /* setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, &on, sizeof(on)); */
    if(bind(pcontext->sd[i], aip2->ai_addr, aip2->ai_addrlen) < 0) { // 绑定套接字
        perror("bind");
        pcontext->sd[i] = -1;
        continue;
    }
    if(listen(pcontext->sd[i], 10) < 0) { // 监听套接字
        perror("listen");
        pcontext->sd[i] = -1;
    } else {
        i++;
        if(i >= MAX_SD_LEN) {
            OPRINT("%s(): maximum number of server sockets exceeded", __FUNCTION__);
            i--;
            break;
        }
    }
/* create a child for every client that connects */
while(!pglobal->stop) { // 循环等待客户端连接
    //int *pfd = (int *)malloc(sizeof(int));
    cfd *pcfd = malloc(sizeof(cfd)); // 分配内存
    if(pcfd == NULL) { // 如果分配内存失败,程序退出
        fprintf(stderr, "failed to allocate (a very small amount of) memory\n");
        exit(EXIT_FAILURE);
    }
    DBG("waiting for clients to connect\n"); // 输出调试信息
    do { // 循环等待客户端连接
        FD_ZERO(&selectfds); // 清空文件描述符集合
        for(i = 0; i < MAX_SD_LEN; i++) { // 将所有套接字加入文件描述符集合
            if(pcontext->sd[i] != -1) {
                FD_SET(pcontext->sd[i], &selectfds);
                if(pcontext->sd[i] > max_fds)
                    max_fds = pcontext->sd[i];
            }
        }
        err = select(max_fds + 1, &selectfds, NULL, NULL, NULL); // 等待客户端连接
        if(err < 0 && errno != EINTR) { // 如果出错,程序退出
            perror("select");
            exit(EXIT_FAILURE);
        }
    } while(err <= 0); // 如果没有客户端连接,继续等待

等待客户端连接

当有客户端连接请求到达时,将会创建一个子进程来处理该连接。这段代码使用 select 函数来实现非阻塞的等待,并通过文件描述符集合 selectfds 来管理待监听的套接字。

do { // 循环等待客户端连接
        FD_ZERO(&selectfds); // 清空文件描述符集合
        for(i = 0; i < MAX_SD_LEN; i++) { // 将所有套接字加入文件描述符集合
            if(pcontext->sd[i] != -1) {
                FD_SET(pcontext->sd[i], &selectfds);
                if(pcontext->sd[i] > max_fds)
                    max_fds = pcontext->sd[i];
            }
        }
        err = select(max_fds + 1, &selectfds, NULL, NULL, NULL); // 等待客户端连接
        if(err < 0 && errno != EINTR) { // 如果出错,程序退出
            perror("select");
            exit(EXIT_FAILURE);
        }
        } while(err <= 0); // 如果没有客户端连接,继续等待

处理与客户端建立的连接。

对于每个就绪的套接字,它会创建一个新线程来处理客户端连接,并将相关的套接字和上下文信息传递给线程函数。线程函数 client_thread 负责实际处理客户端连接的逻辑。主线程继续循环等待并处理更多的连接请求。

for(i = 0; i < max_fds + 1; i++) {
            if(pcontext->sd[i] != -1 && FD_ISSET(pcontext->sd[i], &selectfds)) {
                pcfd->fd = accept(pcontext->sd[i], (struct sockaddr *)&client_addr, &addr_len);
                pcfd->pc = pcontext;
                /* start new thread that will handle this TCP connected client */
                DBG("create thread to handle client that just established a connection\n");
#if 0
/* commented out as it fills up syslog with many redundant entries */
                if(getnameinfo((struct sockaddr *)&client_addr, addr_len, name, sizeof(name), NULL, 0, NI_NUMERICHOST) == 0) {
                    syslog(LOG_INFO, "serving client: %s\n", name);
                }
#endif
                if(pthread_create(&client, NULL, &client_thread, pcfd) != 0) { // 创建线程处理客户端连接
                    DBG("could not launch another client thread\n");
                    close(pcfd->fd);
                    free(pcfd);
                    continue;
                }
                pthread_detach(client); // 分离线程
            }
        }
    }

client_thread

这段代码是一个HTTP客户端线程的函数实现。它接收一个指向cfd结构的指针作为参数,然后进行一系列的操作来处理客户端的请求。

以下是代码的主要流程:

初始化变量和数据结构。

读取客户端的请求行。

根据请求行确定请求的类型,并设置相应的标记。

解析HTTP请求的其余部分,包括请求头和可选的用户名密码验证信息。

检查用户名和密码是否匹配配置文件中的设置。

根据请求类型处理请求,可能涉及发送快照、发送流、执行命令、发送插件描述符JSON文件或发送文件等操作。

关闭文件描述符,释放请求相关的内存。

需要注意的是,代码中的部分逻辑可能与具体的应用程序有关,例如根据配置文件限制命令的执行或检查输入编号的范围等。

/* thread for clients that connected to this server */
void *client_thread(void *arg)
{
    int cnt;
    char input_suffixed = 0;
    int input_number = 0;
    char buffer[BUFFER_SIZE] = {0}, *pb = buffer;
    iobuffer iobuf;
    request req;
    cfd lcfd; /* local-connected-file-descriptor */
    /* we really need the fildescriptor and it must be freeable by us */ // 我们确实需要文件描述符,并且它必须由我们释放
    if(arg != NULL) {
        memcpy(&lcfd, arg, sizeof(cfd));
        free(arg);
    } else
        return NULL;
    /* initializes the structures */ // 初始化结构体
    init_iobuffer(&iobuf);
    init_request(&req);
    /* What does the client want to receive? Read the request. */ // 客户端想要接收什么?读取请求
    memset(buffer, 0, sizeof(buffer));
    if((cnt = _readline(lcfd.fd, &iobuf, buffer, sizeof(buffer) - 1, 5)) == -1) { // 读取请求行
        close(lcfd.fd);
        return NULL;
    }
    /* 确定要提供什么 */
    if(strstr(buffer, "GET /?action=snapshot") != NULL) { // 如果请求是获取快照
        req.type = A_SNAPSHOT; // 设置请求类型为获取快照
#ifdef WXP_COMPAT
    } else if((strstr(buffer, "GET /cam") != NULL) && (strstr(buffer, ".jpg") != NULL)) { // 如果请求是获取jpg格式的快照
        req.type = A_SNAPSHOT; // 设置请求类型为获取快照
#endif
        input_suffixed = 255; // 标记请求中是否包含插件编号
    } else if(strstr(buffer, "GET /?action=stream") != NULL) { // 如果请求是获取流
        input_suffixed = 255; // 标记请求中是否包含插件编号
        req.type = A_STREAM; // 设置请求类型为获取流
#ifdef WXP_COMPAT
    } else if((strstr(buffer, "GET /cam") != NULL) && (strstr(buffer, ".mjpg") != NULL)) { // 如果请求是获取mjpg格式的流
        req.type = A_STREAM; // 设置请求类型为获取流
#endif
        input_suffixed = 255; // 标记请求中是否包含插件编号
    } else if((strstr(buffer, "GET /input") != NULL) && (strstr(buffer, ".json") != NULL)) { // 如果请求是获取输入插件的json格式数据
        req.type = A_INPUT_JSON; // 设置请求类型为获取输入插件的json格式数据
        input_suffixed = 255; // 标记请求中是否包含插件编号
    } else if((strstr(buffer, "GET /output") != NULL) && (strstr(buffer, ".json") != NULL)) { // 如果请求是获取输出插件的json格式数据
        req.type = A_OUTPUT_JSON; // 设置请求类型为获取输出插件的json格式数据
        input_suffixed = 255; // 标记请求中是否包含插件编号
    } else if(strstr(buffer, "GET /program.json") != NULL) { // 如果请求是获取程序的json格式数据
        req.type = A_PROGRAM_JSON; // 设置请求类型为获取程序的json格式数据
        input_suffixed = 255; // 标记请求中是否包含插件编号
    } else if(strstr(buffer, "GET /?action=command") != NULL) {
        int len;
        req.type = A_COMMAND;
        /* advance by the length of known string */
        if((pb = strstr(buffer, "GET /?action=command")) == NULL) { // 如果请求不是获取命令
            DBG("HTTP request seems to be malformed\n"); // 输出调试信息
            send_error(lcfd.fd, 400, "Malformed HTTP request"); // 发送错误信息
            close(lcfd.fd); // 关闭文件描述符
            return NULL; // 返回空指针
        }
        pb += strlen("GET /?action=command"); // a pb points to thestring after the first & after command
        /* only accept certain characters */ // 只接受特定字符
        len = MIN(MAX(strspn(pb, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_-=&1234567890%./"), 0), 100); // 计算参数长度
        req.parameter = malloc(len + 1); // 分配参数内存
        if(req.parameter == NULL) { // 如果分配失败
            exit(EXIT_FAILURE); // 退出程序
        }
        memset(req.parameter, 0, len + 1); // 清空参数内存
        strncpy(req.parameter, pb, len); // 复制参数
        if(unescape(req.parameter) == -1) { // 如果解码失败
            free(req.parameter); // 释放参数内存
            send_error(lcfd.fd, 500, "could not properly unescape command parameter string"); // 发送错误信息
            LOG("could not properly unescape command parameter string\n"); // 输出调试信息
            close(lcfd.fd); // 关闭文件描述符
            return NULL; // 返回空指针
        }
        DBG("command parameter (len: %d): \"%s\"\n", len, req.parameter); // 输出调试信息
    } else {
        int len;
        DBG("try to serve a file\n"); // 输出调试信息
        req.type = A_FILE; // 设置请求类型为获取文件
        if((pb = strstr(buffer, "GET /")) == NULL) { // 如果请求不是获取文件
            DBG("HTTP request seems to be malformed\n"); // 输出调试信息
            send_error(lcfd.fd, 400, "Malformed HTTP request"); // 发送错误信息
            close(lcfd.fd); // 关闭文件描述符
            return NULL; // 返回空指针
        }
        pb += strlen("GET /"); // 跳过"GET /"
        len = MIN(MAX(strspn(pb, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ._-1234567890"), 0), 100); // 计算参数长度
        req.parameter = malloc(len + 1); // 分配参数内存
        if(req.parameter == NULL) { // 如果分配失败
            exit(EXIT_FAILURE); // 退出程序
        }
        memset(req.parameter, 0, len + 1); // 清空参数内存
        strncpy(req.parameter, pb, len); // 复制参数
        DBG("parameter (len: %d): \"%s\"\n", len, req.parameter); // 输出调试信息
    }
    /*
     * 当我们使用多个输入插件时,有些url可能会有一个_[插件编号后缀]
     * 为了兼容性,可以在这种情况下保留,输出将从第0个输入插件生成
     */
    if(input_suffixed) {
        char *sch = strchr(buffer, '_');
        if(sch != NULL) {  // 如果url中有_,则输入编号应该存在
            DBG("sch %s\n", sch + 1); // FIXME 如果添加了超过10个输入插件
            char numStr[3];
            memset(numStr, 0, 3);
            strncpy(numStr, sch + 1, 1);
            input_number = atoi(numStr);
        }
        DBG("input plugin_no: %d\n", input_number);
    }
    /*
     * 解析HTTP请求的其余部分
     * 请求头的结尾由一个单独的空行"\r\n"标记
     */
    do {
        memset(buffer, 0, sizeof(buffer));
        if((cnt = _readline(lcfd.fd, &iobuf, buffer, sizeof(buffer) - 1, 5)) == -1) {
            free_request(&req);
            close(lcfd.fd);
            return NULL;
        }
        if(strstr(buffer, "User-Agent: ") != NULL) {
            req.client = strdup(buffer + strlen("User-Agent: "));
        } else if(strstr(buffer, "Authorization: Basic ") != NULL) {
            req.credentials = strdup(buffer + strlen("Authorization: Basic "));
            decodeBase64(req.credentials);
            DBG("username:password: %s\n", req.credentials);
        }
    } while(cnt > 2 && !(buffer[0] == '\r' && buffer[1] == '\n'));
    /* 如果给出了参数-c,则检查用户名和密码 */
    if(lcfd.pc->conf.credentials != NULL) {
        if(req.credentials == NULL || strcmp(lcfd.pc->conf.credentials, req.credentials) != 0) {
            DBG("access denied\n");
            send_error(lcfd.fd, 401, "username and password do not match to configuration");
            close(lcfd.fd);
            if(req.parameter != NULL) free(req.parameter);
            if(req.client != NULL) free(req.client);
            if(req.credentials != NULL) free(req.credentials);
            return NULL;
        }
        DBG("access granted\n");
    }
    /* 现在是回应请求的时候 */
    if(!(input_number < pglobal->incnt)) { // 如果输入编号超出范围
        DBG("Input number: %d out of range (valid: 0..%d)\n", input_number, pglobal->incnt-1); // 输出调试信息
        send_error(lcfd.fd, 404, "Invalid input plugin number"); // 发送错误信息
        req.type = A_UNKNOWN; // 设置请求类型为未知
    }
    switch(req.type) { // 根据请求类型进行处理
    case A_SNAPSHOT: // 请求快照
        DBG("Request for snapshot from input: %d\n", input_number); // 输出调试信息
        send_snapshot(lcfd.fd, input_number); // 发送快照
        break;
    case A_STREAM: // 请求流
        DBG("Request for stream from input: %d\n", input_number); // 输出调试信息
        send_stream(lcfd.fd, input_number); // 发送流
        break;
    case A_COMMAND: // 请求命令
        if(lcfd.pc->conf.nocommands) { // 如果不允许命令
            send_error(lcfd.fd, 501, "this server is configured to not accept commands"); // 发送错误信息
            break;
        }
        command(lcfd.pc->id, lcfd.fd, req.parameter); // 执行命令
        break;
    case A_INPUT_JSON: // 请求输入插件描述符JSON文件
        DBG("Request for the Input plugin descriptor JSON file\n"); // 输出调试信息
        send_Input_JSON(lcfd.fd, input_number); // 发送输入插件描述符JSON文件
        break;
    case A_OUTPUT_JSON: // 请求输出插件描述符JSON文件
        DBG("Request for the Output plugin descriptor JSON file\n"); // 输出调试信息
        send_Output_JSON(lcfd.fd, input_number); // 发送输出插件描述符JSON文件
        break;
    case A_PROGRAM_JSON: // 请求程序描述符JSON文件
        DBG("Request for the program descriptor JSON file\n"); // 输出调试信息
        send_Program_JSON(lcfd.fd); // 发送程序描述符JSON文件
        break;
    case A_FILE: // 请求文件
        if(lcfd.pc->conf.www_folder == NULL) // 如果没有配置www文件夹
            send_error(lcfd.fd, 501, "no www-folder configured"); // 发送错误信息
        else
            send_file(lcfd.pc->id, lcfd.fd, req.parameter); // 发送文件
        break;
    default: // 未知请求
        DBG("unknown request\n"); // 输出调试信息
    }
    close(lcfd.fd); // 关闭文件描述符
    free_request(&req); // 释放请求内存
    DBG("leaving HTTP client thread\n"); // 输出调试信息
    return NULL;
}

_readline

函数_read用于从文件描述符中读取数据,并将读取的数据存储到缓冲区中。下面是函数的主要步骤:

初始化变量和数据结构。

循环读取数据,直到满足读取长度的要求。

使用select函数等待数据到达或超时。

调用read函数从文件描述符中读取数据。

将读取的数据存储到缓冲区中,并更新相关计数器。

如果读取的字节数小于缓冲区大小,将数据移动到缓冲区末尾。

返回已读取的字节数。

函数_readline是基于_read函数的封装,用于从文件描述符中读取一行数据,直到遇到换行符或达到最大长度。它调用_read函数逐个读取字符,并将字符存储到指定的缓冲区中,直到满足结束条件。返回读取的字符数或-1(表示超时或出错)。

这两个函数可能是某个网络或文件处理程序中的一部分,用于读取和处理输入数据。它们在循环读取和处理数据时提供了超时机制,以防止程序永久阻塞。

int _read(int fd, iobuffer *iobuf, void *buffer, size_t len, int timeout)
{
    int copied = 0, rc, i;
    fd_set fds;
    struct timeval tv;
    memset(buffer, 0, len); // 将buffer清零
    while((copied < len)) { // 循环读取数据
        i = MIN(iobuf->level, len - copied); // 计算需要读取的字节数
        memcpy(buffer + copied, iobuf->buffer + IO_BUFFER - iobuf->level, i); // 将读取到的数据存入buffer中
        iobuf->level -= i; // 更新iobuf中的level
        copied += i; // 更新已读取的字节数
        if(copied >= len) // 如果已读取的字节数等于需要读取的字节数,返回已读取的字节数
            return copied;
        /* select将在超时或有新数据到达时返回 */
        tv.tv_sec = timeout;
        tv.tv_usec = 0;
        FD_ZERO(&fds);
        FD_SET(fd, &fds);
        if((rc = select(fd + 1, &fds, NULL, NULL, &tv)) <= 0) { // 调用select函数等待数据到达或超时
            if(rc < 0) // 如果返回值小于0,说明出错
                exit(EXIT_FAILURE);
            /* 这里一定是超时 */
            return copied; // 返回已读取的字节数
        }
        init_iobuffer(iobuf); // 初始化iobuf
        /*
         * 由于select函数已经返回,所以这里应该至少有一个字节可读
         * 但是,由于在select和read之间,远程socket可能会关闭,所以不能保证一定有数据可读
         */
        if((iobuf->level = read(fd, &iobuf->buffer, IO_BUFFER)) <= 0) { // 调用read函数读取数据
            /* 出错了 */
            return -1; // 返回-1
        }
        /* 如果读取的字节数小于IO_BUFFER,将数据移动到缓冲区末尾 */
        memmove(iobuf->buffer + (IO_BUFFER - iobuf->level), iobuf->buffer, iobuf->level);
    }
    return 0;
}
/******************************************************************************
Description.: Read a single line from the provided fildescriptor.
              This funtion will return under two conditions:
              * line end was reached
              * timeout occured
Input Value.: * fd.....: fildescriptor to read from
              * iobuf..: iobuffer that allows to use this functions from multiple
                         threads because the complete context is the iobuffer.
              * buffer.: The buffer to store values at, will be set to zero
                         before storing values.
              * len....: the length of buffer
              * timeout: seconds to wait for an answer
Return Value: * buffer.: will become filled with bytes read
              * iobuf..: May get altered to save the context for future calls.
              * func().: bytes copied to buffer or -1 in case of error
******************************************************************************/
/* read just a single line or timeout */
int _readline(int fd, iobuffer *iobuf, void *buffer, size_t len, int timeout)
{
    char c = '\0', *out = buffer; // 定义字符变量c和指向buffer的指针out
    int i;
    memset(buffer, 0, len); // 将buffer清零
    for(i = 0; i < len && c != '\n'; i++) { // 循环读取每个字符,直到读取到换行符或达到最大长度
        if(_read(fd, iobuf, &c, 1, timeout) <= 0) { // 调用_read函数读取一个字符,如果返回值小于等于0,说明超时或出错
            /* timeout or error occured */ // 超时或出错
            return -1; // 返回-1
        }
        *out++ = c; // 将读取到的字符存入buffer中
    }
    return i; // 返回读取到的字符数
}

send_snapshot

函数send_snapshot用于将快照数据发送给客户端。下面是对函数的概括:

等待获取新的一帧数据。

锁定输入缓冲区的互斥锁,并等待输入缓冲区的更新条件变量。

读取输入缓冲区的帧大小。

为当前帧分配内存空间。

将输入缓冲区的时间戳复制到用户空间。

将输入缓冲区的帧数据复制到分配的内存空间中。

解锁输入缓冲区的互斥锁。

构建响应头部信息,包括HTTP状态行、标准头部和图片类型等。

将响应头部发送到客户端。

将帧数据发送到客户端。

释放帧数据的内存空间。

/* 发送快照给客户端 */
void send_snapshot(int fd, int input_number)
{
    unsigned char *frame = NULL;
    int frame_size = 0;
    char buffer[BUFFER_SIZE] = {0};
    struct timeval timestamp;
    /* 等待获取新的一帧 */
    pthread_mutex_lock(&pglobal->in[input_number].db);
    pthread_cond_wait(&pglobal->in[input_number].db_update, &pglobal->in[input_number].db);
    /* 读取缓冲区 */
    frame_size = pglobal->in[input_number].size;
    /* 为这一帧分配一个缓冲区 */
    if((frame = malloc(frame_size + 1)) == NULL) {
        free(frame);
        pthread_mutex_unlock(&pglobal->in[input_number].db);
        send_error(fd, 500, "not enough memory");
        return;
    }
    /* 将 v4l2_buffer 的时间戳复制到用户空间 */
    timestamp = pglobal->in[input_number].timestamp;
    memcpy(frame, pglobal->in[input_number].buf, frame_size);
    DBG("got frame (size: %d kB)\n", frame_size / 1024);
    pthread_mutex_unlock(&pglobal->in[input_number].db);
    /* 写入响应 */
    sprintf(buffer, "HTTP/1.0 200 OK\r\n" \
            STD_HEADER \
            "Content-type: image/jpeg\r\n" \
            "X-Timestamp: %d.%06d\r\n" \
            "\r\n", (int) timestamp.tv_sec, (int) timestamp.tv_usec);
    /* 现在发送头和图像 */
    if(write(fd, buffer, strlen(buffer)) < 0 || \
            write(fd, frame, frame_size) < 0) {
        free(frame);
        return;
    }
    free(frame);
}

send_stream

函数send_stream用于发送视频流给客户端。下面是对函数的概括:

准备HTTP响应头部信息,包括状态行、标准头部和多部分数据流的Content-Type等。

将响应头部发送给客户端。

在循环中,等待获取新的一帧数据。

锁定输入缓冲区的互斥锁,并等待输入缓冲区的更新条件变量。

读取输入缓冲区的帧大小。

检查帧缓冲区的大小是否足够,如果不够则增加缓冲区的大小。

将输入缓冲区的时间戳复制到用户空间。

将输入缓冲区的帧数据复制到帧缓冲区中。

解锁输入缓冲区的互斥锁。

构建帧的响应头部信息,包括Content-Type、Content-Length和时间戳等。

将帧的响应头部发送给客户端。

将帧数据发送给客户端。

发送分隔符boundary。

重复步骤3-13直到停止条件满足。

释放帧缓冲区的内存空间。

void send_stream(int fd, int input_number)
{
    unsigned char *frame = NULL, *tmp = NULL;
    int frame_size = 0, max_frame_size = 0;
    char buffer[BUFFER_SIZE] = {0};
    struct timeval timestamp;
    DBG("preparing header\n");
    sprintf(buffer, "HTTP/1.0 200 OK\r\n" \
            STD_HEADER \
            "Content-Type: multipart/x-mixed-replace;boundary=" BOUNDARY "\r\n" \
            "\r\n" \
            "--" BOUNDARY "\r\n");
    if(write(fd, buffer, strlen(buffer)) < 0) {
        free(frame);
        return;
    }
    DBG("Headers send, sending stream now\n");
    while(!pglobal->stop) {
        /* 等待获取新的一帧 */
        pthread_mutex_lock(&pglobal->in[input_number].db);
        pthread_cond_wait(&pglobal->in[input_number].db_update, &pglobal->in[input_number].db);
        /* 读取缓冲区 */
        frame_size = pglobal->in[input_number].size;
        /* 检查帧缓冲区是否足够大,如果不够大则增加缓冲区大小 */
        if(frame_size > max_frame_size) {
            DBG("增加缓冲区大小到 %d\n", frame_size);
            max_frame_size = frame_size + TEN_K;
            if((tmp = realloc(frame, max_frame_size)) == NULL) {
                free(frame);
                pthread_mutex_unlock(&pglobal->in[input_number].db);
                send_error(fd, 500, "内存不足");
                return;
            }
            frame = tmp;
        }
        /* 将 v4l2_buffer 的时间戳复制到用户空间 */
        timestamp = pglobal->in[input_number].timestamp;
        memcpy(frame, pglobal->in[input_number].buf, frame_size);
        DBG("got frame (size: %d kB)\n", frame_size / 1024);
        pthread_mutex_unlock(&pglobal->in[input_number].db);
        /*
         * 打印单个 mimetype 和长度
         * 发送内容长度可以修复在 firefox 中观察到的随机流中断
         */
        sprintf(buffer, "Content-Type: image/jpeg\r\n" \
                "Content-Length: %d\r\n" \
                "X-Timestamp: %d.%06d\r\n" \
                "\r\n", frame_size, (int)timestamp.tv_sec, (int)timestamp.tv_usec);
        DBG("sending intemdiate header\n");
        if(write(fd, buffer, strlen(buffer)) < 0) break;
        DBG("sending frame\n");
        if(write(fd, frame, frame_size) < 0) break;
        DBG("sending boundary\n");
        sprintf(buffer, "\r\n--" BOUNDARY "\r\n");
        if(write(fd, buffer, strlen(buffer)) < 0) break;
    }
    free(frame);
}


目录
相关文章
|
9月前
|
存储 编解码 安全
mjpg-streamer实现细节分析(上)
mjpg-streamer实现细节分析
139 0
|
8月前
|
缓存 iOS开发 C++
iOS 逆向编程(二十三)dsc_extractor 动态库提取器
iOS 逆向编程(二十三)dsc_extractor 动态库提取器
122 1
|
9月前
|
存储 网络协议 数据安全/隐私保护
mjpg-streamer框架分析
mjpg-streamer框架分析
47 0
|
9月前
|
编解码 C++
UVC调用过程部分细节分析
UVC调用过程部分细节分析
359 0
|
9月前
ENVI_IDL:批量重投影ModisSwath产品(调用二次开发接口)+解析
ENVI_IDL:批量重投影ModisSwath产品(调用二次开发接口)+解析
140 1
|
10月前
|
JSON Go API
如何基于 zap 封装一个更好用的日志库
如何基于 zap 封装一个更好用的日志库
345 0
|
11月前
|
人工智能 监控 搜索推荐
使用LangChain的自定义Tool+Agent, 构建全新的AIOps故障分析流程?
如果能够利用LangChain的Agent对问题的推理、任务的编排能力, 再进一步结合自定义的检查脚本工具, 是否就能够更好的实现故障分析的流程化智能编排和执行。
4640 0
|
NoSQL 数据处理 C#
基于C#的ArcEngine二次开发52:GDB数据处理过程中与Name相关的操作
基于C#的ArcEngine二次开发52:GDB数据处理过程中与Name相关的操作
基于C#的ArcEngine二次开发52:GDB数据处理过程中与Name相关的操作
|
Linux API Python
wrf模式学习记录--使用ERA5数据驱动WRF模式三层嵌套:数据下载以及模式处理
wrf模式学习记录--使用ERA5数据驱动WRF模式三层嵌套:数据下载以及模式处理
wrf模式学习记录--使用ERA5数据驱动WRF模式三层嵌套:数据下载以及模式处理
|
网络安全 开发工具
FreeSWITCH线上一次Crashes分析定位过程-ldns库问题
线上一次Crashes分析定位过程-ldns库问题