MQTT 开源代理mosquitto的网络层封装相当sucks

简介: MQTT 开源代理mosquitto的网络层封装相当sucks

最近学习MQTT协议,选择了当前比较流行的MQTT Broker “mosquitto”,但是在阅读代码过程中发现其网络底层库封装的相当差劲。


对于MQTT协议的变长头长度的读取上,基本上采取每次一个byte的方式进行读取判断,对于系统调用read的高代价来讲,真的是相当的浪费,也难怪其不能作为高并发的服务器进行处理。

 

当然mosquitto需要优化的地方还很多:


1. 使用poll而不是使用epoll (可能是处于跨平台考虑,如果linux下可以使用epoll替换),同时的就是刚才提到的 byte 读取网络数据

2. 订阅树的管理上,对于大量的请求断开或者重练效率比较低

3. 空闲空间管理机制优化和数据包发送方式的修改

4. 内存管理上malloc new 没有使用mem pool机制,在大并发情况下,内存管理容易出现问题

5. 锁遍地飞,如果采用reactor_


但是从另一个方面讲,mosquitto作为开源的实现,思路上还是比较清晰,为mqtt服务器开发提供了比较完备的参考,这也就是它的价值所在了。


#ifdef WITH_BROKER
int _mosquitto_packet_read(struct mosquitto_db *db, struct mosquitto *mosq)
#else
int _mosquitto_packet_read(struct mosquitto *mosq)
#endif
{
    uint8_t byte;
    ssize_t read_length;
    int rc = 0;
    if(!mosq) return MOSQ_ERR_INVAL;
    if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
    if(mosq->state == mosq_cs_connect_pending){
        return MOSQ_ERR_SUCCESS;
    }
    /* This gets called if pselect() indicates that there is network data
     * available - ie. at least one byte.  What we do depends on what data we
     * already have.
     * If we've not got a command, attempt to read one and save it. This should
     * always work because it's only a single byte.
     * Then try to read the remaining length. This may fail because it is may
     * be more than one byte - will need to save data pending next read if it
     * does fail.
     * Then try to read the remaining payload, where 'payload' here means the
     * combined variable header and actual payload. This is the most likely to
     * fail due to longer length, so save current data and current position.
     * After all data is read, send to _mosquitto_handle_packet() to deal with.
     * Finally, free the memory and reset everything to starting conditions.
     */
    if(!mosq->in_packet.command){
        read_length = _mosquitto_net_read(mosq, &byte, 1);
        if(read_length == 1){
            mosq->in_packet.command = byte;
#ifdef WITH_BROKER
#  ifdef WITH_SYS_TREE
            g_bytes_received++;
#  endif
            /* Clients must send CONNECT as their first command. */
            if(!(mosq->bridge) && mosq->state == mosq_cs_new && (byte&0xF0) != CONNECT) return MOSQ_ERR_PROTOCOL;
#endif
        }else{
            if(read_length == 0) return MOSQ_ERR_CONN_LOST; /* EOF */
#ifdef WIN32
            errno = WSAGetLastError();
#endif
            if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
                return MOSQ_ERR_SUCCESS;
            }else{
                switch(errno){
                    case COMPAT_ECONNRESET:
                        return MOSQ_ERR_CONN_LOST;
                    default:
                        return MOSQ_ERR_ERRNO;
                }
            }
        }
    }
    /* remaining_count is the number of bytes that the remaining_length
     * parameter occupied in this incoming packet. We don't use it here as such
     * (it is used when allocating an outgoing packet), but we must be able to
     * determine whether all of the remaining_length parameter has been read.
     * remaining_count has three states here:
     *   0 means that we haven't read any remaining_length bytes
     *   <0 means we have read some remaining_length bytes but haven't finished
     *   >0 means we have finished reading the remaining_length bytes.
     */
    if(mosq->in_packet.remaining_count <= 0){
        do{
            read_length = _mosquitto_net_read(mosq, &byte, 1);
            if(read_length == 1){
                mosq->in_packet.remaining_count--;
                /* Max 4 bytes length for remaining length as defined by protocol.
                 * Anything more likely means a broken/malicious client.
                 */
                if(mosq->in_packet.remaining_count < -4) return MOSQ_ERR_PROTOCOL;
#if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
                g_bytes_received++;
#endif
                mosq->in_packet.remaining_length += (byte & 127) * mosq->in_packet.remaining_mult;
                mosq->in_packet.remaining_mult *= 128;
            }else{
                if(read_length == 0) return MOSQ_ERR_CONN_LOST; /* EOF */
#ifdef WIN32
                errno = WSAGetLastError();
#endif
                if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
                    return MOSQ_ERR_SUCCESS;
                }else{
                    switch(errno){
                        case COMPAT_ECONNRESET:
                            return MOSQ_ERR_CONN_LOST;
                        default:
                            return MOSQ_ERR_ERRNO;
                    }
                }
            }
        }while((byte & 128) != 0);
        /* We have finished reading remaining_length, so make remaining_count
         * positive. */
        mosq->in_packet.remaining_count *= -1;
        if(mosq->in_packet.remaining_length > 0){
            mosq->in_packet.payload = _mosquitto_malloc(mosq->in_packet.remaining_length*sizeof(uint8_t));
            if(!mosq->in_packet.payload) return MOSQ_ERR_NOMEM;
            mosq->in_packet.to_process = mosq->in_packet.remaining_length;
        }
    }
    while(mosq->in_packet.to_process>0){
        read_length = _mosquitto_net_read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
        if(read_length > 0){
#if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
            g_bytes_received += read_length;
#endif
            mosq->in_packet.to_process -= read_length;
            mosq->in_packet.pos += read_length;
        }else{
#ifdef WIN32
            errno = WSAGetLastError();
#endif
            if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
                if(mosq->in_packet.to_process > 1000){
                    /* Update last_msg_in time if more than 1000 bytes left to
                     * receive. Helps when receiving large messages.
                     * This is an arbitrary limit, but with some consideration.
                     * If a client can't send 1000 bytes in a second it
                     * probably shouldn't be using a 1 second keep alive. */
                    pthread_mutex_lock(&mosq->msgtime_mutex);
                    mosq->last_msg_in = mosquitto_time();
                    pthread_mutex_unlock(&mosq->msgtime_mutex);
                }
                return MOSQ_ERR_SUCCESS;
            }else{
                switch(errno){
                    case COMPAT_ECONNRESET:
                        return MOSQ_ERR_CONN_LOST;
                    default:
                        return MOSQ_ERR_ERRNO;
                }
            }
        }
    }
    /* All data for this packet is read. */
    mosq->in_packet.pos = 0;
