基于Netty实践搭建的物联网网关iot-gatway

简介: 物联网平台是很大的一个摊子,在设计上,此次上传了关系设计图,业务框架设计图欠奉。在代码上,我目前也只是做了两版版网关,支持多规约;多规约组服务,目前也只支持3761规约的组装和解析;接口做了一个框架,改了几版,开始就是提供jar包调用,后来改成zk+dubbo注册模式,后来改成springboot的Restful服务;数据二次处理也是搭了个框架,具体看业务。

前言介绍

物联网平台框架25.jpg

网关设计图

26.jpg

物联网平台是很大的一个摊子,在设计上,此次上传了关系设计图,业务框架设计图欠奉。在代码上,我目前也只是做了两版版网关,支持多规约;多规约组服务,目前也只支持3761规约的组装和解析;接口做了一个框架,改了几版,开始就是提供jar包调用,后来改成zk+dubbo注册模式,后来改成springboot的Restful服务;数据二次处理也是搭了个框架,具体看业务。

此次开发,按个人的开发与运维经验,结合以往的采集,做了一些功能添加和效率优化,代码完全个人重构。

netty网关,支持百万客户端连接,压力测试ing…,并优化了与服务端集群通信,以往轮询往多个服务器发消息,看似消息发送很平均,其实大大影响了效率,本次对平均算法做了优化,本次上传代码添加了很多功能,摒弃了以往只做心跳维护、数据转发的功能。

socket通信功能优化,提高系统利用率,每台服务器的处理能力比之前提高30%以上

系统功能

  • 心跳维护(增加了客户端在线,但未发心跳的处理功能)
  • 链路监控
  • 报文监控
  • 物联网卡流量监控
  • 踢掉在线终端
  • 在线维护
  • ip黑名单
  • 定向发到集群的某台服务器
  • 多规约支持(代码里就配置了645/698/376/104/二进制/MQTT这几种规约,其实支持更多,个人感觉能支持所有规约,至少目前我见到的都能支持)
  • 服务端序列化传输优化(protobuf序列化后的大小是json的10分之一,xml格式的20分之一,是二进制序列化的10分之一)

开发环境

1、jdk1.8【jdk1.7以下只能部分支持netty】

2、Netty4.1.36.Final【netty3.x 4.x 5每次的变化较大,接口类名也随着变化】

代码示例

itstack-demo-iot-gatewary
└── src
    ├── main
    │   └── java
    │       └── org.itstack.demo.iot.gateway
    │           ├── connect
    │           │   ├── ConfigDesign.java
    │           │   ├── FlowListener.java
    │           │   ├── PoolDesign.java
    │           │   ├── ToClientDesign.java
    │           │   └── ToMasterDesign.java
    │           ├── divpro
    │           │   ├── Check104Handler.java
    │           │   ├── Check376Handler.java
    │           │   ├── CheckAllPurposeHandler.java
    │           │   ├── DivChannelInitializer.java
    │           │   ├── DivMultiprotocolSelection.java
    │           │   └── DivServerHandler.java
    │           ├── freeconfig
    │           │   └── GetProperties.java
    │           ├── jedis
    │           │   ├── JedisBean.java
    │           │   └── Jedisclusters.java
    │           ├── link
    │           │   └── LinkToMonitor.java
    │           ├── masterpro
    │           │   ├── MasterChannelInitializer.java
    │           │   ├── MasterClientHandler.java
    │           │   └── MasterDisconnectListener.java
    │           ├── protobuf
    │           │   ├── MasterMessage.java
    │           │   ├── MasterMessage.proto
    │           │   ├── Message.java
    │           │   └── MessageOrBuilder.java
    │           ├── start
    │           │   └── StartGate.java
    │           └── util
    │               └── BasicDataTypeTransUtils.java
    └── test
         └── java
             └── org.itstack.demo.test
                 └── ApiTest.java

部分代码模块讲解,全部代码,关注公众号:bugstack虫洞栈 | 回复iot-gateway获取完整代码

connect/ConfigDesign.java | 简要配置信息

/**
 * @Description:
 * @version: v1.0.0
 * @author: wbl
 * @date: 2019年8月30日 上午11:13:13
 */
