MQTT 开源代理mosquitto的网络层封装相当sucks

简介: MQTT 开源代理mosquitto的网络层封装相当sucks

最近学习MQTT协议,选择了当前比较流行的MQTT Broker “mosquitto”,但是在阅读代码过程中发现其网络底层库封装的相当差劲。


对于MQTT协议的变长头长度的读取上,基本上采取每次一个byte的方式进行读取判断,对于系统调用read的高代价来讲,真的是相当的浪费,也难怪其不能作为高并发的服务器进行处理。

 

当然mosquitto需要优化的地方还很多:


1. 使用poll而不是使用epoll (可能是处于跨平台考虑,如果linux下可以使用epoll替换),同时的就是刚才提到的 byte 读取网络数据

2. 订阅树的管理上,对于大量的请求断开或者重练效率比较低

3. 空闲空间管理机制优化和数据包发送方式的修改

4. 内存管理上malloc new 没有使用mem pool机制,在大并发情况下,内存管理容易出现问题

5. 锁遍地飞,如果采用reactor_


但是从另一个方面讲,mosquitto作为开源的实现,思路上还是比较清晰,为mqtt服务器开发提供了比较完备的参考,这也就是它的价值所在了。


#ifdef WITH_BROKER
int _mosquitto_packet_read(struct mosquitto_db *db, struct mosquitto *mosq)
#else
int _mosquitto_packet_read(struct mosquitto *mosq)
#endif
{
    uint8_t byte;
    ssize_t read_length;
    int rc = 0;
    if(!mosq) return MOSQ_ERR_INVAL;
    if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
    if(mosq->state == mosq_cs_connect_pending){
        return MOSQ_ERR_SUCCESS;
    }
    /* This gets called if pselect() indicates that there is network data
     * available - ie. at least one byte.  What we do depends on what data we
     * already have.
     * If we've not got a command, attempt to read one and save it. This should
     * always work because it's only a single byte.
     * Then try to read the remaining length. This may fail because it is may
     * be more than one byte - will need to save data pending next read if it
     * does fail.
     * Then try to read the remaining payload, where 'payload' here means the
     * combined variable header and actual payload. This is the most likely to
     * fail due to longer length, so save current data and current position.
     * After all data is read, send to _mosquitto_handle_packet() to deal with.
     * Finally, free the memory and reset everything to starting conditions.
     */
    if(!mosq->in_packet.command){
        read_length = _mosquitto_net_read(mosq, &byte, 1);
        if(read_length == 1){
            mosq->in_packet.command = byte;
#ifdef WITH_BROKER
#  ifdef WITH_SYS_TREE
            g_bytes_received++;
#  endif
            /* Clients must send CONNECT as their first command. */
            if(!(mosq->bridge) && mosq->state == mosq_cs_new && (byte&0xF0) != CONNECT) return MOSQ_ERR_PROTOCOL;
#endif
        }else{
            if(read_length == 0) return MOSQ_ERR_CONN_LOST; /* EOF */
#ifdef WIN32
            errno = WSAGetLastError();
#endif
            if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
                return MOSQ_ERR_SUCCESS;
            }else{
                switch(errno){
                    case COMPAT_ECONNRESET:
                        return MOSQ_ERR_CONN_LOST;
                    default:
                        return MOSQ_ERR_ERRNO;
                }
            }
        }
    }
    /* remaining_count is the number of bytes that the remaining_length
     * parameter occupied in this incoming packet. We don't use it here as such
     * (it is used when allocating an outgoing packet), but we must be able to
     * determine whether all of the remaining_length parameter has been read.
     * remaining_count has three states here:
     *   0 means that we haven't read any remaining_length bytes
     *   <0 means we have read some remaining_length bytes but haven't finished
     *   >0 means we have finished reading the remaining_length bytes.
     */
    if(mosq->in_packet.remaining_count <= 0){
        do{
            read_length = _mosquitto_net_read(mosq, &byte, 1);
            if(read_length == 1){
                mosq->in_packet.remaining_count--;
                /* Max 4 bytes length for remaining length as defined by protocol.
                 * Anything more likely means a broken/malicious client.
                 */
                if(mosq->in_packet.remaining_count < -4) return MOSQ_ERR_PROTOCOL;
#if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
                g_bytes_received++;
#endif
                mosq->in_packet.remaining_length += (byte & 127) * mosq->in_packet.remaining_mult;
                mosq->in_packet.remaining_mult *= 128;
            }else{
                if(read_length == 0) return MOSQ_ERR_CONN_LOST; /* EOF */
#ifdef WIN32
                errno = WSAGetLastError();
#endif
                if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
                    return MOSQ_ERR_SUCCESS;
                }else{
                    switch(errno){
                        case COMPAT_ECONNRESET:
                            return MOSQ_ERR_CONN_LOST;
                        default:
                            return MOSQ_ERR_ERRNO;
                    }
                }
            }
        }while((byte & 128) != 0);
        /* We have finished reading remaining_length, so make remaining_count
         * positive. */
        mosq->in_packet.remaining_count *= -1;
        if(mosq->in_packet.remaining_length > 0){
            mosq->in_packet.payload = _mosquitto_malloc(mosq->in_packet.remaining_length*sizeof(uint8_t));
            if(!mosq->in_packet.payload) return MOSQ_ERR_NOMEM;
            mosq->in_packet.to_process = mosq->in_packet.remaining_length;
        }
    }
    while(mosq->in_packet.to_process>0){
        read_length = _mosquitto_net_read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
        if(read_length > 0){
#if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
            g_bytes_received += read_length;
#endif
            mosq->in_packet.to_process -= read_length;
            mosq->in_packet.pos += read_length;
        }else{
#ifdef WIN32
            errno = WSAGetLastError();
#endif
            if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
                if(mosq->in_packet.to_process > 1000){
                    /* Update last_msg_in time if more than 1000 bytes left to
                     * receive. Helps when receiving large messages.
                     * This is an arbitrary limit, but with some consideration.
                     * If a client can't send 1000 bytes in a second it
                     * probably shouldn't be using a 1 second keep alive. */
                    pthread_mutex_lock(&mosq->msgtime_mutex);
                    mosq->last_msg_in = mosquitto_time();
                    pthread_mutex_unlock(&mosq->msgtime_mutex);
                }
                return MOSQ_ERR_SUCCESS;
            }else{
                switch(errno){
                    case COMPAT_ECONNRESET:
                        return MOSQ_ERR_CONN_LOST;
                    default:
                        return MOSQ_ERR_ERRNO;
                }
            }
        }
    }
    /* All data for this packet is read. */
    mosq->in_packet.pos = 0;
