Kafka +深度学习+ MQTT搭建可扩展的物联网平台【附源码】

简介: Kafka +深度学习+ MQTT搭建可扩展的物联网平台【附源码】

物联网+大数据+机器学习将会是以后的趋势,这里介绍一篇这方面的文章包含源码。


混合机器学习基础架构构建了一个场景,利用Apache Kafka作为可扩展的中枢神经系统。 公共云用于极大规模地训练分析模型(例如,通过Google ML Engine在Google Cloud Platform(GCP)上使用TensorFlow和TPU,预测(即模型推断)在本地Kafka基础设施的执行( 例如,利用Kafka Streams或KSQL进行流分析)。


本文重点介绍内部部署。 创建了一个带有KSQL UDF的Github项目,用于传感器分析。 它利用KSQL的新API功能,使用Java轻松构建UDF / UDAF函数,对传入事件进行连续流处理。



使用案例:Connected Cars - 使用深度学习的实时流分析



从连接设备(本例中的汽车传感器)连续处理数百万个事件:

519bae28f15d00e8c5e9e157e2af30d9.jpg为此构建了不同的分析模型。 他们在公共云上接受TensorFlow,H2O和Google ML Engine的训练。 模型创建不是此示例的重点。 最终模型已经可以投入生产,可以部署用于实时预测。


模型服务可以通过模型server 完成,也可以本地嵌入到流处理应用程序中。 参阅RPC与流处理的权衡,以获得模型部署和....



演示:使用MQTT,Kafka和KSQL在Edge进行模型推理



Github项目:深度学习+KSQL UDF 用于流式异常检测MQTT物联网传感器数据

(下载源码:3aa46c2372700ac963550e9d06bb7c3a.jpgksql-udf-deep-learning-mqtt-iot-master.zip (474.64 KB, 下载次数: 0)

该项目的重点是通过MQTT将数据提取到Kafka并通过KSQL处理数据:

713af52f209a9541fb90a639c6b7b933.jpg

Confluent MQTT Proxy的一大优势是无需MQTT Broker即可实现物联网方案的简单性。 可以通过MQTT代理将消息直接从MQTT设备转发到Kafka。 这显着降低了工作量和成本。 如果你“只是”想要在Kafka和MQTT设备之间进行通信,这是一个完美的解决方案。


如果你想看到另一部分(与Elasticsearch / Grafana等接收器应用程序集成),请查看Github项目“KSQL for streaming IoT data”。 这实现了通过Kafka Connect和Elastic连接器与ElasticSearch和Grafana的集成。(源码下载:链接: https://pan.baidu.com/s/1FCFgAoF9v1ihp9fyqHeKag 密码: 67sz)


KSQL UDF - 源代码



开发UDF非常容易。 只需在UDF类中的一个Java方法中实现该函数:

 

@Udf(description = "apply analytic model to sensor input")             public String anomaly(String sensorinput){ "YOUR LOGIC" }

       
这里是所有代码:

package com.github.megachucky.kafka.streams.machinelearning;
import java.util.Arrays;
import hex.genmodel.GenModel;
import hex.genmodel.easy.EasyPredictModelWrapper;
import hex.genmodel.easy.RowData;
import hex.genmodel.easy.exception.PredictException;
import hex.genmodel.easy.prediction.AutoEncoderModelPrediction;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
@UdfDescription(name = "anomaly", description = "anomaly detection using deep learning")
public class Anomaly {
    // Model built with H2O R API:
      // anomaly_model <- h2o.deeplearning(x = names(train_ecg),training_frame =
      // train_ecg,activation = "Tanh",autoencoder = TRUE,hidden =
      // c(50,20,50),sparse = TRUE,l1 = 1e-4,epochs = 100)
      // Name of the generated H2O model
      private static String modelClassName = "io.confluent.ksql.function.udf.ml"
                                             + ".DeepLearning_model_R_1509973865970_1"; 
  @Udf(description = "apply analytic model to sensor input")
  public String anomaly(String sensorinput) {
      System.out.println("Kai: DL-UDF starting");
      GenModel rawModel;
        try {
            rawModel = (hex.genmodel.GenModel) Class.forName(modelClassName).newInstance();
        EasyPredictModelWrapper model = new EasyPredictModelWrapper(rawModel);
        // Prepare input sensor data to be in correct data format for the autoencoder model (double[]):
        String[] inputStringArray = sensorinput.split("#");
        double[] doubleValues = Arrays.stream(inputStringArray)
                .mapToDouble(Double::parseDouble)
                .toArray();
        RowData row = new RowData();
        int j = 0;
        for (String colName : rawModel.getNames()) {
          row.put(colName, doubleValues[j]);
          j++;
        }
        AutoEncoderModelPrediction p = model.predictAutoEncoder(row);
        // System.out.println("original: " + java.util.Arrays.toString(p.original));
        // System.out.println("reconstructedrowData: " + p.reconstructedRowData);
        // System.out.println("reconstructed: " + java.util.Arrays.toString(p.reconstructed));
        double sum = 0;
        for (int i = 0; i < p.original.length; i++) {
          sum += (p.original[i] - p.reconstructed[i]) * (p.original[i] - p.reconstructed[i]);
        }
        // Calculate Mean Square Error => High reconstruction error means anomaly
        double mse = sum / p.original.length;
        System.out.println("MSE: " + mse);
        String mseString = "" + mse;
        return (mseString);
        } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
            System.out.println(e.toString());
        } catch (PredictException e) {
            System.out.println(e.toString());
        }
        return null;
  }
}


如何使用Apache Kafka和MQTT Proxy运行演示?



执行演示的所有步骤都在Github项目中描述。

你只需安装Confluent Platform,然后按照以下步骤部署UDF,创建MQTT事件并通过KSQL levera处理它们....

这里使用Mosquitto生成MQTT消息。 当然,也可以使用任何其他MQTT客户端。 这是开放和标准化协议的巨大好处。


目录
相关文章
|
5月前
|
消息中间件 安全 物联网
海量接入、毫秒响应:易易互联基于 Apache RocketMQ + MQTT 构筑高可用物联网消息中枢
易易互联科技有限公司是吉利集团旗下专注于换电生态的全资子公司,致力于打造安全、便捷、便宜的智能换电网络。公司依托吉利GBRC换电平台,基于电池共享与车辆全生命周期运营,已布局超470座换电站,覆盖40多个城市,计划2027年达2000座。面对海量设备高并发连接、高实时性要求及数据洪峰挑战,易易互联采用阿里云MQTT与RocketMQ构建高效物联网通信架构,实现稳定接入、低延迟通信与弹性处理,全面支撑其全国换电网络规模化运营与智能化升级。
367 1
海量接入、毫秒响应:易易互联基于 Apache RocketMQ + MQTT 构筑高可用物联网消息中枢
|
9月前
|
物联网
(手把手)在华为云、阿里云搭建自己的物联网MQTT消息服务器,免费IOT平台
本文介绍如何在阿里云搭建自己的物联网MQTT消息服务器,并使用 “MQTT客户端调试工具”模拟MQTT设备,接入平台进行消息收发。
2949 42
|
9月前
|
物联网
如何在腾讯云等平台搭建自己的物联网MQTT服务器Broker
物联网技术及MQTT协议被广泛应用于各种场景。本文介绍物联网MQTT服务助手下载,如何搭建自己的物联网平台,并使用 “MQTT客户端调试工具”模拟MQTT设备,接入平台进行消息收发。
692 37
|
11月前
|
监控 物联网 网络性能优化
【杂谈】-MQTT与HTTP在物联网中的比较:为什么MQTT是更好的选择
通过上述分析,可以看出MQTT在物联网应用中的确是更好的选择。其高效的通信模型、低带宽消耗、稳定的连接保持机制以及可靠的消息质量保证,使其在各种物联网场景中都能表现出色。开发者在设计和实现物联网系统时,应优先考虑采用MQTT协议,以充分发挥其在资源受限环境下的优势,提升系统的整体性能和可靠性。
2204 26
|
机器学习/深度学习 自然语言处理 物联网
深度学习入门:从理论到实践新技术趋势与应用:探讨新兴技术如区块链、物联网、虚拟现实等的发展趋势和应用场景
【8月更文挑战第30天】本文将介绍深度学习的基本原理和实践应用。我们将从深度学习的定义、历史和发展开始,然后深入探讨其工作原理和关键技术。接着,我们将通过一个简单的代码示例来展示如何实现深度学习模型。最后,我们将讨论深度学习在现实世界中的应用和挑战。无论你是初学者还是有经验的开发者,这篇文章都将为你提供深度学习的全面理解。
|
消息中间件 存储 监控
深度写作:深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(Message Queue, MQ)扮演着至关重要的角色。MQ不仅实现了应用间的解耦,还提供了异步消息处理、流量削峰等功能。而在MQ的众多特性中,长轮询(Long Polling)机制因其能有效提升消息处理的实时性和效率,备受关注。
511 12
|
消息中间件 存储 Java
深入源码理解MQ长轮询优化机制
【11月更文挑战第22天】在分布式系统中,消息队列(MQ)作为一种重要的中间件,广泛应用于解耦、异步处理、流量削峰等场景。其中,延时消息和定时消息作为MQ的高级功能,能够进一步满足复杂的业务需求。为了实现这些功能,MQ系统需要进行一系列优化,长轮询机制便是其中的关键一环。本文将深入探讨MQ如何设计延时消息和定时消息的优化机制,特别是长轮询机制的实现原理及其在Java中的模拟实现。
268 2
|
网络协议 物联网 网络性能优化
物联网协议比较 MQTT CoAP RESTful/HTTP XMPP
【10月更文挑战第18天】本文介绍了物联网领域中四种主要的通信协议:MQTT、CoAP、RESTful/HTTP和XMPP,分别从其特点、应用场景及优缺点进行了详细对比,并提供了简单的示例代码。适合开发者根据具体需求选择合适的协议。
596 5
|
网络协议 物联网 网络性能优化
物联网江湖风云变幻!MQTT CoAP RESTful/HTTP XMPP四大门派谁主沉浮?
【9月更文挑战第3天】物联网(IoT)的兴起催生了多种通信协议,如MQTT、CoAP、RESTful/HTTP和XMPP,各自适用于不同场景。本文将对比这些协议的特点、优缺点,并提供示例代码。MQTT轻量级且支持QoS,适合大规模部署;CoAP基于UDP,适用于低功耗网络;RESTful/HTTP易于集成但不适合资源受限设备;XMPP支持双向通信,适合复杂交互应用。通过本文,开发者可更好地选择合适的物联网通信协议。
319 2
|
分布式计算 搜索推荐 物联网
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决

相关产品

  • 物联网平台