RabbitMQ 在物联网 (IoT) 项目中的应用案例

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 【8月更文第28天】随着物联网技术的发展,越来越多的设备被连接到互联网上以收集和传输数据。这些设备可以是传感器、执行器或其他类型的硬件。为了有效地管理这些设备并处理它们产生的大量数据,需要一个可靠的消息传递系统。RabbitMQ 是一个流行的开源消息中间件,它提供了一种灵活的方式来处理和转发消息,非常适合用于物联网环境。

#

概述

随着物联网技术的发展,越来越多的设备被连接到互联网上以收集和传输数据。这些设备可以是传感器、执行器或其他类型的硬件。为了有效地管理这些设备并处理它们产生的大量数据,需要一个可靠的消息传递系统。RabbitMQ 是一个流行的开源消息中间件,它提供了一种灵活的方式来处理和转发消息,非常适合用于物联网环境。

设备管理

在物联网项目中,设备管理通常涉及设备注册、状态监控、配置更新等任务。RabbitMQ 可以作为一个中心化的消息传递平台,协调这些活动。

示例架构
  1. 设备注册

    • 设备向 RabbitMQ 发送注册请求。
    • 后端服务接收请求并验证设备信息。
    • 后端服务通过 RabbitMQ 向设备发送确认消息。
  2. 状态监控

    • 设备定期向 RabbitMQ 发送心跳消息以报告其状态。
    • 后端服务订阅这些消息并记录设备的状态变化。
  3. 配置更新

    • 后端服务通过 RabbitMQ 发送更新指令给特定设备或设备组。
    • 设备接收指令后执行更新操作,并将结果反馈给后端。
示例代码

假设我们使用 Python 的 pika 库与 RabbitMQ 交互。

设备端代码:

import pika
import json

def send_message(message):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='device_registration')
    channel.basic_publish(exchange='',
                          routing_key='device_registration',
                          body=json.dumps(message))
    print(" [x] Sent registration message")
    connection.close()

if __name__ == '__main__':
    device_id = "DEVICE_001"
    device_type = "Sensor"
    firmware_version = "1.0.0"
    message = {
   
        "device_id": device_id,
        "device_type": device_type,
        "firmware_version": firmware_version
    }
    send_message(message)

后端服务代码:

import pika
import json

def on_message_received(ch, method, properties, body):
    data = json.loads(body)
    print(f" [x] Received registration request: {data}")
    # 这里可以添加验证逻辑
    ch.basic_ack(delivery_tag=method.delivery_tag)

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='device_registration', durable=True)
    channel.basic_consume(queue='device_registration',
                          on_message_callback=on_message_received)

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    main()

实时数据流处理

实时数据流处理是物联网应用中的关键部分,涉及到数据的收集、处理和存储。RabbitMQ 可以作为数据管道,将设备数据转发给多个处理服务。

示例架构
  • 数据收集
    • 设备向 RabbitMQ 发送数据。
  • 数据处理
    • 多个处理服务订阅 RabbitMQ 上的数据队列。
    • 处理服务可以是数据清洗、数据分析或数据存储服务。
  • 数据存储
    • 处理后的数据被发送到持久化存储系统。
示例代码

设备端代码(修改心跳消息为数据消息):

import pika
import json
import time

def send_data(data):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='sensor_data')
    channel.basic_publish(exchange='',
                          routing_key='sensor_data',
                          body=json.dumps(data))
    print(" [x] Sent sensor data")
    connection.close()

if __name__ == '__main__':
    while True:
        sensor_value = 25.3  # 假设这是传感器读数
        device_id = "DEVICE_001"
        data = {
   
            "device_id": device_id,
            "value": sensor_value,
            "timestamp": int(time.time())
        }
        send_data(data)
        time.sleep(10)  # 每10秒发送一次数据

数据处理服务代码:

import pika
import json