public class ConfigDesign {
    //用于分配处理业务线程的线程组个数
    public static final int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors();//硬件线程数
    //如果代码的瓶颈是在CPU这块的话,我会有7个线程在同时 竞争CPU周期,而不是更合理的4个线程。如果我的瓶颈是在内存这的话,那这个测试我可以获得7倍的性能提升
    //业务处理线程大小
    public static final int BIZTHREADSIZE = 4;
    //客户端与服务端通信缓存
    //用来初始化服务端可连接队列
    //服务端处理客户端连接请求是按顺序处理的,所以同一时间只能处理一个客户端连接,
    //多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小
    public static final int CACHESIZE = 1024;
    //下行只有唯一标志,这里需要维护一个area和ip的对应关系,0904设计给前置ip,回也回ip,就不需要CLIENT_AREA_IP了。
    //public static Map <String,String> CLIENT_AREA_IP = new HashMap<String,String>();//adr_code : ip
    public static Map<String, Channel> CLIENT_IP_CONNECT = new HashMap<String, Channel>();//ip : channel
    //这样的问题是,一个ip可以有多个终端,即换SIM卡操作,这个无法避免,若上层设置数据库白名单就太死板了。如果唯一标识也是主键,那就只能提示终端冲突了。
    //一般情况而言,一个ip就可以说一个终端,在线维护那里,直接就替换ip对应的唯一标识,并在登陆日志提示,ip对应唯一标识改变。这解决了设备唯一性
    //如果换sim卡了,就是新设备登录,正常无影响。死终端会在数据库,但不会再redis在线列表,这就涉及到,在线设备信息同步问题了。
    //上层以后可以做一个删除客户端的操作,应用层删除数据库信息,前置删除在线终端,网关关闭终端连接。即[分手以后,是陌生人]
    //id也是从第一次的0累加的。
    public static Map<String, Channel> Server_IP_CONNECT = new HashMap<String, Channel>();//id : channel
    //连接的服务端句柄数组,采用轮询操作,
    //这里要做好服务端挂了之后,发往其他的Channel,并尝试重连服务端,即心跳维护
    public static int MASTER_INDEX = 0;//轮询map中的前置,map中存活的size,置零。
    //统计每个ip通信的流量,上下行都写在这,统计物联网卡的流量,这个流量也需要初始化的时候从redis读取一下
    public static Map<String, String> FLOW_COUNT = new HashMap<String, String>();//ip : flow
    //  ip黑名单监控(这种配置型的,需要加载到redis,启动时一次性从redis读取到本地)
    public static List<String> BLACKLIST = new ArrayList<String>();
    //写入的话,验证一下sync结束的finally操作。要确定一定能写进去,不会存在bug,否认换设计
}

divpro/MasterSlotsPartition.java | 主站槽分区

/**
 * @Description: 主站槽分区
 * @version: v1.0.0
 * @author: wbl
 * @date: 2019年9月11日 下午5:28:51
 */
public class MasterSlotsPartition {
    public static MasterSlotsPartition mster = new MasterSlotsPartition();
    public static MasterSlotsPartition getInstance() {
        return mster;
    }
    //分片
    public Channel getActiveMaster() {
        Channel channel = null;
        int masterSize = ConfigDesign.Server_IP_CONNECT.size();//主站个数,从1开始
        int masterIndex = ConfigDesign.MASTER_INDEX;//当前步数 +无穷
        int pos = 0;//要发往的主站编号
        for (int i = 0; i < masterSize; i++) {
            if (masterIndex >= i * 1000 && masterIndex < (i + 1) * 1000) {
                pos = i;
            }
            if (masterIndex == masterSize * 1000) {
                ConfigDesign.MASTER_INDEX = 0;
            }
        }
        ConfigDesign.MASTER_INDEX++;//放置在0后面
        for (int i = 0; i < masterSize; i++) {
            channel = ConfigDesign.Server_IP_CONNECT.get("" + (pos + 1));
            System.out.println("当前线程连接的master:" + channel.remoteAddress().toString());
            if (channel.isActive()) {//isOpen()、isRegistered()、isActive()和isWritable()
                return channel;
            } else {
                pos++;
                if (pos == masterSize) {
                    pos = 0;
                }
                continue;
            }
        }
        return channel;
    }
    //轮询 效率没有分片高
    public static Channel getMasterAverage() {
        Channel channel = null;
        int masterSize = ConfigDesign.Server_IP_CONNECT.size();
        int masterIndex = ConfigDesign.MASTER_INDEX;
        for (int i = 0; i < masterSize; i++) {
            channel = ConfigDesign.Server_IP_CONNECT.get(masterIndex);
            ConfigDesign.MASTER_INDEX++;
            masterIndex++;
            if (masterIndex >= masterSize) {
                ConfigDesign.MASTER_INDEX = 0;
            }
            if (!channel.isActive()) {
                continue;
            }
            return channel;
        }
        return channel;
    }
}

protobuf/MasterMessage.proto | 消息协议定义