#ifdef WITH_BROKER
#  ifdef WITH_SYS_TREE
    g_msgs_received++;
    if(((mosq->in_packet.command)&0xF5) == PUBLISH){
        g_pub_msgs_received++;
    }
#  endif
    rc = mqtt3_packet_handle(db, mosq);
#else
    rc = _mosquitto_packet_handle(mosq);
#endif
    /* Free data and reset values */
    _mosquitto_packet_cleanup(&mosq->in_packet);
    pthread_mutex_lock(&mosq->msgtime_mutex);
    mosq->last_msg_in = mosquitto_time();
    pthread_mutex_unlock(&mosq->msgtime_mutex);
    return rc;
}
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
1月前
|
安全 网络协议 网络安全
代理IP、Socks5代理与网络安全:保护隐私与防御威胁的技术探索
代理IP、Socks5代理与网络安全:保护隐私与防御威胁的技术探索
41 0
|
2月前
|
传感器 监控 物联网
golang开源的可嵌入应用程序高性能的MQTT服务
golang开源的可嵌入应用程序高性能的MQTT服务
201 3
|
2月前
|
传感器 网络协议 物联网
在Linux中搭建Mosquitto MQTT协议消息服务端并结合内网穿透工具实现公网访问
Mosquitto是一个开源的消息代理,它实现了MQTT协议版本3.1和3.1.1。它可以在不同的平台上运行,包括Windows、Linux、macOS等。mosquitto可以用于物联网、传感器、移动应用程序等场景,提供了一种轻量级的、可靠的、基于发布/订阅模式的消息传递机制。
|
5月前
|
存储 传感器 物联网
如何在Docker中配置Mosquitto MQTT代理,以便在容器化环境中运行和管理MQTT通信
如何在Docker中配置Mosquitto MQTT代理,以便在容器化环境中运行和管理MQTT通信
261 0
如何在Docker中配置Mosquitto MQTT代理,以便在容器化环境中运行和管理MQTT通信
|
2月前
|
网络协议 Go 数据安全/隐私保护
golang开源的可嵌入应用程序高性能的MQTT服务
golang开源的可嵌入应用程序高性能的MQTT服务
253 2
|
5天前
|
运维 前端开发 Devops
云效产品使用报错问题之流水线打包docker镜像时报网络代理有问题如何解决
本合集将整理呈现用户在使用过程中遇到的报错及其对应的解决办法,包括但不限于账户权限设置错误、项目配置不正确、代码提交冲突、构建任务执行失败、测试环境异常、需求流转阻塞等问题。阿里云云效是一站式企业级研发协同和DevOps平台,为企业提供从需求规划、开发、测试、发布到运维、运营的全流程端到端服务和工具支撑,致力于提升企业的研发效能和创新能力。
|
8天前
|
缓存 安全 网络协议
代理ip会不会影响网络速度和稳定性
代理ip会不会影响网络速度和稳定性
|
14天前
|
JSON Kubernetes 网络架构
Kubernetes CNI 网络模型及常见开源组件
【4月更文挑战第13天】目前主流的容器网络模型是CoreOS 公司推出的 Container Network Interface(CNI)模型
|
25天前
|
编解码 Ubuntu 算法
【Linux】NUC977移植使用MQTT(基于mosquitto)
【Linux】NUC977移植使用MQTT(基于mosquitto)
|
1月前
|
机器学习/深度学习 安全 算法
网络代理技术:保障隐私与增强安全
网络代理技术:保障隐私与增强安全
28 0