MQTT 开源代理mosquitto的网络层封装相当sucks

简介: MQTT 开源代理mosquitto的网络层封装相当sucks

最近学习MQTT协议,选择了当前比较流行的MQTT Broker “mosquitto”,但是在阅读代码过程中发现其网络底层库封装的相当差劲。


对于MQTT协议的变长头长度的读取上,基本上采取每次一个byte的方式进行读取判断,对于系统调用read的高代价来讲,真的是相当的浪费,也难怪其不能作为高并发的服务器进行处理。

 

当然mosquitto需要优化的地方还很多:


1. 使用poll而不是使用epoll (可能是处于跨平台考虑,如果linux下可以使用epoll替换),同时的就是刚才提到的 byte 读取网络数据

2. 订阅树的管理上,对于大量的请求断开或者重练效率比较低

3. 空闲空间管理机制优化和数据包发送方式的修改

4. 内存管理上malloc new 没有使用mem pool机制,在大并发情况下,内存管理容易出现问题

5. 锁遍地飞,如果采用reactor_


但是从另一个方面讲,mosquitto作为开源的实现,思路上还是比较清晰,为mqtt服务器开发提供了比较完备的参考,这也就是它的价值所在了。


#ifdef WITH_BROKER
int _mosquitto_packet_read(struct mosquitto_db *db, struct mosquitto *mosq)
#else
int _mosquitto_packet_read(struct mosquitto *mosq)
#endif
{
    uint8_t byte;
    ssize_t read_length;
    int rc = 0;
    if(!mosq) return MOSQ_ERR_INVAL;
    if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
    if(mosq->state == mosq_cs_connect_pending){
        return MOSQ_ERR_SUCCESS;
    }
    /* This gets called if pselect() indicates that there is network data
     * available - ie. at least one byte.  What we do depends on what data we
     * already have.
     * If we've not got a command, attempt to read one and save it. This should
     * always work because it's only a single byte.
     * Then try to read the remaining length. This may fail because it is may
     * be more than one byte - will need to save data pending next read if it
     * does fail.
     * Then try to read the remaining payload, where 'payload' here means the
     * combined variable header and actual payload. This is the most likely to
     * fail due to longer length, so save current data and current position.
     * After all data is read, send to _mosquitto_handle_packet() to deal with.
     * Finally, free the memory and reset everything to starting conditions.
     */
    if(!mosq->in_packet.command){
        read_length = _mosquitto_net_read(mosq, &byte, 1);
        if(read_length == 1){
            mosq->in_packet.command = byte;
#ifdef WITH_BROKER
#  ifdef WITH_SYS_TREE
            g_bytes_received++;
#  endif
            /* Clients must send CONNECT as their first command. */
            if(!(mosq->bridge) && mosq->state == mosq_cs_new && (byte&0xF0) != CONNECT) return MOSQ_ERR_PROTOCOL;
#endif
        }else{
            if(read_length == 0) return MOSQ_ERR_CONN_LOST; /* EOF */
#ifdef WIN32
            errno = WSAGetLastError();
#endif
            if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
                return MOSQ_ERR_SUCCESS;
            }else{
                switch(errno){
                    case COMPAT_ECONNRESET:
                        return MOSQ_ERR_CONN_LOST;
                    default:
                        return MOSQ_ERR_ERRNO;
                }
            }
        }
    }
    /* remaining_count is the number of bytes that the remaining_length
     * parameter occupied in this incoming packet. We don't use it here as such
     * (it is used when allocating an outgoing packet), but we must be able to
     * determine whether all of the remaining_length parameter has been read.
     * remaining_count has three states here:
     *   0 means that we haven't read any remaining_length bytes
     *   <0 means we have read some remaining_length bytes but haven't finished
     *   >0 means we have finished reading the remaining_length bytes.
     */
    if(mosq->in_packet.remaining_count <= 0){
        do{
            read_length = _mosquitto_net_read(mosq, &byte, 1);
            if(read_length == 1){
                mosq->in_packet.remaining_count--;
                /* Max 4 bytes length for remaining length as defined by protocol.
                 * Anything more likely means a broken/malicious client.
                 */
                if(mosq->in_packet.remaining_count < -4) return MOSQ_ERR_PROTOCOL;
#if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
                g_bytes_received++;
#endif
                mosq->in_packet.remaining_length += (byte & 127) * mosq->in_packet.remaining_mult;
                mosq->in_packet.remaining_mult *= 128;
            }else{
                if(read_length == 0) return MOSQ_ERR_CONN_LOST; /* EOF */
#ifdef WIN32
                errno = WSAGetLastError();
#endif
                if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
                    return MOSQ_ERR_SUCCESS;
                }else{
                    switch(errno){
                        case COMPAT_ECONNRESET:
                            return MOSQ_ERR_CONN_LOST;
                        default:
                            return MOSQ_ERR_ERRNO;
                    }
                }
            }
        }while((byte & 128) != 0);
        /* We have finished reading remaining_length, so make remaining_count
         * positive. */
        mosq->in_packet.remaining_count *= -1;
        if(mosq->in_packet.remaining_length > 0){
            mosq->in_packet.payload = _mosquitto_malloc(mosq->in_packet.remaining_length*sizeof(uint8_t));
            if(!mosq->in_packet.payload) return MOSQ_ERR_NOMEM;
            mosq->in_packet.to_process = mosq->in_packet.remaining_length;
        }
    }
    while(mosq->in_packet.to_process>0){
        read_length = _mosquitto_net_read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
        if(read_length > 0){
#if defined(WITH_BROKER) && defined(WITH_SYS_TREE)
            g_bytes_received += read_length;
#endif
            mosq->in_packet.to_process -= read_length;
            mosq->in_packet.pos += read_length;
        }else{
#ifdef WIN32
            errno = WSAGetLastError();
#endif
            if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
                if(mosq->in_packet.to_process > 1000){
                    /* Update last_msg_in time if more than 1000 bytes left to
                     * receive. Helps when receiving large messages.
                     * This is an arbitrary limit, but with some consideration.
                     * If a client can't send 1000 bytes in a second it
                     * probably shouldn't be using a 1 second keep alive. */
                    pthread_mutex_lock(&mosq->msgtime_mutex);
                    mosq->last_msg_in = mosquitto_time();
                    pthread_mutex_unlock(&mosq->msgtime_mutex);
                }
                return MOSQ_ERR_SUCCESS;
            }else{
                switch(errno){
                    case COMPAT_ECONNRESET:
                        return MOSQ_ERR_CONN_LOST;
                    default:
                        return MOSQ_ERR_ERRNO;
                }
            }
        }
    }
    /* All data for this packet is read. */
    mosq->in_packet.pos = 0;