syntax = "proto3";
package org.itstack.demo.iot.gateway.protobuf;
option java_package = "org.itstack.demo.iot.gateway.protobuf";
option java_multiple_files = true;
option java_outer_classname = "MasterMessage";
message Message {
    string ip = 2;
    string port = 3;
    string protocolId = 4;
    string content = 5;
    string property = 6;
}

test/ApiTest.java

/**
 * 虫洞栈:https://bugstack.cn
 * 公众号:bugstack虫洞栈 | 关注公众号回复iot-gatewary,获取工程源码
 * Create by fuzhengwei on 2019
 */
public class ApiTest {
    public static void main(String[] args) {
        System.out.println("hi 微信公众号:bugstack虫洞栈 | 欢迎关注获取专题文章和源码");
    }
    /**
     * 编译proto文件
     * protoc.exe -I=E:\itstack\GIT\itstack.org\itstack-demo-iot-gatewary\src\main\java\org\itstack\demo\iot\gateway\protobuf --java_out=E:\itstack\GIT\itstack.org\itstack-demo-iot-gatewary\src\main\java\ MasterMessage.proto
     */
}

测试结果

启动StartGate服务

2019-09-17 16:00:37.639  INFO 5080 --- [           main] start.StartGate                          : Started StartGate in 3.989 seconds (JVM running for 4.436)
read master config:127.0.0.1:7001,127.0.0.1:7002,127.0.0.1:7003
Link Listener: [Tue Sep 17 16:00:37 CST 2019]    Master.size:0   client.size:0...
read client config:10001
bind: [Tue Sep 17 16:00:38 CST 2019]   服务器开始监听端口10001,等待客户端连接.........
Login:[Tue Sep 17 16:00:38 CST 2019]   127.0.0.1:7001
Login:[Tue Sep 17 16:00:38 CST 2019]   127.0.0.1:7003
Login:[Tue Sep 17 16:00:38 CST 2019]   127.0.0.1:7002
Link Listener: [Tue Sep 17 16:00:47 CST 2019]    Master.size:1   client.size:0...
master heartbeat: [Tue Sep 17 16:00:51 CST 2019]  127.0.0.1:7002
Link Listener: [Tue Sep 17 16:00:57 CST 2019]    Master.size:2   client.size:0...
master heartbeat: [Tue Sep 17 16:01:01 CST 2019]  127.0.0.1:7002
Link Listener: [Tue Sep 17 16:01:07 CST 2019]    Master.size:3   client.size:0...
master heartbeat: [Tue Sep 17 16:01:11 CST 2019]  127.0.0.1:7002
master heartbeat: [Tue Sep 17 16:01:11 CST 2019]  127.0.0.1:7001
收到客户端连接:/127.0.0.1:59728
Link Listener: [Tue Sep 17 16:01:17 CST 2019]    Master.size:3   client.size:1...
物联网卡流量统计(首次):127.0.0.1:59728:4B
当前线程连接的master:/127.0.0.1:7002
master heartbeat: [Tue Sep 17 16:01:19 CST 2019]  127.0.0.1:7003
物联网卡流量监控:(缓存里不带单位):8B
当前线程连接的master:/127.0.0.1:7002
物联网卡流量监控:(缓存里不带单位):12B
当前线程连接的master:/127.0.0.1:7002
master heartbeat: [Tue Sep 17 16:01:21 CST 2019]  127.0.0.1:7001
物联网卡流量监控:(缓存里不带单位):16B
当前线程连接的master:/127.0.0.1:7002
物联网卡流量监控:(缓存里不带单位):20B
当前线程连接的master:/127.0.0.1:7002
客户端断开链接:/127.0.0.1:59728
Link Listener: [Tue Sep 17 16:01:27 CST 2019]    Master.size:3   client.size:0...
master heartbeat: [Tue Sep 17 16:01:29 CST 2019]  127.0.0.1:7003
master heartbeat: [Tue Sep 17 16:01:31 CST 2019]  127.0.0.1:7001
master heartbeat: [Tue Sep 17 16:01:32 CST 2019]  127.0.0.1:7002
相关实践学习
钉钉群中如何接收IoT温控器数据告警通知
本实验主要介绍如何将温控器设备以MQTT协议接入IoT物联网平台,通过云产品流转到函数计算FC,调用钉钉群机器人API,实时推送温湿度消息到钉钉群。
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
目录
相关文章
|
30天前
|
传感器 存储 机器学习/深度学习
物联网(IoT)简介:定义、技术与应用
【5月更文挑战第30天】物联网(IoT)是将物品通过嵌入式系统、传感器及通信技术连接至互联网,实现物物、物人交互和数据共享的技术。其关键包括传感器、通信、嵌入式系统、云计算和人工智能技术。物联网应用于智能家居、智慧城市、工业自动化、农业和健康医疗等领域,通过Arduino等平台可实现简单数据传输。随着技术发展,物联网将深远影响人们生活和工作方式。
128 3
|
3天前
|
供应链 安全 物联网
物联网(IoT)安全:风险与防护策略
【6月更文挑战第26天】物联网(IoT)安全风险包括数据泄露、设备劫持、DDoS攻击、超级漏洞和不安全设备。防护策略涉及强化设备安全设计、建立认证授权机制、加密数据传输、实施安全监控、加强供应链管理、提升用户安全意识及采用零信任模型。多层面合作以降低安全威胁,确保物联网稳定安全。
|
13天前
|
存储 分布式计算 物联网
Apache IoTDB进行IoT相关开发实践
IoTDB是面向物联网的时序数据库,专注于时间序列数据管理,提供高效的数据处理、集成Hadoop和Spark生态、支持多目录存储策略。它还具有InfluxDB协议适配器,允许无缝迁移原本使用InfluxDB的业务。文章讨论了IoTDB的体系结构,包括数据文件、系统文件和预写日志文件的存储策略,并介绍了如何配置数据存储目录。此外,还提及了InfluxDB版本和查询语法的支持情况。IoTDB在物联网数据管理和分析中扮演关键角色,尤其适合处理大规模实时数据。
29 5
|
11天前
|
传感器 人工智能 搜索推荐
人工智能(AI)与物联网(IoT)的融合是当今技术领域的一个重要趋势
人工智能(AI)与物联网(IoT)的融合是当今技术领域的一个重要趋势
|
12天前
|
安全 物联网 物联网安全
物联网(IoT)的兴起与挑战:技术变革的双刃剑
【6月更文挑战第17天】**物联网的崛起带来了智能家居、智慧城市等便利,但安全挑战、设备兼容性和带宽压力也随之而来。加强安全、统一标准及提升处理能力是关键。面对挑战,持续的技术创新和法规完善将推动其健康发展,物联网的未来充满希望。**
|
16天前
|
存储 分布式计算 物联网
Apache IoTDB进行IoT相关开发实践
物联网技术带来数据库管理挑战,特别是实时数据整合与安全性。IoTDB是一个专为时间序列数据设计的数据库,提供数据收集、存储和分析服务,适用于海量物联网数据。其架构包括数据文件、系统文件和预写日志文件的管理,并支持多目录存储策略。此外,IoTDB还开发了InfluxDB协议适配器,使得用户能无缝迁移原有InfluxDB业务。此适配器基于IoTDB的Java服务接口,转换InfluxDB的元数据格式,实现与IoTDB的数据交互。目前,适配器支持InfluxDB 1.x版本及部分查询语法。
60 5
|
1天前
|
传感器 安全 物联网
物联网(IoT)设备的硬件选型与集成技术博文
【6月更文挑战第28天】物联网设备硬件选型与集成聚焦关键要素:功能匹配、性能稳定性、兼容扩展及成本效益。嵌入式系统、通信协议、数据处理和安全性技术确保集成效果,支撑高效、智能的IoT系统,驱动家居、城市与工业自动化变革。
|
1月前
|
存储 关系型数据库 物联网
【PolarDB开源】PolarDB在物联网(IoT)数据存储中的应用探索
【5月更文挑战第27天】PolarDB,阿里云的高性能云数据库,针对物联网(IoT)数据存储的挑战,如大规模数据、实时性及多样性,展现出高扩展性、高性能和高可靠性。它采用分布式架构,支持动态扩展,保证99.95%的高可用性,并能处理结构化、半结构化和非结构化数据。通过SDK实现数据实时写入,支持SQL查询和冷热数据分层,有效降低成本。随着IoT发展,PolarDB在该领域的应用将更加广泛。
129 1
|
1月前
|
传感器 数据采集 监控
什么是物联网通信网关?
物联网通信网关是连接物联网设备与云或外部网络的关键桥梁。
29 2
|
22天前
|
机器学习/深度学习 传感器 算法
物联网(IoT)数据与机器学习的结合
【6月更文挑战第6天】物联网和机器学习加速融合,驱动数据收集与智能分析。通过机器学习算法处理 IoT 数据,实现智能家居、工业生产的智能化。示例代码展示如何用线性回归预测温度。结合带来的优势包括实时监测、预警、资源优化,但也面临数据质量、隐私安全、算法选择等挑战。未来需强化技术创新,应对挑战,推动社会智能化发展。
66 0

热门文章

最新文章

相关产品

  • 物联网平台