def on_message_received(ch, method, properties, body):
    data = json.loads(body)
    print(f" [x] Received sensor data: {data}")
    # 这里可以添加数据处理逻辑
    ch.basic_ack(delivery_tag=method.delivery_tag)

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='sensor_data', durable=True)
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(queue='sensor_data',
                          on_message_callback=on_message_received)

    print(' [*] Waiting for sensor data. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    main()

总结

RabbitMQ 为物联网项目提供了强大的消息传递功能,支持设备管理和实时数据流处理。通过使用 RabbitMQ,可以构建高度可扩展且可靠的物联网解决方案。以上示例展示了如何使用 RabbitMQ 来实现基本的功能,实际应用中可能还需要考虑错误处理、安全性和性能优化等问题。

相关实践学习
钉钉群中如何接收IoT温控器数据告警通知
本实验主要介绍如何将温控器设备以MQTT协议接入IoT物联网平台,通过云产品流转到函数计算FC,调用钉钉群机器人API,实时推送温湿度消息到钉钉群。
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
目录
相关文章
|
21天前
|
安全 物联网 网络安全
智能设备的安全隐患:物联网(IoT)安全指南
智能设备的安全隐患:物联网(IoT)安全指南
61 12
|
19天前
|
传感器 监控 安全
物联网(IoT):定义、影响与未来
物联网(IoT):定义、影响与未来
46 3
|
22天前
|
传感器 安全 算法
在物联网项目中使用 MicroPython 时如何确保数据安全
在物联网项目中使用MicroPython时,确保数据安全至关重要。可通过加密通信、安全固件更新、认证机制和定期审计等方法提升安全性,防止数据泄露和设备被恶意操控。
|
22天前
|
传感器 物联网 芯片
如何在物联网项目中使用 MicroPython
本指南介绍如何在物联网项目中使用MicroPython,涵盖设备选择、环境搭建、基础编程及网络通信等内容,助你快速上手MicroPython开发。
|
28天前
|
存储 JSON 运维
智能物联网平台:Azure IoT Hub在设备管理中的实践
【10月更文挑战第26天】随着物联网技术的发展,Azure IoT Hub成为企业管理和连接数百万台设备的强大平台。本文介绍Azure IoT Hub的设备管理功能,包括设备注册、设备孪生、直接方法和监控诊断,并通过示例代码展示其应用。
40 4
|
27天前
|
SQL 监控 物联网
ClickHouse在物联网(IoT)中的应用:实时监控与分析
【10月更文挑战第27天】随着物联网(IoT)技术的快速发展,越来越多的设备被连接到互联网上,产生了海量的数据。这些数据不仅包含了设备的状态信息,还包括用户的使用习惯、环境参数等。如何高效地处理和分析这些数据,成为了一个重要的挑战。作为一位数据工程师,我在一个物联网项目中深入使用了ClickHouse,以下是我的经验和思考。
61 0
|
2月前
|
人工智能 安全 物联网
|
3月前
|
存储 物联网 关系型数据库
PolarDB在物联网(IoT)数据存储中的应用探索
【9月更文挑战第6天】随着物联网技术的发展,海量设备数据对实时存储和处理提出了更高要求。传统数据库在扩展性、性能及实时性方面面临挑战。阿里云推出的PolarDB具备高性能、高可靠及高扩展性特点,能有效应对这些挑战。它采用分布式存储架构,支持多副本写入优化、并行查询等技术,确保数据实时写入与查询;多副本存储架构和数据持久化存储机制保证了数据安全;支持动态调整数据库规模,适应设备和数据增长。通过API或SDK接入IoT设备,实现数据实时写入、分布式存储与高效查询,展现出在IoT数据存储领域的巨大潜力。
70 1
|
3月前
|
传感器 监控 安全
物联网通信的基石:LoRa、Sigfox与NB-IoT详解
物联网通信的基石:LoRa、Sigfox与NB-IoT详解
341 0
|
3月前
|
机器学习/深度学习 人工智能 算法
物联网(IoT)就像是一个大型派对,无数的设备都在欢快地交流着信息
【9月更文挑战第4天】在这个万物互联的时代,物联网(IoT)犹如一场盛大的派对,各类设备欢聚一堂。然而,如何让这些设备互相理解并协同工作呢?这就需要机器学习与人工智能的助力。例如,智能空调通过学习你的使用习惯来调节温度,使你更加舒适;智能安防系统则能识别异常行为并及时报警,保障家庭安全。此外,智能农业、交通等领域也因机器学习和人工智能的应用变得更加高效。下面通过一个简单的温度预测代码示例,展示机器学习在物联网中的实际应用,让我们一起感受其强大潜力。
64 0