#ifdef WITH_BROKER
#  ifdef WITH_SYS_TREE
    g_msgs_received++;
    if(((mosq->in_packet.command)&0xF5) == PUBLISH){
        g_pub_msgs_received++;
    }
#  endif
    rc = mqtt3_packet_handle(db, mosq);
#else
    rc = _mosquitto_packet_handle(mosq);
#endif
    /* Free data and reset values */
    _mosquitto_packet_cleanup(&mosq->in_packet);
    pthread_mutex_lock(&mosq->msgtime_mutex);
    mosq->last_msg_in = mosquitto_time();
    pthread_mutex_unlock(&mosq->msgtime_mutex);
    return rc;
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4月前
|
缓存 数据安全/隐私保护 Kotlin
Kotlin 中的网络请求代理设置最佳实践
Kotlin 中的网络请求代理设置最佳实践
|
3天前
|
安全 Linux 网络安全
nmap 是一款强大的开源网络扫描工具,能检测目标的开放端口、服务类型和操作系统等信息
nmap 是一款强大的开源网络扫描工具,能检测目标的开放端口、服务类型和操作系统等信息。本文分三部分介绍 nmap:基本原理、使用方法及技巧、实际应用及案例分析。通过学习 nmap,您可以更好地了解网络拓扑和安全状况,提升网络安全管理和渗透测试能力。
20 5
|
10天前
|
消息中间件 弹性计算 运维
一图看懂云消息队列 RabbitMQ 版对比开源优势
一张图带您快速了解云消息队列 RabbitMQ 版对比开源版本的显著优势。
|
17天前
|
JSON 资源调度 网络性能优化
vue3中使用mqtt数据传输(封装)
vue3中使用mqtt数据传输(封装)
21 4
|
1月前
|
网络协议 物联网 虚拟化
|
17天前
|
存储 缓存 Dart
Flutter&鸿蒙next 封装 Dio 网络请求详解:登录身份验证与免登录缓存
本文详细介绍了如何在 Flutter 中使用 Dio 封装网络请求,实现用户登录身份验证及免登录缓存功能。首先在 `pubspec.yaml` 中添加 Dio 和 `shared_preferences` 依赖,然后创建 `NetworkService` 类封装 Dio 的功能,包括请求拦截、响应拦截、Token 存储和登录请求。最后,通过一个登录界面示例展示了如何在实际应用中使用 `NetworkService` 进行身份验证。希望本文能帮助你在 Flutter 中更好地处理网络请求和用户认证。
132 1
|
4天前
|
网络协议 Unix Linux
精选2款C#/.NET开源且功能强大的网络通信框架
精选2款C#/.NET开源且功能强大的网络通信框架
|
1月前
|
网络协议 Java 程序员
【网络】局域网LAN、广域网WAN、TCP/IP协议、封装和分用
【网络】局域网LAN、广域网WAN、TCP/IP协议、封装和分用
32 2
|
1月前
|
网络协议 网络架构
【第三期】计算机网络常识/网络分层模型与数据包封装传输过程
【第三期】计算机网络常识/网络分层模型与数据包封装传输过程
45 0
|
2月前
|
JSON 监控 编译器