#ifdef WITH_BROKER
#  ifdef WITH_SYS_TREE
    g_msgs_received++;
    if(((mosq->in_packet.command)&0xF5) == PUBLISH){
        g_pub_msgs_received++;
    }
#  endif
    rc = mqtt3_packet_handle(db, mosq);
#else
    rc = _mosquitto_packet_handle(mosq);
#endif
    /* Free data and reset values */
    _mosquitto_packet_cleanup(&mosq->in_packet);
    pthread_mutex_lock(&mosq->msgtime_mutex);
    mosq->last_msg_in = mosquitto_time();
    pthread_mutex_unlock(&mosq->msgtime_mutex);
    return rc;
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4月前
|
消息中间件 运维 Serverless
商业版vs开源版:一图看懂云消息队列 RocketMQ 版核心优势
自建开源 RocketMQ 集群,为保证业务稳定性,往往需要按照业务请求的峰值去配置集群资源。云消息队列 RocketMQ 版 Serverless 实例通过资源快速伸缩,实现资源使用量与实际业务负载贴近,并按实际使用量计费,有效降低企业的运维压力和使用成本。
315 19
|
3月前
|
负载均衡 网络协议 网络性能优化
动态IP代理技术详解及网络性能优化
动态IP代理技术通过灵活更换IP地址,广泛应用于数据采集、网络安全测试等领域。本文详细解析其工作原理,涵盖HTTP、SOCKS代理及代理池的实现方法,并提供代码示例。同时探讨配置动态代理IP后如何通过智能调度、负载均衡、优化协议选择等方式提升网络性能,确保高效稳定的网络访问。
455 2
|
14天前
|
机器学习/深度学习 存储 人工智能
SAFEARENA: 评估自主网络代理的安全性
基于大语言模型的智能体在解决基于网络的任务方面正变得越来越熟练。随着这一能力的增强,也随之带来了更大的被恶意利用的风险,例如在在线论坛上发布虚假信息,或在网站上销售非法物质。为了评估这些风险,我们提出了SAFEARENA,这是第一个专注于故意滥用网络代理的基准测试。SAFEARENA包含四个网站上共计500个任务,其中250个是安全的,250个是有害的。我们将有害任务分为五类:虚假信息、非法活动、骚扰、网络犯罪和社会偏见,旨在评估网络代理的真实滥用情况。我们对包括GPT-4o、Claude-3.5 Sonnet、Qwen-2-VL 72B和Llama-3.2 90B在内的领先基于大语言模型的网
62 11
SAFEARENA: 评估自主网络代理的安全性
|
23天前
|
消息中间件 存储 Apache
恭喜 Apache RocketMQ 荣获 2024 开源创新榜单“年度开源项目”
恭喜 Apache RocketMQ 荣获 2024 开源创新榜单“年度开源项目”
|
1月前
|
安全 网络协议 Java
Java网络编程封装
Java网络编程封装原理旨在隐藏底层通信细节,提供简洁、安全的高层接口。通过简化开发、提高安全性和增强可维护性,封装使开发者能更高效地进行网络应用开发。常见的封装层次包括套接字层(如Socket和ServerSocket类),以及更高层次的HTTP请求封装(如RestTemplate)。示例代码展示了如何使用RestTemplate简化HTTP请求的发送与处理,确保代码清晰易维护。
|
2月前
|
数据采集 人工智能 自然语言处理
FireCrawl:开源 AI 网络爬虫工具,自动爬取网站及子页面内容,预处理为结构化数据
FireCrawl 是一款开源的 AI 网络爬虫工具,专为处理动态网页内容、自动爬取网站及子页面而设计,支持多种数据提取和输出格式。
642 19
FireCrawl:开源 AI 网络爬虫工具,自动爬取网站及子页面内容,预处理为结构化数据
|
2月前
|
消息中间件 存储 Apache
恭喜 Apache RocketMQ、Apache Seata 荣获 2024 开源创新榜单“年度开源项目”
近日,以“新纪天工、开物焕彩——致敬开源的力量”为活动主题的“重大科技成就发布会(首场)”在国家科技传播中心成功举办,并隆重揭晓了 2024 开源创新榜单,旨在致敬中国开源力量,传播推广开源科技成就,营造中国开源创新生态。2024 年开源创新榜单由中国科协科学技术传播中心、中国计算机学会、中国通信学会、中国科学院软件研究所共同主办,中国开发者社区承办,以王怀民院士为首组建评审委员会,进行研讨评审,面向中国开源行业领域,遴选具有创新性、贡献度和影响力的开源项目、社区、应用场景与开源事件。在评审出的 10 个年度开源项目中,Apache RocketMQ、Apache Seata 成功入选。
111 14
|
2月前
|
机器学习/深度学习 人工智能 搜索推荐
PaSa:字节跳动开源学术论文检索智能体,自动调用搜索引擎、浏览相关论文并追踪引文网络
PaSa 是字节跳动推出的基于强化学习的学术论文检索智能体,能够自动调用搜索引擎、阅读论文并追踪引文网络,帮助用户快速获取精准的学术文献。
257 15
|
2月前
|
缓存 负载均衡 安全
Swift中的网络代理设置与数据传输
Swift中的网络代理设置与数据传输
|
4月前
|
消息中间件 弹性计算 运维
一图看懂云消息队列 RabbitMQ 版对比开源优势
一张图带您快速了解云消息队列 RabbitMQ 版对比开源版本的显著优势。
111 13

热门文章

最新文章